Pyspark-broadcast-and-accumulator
PySpark-放送およびアキュムレーター
並列処理の場合、Apache Sparkは共有変数を使用します。 共有変数のコピーは、ドライバーがクラスター上のエグゼキューターにタスクを送信するときにクラスターの各ノードで実行されるため、タスクの実行に使用できます。
Apache Sparkでサポートされる共有変数には2つのタイプがあります-
- 放送
- アキュムレータ
それらを詳細に理解しましょう。
放送
ブロードキャスト変数は、すべてのノードにわたってデータのコピーを保存するために使用されます。 この変数はすべてのマシンでキャッシュされ、タスクを備えたマシンでは送信されません。 次のコードブロックには、PySparkのBroadcastクラスの詳細が含まれています。
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
次の例は、Broadcast変数の使用方法を示しています。 ブロードキャスト変数には、値と呼ばれる属性があり、データを保存し、ブロードキャストされた値を返すために使用されます。
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
コマンド-ブロードキャスト変数のコマンドは次のとおりです-
$SPARK_HOME/bin/spark-submit broadcast.py
出力-次のコマンドの出力を以下に示します。
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
アキュムレータ
アキュムレーター変数は、連想演算および可換演算を介して情報を集約するために使用されます。 たとえば、加算操作またはカウンター(MapReduce内)にアキュムレーターを使用できます。 次のコードブロックには、PySparkのAccumulatorクラスの詳細が含まれています。
class pyspark.Accumulator(aid, value, accum_param)
次の例は、Accumulator変数の使用方法を示しています。 アキュムレータ変数には、ブロードキャスト変数が持っているものと同様の値と呼ばれる属性があります。 データを保存し、アキュムレータの値を返すために使用されますが、ドライバープログラムでのみ使用できます。
この例では、アキュムレーター変数は複数のワーカーによって使用され、累積値を返します。
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
コマンド-アキュムレータ変数のコマンドは次のとおりです-
$SPARK_HOME/bin/spark-submit accumulator.py
出力-上記のコマンドの出力を以下に示します。
Accumulated value is -> 150