SpringBatchで複数ファイルをマルチスレッドでDBに突っ込む

■ INPUT FILE(hoge1~5.csv)
hoge1.csv

1,aaa
2,bbb
・・・

hoge5.csv

51,aaa
52,bbb
・・・

~hoge2.csvからhoge4.csv同じような感じなので省略~
 
■ OUTPUT DB(hogeテーブル)

mysql> create database hoge;
Query OK, 1 row affected (0.00 sec)
mysql> use hoge
Database changed
mysql> create table hoge (
 -> hoge_id int unsigned not null,
 -> hoge_name varchar(20) not null,
 -> hoge_value varchar(20) not null
 -> );
Query OK, 0 rows affected (0.11 sec)

 
■ Job定義XML(長いので抜粋…)
DBまわり

<import resource="HogeDB.xml"/>

Job。Stepにparent付けてマスターを指定

<batch:job id="hogeMultiJob" job-repository="jobRepository">
 <batch:step id="hogeStep" parent="hogeStepMaster" />
</batch:job>

parentで指定したマスター。
パーティショニングするところで結構複雑なので中身の詳細は追って。。。

<bean id="hogeStepMaster" class="org.springframework.batch.core.partition.support.PartitionStep">
 <property name="jobRepository" ref="jobRepository" />
 <property name="stepExecutionSplitter">
  <bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
   <constructor-arg ref="jobRepository" />
   <constructor-arg ref="step1" />
   <constructor-arg>
    <bean class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
     <property name="resources" value="file:/home/eshinohara/springsource/~略~/input/hoge*.csv" />
    </bean>
   </constructor-arg>
  </bean>
 </property>
 <property name="partitionHandler">
  <bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
   <property name="taskExecutor" ref="asyncTaskExecutor" />
   <property name="step" ref="step1" />
   <property name="gridSize" value="5"/>
  </bean>
 </property>
</bean>

非同期なTaskExecutorのbean定義

<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

パーティションハンドラから呼ばれるマルチスレッドで動くStep。

<batch:step id="step1">
 <batch:tasklet transaction-manager="jobRepository-transactionManager">
  <batch:chunk reader="hogeFileItemReader"  processor="hogeProcessor" writer="hogeDBWriter"  commit-interval="10" />
 </batch:tasklet>
</batch:step>

CSVをカンマ区切りで読込み。
scopeをstepにしてLate Binding→Late Binding of Job and Step Attributes

<bean id="hogeFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
 <property name="resource" value="#{stepExecutionContext[fileName]}" />
 <property name="strict" value="false" />
 <property name="lineMapper">
  <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
   <property name="lineTokenizer">
    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
     <property name="delimiter" value=","/>
     <property name="names" value="ID,name" />
    </bean>
   </property>
   <property name="fieldSetMapper">
    <bean class="org.springframework.sample.batch.example.HogeFieldSetMapper" />
   </property>
  </bean>
 </property>
</bean>

DBに書き出し

<bean id="hogeDBWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
 <property name="assertUpdates" value="false" />
  <property name="itemSqlParameterSourceProvider">
    <bean name="sqlParameterSourceProvider" id="sqlParameterSourceProvider"
      class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
  </property>
  <property name="sql" value="insert into hoge (hoge_id, hoge_name, hoge_value) values (:id, :hogeName, :hogeValue)"/>
  <property name="dataSource" ref="hoge-dataSource" />
</bean>

 
■プログラム
Hoge.java … setter,getter持ってるアレ。
HogeFieldSetMapper … ファイルから読み込んだデータをオブジェクト(上のHoge)にセット。
HogeProcessor … なんとなく使ってみたかっただけ。ファイルから取得した値に文字列追加して別カラムにセット。
 
■ 実行結果ログ(log4j.xmlに%tでスレッドを記載)
ジョブ定義XML読み込み

2010-11-19 21:16:38,202 INFO main [org.springframework.beans.factory.xml.XmlBeanDefinitionReader] - <Loading XML bean definitions from class path resource [FileToTableMultiJob.xml]>

マルチスレッドで実行

2010-11-19 21:16:39,803 DEBUG SimpleAsyncTaskExecutor-1 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=2, name=step1:partition3, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,808 DEBUG SimpleAsyncTaskExecutor-4 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=4, name=step1:partition4, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,811 DEBUG SimpleAsyncTaskExecutor-3 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=3, name=step1:partition2, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,815 DEBUG SimpleAsyncTaskExecutor-2 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=6, name=step1:partition0, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,822 DEBUG SimpleAsyncTaskExecutor-5 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=5, name=step1:partition1, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>

処理完了

2010-11-19 21:16:39,893 DEBUG main [org.springframework.batch.core.job.flow.support.SimpleFlow] - <Completed state=hogeMultiJob.end1 with status=COMPLETED>

 
====
 
バージョンによってJob定義の記述とかもルールがチョコチョコ変わってたり、
そもそも中身よくわかってなくて、英語のフォーラムみならが、
パズルみたいにbeanを組み合わせていったのであんまり自信ないですが、
それっぽく動いてるように見えるので、参考までに載せておきました。。
 
来週からはもっと中身を深堀して、レコードをスキップしたり、バリデーションしたり、
リランどうするんだ?とか、その辺みていこうと思います。。
 
にしても、確かに、自分でコードはあんまり書かなくてもよいけどXML地獄だな。。。汗

SpringBatchで複数ファイルをマルチスレッドでDBに突っ込む」への3件のフィードバック

コメントを残す

以下に詳細を記入するか、アイコンをクリックしてログインしてください。

WordPress.com ロゴ

WordPress.com アカウントを使ってコメントしています。 ログアウト /  変更 )

Google フォト

Google アカウントを使ってコメントしています。 ログアウト /  変更 )

Twitter 画像

Twitter アカウントを使ってコメントしています。 ログアウト /  変更 )

Facebook の写真

Facebook アカウントを使ってコメントしています。 ログアウト /  変更 )

%s と連携中