Pyspark-rdd

提供:Dev Guides
移動先:案内検索

PySpark-RDD

システムにPySparkをインストールして構成したので、Apache SparkでPythonでプログラミングできます。 ただし、その前に、Sparkの基本概念であるRDDについて理解しましょう。

RDDは Resilient Distributed Dataset の略で、これらはクラスターで並列処理を行うために複数のノードで実行および動作する要素です。 RDDは不変の要素です。つまり、RDDを作成すると、それを変更することはできません。 RDDもフォールトトレラントであるため、障害が発生した場合は自動的に回復します。 これらのRDDに複数の操作を適用して、特定のタスクを達成できます。

これらのRDDに操作を適用するには、2つの方法があります-

  • 変換と
  • アクション

これら2つの方法を詳細に理解しましょう。

変換-これらは、新しいRDDを作成するためにRDDに適用される操作です。 Filter、groupBy、およびmapは、変換の例です。

アクション-これらはRDDに適用される操作で、Sparkに計算を実行し、結果をドライバーに送り返すよう指示します。

PySparkで操作を適用するには、最初に PySpark RDD を作成する必要があります。 次のコードブロックには、PySpark RDDクラスの詳細があります-

class pyspark.RDD (
   jrdd,
   ctx,
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

PySparkを使用していくつかの基本的な操作を実行する方法を見てみましょう。 Pythonファイル内の次のコードは、前述の単語のセットを格納するRDD単語を作成します。

words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)

次に、単語に対していくつかの操作を実行します。

カウント()

RDDの要素の数が返されます。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

コマンド-count()のコマンドは-

$SPARK_HOME/bin/spark-submit count.py

出力-上記のコマンドの出力は-

Number of elements in RDD → 8

collect()

RDDのすべての要素が返されます。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

コマンド-collect()のコマンドは-

$SPARK_HOME/bin/spark-submit collect.py

出力-上記のコマンドの出力は-

Elements in RDD -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

foreach(f)

foreach内の関数の条件を満たす要素のみを返します。 次の例では、foreachでprint関数を呼び出します。これは、RDDのすべての要素を印刷します。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

コマンド-foreach(f)のコマンドは-

$SPARK_HOME/bin/spark-submit foreach.py

出力-上記のコマンドの出力は-

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

フィルター(f)

フィルター内の機能を満たす要素を含む新しいRDDが返されます。 次の例では、 spark を含む文字列を除外します。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

コマンド-filter(f)のコマンドは-

$SPARK_HOME/bin/spark-submit filter.py

出力-上記のコマンドの出力は-

Fitered RDD -> [
   'spark',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

map(f、preservesPartitioning = False)

RDDの各要素に関数を適用することにより、新しいRDDが返されます。 次の例では、キーと値のペアを形成し、すべての文字列を値1にマッピングします。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

コマンド-map(f、preservesPartitioning = False)のコマンドは-

$SPARK_HOME/bin/spark-submit map.py

出力-上記のコマンドの出力は-

Key value pair -> [
   ('scala', 1),
   ('java', 1),
   ('hadoop', 1),
   ('spark', 1),
   ('akka', 1),
   ('spark vs hadoop', 1),
   ('pyspark', 1),
   ('pyspark and spark', 1)
]

reduce(f)

指定された可換および連想バイナリ演算を実行した後、RDDの要素が返されます。 次の例では、オペレーターから追加パッケージをインポートし、「num」に適用して単純な追加操作を実行しています。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

コマンド-reduce(f)のコマンドは-

$SPARK_HOME/bin/spark-submit reduce.py

出力-上記のコマンドの出力は-

Adding all the elements -> 15

join(other、numPartitions = None)

一致するキーとその特定のキーのすべての値を持つ要素のペアを持つRDDを返します。 次の例では、2つの異なるRDDに2つの要素のペアがあります。 これらの2つのRDDを結合した後、一致するキーとその値を持つ要素を持つRDDを取得します。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

コマンド-join(other、numPartitions = None)のコマンドは-

$SPARK_HOME/bin/spark-submit join.py

出力-上記のコマンドの出力は-

Join RDD -> [
   ('spark', (1, 2)),
   ('hadoop', (4, 5))
]

キャッシュ()

このRDDをデフォルトのストレージレベル(MEMORY_ONLY)で保持します。 RDDがキャッシュされているかどうかも確認できます。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

コマンド-cache()のコマンドは-

$SPARK_HOME/bin/spark-submit cache.py

出力-上記のプログラムの出力は-

Words got cached -> True

これらは、PySpark RDDで実行される最も重要な操作の一部でした。