高速な分散処理エンジンApache Sparkの操作を対話シェルで試してみる!

最近、規模の大きなデータを、高速に処理する方法を調べています。

Apache Sparkは高速な分散処理エンジン

「高速に」といっても、「スループットが高い」という意味と、「レスポンスが早い」という意味があります。

「スループットが高い」というのは、一定時間にどれだけたくさんの処理ができるかです。「レスポンスが早い」というのは、処理を要求してから完成までにかかる時間が短いという意味です。

分散処理といえばHadoopが浮かびますが、Hadoopはスループット重視です。レスポンスタイムを重視したい場合には向きません。レスポンスを重視したい場合というのはたとえば、ユーザーからリクエストを受けてすぐに分析結果を返したい場合のような場合です。

Apache Sparkは、レスポンスタイムを重視した分散処理エンジンです。

スクリーンショット 2015-01-03 17.09.12

Apache Spark™ – Lightning-Fast Cluster Computing

先日書いたPrestoもレスポンスタイムを重視した分散処理エンジンです。

分散SQLクエリエンジンPrestoをMac OS Xにインストール

PrestoはSQLクエリを処理することに特化していますが、Sparkは汎用の処理ができるように設計されています。

Apache Sparkのダウンロード

Apache Sparkはオープンソースです。GitHubからソースコードをダウンロードできます。

apache/spark

Sparkは主にScalaで書かれていて、Mavenでビルドすることができます。

mvn -DskipTests clean package

または、Apache Sparkのダウンロードページから、ビルド済みのパッケージをダウンロードすることもできます。ダウンロードは下記から。

Downloads | Apache Spark

今回は試しに、「Apache Spark 1.2.0 Prebuild for Hadoop 2.4」をダウンロードしてみました。

対話シェルの起動

ダウンロードして解凍したら、Sparkシェルを起動してみます。

$ ./bin/spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
# ...(略)...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/
 
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25)
Type in expressions to have them evaluated.
Type :help for more information.
# ...(略)...
Spark context available as sc.
 
scala>

プロンプトが表示されて、入力を受け付ける状態になりました。

「Spark context available as sc.」というメッセージが出ていますが、SparkContextの変数がscという名前で定義されているので、これを利用することができます。

対話シェルにコマンドを打ち込んでみる

Sparkが起動したら、さっそく対話シェルにコマンドを打ち込んでみます。下記のクイックスタートを参考にしています。

Quick Start – Spark 1.2.0 Documentation

ここでは、試しにSparkのREADME.mdを分析してみます。たとえば、README.mdを読み込んで、その行数を数えてみます。

scala> sc.textFile("README.md").count()
res9: Long = 98

結果は98行と出ました。

bashでこのファイルの行数を数えてみると確かに98行のようです。

$ cat README.md | wc -l
      98

mapとreduceで複雑な分析をする

countという単純な操作だけでなく、mapメソッドとreduceメソッドで複雑な分析をすることもできます。

たとえば、mapメソッドで、行をスペース区切りにして、1行に含まれる単語数に変換します。そして、reduceメソッドで、1行に含まれる単語数の最大値を求めます。

scala> sc.textFile("README.md").map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res10: Int = 14

結果、最大で14単語を含む行があるということが分かりました。

reduceメソッドで用いる関数は、下記のように、java.lang.Mathパッケージをインポートして、置き換えることもできます。

scala> import java.lang.Math
import java.lang.Math
 
scala> sc.textFile("README.md").map(line => line.split(" ").size).reduce((a, b) => Math.max(a,b))
res11: Int = 14

こちらも結果は14単語です。

これから、Javaアプリケーションでの利用を考えていく予定です。

About katty0324

Scroll To Top