sstableloaderでCassandraにデータをバルクインポートする。

サービスのデータの一部を、Cassandraへ移行することを検討しています。
そんな時問題になるのが、どうやってデータを移動させるか、です。

sstableloaderでデータインポート

Cassandraへのデータの移行方法として候補に上がるのが、sstableloaderを用いる方法です。
スクリーンショット 2014-11-16 4.07.03
Cassandra bulk loader (sstableloader) | DataStax Cassandra 2.1 Documentation
SSTableは、Cassandraのデータ形式です。Cassandraのデータ形式でインポートするので、効率が良さそうです。

どうやってSSTableを用意するか?

既存のCassandraデータベースで使用されているSSTableを、他のCassandraにインポートする場合は簡単です。
しかし、MySQLなど他の種類のデータベースから移行する場合は、そのデータベースのデータをSSTableに変換する必要があります。
SSTableはバイナリなので簡単には作ることができませんが、CassandraのJavaライブラリ内に用意されているSSTableWriterというクラスを使えば、SSTableの形式を理解しなくても生成することができます。
早速SSTableWriterを使ってSSTableを作成し、sstableloaderを使ってCassandraにインポートしてみます。
下記を参考にしました。
Using the Cassandra Bulk Loader, Updated | Planet Cassandra

CQLSSTableWriterを使ってSSTableを作成

今回は仮に、下記のようなテーブルにインポートすることを想定しています。

create table client (
  application_id int,
  created timestamp,
  id varchar,
  primary key (application_id, created)
) with clustering order by (created desc);

まずは、MavenでCassandraのライブラリを導入します。


	org.apache.cassandra
	cassandra-all
	2.1.1

定数で設定を用意しておきます。

private static final String KEY_SPACE = "test_key_space";
private static final String COLUMN_FAMILY = "client";

ここから、実際の処理です。
CQLSSTableWriter.Builderを使って、CQLSSTableWriterインスタンスを作成します。設定に、保存先とインポート先のテーブルの定義、インサートのためのクエリを設定しておきます。パーティショナの設定は、Cassandraの設定と合わせます。

Config.setClientMode(true);
String directory = String.format("/Users/kataoka/%s/%s/", KEY_SPACE, COLUMN_FAMILY);
String table = String.format("create columnfamily %s.%s (application_id int, created timestamp, id varchar, primary key (application_id, created)) with clustering order by (created desc);", KEY_SPACE, COLUMN_FAMILY);
String statement = String.format("insert into %s.%s (application_id, created, id) values (?, ?, ?);", KEY_SPACE, COLUMN_FAMILY);
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
builder.inDirectory(directory).forTable(table).using(statement).withPartitioner(new Murmur3Partitioner());

CQLSSTableWriterインスタンスを作成したら、addRowメソッドでデータを投げ込んでいきます。今回はダミーデータを入れていきます。

CQLSSTableWriter writer = builder.build();
for (int i = 0; i < 1000000; i++) {
	writer.addRow(new Integer(1), new Date(new Random().nextLong() % new Date().getTime()), RandomStringUtils.randomAlphabetic(16));
}
writer.close();

これでSSTableが生成されます。保存先に指定したディレクトリには、複数のファイルが生成されます。

ls /Users/kataoka/test_key_space/client/
test_key_space-client-ka-1-CompressionInfo.db
test_key_space-client-ka-1-Digest.sha1
test_key_space-client-ka-1-Index.db
test_key_space-client-ka-1-TOC.txt
test_key_space-client-ka-1-Data.db
test_key_space-client-ka-1-Filter.db
test_key_space-client-ka-1-Statistics.db

これでSSTableの作成が完了です。続いて、SSTableのロードをします。

SSTableのロード

SSTableのロードは、ツールが用意されているので一発です。
sstableloaderの引数に、先ほどのSSTableの保存ディレクトリを指定して、実行します。

$ sstableloader -d 127.0.0.1 /Users/kataoka/test_key_space/client/
Established connection to initial hosts
Opening sstables and calculating sections to stream
Streaming relevant part of /Users/kataoka/test_key_space/client/test_key_space-client-ka-1-Data.db to [/127.0.0.1]
progress: [/127.0.0.1]0:1/1 100% total: 100% 0  MB/s(avg: 2 MB/s)
Summary statistics:
   Connections per host:         : 1
   Total files transferred:      : 1
   Total bytes transferred:      : 42437199
   Total duration (ms):          : 7247
   Average transfer rate (MB/s): : 2
   Peak transfer rate (MB/s):    : 8

少し待つと、インポートが完了します。
保存されたデータを確認してみます。

cqlsh:test_key_space> select * from client where application_id = 1 order by created desc limit 3;
 application_id | created                  | id
----------------+--------------------------+------------------
              1 | 2014-11-16 03:41:02+0900 | MBNaDIRCcpOrNpNM
              1 | 2014-11-16 02:13:43+0900 | nNSUQDqPRWRtHGca
              1 | 2014-11-16 01:36:27+0900 | KeBqgZvwpGhqQlRi
(3 rows)

インポートされています!

処理時間を確認

今回は、100万件のデータのSSTableを生成し、インポートしてみました。

処理 時間
SSTableの生成 20.3秒
SSTableのインポート 9.6秒

これなら、それなりの規模のデータも移行できそうです。

タイトルとURLをコピーしました