Apache-presto-kafka-connector

提供:Dev Guides
移動先:案内検索

Apache Presto-KAFKAコネクター

PrestoのKafkaコネクタを使用すると、Prestoを使用してApache Kafkaのデータにアクセスできます。

前提条件

次のApacheプロジェクトの最新バージョンをダウンロードしてインストールします。

  • Apache ZooKeeper
  • アパッチカフカ

ZooKeeperを起動します

次のコマンドを使用して、ZooKeeperサーバーを起動します。

$ bin/zookeeper-server-start.sh config/zookeeper.properties

現在、ZooKeeperは2181でポートを開始します。

カフカを開始

次のコマンドを使用して、別の端末でKafkaを起動します。

$ bin/kafka-server-start.sh config/server.properties

kafkaの起動後、ポート番号9092が使用されます。

TPCHデータ

ダウンロードtpch-kafka

$  curl -o kafka-tpch
https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_
0811-1.0.sh

これで、上記のコマンドを使用してMavenセントラルからローダーをダウンロードしました。 次のような応答が返されます。

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
  5 21.6M    5 1279k    0     0  83898      0  0:04:30  0:00:15  0:04:15  129k
  6 21.6M    6 1407k    0     0  86656      0  0:04:21  0:00:16  0:04:05  131k
 24 21.6M   24 5439k    0     0   124k      0  0:02:57  0:00:43  0:02:14  175k
 24 21.6M   24 5439k    0     0   124k      0  0:02:58  0:00:43  0:02:15  160k
 25 21.6M   25 5736k    0     0   128k      0  0:02:52  0:00:44  0:02:08  181k
 ………………………..

次に、次のコマンドを使用して実行可能にします。

$ chmod 755 kafka-tpch

tpch-kafkaを実行します

次のコマンドを使用して、kafka-tpchプログラムを実行し、tpchデータを含む多数のトピックをプリロードします。

問い合わせ

$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny

結果

2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging
to stderr
2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand
Processing tables: [customer, orders, lineitem, part, partsupp, supplier,
nation, region]
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1
de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2
de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3
de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4
de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
………………………
……………………….

現在、Kafkaテーブルの顧客、注文、サプライヤーなどは、tpchを使用してロードされます。

構成設定を追加

Prestoサーバーに次のKafkaコネクター構成設定を追加しましょう。

connector.name = kafka

kafka.nodes = localhost:9092

kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,
tpch.supplier,tpch.nation,tpch.region

kafka.hide-internal-columns = false

上記の構成では、KafkaテーブルはKafka-tpchプログラムを使用してロードされます。

Presto CLIを起動します

次のコマンドを使用してPresto CLIを起動します。

$ ./presto --server localhost:8080 --catalog kafka —schema tpch;
  • 「tpch」*はKafkaコネクタのスキーマであり、次のような応答を受け取ります。
presto:tpch>

リスト表

次のクエリは、*“ tpch” *スキーマ内のすべてのテーブルをリストします。

問い合わせ

presto:tpch> show tables;

結果

  Table
----------
 customer
 lineitem
 nation
 orders
 part
 partsupp
 region
 supplier

顧客テーブルの説明

次のクエリでは、*“ customer” *テーブルについて説明しています。

問い合わせ

presto:tpch> describe customer;

結果

  Column           |  Type   |                   Comment
-------------------+---------+---------------------------------------------
 _partition_id     | bigint  | Partition Id
 _partition_offset | bigint  | Offset for the message within the partition
 _segment_start    | bigint  | Segment start offset
 _segment_end      | bigint  | Segment end offset
 _segment_count    | bigint  | Running message count per segment
 _key              | varchar | Key text
 _key_corrupt      | boolean | Key data is corrupt
 _key_length       | bigint  | Total number of key bytes
 _message          | varchar | Message text
 _message_corrupt  | boolean | Message data is corrupt
 _message_length   | bigint  | Total number of message bytes