Apache Mesosで構築したクラスタ上で、Apache Sparkの分散処理を実行する。

クラスタマネージャーApache Mesosを使って、Amazon EC2にクラスタを構築。」で構築したクラスタを使って、Sparkの分散処理を実行してみます。

Apache SparkをAmazon EC2にインスール

今回は、SparkをAmazon EC2のインスタンスにインスールしていきます。やっていることは下記と同じで、ビルド済みのパッケージをダウンロードして配置しているだけです。

高速な分散処理エンジンApache Sparkの操作を対話シェルで試してみる!

cd /tmp
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.2.0-bin-hadoop2.4.tgz
tar xzf spark-*.tgz
rm spark-*.tgz
sudo mv spark-* /opt/spark

これでSparkが配置完了です。

Apache Mesosを利用するためのApache Sparkの設定

Mesosで構築したクラスタを利用するために、Sparkの設定を変更していきます。

spark-env.shに、MESOS_NATIVE_LIBRARYとSPARK_EXECUTOR_URIの環境変数を設定します。

cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
cat << EOT >> /opt/spark/conf/spark-env.sh
export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
export SPARK_EXECUTOR_URI=http://d3kbcqa49mib13.cloudfront.net/spark-1.2.0-bin-hadoop2.4.tgz
EOT

spark-defaults.confに、spark.mesos.coarseのプロパティを追加します。

cp /opt/spark/conf/spark-defaults.conf.template /opt/spark/conf/spark-defaults.conf
cat << EOT >> /opt/spark/conf/spark-defaults.conf
spark.mesos.coarse=true
EOT

このspark.mesos.coarseを設定しないと、次のようなエラーが出てタスクが処理されませんでした。

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

設定はこれで終わりです。

Apache Mesos上にApche Sparkの起動

インスールが完了したら、Sparkを起動します。

今回もSparkの対話シェルを起動しますが、–masterオプションを指定しています。ここには、「クラスタマネージャーApache Mesosを使って、Amazon EC2にクラスタ構築。」で構築したMesosのURIを指定します。

/opt/spark/bin/spark-shell --master mesos://172.31.1.168:5050/

対話シェルが起動して、プロンプトが表示されたら、4040ポートでブラウザを開くと、Sparkの管理画面を見ることができます。

スクリーンショット 2015-01-05 15.15.21

そして、5050ポートでMesosの管理画面を開くと、FrameworksのメニューにSpark shellが認識されているのが分かります!

スクリーンショット 2015-01-05 15.14.30

SparkとMesosの連携が確認できました!

クラスタ2台の構成にしてSparkのコマンドを実行すると、次のようにタスクが分散されて実行されるのを見ることができます。

スクリーンショット 2015-01-05 20.19.34

Mesosで構築したクラスタ上でSparkの動作が確認できました!

動作が確認できたところで、分散によって処理が高速化されるのかが気になってきます。

ノード数を変えて処理時間を比較

シンプルな問題として、1〜10億までの総和を計算してみます。

まず、クラスタのノード数を1台にして、実行。総和の計算には、39.6秒かかりました。

scala> sc.parallelize(1 to 1000000000).sum()
// ...(略)...
15/01/05 11:24:03 INFO DAGScheduler: Job 8 finished: sum at <console>:13, took 39.586944 s
res6: Double = 5.0000000007595942E17

ノードを追加して、2台にします。19.7秒で、約半分の時間になりました!

scala> sc.parallelize(1 to 1000000000).sum()
// ...(略)...
15/01/05 11:20:54 INFO DAGScheduler: Job 6 finished: sum at <console>:13, took 19.719305 s
res6: Double = 5.0000000007595942E17

更にノードを1台追加して、3台にしました。総和の計算時間は、14.5秒。1台の時に比べて、約3倍の速度になりました。

scala> sc.parallelize(1 to 1000000000).sum()
// ...(略)...
15/01/05 11:26:38 INFO DAGScheduler: Job 9 finished: sum at <console>:13, took 14.491517 s
res6: Double = 5.0000000007595942E17

ノード数に比例して高速化する様子を確認することができました!

ノード数 処理時間
1台 39.6 sec
2台 19.7 sec
3台 14.5 sec

実際に使うときは、こんな分散しやすい問題ばかりではないと思いますが、それでも十分に高速化な処理が期待できそうです。

About katty0324

Scroll To Top