Pyspark-rdd
PySpark-RDD
システムにPySparkをインストールして構成したので、Apache SparkでPythonでプログラミングできます。 ただし、その前に、Sparkの基本概念であるRDDについて理解しましょう。
RDDは Resilient Distributed Dataset の略で、これらはクラスターで並列処理を行うために複数のノードで実行および動作する要素です。 RDDは不変の要素です。つまり、RDDを作成すると、それを変更することはできません。 RDDもフォールトトレラントであるため、障害が発生した場合は自動的に回復します。 これらのRDDに複数の操作を適用して、特定のタスクを達成できます。
これらのRDDに操作を適用するには、2つの方法があります-
- 変換と
- アクション
これら2つの方法を詳細に理解しましょう。
変換-これらは、新しいRDDを作成するためにRDDに適用される操作です。 Filter、groupBy、およびmapは、変換の例です。
アクション-これらはRDDに適用される操作で、Sparkに計算を実行し、結果をドライバーに送り返すよう指示します。
PySparkで操作を適用するには、最初に PySpark RDD を作成する必要があります。 次のコードブロックには、PySpark RDDクラスの詳細があります-
PySparkを使用していくつかの基本的な操作を実行する方法を見てみましょう。 Pythonファイル内の次のコードは、前述の単語のセットを格納するRDD単語を作成します。
次に、単語に対していくつかの操作を実行します。
カウント()
RDDの要素の数が返されます。
コマンド-count()のコマンドは-
出力-上記のコマンドの出力は-
collect()
RDDのすべての要素が返されます。
コマンド-collect()のコマンドは-
出力-上記のコマンドの出力は-
foreach(f)
foreach内の関数の条件を満たす要素のみを返します。 次の例では、foreachでprint関数を呼び出します。これは、RDDのすべての要素を印刷します。
コマンド-foreach(f)のコマンドは-
出力-上記のコマンドの出力は-
フィルター(f)
フィルター内の機能を満たす要素を含む新しいRDDが返されます。 次の例では、 spark を含む文字列を除外します。
コマンド-filter(f)のコマンドは-
出力-上記のコマンドの出力は-
map(f、preservesPartitioning = False)
RDDの各要素に関数を適用することにより、新しいRDDが返されます。 次の例では、キーと値のペアを形成し、すべての文字列を値1にマッピングします。
コマンド-map(f、preservesPartitioning = False)のコマンドは-
出力-上記のコマンドの出力は-
reduce(f)
指定された可換および連想バイナリ演算を実行した後、RDDの要素が返されます。 次の例では、オペレーターから追加パッケージをインポートし、「num」に適用して単純な追加操作を実行しています。
コマンド-reduce(f)のコマンドは-
出力-上記のコマンドの出力は-
join(other、numPartitions = None)
一致するキーとその特定のキーのすべての値を持つ要素のペアを持つRDDを返します。 次の例では、2つの異なるRDDに2つの要素のペアがあります。 これらの2つのRDDを結合した後、一致するキーとその値を持つ要素を持つRDDを取得します。
コマンド-join(other、numPartitions = None)のコマンドは-
出力-上記のコマンドの出力は-
キャッシュ()
このRDDをデフォルトのストレージレベル(MEMORY_ONLY)で保持します。 RDDがキャッシュされているかどうかも確認できます。
コマンド-cache()のコマンドは-
出力-上記のプログラムの出力は-
これらは、PySpark RDDで実行される最も重要な操作の一部でした。