Pyspark-quick-guide
PySpark-はじめに
この章では、Apache Sparkとは何か、PySparkはどのように開発されたのかを理解します。
Spark –概要
Apache Sparkは、超高速のリアルタイム処理フレームワークです。 メモリ内の計算を実行して、データをリアルタイムで分析します。 Apache Hadoop MapReduce がバッチ処理のみを実行しており、リアルタイム処理機能が欠けていたため、それが明らかになりました。 そのため、Apache Sparkは、ストリーム処理をリアルタイムで実行でき、バッチ処理も行えるため導入されました。
リアルタイムおよびバッチ処理とは別に、Apache Sparkは対話型クエリと反復アルゴリズムもサポートしています。 Apache Sparkには、アプリケーションをホストできる独自のクラスターマネージャーがあります。 ストレージと処理の両方にApache Hadoopを活用します。 ストレージには HDFS (Hadoop分散ファイルシステム)を使用し、 YARN でもSparkアプリケーションを実行できます。
PySpark –概要
Apache Sparkは* Scalaプログラミング言語*で記述されています。 SparkでPythonをサポートするために、Apache Spark CommunityはPySparkというツールをリリースしました。 PySparkを使用すると、Pythonプログラミング言語で RDD を操作することもできます。 これを実現できるのは、 Py4j というライブラリーがあるためです。
PySparkは、Python APIをスパークコアにリンクし、Sparkコンテキストを初期化する PySpark Shell を提供します。 多くのデータサイエンティストと分析の専門家は、その豊富なライブラリセットのためにPythonを使用しています。 PythonとSparkを統合することは、彼らにとって大きな恩恵です。
PySpark-環境設定
この章では、PySparkの環境設定について理解します。
注-これは、コンピューターにJavaおよびScalaがインストールされていることを考慮しています。
次の手順でPySparkをダウンロードして設定しましょう。
ステップ1 *-公式のApache Spark downloadページにアクセスし、そこから入手できる最新バージョンのApache Sparkをダウンロードします。 このチュートリアルでは、 *spark-2.1.0-bin-hadoop2.7 を使用しています。
- ステップ2 *-次に、ダウンロードしたSpark tarファイルを抽出します。 デフォルトでは、ダウンロードディレクトリにダウンロードされます。
ディレクトリ spark-2.1.0-bin-hadoop2.7 が作成されます。 PySparkを開始する前に、次の環境を設定して、Sparkパスと* Py4jパス*を設定する必要があります。
または、上記の環境をグローバルに設定するには、それらを* .bashrcファイル*に入れます。 次に、環境が機能するように次のコマンドを実行します。
すべての環境が設定されたので、次のコマンドを実行してSparkディレクトリに移動し、PySparkシェルを呼び出しましょう-
これにより、PySparkシェルが起動します。
PySpark-SparkContext
SparkContextは、spark機能へのエントリポイントです。 Sparkアプリケーションを実行すると、ドライバープログラムが起動します。これにはメイン関数があり、ここでSparkContextが開始されます。 次に、ドライバープログラムは、ワーカーノードのエグゼキューター内で操作を実行します。
SparkContextはPy4Jを使用して JVM を起動し、 JavaSparkContext を作成します。 デフォルトでは、PySparkは 'sc' としてSparkContextを使用できるため、新しいSparkContextの作成は機能しません。
次のコードブロックには、PySparkクラスとSparkContextが取得できるパラメーターの詳細が含まれています。
パラメーター
以下は、SparkContextのパラメーターです。
- マスター-接続先のクラスターのURLです。
- appName -ジョブの名前。
- sparkHome -Sparkインストールディレクトリ。
- pyFiles -クラスターに送信してPYTHONPATHに追加する.zipまたは.pyファイル。
- 環境-ワーカーノードの環境変数。
- batchSize -単一のJavaオブジェクトとして表されるPythonオブジェクトの数。 バッチ処理を無効にする場合は1、オブジェクトサイズに基づいてバッチサイズを自動的に選択する場合は0、無制限のバッチサイズを使用する場合は-1を設定します。
- Serializer -RDDシリアライザー。
- Conf -すべてのSparkプロパティを設定するためのL \ {SparkConf}のオブジェクト。
- ゲートウェイ-既存のゲートウェイとJVMを使用します。それ以外の場合は、新しいJVMを初期化します。
- JSC -JavaSparkContextインスタンス。
- profiler_cls -プロファイリングに使用されるカスタムプロファイラーのクラス(デフォルトはpyspark.profiler.BasicProfilerです)。
上記のパラメーターの中で、 master および appname が主に使用されます。 PySparkプログラムの最初の2行は、次のようになります-
SparkContextの例– PySparkシェル
SparkContextについて十分に理解したので、PySparkシェルで簡単な例を実行してみましょう。 この例では、 README.md ファイル内の文字「a」または「b」の行数をカウントします。 したがって、ファイルに5行があり、3行に文字 'a’がある場合、出力は→ Line with a:3 になります。 同じことが文字「b」についても行われます。
注-次の例では、PySparkシェルの起動時にSparkがscという名前のSparkContextオブジェクトを自動的に作成するため、SparkContextオブジェクトは作成しません。 別のSparkContextオブジェクトを作成しようとすると、次のエラーが発生します-「ValueError:複数のSparkContextsを一度に実行できません」。
SparkContextの例-Pythonプログラム
Pythonプログラムを使用して同じ例を実行してみましょう。 firstapp.py というPythonファイルを作成し、そのファイルに次のコードを入力します。
次に、ターミナルで次のコマンドを実行して、このPythonファイルを実行します。 上記と同じ出力が得られます。
PySpark-RDD
システムにPySparkをインストールして構成したので、Apache SparkでPythonでプログラミングできます。 ただし、その前に、Sparkの基本概念であるRDDについて理解しましょう。
RDDは Resilient Distributed Dataset の略で、これらはクラスターで並列処理を行うために複数のノードで実行および動作する要素です。 RDDは不変の要素です。つまり、RDDを作成すると、それを変更することはできません。 RDDもフォールトトレラントであるため、障害が発生した場合は自動的に回復します。 これらのRDDに複数の操作を適用して、特定のタスクを達成できます。
これらのRDDに操作を適用するには、2つの方法があります-
- 変換と
- アクション
これら2つの方法を詳細に理解しましょう。
変換-これらは、新しいRDDを作成するためにRDDに適用される操作です。 Filter、groupBy、およびmapは、変換の例です。
アクション-これらはRDDに適用される操作で、Sparkに計算を実行し、結果をドライバーに送り返すよう指示します。
PySparkで操作を適用するには、最初に PySpark RDD を作成する必要があります。 次のコードブロックには、PySpark RDDクラスの詳細があります-
PySparkを使用していくつかの基本的な操作を実行する方法を見てみましょう。 Pythonファイル内の次のコードは、前述の単語のセットを格納するRDD単語を作成します。
次に、単語に対していくつかの操作を実行します。
カウント()
RDDの要素の数が返されます。
コマンド-count()のコマンドは-
出力-上記のコマンドの出力は-
collect()
RDDのすべての要素が返されます。
コマンド-collect()のコマンドは-
出力-上記のコマンドの出力は-
foreach(f)
foreach内の関数の条件を満たす要素のみを返します。 次の例では、foreachでprint関数を呼び出します。これは、RDDのすべての要素を印刷します。
コマンド-foreach(f)のコマンドは-
出力-上記のコマンドの出力は-
フィルター(f)
フィルター内の機能を満たす要素を含む新しいRDDが返されます。 次の例では、 spark を含む文字列を除外します。
コマンド-filter(f)のコマンドは-
出力-上記のコマンドの出力は-
map(f、preservesPartitioning = False)
RDDの各要素に関数を適用することにより、新しいRDDが返されます。 次の例では、キーと値のペアを形成し、すべての文字列を値1にマッピングします。
コマンド-map(f、preservesPartitioning = False)のコマンドは-
出力-上記のコマンドの出力は-
reduce(f)
指定された可換および連想バイナリ演算を実行した後、RDDの要素が返されます。 次の例では、オペレーターから追加パッケージをインポートし、「num」に適用して単純な追加操作を実行しています。
コマンド-reduce(f)のコマンドは-
出力-上記のコマンドの出力は-
join(other、numPartitions = None)
一致するキーとその特定のキーのすべての値を持つ要素のペアを持つRDDを返します。 次の例では、2つの異なるRDDに2つの要素のペアがあります。 これらの2つのRDDを結合した後、一致するキーとその値を持つ要素を持つRDDを取得します。
コマンド-join(other、numPartitions = None)のコマンドは-
出力-上記のコマンドの出力は-
キャッシュ()
このRDDをデフォルトのストレージレベル(MEMORY_ONLY)で保持します。 RDDがキャッシュされているかどうかも確認できます。
コマンド-cache()のコマンドは-
出力-上記のプログラムの出力は-
これらは、PySpark RDDで実行される最も重要な操作の一部でした。
PySpark-放送およびアキュムレーター
並列処理の場合、Apache Sparkは共有変数を使用します。 共有変数のコピーは、ドライバーがクラスター上のエグゼキューターにタスクを送信するときにクラスターの各ノードで実行されるため、タスクの実行に使用できます。
Apache Sparkでサポートされる共有変数には2つのタイプがあります-
- 放送
- アキュムレータ
それらを詳細に理解しましょう。
放送
ブロードキャスト変数は、すべてのノードにわたってデータのコピーを保存するために使用されます。 この変数はすべてのマシンでキャッシュされ、タスクを備えたマシンでは送信されません。 次のコードブロックには、PySparkのBroadcastクラスの詳細が含まれています。
次の例は、Broadcast変数の使用方法を示しています。 ブロードキャスト変数には、値と呼ばれる属性があり、データを保存し、ブロードキャストされた値を返すために使用されます。
コマンド-ブロードキャスト変数のコマンドは次のとおりです-
出力-次のコマンドの出力を以下に示します。
アキュムレータ
アキュムレーター変数は、連想演算および可換演算を介して情報を集約するために使用されます。 たとえば、加算操作またはカウンター(MapReduce内)にアキュムレーターを使用できます。 次のコードブロックには、PySparkのAccumulatorクラスの詳細が含まれています。
次の例は、Accumulator変数の使用方法を示しています。 アキュムレータ変数には、ブロードキャスト変数が持っているものと同様の値と呼ばれる属性があります。 データを保存し、アキュムレータの値を返すために使用されますが、ドライバープログラムでのみ使用できます。
この例では、アキュムレーター変数は複数のワーカーによって使用され、累積値を返します。
コマンド-アキュムレータ変数のコマンドは次のとおりです-
出力-上記のコマンドの出力を以下に示します。
PySpark-SparkConf
ローカル/クラスター上でSparkアプリケーションを実行するには、いくつかの構成とパラメーターを設定する必要があります。これがSparkConfで役立ちます。 Sparkアプリケーションを実行するための構成を提供します。 次のコードブロックには、PySparkのSparkConfクラスの詳細が含まれています。
最初に、SparkConf()でSparkConfオブジェクトを作成します。このオブジェクトは、* spark。** Javaシステムプロパティからも値をロードします。 SparkConfオブジェクトを使用してさまざまなパラメーターを設定できるようになり、それらのパラメーターはシステムプロパティよりも優先されます。
SparkConfクラスには、チェーンをサポートするセッターメソッドがあります。 たとえば、* conf.setAppName(“ PySpark App”)。setMaster(“ local”)*と書くことができます。 SparkConfオブジェクトをApache Sparkに渡すと、ユーザーは変更できません。
以下は、SparkConfの最も一般的に使用される属性の一部です-
- * set(key、value)*-構成プロパティを設定します。
- * setMaster(value)*-マスターURLを設定します。
- * setAppName(value)*-アプリケーション名を設定します。
- * get(key、defaultValue = None)*-キーの設定値を取得します。
- * setSparkHome(value)*-ワーカーノードでSparkインストールパスを設定します。
PySparkプログラムでSparkConfを使用する次の例を考えてみましょう。 この例では、sparkアプリケーション名を PySpark App に設定し、sparkアプリケーションのマスターURLを→ spark://master:7077 に設定しています。
次のコードブロックには行があり、Pythonファイルに追加されると、PySparkアプリケーションを実行するための基本的な構成を設定します。
PySpark-SparkFiles
Apache Sparkでは、 sc.addFile (scはデフォルトのSparkContext)を使用してファイルをアップロードし、 SparkFiles.get を使用してワーカーのパスを取得できます。 したがって、SparkFilesは* SparkContext.addFile()*によって追加されたファイルへのパスを解決します。
SparkFilesには次のクラスメソッドが含まれています-
- get(ファイル名)
- getrootdirectory()
それらを詳細に理解しましょう。
get(ファイル名)
SparkContext.addFile()を介して追加されるファイルのパスを指定します。
getrootdirectory()
SparkContext.addFile()を介して追加されたファイルを含むルートディレクトリへのパスを指定します。
コマンド-コマンドは次のとおりです-
出力-上記のコマンドの出力は-
PySpark-StorageLevel
StorageLevelは、RDDの保存方法を決定します。 Apache Sparkでは、StorageLevelはRDDをメモリに保存するか、ディスクに保存するか、またはその両方を決定します。 また、RDDをシリアル化するかどうか、およびRDDパーティションを複製するかどうかも決定します。
次のコードブロックには、StorageLevelのクラス定義があります-
さて、RDDのストレージを決定するために、以下に示すさまざまなストレージレベルがあります-
- DISK_ONLY = StorageLevel(True、False、False、False、1)
- DISK_ONLY_2 = StorageLevel(True、False、False、False、2)
- MEMORY_AND_DISK = StorageLevel(True、True、False、False、1)
- MEMORY_AND_DISK_2 = StorageLevel(True、True、False、False、2)
- MEMORY_AND_DISK_SER = StorageLevel(True、True、False、False、1)
- MEMORY_AND_DISK_SER_2 = StorageLevel(True、True、False、False、2)
- MEMORY_ONLY = StorageLevel(False、True、False、False、1)
- MEMORY_ONLY_2 = StorageLevel(False、True、False、False、2)
- MEMORY_ONLY_SER = StorageLevel(False、True、False、False、1)
- MEMORY_ONLY_SER_2 = StorageLevel(False、True、False、False、2)
- OFF_HEAP = StorageLevel(True、True、True、False、1)
次のStorageLevelの例を考えてみましょう。ここでは、ストレージレベル* MEMORY_AND_DISK_2、*を使用します。これは、RDDパーティションに2のレプリケーションがあることを意味します。
コマンド-コマンドは次のとおりです-
出力-上記のコマンドの出力は以下のとおりです-
PySpark-MLlib
Apache Sparkは、 MLlib と呼ばれる機械学習APIを提供します。 PySparkには、Pythonでこの機械学習APIもあります。 以下に記載されているさまざまな種類のアルゴリズムをサポートしています-
- mllib.classification - spark.mllib パッケージは、バイナリ分類、マルチクラス分類、および回帰分析のためのさまざまなメソッドをサポートしています。 分類で最も人気のあるアルゴリズムのいくつかは、 Random Forest、Naive Bayes、Decision Tree などです。
- mllib.clustering -クラスタリングは教師なしの学習問題であり、類似性の概念に基づいてエンティティのサブセットを互いにグループ化することを目的としています。
- mllib.fpm -頻繁なパターンマッチングは、大規模なデータセットを分析するための最初のステップに通常含まれる、頻繁なアイテム、アイテムセット、サブシーケンス、またはその他の下位構造をマイニングすることです。 これは何年もの間、データマイニングの活発な研究トピックです。
- mllib.linalg -線形代数用のMLlibユーティリティ。
- mllib.recommendation -レコメンダーシステムでは、協調フィルタリングが一般的に使用されます。 これらの手法は、ユーザーアイテムの関連付けマトリックスの不足しているエントリを埋めることを目的としています。
- spark.mllib -現在、モデルベースの協調フィルタリングをサポートしています。このフィルタリングでは、ユーザーと製品は、不足しているエントリの予測に使用できる潜在的な要因の小さなセットで記述されます。 spark.mllibは、交互最小二乗(ALS)アルゴリズムを使用してこれらの潜在的要因を学習します。
- mllib.regression -線形回帰は回帰アルゴリズムのファミリーに属します。 回帰の目的は、変数間の関係と依存関係を見つけることです。 線形回帰モデルとモデルの要約を操作するためのインターフェイスは、ロジスティック回帰の場合に似ています。
mllibパッケージの一部として、他のアルゴリズム、クラス、および関数もあります。 今のところ、 pyspark.mllib のデモを理解しましょう。
次の例は、ALSアルゴリズムを使用して推奨モデルを構築し、トレーニングデータで評価する協調フィルタリングの例です。
使用されるデータセット-test.data
コマンド-コマンドは次のようになります-
出力-上記のコマンドの出力は-
PySpark-シリアライザー
シリアル化は、Apache Sparkのパフォーマンスチューニングに使用されます。 ネットワーク経由で送信されるデータ、ディスクに書き込まれるデータ、またはメモリに保存されるデータはすべてシリアル化する必要があります。 シリアル化は、コストのかかる操作で重要な役割を果たします。
PySparkは、パフォーマンスチューニングのためにカスタムシリアライザーをサポートしています。 次の2つのシリアライザーはPySparkでサポートされています-
MarshalSerializer
PythonのMarshal Serializerを使用してオブジェクトをシリアル化します。 このシリアライザーはPickleSerializerよりも高速ですが、サポートするデータ型の数は少なくなります。
PickleSerializer
PythonのPickle Serializerを使用してオブジェクトをシリアル化します。 このシリアライザーは、ほぼすべてのPythonオブジェクトをサポートしますが、より専門的なシリアライザーほど高速ではない場合があります。
PySparkのシリアル化の例を見てみましょう。 ここでは、MarshalSerializerを使用してデータをシリアル化します。
コマンド-コマンドは次のとおりです-
出力-上記のコマンドの出力は-