2015年9月14日月曜日

embulkのプラグインをJavaで作ってみる

■embulkのプラグイン

データ加工ツールのembulkのプラグインを作ってみます。
Javaで作ります。




■雛型の一覧を表示する

embulkのプラグインを作るには new コマンドで雛型を作ります。
どんな雛型があるか以下のコマンドで確認します。

embulk new

X:\embulk>embulk new
2015-XX-XX 00:00:00.000 +0900: Embulk v0.7.4
Usage: new <category> <name>
categories:
    ruby-input                 Ruby record input plugin    (like "mysql")
    ruby-output                Ruby record output plugin   (like "mysql")
    ruby-filter                Ruby record filter plugin   (like "add-hostname")
    #ruby-file-input           Ruby file input plugin      (like "ftp")          # not implemented yet [#21]
    #ruby-file-output          Ruby file output plugin     (like "ftp")          # not implemented yet [#22]
    ruby-parser                Ruby file parser plugin     (like "csv")
    ruby-formatter             Ruby file formatter plugin  (like "csv")
    #ruby-decoder              Ruby file decoder plugin    (like "gzip")         # not implemented yet [#31]
    #ruby-encoder              Ruby file encoder plugin    (like "gzip")         # not implemented yet [#32]
    java-input                 Java record input plugin    (like "mysql")
    java-output                Java record output plugin   (like "mysql")
    java-filter                Java record filter plugin   (like "add-hostname")
    java-file-input            Java file input plugin      (like "ftp")
    java-file-output           Java file output plugin     (like "ftp")
    java-parser                Java file parser plugin     (like "csv")
    java-formatter             Java file formatter plugin  (like "csv")
    java-decoder               Java file decoder plugin    (like "gzip")
    java-encoder               Java file encoder plugin    (like "gzip")

examples:
    new ruby-output hbase
    new ruby-filter int-to-string


Javaでパーサーを作るので java-parser を使います。

■雛型を作る
雛型を作るには
embulk new java-parser プラグイン名
とします。
実際には
embulk new java-parser sample

X:\embulk>embulk new java-parser sample
2015-XX-XX 00:00:00.000 +0900: Embulk v0.7.4
Creating embulk-parser-sample/
  Creating embulk-parser-sample/README.md
  Creating embulk-parser-sample/LICENSE.txt
  Creating embulk-parser-sample/.gitignore
  Creating embulk-parser-sample/gradle/wrapper/gradle-wrapper.jar
  Creating embulk-parser-sample/gradle/wrapper/gradle-wrapper.properties
  Creating embulk-parser-sample/gradlew.bat
  Creating embulk-parser-sample/gradlew
  Creating embulk-parser-sample/build.gradle
  Creating embulk-parser-sample/lib/embulk/parser/sample.rb
  Creating embulk-parser-sample/src/main/java/org/embulk/parser/sample/SampleParserPlugin.java
  Creating embulk-parser-sample/src/test/java/org/embulk/parser/sample/TestSampleParserPlugin.java
  Creating embulk-parser-sample/lib/embulk/guess/sample.rb

Plugin template is successfully generated.
Next steps:

  $ cd embulk-parser-sample
  $ ./gradlew package

色々なファイルが出来たのでこれらを使ってパーサープラグインを作っていきます

■パーサーを作る

一行づつ読み込んで加工するパーサーを作ってみます。
まずは
embulk-parser-sample/src/main/java/org/embulk/parser/sample/SampleParserPlugin.java
を編集します。

package org.embulk.parser.sample;

import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.FileInput;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.SchemaConfig;

public class SampleParserPlugin
        implements ParserPlugin
{
    public interface PluginTask                
            extends Task, LineDecoder.DecoderTask   // --- ①
    {
        // configuration option 1 (required integer)    //--- ②
        @Config("option1")
        public int getOption1();

        // configuration option 2 (optional string, null is not allowed)
        @Config("optoin2")
        @ConfigDefault("\"myvalue\"")
        public String getOption2();

        // configuration option 3 (optional string, null is allowed)
        @Config("optoin3")
        @ConfigDefault("null")
        public Optional<String> getOption3();

        // if you get schema from config or data source
        @Config("columns")
        public SchemaConfig getColumns();
    }

    @Override
    public void transaction(ConfigSource config, ParserPlugin.Control control)  //--- ③
    {
        PluginTask task = config.loadConfig(PluginTask.class);

        Schema schema = task.getColumns().toSchema();

        control.run(task.dump(), schema);
    }

    @Override
    public void run(TaskSource taskSource, Schema schema,  //--- ④
            FileInput input, PageOutput output)
    {
        PluginTask task = taskSource.loadTask(PluginTask.class);
LineDecoder lineDecoder = new LineDecoder(input, task);
PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output);
String line = null;

while( input.nextFile() ){ //--- ⑤
while(true){
line = lineDecoder.poll();

if( line == null ){
break;
}
String[] data = line.split(",");
pageBuilder.setLong(0, Long.parseLong(data[0]));   // ID
pageBuilder.setString(1, data[1]);                         // 名前
pageBuilder.setLong(2, Long.parseLong(data[2]));   // 年齢
pageBuilder.setString(3, data[2]);                         // メール
pageBuilder.addRecord();
}
}
pageBuilder.finish();
   
        // Write your code here :)
        throw new UnsupportedOperationException("SampleParserPlugin.run method is not implemented yet");
    }
}

修正点と簡単な説明など

①DocoderTaskからも継承するように修正します。
これは一行づつ読み込むプラグインを使うのに必要になります。
インポート文の追加も必要です
import org.embulk.spi.util.LineDecoder;

public interface PluginTask
extends Task, LineDecoder.DecoderTask
②ここでは設定ファイルの読み込みをしています。
③transaction 実装例を見るとここでカラムの指定などをしているようです。
今回は何もしません。
④run ここで実際のデータを加工します。
⑤でpath_prefixで指定したファイル名分のループします。
LindeDecoderを使って一行づつ読み込んで
読み込んだデータを加工し
pageBuilderに追加します。
一通り追加したら pageBuilder.finish します。

■ビルド
作ったプラグインをビルドするには gradlew コマンドを使います

gradlew gem

但しこのままだとエラーになるのでbuild.gradleの
gemタスクのdoLastの部分をコメントにします。
正しい対処法はわかりませんがこれで動かすことは出来ます。

task gem(type: JRubyExec, dependsOn: ["gemspec", "classpath"]) {
    jrubyArgs "-rrubygems/gem_runner", "-eGem::GemRunner.new.run(ARGV)", "build"
    script "${project.name}.gemspec"
//    doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") }
}


X:\embulk\embulk-parser-sample>gradlew gem
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:jar UP-TO-DATE
:classpath UP-TO-DATE
:gemspec
:gem
Invalid gemspec in [X:\embulk\embulk-parser-sample\embulk-parser-sample.gemspec]: No such file or directory - git
ERROR:  Error loading gemspec. Aborting.

BUILD SUCCESSFUL

Total time: 18.251 secs


■動かしてみる
embulk-parser-sampleにconfig.ymlファイルを作る
parserのtypeに今回作ったsampleを指定します。
(newコマンドで指定したプラグイン名)
path_prefixに対象ファイルのプレフィックスを
option1は必須なので何か数値
カラムはID, 名前, 年齢, メール
in:
  type: file
  path_prefix: sample.csv
  parser:
    type: sample
    option1: 20
    columns:
    - {name: id, type: long}
    - {name: name, type: string}
    - {name: age, type: long}
    - {name: mail, type: string}
exec: {}
out: {type: stdout}

読み込み用のファイルを作る
こんなファイルを作りました。
UTF-8で保存します。
001,山田 太郎,32,yamadatarou@mail.com
002,山田 花子,28,yamadahanako@mail.com

いよいよ動かしてみます。
embulk-parser-sampleディレクトリで以下のコマンドを実行します。
embulk preview -I lib config.yml

X:\embulk\embulk-parser-sample>embulk preview -I lib config.yml
2015-XX-XX 00:00:00.000 +0900: Embulk v0.7.4
2015-XX-XX 00:00:00.000 +0900 [INFO] (preview): Loaded plugin embulk/parser/sample from a load path
2015-XX-XX 00:00:00.000 +0900 [INFO] (preview): Listing local files at directory '.' filtering filename by prefix 'sample.csv'
2015-XX-XX 00:00:00.000 +0900 [INFO] (preview): Loading files [sample.csv]
+---------+-------------+----------+-------------+
| id:long | name:string | age:long | mail:string |
+---------+-------------+----------+-------------+
|       1 |       山田 太郎 |       32 |          32 |
|       2 |       山田 花子 |       28 |          28 |
+---------+-------------+----------+-------------+

うまくいきました。

0 件のコメント:

コメントを投稿