Apache SparkをJavaアプリケーションから使う。

高速な分散処理エンジンApache Sparkの操作を対話シェルで試してみる!」に続いて、Apache Sparkの操作をJavaからしてみます。

Apache SparkをJavaから操作

下記のSparkのプログラミングガイドを参考にしました。

スクリーンショット 2015-01-04 11.12.06

Spark Programming Guide – Spark 1.2.0 Documentation

前回使ったSparkの対話シェルはScalaでしたが、今回は使い慣れているJavaで試してみます。Javaの対話シェルは用意されていませんが、SparkのJavaライブラリは用意されています。

JavaでSparkContextの用意

ライブラリはMavenから導入できます。最新バージョンのSparkで試します。

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<version>1.2.0</version>
</dependency>

ライブラリが導入できたら、SparkContextを作成します。

高速な分散処理エンジンApache Sparkの操作を対話シェルで試してみる!」では、対話シェルの起動時点でscという変数にSparkContextが用意されていました。

今回は、プログラムで接続先などを指定して、SparkContextを作成します。クラスタを指定せずにスタンドアローンで使う場合は、setMasterメソッドにはlocalを指定します。

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

これで分析の準備ができました。

ファイルの行数を数えてみる

比較しやすいように、前回と同じ分析をしてみます。

まずは、README.mdのファイルの行数を数えます。

long lineCount = sparkContext.textFile("README.md").count();
System.out.println("lineCount: " + lineCount);

Scalaで書いた場合とほぼ同じです。

結果は98行となりました。

lineCount: 98

当然ですが、Scalaの対話シェルで実行した場合と結果も同じになりました。

mapとreduceで複雑な分析をする

続いて、もう少し複雑な処理をしてみます。

次のプログラムは、1行に含まれる単語数の最大値を求めます。ラムダ式が使いたかったので、Java 8を利用しています。

long maxWords = sparkContext.textFile("README.md").map(line -> line.split(" ").length).reduce((a, b) -> Math.max(a, b));
System.out.println("maxWords: " + maxWords);

ラムダ式の表現は、JavaとScalaで少し違いますが、書いていることは同じです。

結果は、14行となりました。

maxWords: 14

こちらも対話シェルで実行した場合と同じになっています。

Sparkの操作のインタフェースは、Java 8のStream APIとほぼ同じなので、Stream APIを使ったことがあれば、すぐに手に馴染むと思います。

About katty0324

Scroll To Top