Pyspark-quick-guide

提供:Dev Guides
移動先:案内検索

PySpark-はじめに

この章では、Apache Sparkとは何か、PySparkはどのように開発されたのかを理解します。

Spark –概要

Apache Sparkは、超高速のリアルタイム処理フレームワークです。 メモリ内の計算を実行して、データをリアルタイムで分析します。 Apache Hadoop MapReduce がバッチ処理のみを実行しており、リアルタイム処理機能が欠けていたため、それが明らかになりました。 そのため、Apache Sparkは、ストリーム処理をリアルタイムで実行でき、バッチ処理も行えるため導入されました。

リアルタイムおよびバッチ処理とは別に、Apache Sparkは対話型クエリと反復アルゴリズムもサポートしています。 Apache Sparkには、アプリケーションをホストできる独自のクラスターマネージャーがあります。 ストレージと処理の両方にApache Hadoopを活用します。 ストレージには HDFS (Hadoop分散ファイルシステム)を使用し、 YARN でもSparkアプリケーションを実行できます。

PySpark –概要

Apache Sparkは* Scalaプログラミング言語*で記述されています。 SparkでPythonをサポートするために、Apache Spark CommunityはPySparkというツールをリリースしました。 PySparkを使用すると、Pythonプログラミング言語で RDD を操作することもできます。 これを実現できるのは、 Py4j というライブラリーがあるためです。

PySparkは、Python APIをスパークコアにリンクし、Sparkコンテキストを初期化する PySpark Shell を提供します。 多くのデータサイエンティストと分析の専門家は、その豊富なライブラリセットのためにPythonを使用しています。 PythonとSparkを統合することは、彼らにとって大きな恩恵です。

PySpark-環境設定

この章では、PySparkの環境設定について理解します。

-これは、コンピューターにJavaおよびScalaがインストールされていることを考慮しています。

次の手順でPySparkをダウンロードして設定しましょう。

ステップ1 *-公式のApache Spark downloadページにアクセスし、そこから入手できる最新バージョンのApache Sparkをダウンロードします。 このチュートリアルでは、 *spark-2.1.0-bin-hadoop2.7 を使用しています。

  • ステップ2 *-次に、ダウンロードしたSpark tarファイルを抽出します。 デフォルトでは、ダウンロードディレクトリにダウンロードされます。
# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

ディレクトリ spark-2.1.0-bin-hadoop2.7 が作成されます。 PySparkを開始する前に、次の環境を設定して、Sparkパスと* Py4jパス*を設定する必要があります。

export SPARK_HOME =/home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

または、上記の環境をグローバルに設定するには、それらを* .bashrcファイル*に入れます。 次に、環境が機能するように次のコマンドを実行します。

# source .bashrc

すべての環境が設定されたので、次のコマンドを実行してSparkディレクトリに移動し、PySparkシェルを呼び出しましょう-

# ./bin/pyspark

これにより、PySparkシェルが起動します。

Python 2.7.12 (default, Nov 19 2016, 06:48:10)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
    /__/__  ___ _____//__
    _\ \/_ \/_ `/__/ '_/
  /__/.__/\_,_/_//_/\_\   version 2.1.0
     /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

PySpark-SparkContext

SparkContextは、spark機能へのエントリポイントです。 Sparkアプリケーションを実行すると、ドライバープログラムが起動します。これにはメイン関数があり、ここでSparkContextが開始されます。 次に、ドライバープログラムは、ワーカーノードのエグゼキューター内で操作を実行します。

SparkContextはPy4Jを使用して JVM を起動し、 JavaSparkContext を作成します。 デフォルトでは、PySparkは 'sc' としてSparkContextを使用できるため、新しいSparkContextの作成は機能しません。

SparkContext

次のコードブロックには、PySparkクラスとSparkContextが取得できるパラメーターの詳細が含まれています。

class pyspark.SparkContext (
   master = None,
   appName = None,
   sparkHome = None,
   pyFiles = None,
   environment = None,
   batchSize = 0,
   serializer = PickleSerializer(),
   conf = None,
   gateway = None,
   jsc = None,
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

パラメーター

以下は、SparkContextのパラメーターです。

  • マスター-接続先のクラスターのURLです。
  • appName -ジョブの名前。
  • sparkHome -Sparkインストールディレクトリ。
  • pyFiles -クラスターに送信してPYTHONPATHに追加する.zipまたは.pyファイル。
  • 環境-ワーカーノードの環境変数。
  • batchSize -単一のJavaオブジェクトとして表されるPythonオブジェクトの数。 バッチ処理を無効にする場合は1、オブジェクトサイズに基づいてバッチサイズを自動的に選択する場合は0、無制限のバッチサイズを使用する場合は-1を設定します。
  • Serializer -RDDシリアライザー。
  • Conf -すべてのSparkプロパティを設定するためのL \ {SparkConf}のオブジェクト。
  • ゲートウェイ-既存のゲートウェイとJVMを使用します。それ以外の場合は、新しいJVMを初期化します。
  • JSC -JavaSparkContextインスタンス。
  • profiler_cls -プロファイリングに使用されるカスタムプロファイラーのクラス(デフォルトはpyspark.profiler.BasicProfilerです)。

上記のパラメーターの中で、 master および appname が主に使用されます。 PySparkプログラムの最初の2行は、次のようになります-

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContextの例– PySparkシェル

SparkContextについて十分に理解したので、PySparkシェルで簡単な例を実行してみましょう。 この例では、 README.md ファイル内の文字「a」または「b」の行数をカウントします。 したがって、ファイルに5行があり、3行に文字 'a’がある場合、出力は→ Line with a:3 になります。 同じことが文字「b」についても行われます。

-次の例では、PySparkシェルの起動時にSparkがscという名前のSparkContextオブジェクトを自動的に作成するため、SparkContextオブジェクトは作成しません。 別のSparkContextオブジェクトを作成しようとすると、次のエラーが発生します-「ValueError:複数のSparkContextsを一度に実行できません」

PySpark Shell

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContextの例-Pythonプログラム

Pythonプログラムを使用して同じ例を実行してみましょう。 firstapp.py というPythonファイルを作成し、そのファイルに次のコードを入力します。

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

次に、ターミナルで次のコマンドを実行して、このPythonファイルを実行します。 上記と同じ出力が得られます。

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

PySpark-RDD

システムにPySparkをインストールして構成したので、Apache SparkでPythonでプログラミングできます。 ただし、その前に、Sparkの基本概念であるRDDについて理解しましょう。

RDDは Resilient Distributed Dataset の略で、これらはクラスターで並列処理を行うために複数のノードで実行および動作する要素です。 RDDは不変の要素です。つまり、RDDを作成すると、それを変更することはできません。 RDDもフォールトトレラントであるため、障害が発生した場合は自動的に回復します。 これらのRDDに複数の操作を適用して、特定のタスクを達成できます。

これらのRDDに操作を適用するには、2つの方法があります-

  • 変換と
  • アクション

これら2つの方法を詳細に理解しましょう。

変換-これらは、新しいRDDを作成するためにRDDに適用される操作です。 Filter、groupBy、およびmapは、変換の例です。

アクション-これらはRDDに適用される操作で、Sparkに計算を実行し、結果をドライバーに送り返すよう指示します。

PySparkで操作を適用するには、最初に PySpark RDD を作成する必要があります。 次のコードブロックには、PySpark RDDクラスの詳細があります-

class pyspark.RDD (
   jrdd,
   ctx,
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

PySparkを使用していくつかの基本的な操作を実行する方法を見てみましょう。 Pythonファイル内の次のコードは、前述の単語のセットを格納するRDD単語を作成します。

words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)

次に、単語に対していくつかの操作を実行します。

カウント()

RDDの要素の数が返されます。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

コマンド-count()のコマンドは-

$SPARK_HOME/bin/spark-submit count.py

出力-上記のコマンドの出力は-

Number of elements in RDD → 8

collect()

RDDのすべての要素が返されます。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

コマンド-collect()のコマンドは-

$SPARK_HOME/bin/spark-submit collect.py

出力-上記のコマンドの出力は-

Elements in RDD -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

foreach(f)

foreach内の関数の条件を満たす要素のみを返します。 次の例では、foreachでprint関数を呼び出します。これは、RDDのすべての要素を印刷します。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

コマンド-foreach(f)のコマンドは-

$SPARK_HOME/bin/spark-submit foreach.py

出力-上記のコマンドの出力は-

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

フィルター(f)

フィルター内の機能を満たす要素を含む新しいRDDが返されます。 次の例では、 spark を含む文字列を除外します。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

コマンド-filter(f)のコマンドは-

$SPARK_HOME/bin/spark-submit filter.py

出力-上記のコマンドの出力は-

Fitered RDD -> [
   'spark',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

map(f、preservesPartitioning = False)

RDDの各要素に関数を適用することにより、新しいRDDが返されます。 次の例では、キーと値のペアを形成し、すべての文字列を値1にマッピングします。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

コマンド-map(f、preservesPartitioning = False)のコマンドは-

$SPARK_HOME/bin/spark-submit map.py

出力-上記のコマンドの出力は-

Key value pair -> [
   ('scala', 1),
   ('java', 1),
   ('hadoop', 1),
   ('spark', 1),
   ('akka', 1),
   ('spark vs hadoop', 1),
   ('pyspark', 1),
   ('pyspark and spark', 1)
]

reduce(f)

指定された可換および連想バイナリ演算を実行した後、RDDの要素が返されます。 次の例では、オペレーターから追加パッケージをインポートし、「num」に適用して単純な追加操作を実行しています。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

コマンド-reduce(f)のコマンドは-

$SPARK_HOME/bin/spark-submit reduce.py

出力-上記のコマンドの出力は-

Adding all the elements -> 15

join(other、numPartitions = None)

一致するキーとその特定のキーのすべての値を持つ要素のペアを持つRDDを返します。 次の例では、2つの異なるRDDに2つの要素のペアがあります。 これらの2つのRDDを結合した後、一致するキーとその値を持つ要素を持つRDDを取得します。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

コマンド-join(other、numPartitions = None)のコマンドは-

$SPARK_HOME/bin/spark-submit join.py

出力-上記のコマンドの出力は-

Join RDD -> [
   ('spark', (1, 2)),
   ('hadoop', (4, 5))
]

キャッシュ()

このRDDをデフォルトのストレージレベル(MEMORY_ONLY)で保持します。 RDDがキャッシュされているかどうかも確認できます。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

コマンド-cache()のコマンドは-

$SPARK_HOME/bin/spark-submit cache.py

出力-上記のプログラムの出力は-

Words got cached -> True

これらは、PySpark RDDで実行される最も重要な操作の一部でした。

PySpark-放送およびアキュムレーター

並列処理の場合、Apache Sparkは共有変数を使用します。 共有変数のコピーは、ドライバーがクラスター上のエグゼキューターにタスクを送信するときにクラスターの各ノードで実行されるため、タスクの実行に使用できます。

Apache Sparkでサポートされる共有変数には2つのタイプがあります-

  • 放送
  • アキュムレータ

それらを詳細に理解しましょう。

放送

ブロードキャスト変数は、すべてのノードにわたってデータのコピーを保存するために使用されます。 この変数はすべてのマシンでキャッシュされ、タスクを備えたマシンでは送信されません。 次のコードブロックには、PySparkのBroadcastクラスの詳細が含まれています。

class pyspark.Broadcast (
   sc = None,
   value = None,
   pickle_registry = None,
   path = None
)

次の例は、Broadcast変数の使用方法を示しています。 ブロードキャスト変数には、値と呼ばれる属性があり、データを保存し、ブロードキャストされた値を返すために使用されます。

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

コマンド-ブロードキャスト変数のコマンドは次のとおりです-

$SPARK_HOME/bin/spark-submit broadcast.py

出力-次のコマンドの出力を以下に示します。

Stored data -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka'
]
Printing a particular element in RDD -> hadoop

アキュムレータ

アキュムレーター変数は、連想演算および可換演算を介して情報を集約するために使用されます。 たとえば、加算操作またはカウンター(MapReduce内)にアキュムレーターを使用できます。 次のコードブロックには、PySparkのAccumulatorクラスの詳細が含まれています。

class pyspark.Accumulator(aid, value, accum_param)

次の例は、Accumulator変数の使用方法を示しています。 アキュムレータ変数には、ブロードキャスト変数が持っているものと同様の値と呼ばれる属性があります。 データを保存し、アキュムレータの値を返すために使用されますが、ドライバープログラムでのみ使用できます。

この例では、アキュムレーター変数は複数のワーカーによって使用され、累積値を返します。

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
   global num
   num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

コマンド-アキュムレータ変数のコマンドは次のとおりです-

$SPARK_HOME/bin/spark-submit accumulator.py

出力-上記のコマンドの出力を以下に示します。

Accumulated value is -> 150

PySpark-SparkConf

ローカル/クラスター上でSparkアプリケーションを実行するには、いくつかの構成とパラメーターを設定する必要があります。これがSparkConfで役立ちます。 Sparkアプリケーションを実行するための構成を提供します。 次のコードブロックには、PySparkのSparkConfクラスの詳細が含まれています。

class pyspark.SparkConf (
   loadDefaults = True,
   _jvm = None,
   _jconf = None
)

最初に、SparkConf()でSparkConfオブジェクトを作成します。このオブジェクトは、* spark。** Javaシステムプロパティからも値をロードします。 SparkConfオブジェクトを使用してさまざまなパラメーターを設定できるようになり、それらのパラメーターはシステムプロパティよりも優先されます。

SparkConfクラスには、チェーンをサポートするセッターメソッドがあります。 たとえば、* conf.setAppName(“ PySpark App”)。setMaster(“ local”)*と書くことができます。 SparkConfオブジェクトをApache Sparkに渡すと、ユーザーは変更できません。

以下は、SparkConfの最も一般的に使用される属性の一部です-

  • * set(key、value)*-構成プロパティを設定します。
  • * setMaster(value)*-マスターURLを設定します。
  • * setAppName(value)*-アプリケーション名を設定します。
  • * get(key、defaultValue = None)*-キーの設定値を取得します。
  • * setSparkHome(value)*-ワーカーノードでSparkインストールパスを設定します。

PySparkプログラムでSparkConfを使用する次の例を考えてみましょう。 この例では、sparkアプリケーション名を PySpark App に設定し、sparkアプリケーションのマスターURLを→ spark://master:7077 に設定しています。

次のコードブロックには行があり、Pythonファイルに追加されると、PySparkアプリケーションを実行するための基本的な構成を設定します。

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

PySpark-SparkFiles

Apache Sparkでは、 sc.addFile (scはデフォルトのSparkContext)を使用してファイルをアップロードし、 SparkFiles.get を使用してワーカーのパスを取得できます。 したがって、SparkFilesは* SparkContext.addFile()*によって追加されたファイルへのパスを解決します。

SparkFilesには次のクラスメソッドが含まれています-

  • get(ファイル名)
  • getrootdirectory()

それらを詳細に理解しましょう。

get(ファイル名)

SparkContext.addFile()を介して追加されるファイルのパスを指定します。

getrootdirectory()

SparkContext.addFile()を介して追加されたファイルを含むルートディレクトリへのパスを指定します。

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

コマンド-コマンドは次のとおりです-

$SPARK_HOME/bin/spark-submit sparkfiles.py

出力-上記のコマンドの出力は-

Absolute Path ->
  /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

PySpark-StorageLevel

StorageLevelは、RDDの保存方法を決定します。 Apache Sparkでは、StorageLevelはRDDをメモリに保存するか、ディスクに保存するか、またはその両方を決定します。 また、RDDをシリアル化するかどうか、およびRDDパーティションを複製するかどうかも決定します。

次のコードブロックには、StorageLevelのクラス定義があります-

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

さて、RDDのストレージを決定するために、以下に示すさまざまなストレージレベルがあります-

  • DISK_ONLY = StorageLevel(True、False、False、False、1)
  • DISK_ONLY_2 = StorageLevel(True、False、False、False、2)
  • MEMORY_AND_DISK = StorageLevel(True、True、False、False、1)
  • MEMORY_AND_DISK_2 = StorageLevel(True、True、False、False、2)
  • MEMORY_AND_DISK_SER = StorageLevel(True、True、False、False、1)
  • MEMORY_AND_DISK_SER_2 = StorageLevel(True、True、False、False、2)
  • MEMORY_ONLY = StorageLevel(False、True、False、False、1)
  • MEMORY_ONLY_2 = StorageLevel(False、True、False、False、2)
  • MEMORY_ONLY_SER = StorageLevel(False、True、False、False、1)
  • MEMORY_ONLY_SER_2 = StorageLevel(False、True、False、False、2)
  • OFF_HEAP = StorageLevel(True、True、True、False、1)

次のStorageLevelの例を考えてみましょう。ここでは、ストレージレベル* MEMORY_AND_DISK_2、*を使用します。これは、RDDパーティションに2のレプリケーションがあることを意味します。

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local",
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

コマンド-コマンドは次のとおりです-

$SPARK_HOME/bin/spark-submit storagelevel.py

出力-上記のコマンドの出力は以下のとおりです-

Disk Memory Serialized 2x Replicated

PySpark-MLlib

Apache Sparkは、 MLlib と呼ばれる機械学習APIを提供します。 PySparkには、Pythonでこの機械学習APIもあります。 以下に記載されているさまざまな種類のアルゴリズムをサポートしています-

  • mllib.classification - spark.mllib パッケージは、バイナリ分類、マルチクラス分類、および回帰分析のためのさまざまなメソッドをサポートしています。 分類で最も人気のあるアルゴリズムのいくつかは、 Random Forest、Naive Bayes、Decision Tree などです。
  • mllib.clustering -クラスタリングは教師なしの学習問題であり、類似性の概念に基づいてエンティティのサブセットを互いにグループ化することを目的としています。
  • mllib.fpm -頻繁なパターンマッチングは、大規模なデータセットを分析するための最初のステップに通常含まれる、頻繁なアイテム、アイテムセット、サブシーケンス、またはその他の下位構造をマイニングすることです。 これは何年もの間、データマイニングの活発な研究トピックです。
  • mllib.linalg -線形代数用のMLlibユーティリティ。
  • mllib.recommendation -レコメンダーシステムでは、協調フィルタリングが一般的に使用されます。 これらの手法は、ユーザーアイテムの関連付けマトリックスの不足しているエントリを埋めることを目的としています。
  • spark.mllib -現在、モデルベースの協調フィルタリングをサポートしています。このフィルタリングでは、ユーザーと製品は、不足しているエントリの予測に使用できる潜在的な要因の小さなセットで記述されます。 spark.mllibは、交互最小二乗(ALS)アルゴリズムを使用してこれらの潜在的要因を学習します。
  • mllib.regression -線形回帰は回帰アルゴリズムのファミリーに属します。 回帰の目的は、変数間の関係と依存関係を見つけることです。 線形回帰モデルとモデルの要約を操作するためのインターフェイスは、ロジスティック回帰の場合に似ています。

mllibパッケージの一部として、他のアルゴリズム、クラス、および関数もあります。 今のところ、 pyspark.mllib のデモを理解しましょう。

次の例は、ALSアルゴリズムを使用して推奨モデルを構築し、トレーニングデータで評価する協調フィルタリングの例です。

使用されるデータセット-test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)

   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))

   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

コマンド-コマンドは次のようになります-

$SPARK_HOME/bin/spark-submit recommend.py

出力-上記のコマンドの出力は-

Mean Squared Error = 1.20536041839e-05

PySpark-シリアライザー

シリアル化は、Apache Sparkのパフォーマンスチューニングに使用されます。 ネットワーク経由で送信されるデータ、ディスクに書き込まれるデータ、またはメモリに保存されるデータはすべてシリアル化する必要があります。 シリアル化は、コストのかかる操作で重要な役割を果たします。

PySparkは、パフォーマンスチューニングのためにカスタムシリアライザーをサポートしています。 次の2つのシリアライザーはPySparkでサポートされています-

MarshalSerializer

PythonのMarshal Serializerを使用してオブジェクトをシリアル化します。 このシリアライザーはPickleSerializerよりも高速ですが、サポートするデータ型の数は少なくなります。

class pyspark.MarshalSerializer

PickleSerializer

PythonのPickle Serializerを使用してオブジェクトをシリアル化します。 このシリアライザーは、ほぼすべてのPythonオブジェクトをサポートしますが、より専門的なシリアライザーほど高速ではない場合があります。

class pyspark.PickleSerializer

PySparkのシリアル化の例を見てみましょう。 ここでは、MarshalSerializerを使用してデータをシリアル化します。

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

コマンド-コマンドは次のとおりです-

$SPARK_HOME/bin/spark-submit serializing.py

出力-上記のコマンドの出力は-

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]