Ubuntu16.04でTransporterを使用して、変換されたデータをMongoDBからElasticsearchに同期する方法
序章
Transporterは、さまざまなデータストア間でデータを移動するためのオープンソースツールです。 開発者は、データベース間でのデータの移動、ファイルからデータベースへのデータの移動、またはその逆などのタスクのために1回限りのスクリプトを作成することがよくありますが、Transporterなどのツールを使用することにはいくつかの利点があります。
Transporterでは、パイプラインを構築します。これは、ソース(データが読み取られる場所)からシンク(データが書き込まれる場所)へのデータのフローを定義します。 )。 ソースとシンクは、SQLまたはNoSQLデータベース、フラットファイル、またはその他のリソースです。 Transporterはプラグイン可能な拡張機能であるadaptorsを使用してこれらのリソースと通信し、プロジェクトにはデフォルトで人気のあるデータベース用のいくつかのアダプターが含まれています。
Transporterでは、データの移動に加えて、トランスフォーマーを使用してパイプラインを移動するときにデータを変更することもできます。 アダプターと同様に、デフォルトでいくつかのトランスフォーマーが含まれています。 独自のトランスフォーマーを作成して、データの変更をカスタマイズすることもできます。
このチュートリアルでは、Transporterの組み込みアダプターとJavaScriptで記述されたカスタムトランスフォーマーを使用して、MongoDBデータベースからElasticsearchにデータを移動および処理する例について説明します。
前提条件
このチュートリアルに従うには、次のものが必要です。
- このUbuntu16.04初期サーバーセットアップチュートリアルに従ってセットアップされた1つのUbuntu16.04サーバー。これには、sudo非rootユーザーとファイアウォールが含まれます。
- このMongoDBonUbuntu 16.04チュートリアル、または既存のMongoDBインストールに従ってインストールされたMongoDB。
- Elasticsearchは、このElasticsearch on Ubuntu 16.04チュートリアル、または既存のElasticsearchインストールに従ってインストールされます。
トランスポーターパイプラインはJavaScriptで記述されています。 このチュートリアルに従うためにJavaScriptの予備知識や経験は必要ありませんが、これらのJavaScriptチュートリアルで詳細を学ぶことができます。
ステップ1—トランスポーターのインストール
Transporterは、最も一般的なオペレーティングシステム用のバイナリを提供します。 Ubuntuのインストールプロセスには、Linuxバイナリをダウンロードして実行可能にするという2つのステップが含まれます。
まず、GitHubのTransporterの最新リリースページから最新バージョンへのリンクを取得します。 -linux-amd64
で終わるリンクをコピーします。 このチュートリアルでは、執筆時点で最新のv0.5.2を使用しています。
バイナリをホームディレクトリにダウンロードします。
cd wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64
それを/usr/ local /binまたはお好みのインストールディレクトリに移動します。
mv transporter-*-linux-amd64 /usr/local/bin/transporter
次に、実行可能にして、実行できるようにします。
chmod +x /usr/local/bin/transporter
バイナリを実行することにより、Transporterが正しくセットアップされていることをテストできます。
transporter
使用法ヘルプの出力とバージョン番号が表示されます。
OutputUSAGE transporter <command> [flags] COMMANDS run run pipeline loaded from a file . . . VERSION 0.5.2
Transporterを使用してMongoDBからElasticsearchにデータを移動するには、移動するMongoDBのデータと、Transporterに移動方法を指示するパイプラインの2つが必要です。 次のステップでいくつかのサンプルデータを作成しますが、移動するMongoDBデータベースがすでにある場合は、次のステップをスキップして、ステップ3に直接進むことができます。
ステップ2—サンプルデータをMongoDBに追加する(オプション)
このステップでは、MongoDBに単一のコレクションを含むサンプルデータベースを作成し、そのコレクションにいくつかのドキュメントを追加します。 次に、チュートリアルの残りの部分で、このサンプルデータをTransporterパイプラインを使用して移行および変換します。
まず、MongoDBデータベースに接続します。
mongo
これにより、プロンプトがmongo>
に変更され、MongoDBシェルを使用していることが示されます。
ここから、作業するデータベースを選択します。 これをmy_application
と呼びます。
use my_application
MongoDB
では、データベースまたはコレクションを明示的に作成する必要はありません。 名前で選択したデータベースへのデータの追加を開始すると、そのデータベースが自動的に作成されます。
したがって、my_application
データベースを作成するには、2つのドキュメントをusers
コレクションに保存します。1つはSammySharkを表し、もう1つはGillyGlowfishを表します。 これがテストデータになります。
db.users.save({"firstName": "Sammy", "lastName": "Shark"}); db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});
ドキュメントを追加したら、users
コレクションにクエリを実行してレコードを表示できます。
db.users.find().pretty();
出力は以下の出力のようになりますが、_id
列は異なります。 MongoDBは、コレクション内のドキュメントを一意に識別するためにオブジェクトIDを自動的に追加します。
output{ "_id" : ObjectId("59299ac7f80b31254a916456"), "firstName" : "Sammy", "lastName" : "Shark" } { "_id" : ObjectId("59299ac7f80b31254a916457"), "firstName" : "Gilly", "lastName" : "Glowfish" }
CTRL+C
を押して、MongoDBシェルを終了します。
次に、このデータをMongoDBからElasticsearchに移動するためのTransporterパイプラインを作成しましょう。
ステップ3—基本的なパイプラインを作成する
Transporterのパイプラインは、デフォルトでpipeline.js
という名前のJavaScriptファイルによって定義されます。 組み込みのinit
コマンドは、ソースとシンクを指定して、正しいディレクトリに基本的な構成ファイルを作成します。
MongoDBをソースとして、Elasticsearchをシンクとして、スターターpipeline.js
を初期化します。
transporter init mongodb elasticsearch
次の出力が表示されます。
OutputWriting pipeline.js...
このステップではpipeline.js
を変更する必要はありませんが、どのように機能するかを見てみましょう。
ファイルは次のようになりますが、コマンドcat pipeline.js
、less pipeline.js
(q
を押してless
を終了)を使用してファイルの内容を表示することもできます。または、お気に入りのテキストエディタで開きます。
パイプライン.js
var source = mongodb({ "uri": "${MONGODB_URI}" // "timeout": "30s", // "tail": false, // "ssl": false, // "cacerts": ["/path/to/cert.pem"], // "wc": 1, // "fsync": false, // "bulk": false, // "collection_filters": "{}", // "read_preference": "Primary" }) var sink = elasticsearch({ "uri": "${ELASTICSEARCH_URI}" // "timeout": "10s", // defaults to 30s // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch }) t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
var source
およびvar sink
で始まる行は、MongoDBおよびElasticsearchアダプターのJavaScript変数をそれぞれ定義します。 このステップの後半で、これらのアダプターが必要とするMONGODB_URI
およびELASTICSEARCH_URI
環境変数を定義します。
//
で始まる行はコメントです。 これらは、パイプラインに設定できるいくつかの一般的な構成オプションを強調していますが、ここで作成する基本的なパイプラインには使用していません。
最後の行は、ソースとシンクを接続します。 変数transporter
またはt
を使用すると、パイプラインにアクセスできます。 .Source()
および.Save()
関数を使用して、ファイルで以前に定義されたsource
およびsink
変数を使用してソースとシンクを追加します。
Source()
およびSave()
関数の3番目の引数は、namespace.
です。最後の引数として/.*/
を渡すことは、MongoDBからすべてのデータを転送することを意味します。 Elasticsearchの同じ名前空間に保存します。
このパイプラインを実行する前に、 MongoDBURIおよびElasticsearchURIの環境変数を設定する必要があります。 使用している例では、両方ともデフォルト設定でローカルにホストされていますが、既存のMongoDBまたはElasticsearchインスタンスを使用している場合は、これらのオプションをカスタマイズしてください。
export MONGODB_URI='mongodb://localhost/my_application' export ELASTICSEARCH_URI='http://localhost:9200/my_application'
これで、パイプラインを実行する準備が整いました。
transporter run pipeline.js
次のように終了する出力が表示されます。
Output. . . INFO[0001] metrics source records: 2 path=source ts=1522942118483391242 INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960 INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878
最後から2番目と3番目の行で、この出力は、ソースに2つのレコードが存在し、2つのレコードがシンクに移動されたことを示しています。
両方のレコードが処理されたことを確認するために、Elasticsearchにmy_application
データベースの内容を照会できます。これは現在存在しているはずです。
curl $ELASTICSEARCH_URI/_search?pretty=true
?pretty=true
パラメーターを使用すると、出力が読みやすくなります。
Output{ "took" : 5, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "lastName" : "Shark" } } ] } }
MongoDBのデータベースとコレクションは、Elasticsearchのインデックスとタイプに類似しています。 それを念頭に置いて、次のことを確認してください。
_index
フィールドがmy_application,
元のMongoDBデータベースの名前に設定されています)。_type
フィールドがusers,
に設定されたMongoDBコレクションの名前。firstName
フィールドとlastName
フィールドには、それぞれ「サミー」、「サメ」、「ギリー」、「グローフィッシュ」と入力されています。
これにより、MongoDBの両方のレコードがTransporterを介して正常に処理され、Elasticsearchに読み込まれたことが確認されます。 この基本的なパイプラインに基づいて構築するために、入力データを変換できる中間処理ステップを追加します。
ステップ4—トランスフォーマーを作成する
名前が示すように、トランスフォーマーは、ソースデータをシンクにロードする前に変更します。 たとえば、新しいフィールドを追加したり、フィールドを削除したり、フィールドのデータを変更したりできます。 Transporterには、事前定義されたトランスフォーマーとカスタムトランスポーターのサポートが付属しています。
通常、カスタムトランスフォーマーはJavaScript関数として記述され、別のファイルに保存されます。 それらを使用するには、pipeline.js
のトランスフォーマーファイルへの参照を追加します。 Transporterには、OttoとGojaの両方のJavaScriptエンジンが含まれています。 Gojaは新しく、一般的に高速であるため、ここで使用します。 唯一の機能上の違いは構文です。
transform.js
というファイルを作成します。このファイルを使用して、変換関数を記述します。
nano transform.js
これが使用する関数です。この関数は、fullName
という新しいフィールドを作成します。このフィールドの値は、スペースで区切られたfirstName
フィールドとlastName
フィールドを連結したものになります。 (Sammy Shark
など)。
transform.js
function transform(msg) { msg.data.fullName = msg.data.firstName + " " + msg.data.lastName; return msg }
このファイルの行を見ていきましょう。
- ファイルの最初の行である
function transform(msg),
は、関数定義です。 msg
は、ソースドキュメントの詳細を含むJavaScriptオブジェクトです。 このオブジェクトを使用して、パイプラインを通過するデータにアクセスします。- 関数の最初の行は2つの既存のフィールドを連結し、はその値を新しい
fullName
フィールドに割り当てます。 - 関数の最後の行は、パイプラインの残りの部分で使用するために、新しく変更された
msg
オブジェクトを返します。
ファイルを保存して閉じます。
次に、このトランスフォーマーを使用するようにパイプラインを変更する必要があります。 pipeline.js
ファイルを開いて編集します。
nano pipeline.js
最後の行で、Transform()
関数への呼び出しを追加して、Source()
とSave()
への呼び出しの間のパイプラインにトランスフォーマーを追加する必要があります。
〜/transporter/pipeline.js
. . . t.Source("source", source, "/.*/") .Transform(goja({"filename": "transform.js"})) .Save("sink", sink, "/.*/")
Transform()
に渡される引数は、変換のタイプであり、この場合はGojaです。 goja
関数を使用して、相対パスを使用してトランスフォーマーのファイル名を指定します。
ファイルを保存して閉じます。 パイプラインを再実行してトランスフォーマーをテストする前に、Elasticsearchの既存のデータを前のテストからクリアしましょう。
curl -XDELETE $ELASTICSEARCH_URI
コマンドの成功を確認するこの出力が表示されます。
Output{"acknowledged":true}
次に、パイプラインを再実行します。
transporter run pipeline.js
出力は前のテストと非常によく似ており、パイプラインが以前と同じように正常に完了したかどうかを最後の数行で確認できます。 確かに、Elasticsearchを再度チェックして、データが期待する形式で存在するかどうかを確認できます。
curl $ELASTICSEARCH_URI/_search?pretty=true
新しい出力にfullName
フィールドが表示されます。
Output{ "took" : 9, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e9c6687d9f638ced4fe", "_score" : 1.0, "_source" : { "firstName" : "Gilly", "fullName" : "Gilly Glowfish", "lastName" : "Glowfish" } }, { "_index" : "my_application", "_type" : "users", "_id" : "5ac63e986687d9f638ced4fd", "_score" : 1.0, "_source" : { "firstName" : "Sammy", "fullName" : "Sammy Shark", "lastName" : "Shark" } } ] } }
fullName
フィールドが両方のドキュメントに追加されており、値が正しく設定されていることに注意してください。 これで、トランスポーターパイプラインにカスタム変換を追加する方法がわかりました。
結論
MongoDBからElasticsearchにデータをコピーおよび変更するためのトランスフォーマーを備えた基本的なTransporterパイプラインを構築しました。 同じ方法でより複雑な変換を適用したり、同じパイプラインで複数の変換をチェーンしたりすることができます。 MongoDBとElasticsearchは、Transporterがサポートするアダプターの2つにすぎません。 また、フラットファイル、PostgresなどのSQLデータベース、およびその他の多くのデータソースもサポートしています。
GitHubのTransporterプロジェクトをチェックして、APIの最新の変更を更新し、 Transporter wiki にアクセスして、アダプター、トランスフォーマー、およびトランスフォーマーの他の機能。