■ INPUT FILE(hoge1~5.csv)
hoge1.csv
1,aaa 2,bbb ・・・
hoge5.csv
51,aaa 52,bbb ・・・
~hoge2.csvからhoge4.csv同じような感じなので省略~
■ OUTPUT DB(hogeテーブル)
mysql> create database hoge; Query OK, 1 row affected (0.00 sec) mysql> use hoge Database changed mysql> create table hoge ( -> hoge_id int unsigned not null, -> hoge_name varchar(20) not null, -> hoge_value varchar(20) not null -> ); Query OK, 0 rows affected (0.11 sec)
■ Job定義XML(長いので抜粋…)
DBまわり
<import resource="HogeDB.xml"/>
Job。Stepにparent付けてマスターを指定
<batch:job id="hogeMultiJob" job-repository="jobRepository"> <batch:step id="hogeStep" parent="hogeStepMaster" /> </batch:job>
parentで指定したマスター。
パーティショニングするところで結構複雑なので中身の詳細は追って。。。
<bean id="hogeStepMaster" class="org.springframework.batch.core.partition.support.PartitionStep"> <property name="jobRepository" ref="jobRepository" /> <property name="stepExecutionSplitter"> <bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter"> <constructor-arg ref="jobRepository" /> <constructor-arg ref="step1" /> <constructor-arg> <bean class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"> <property name="resources" value="file:/home/eshinohara/springsource/~略~/input/hoge*.csv" /> </bean> </constructor-arg> </bean> </property> <property name="partitionHandler"> <bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> <property name="taskExecutor" ref="asyncTaskExecutor" /> <property name="step" ref="step1" /> <property name="gridSize" value="5"/> </bean> </property> </bean>
非同期なTaskExecutorのbean定義
<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
パーティションハンドラから呼ばれるマルチスレッドで動くStep。
<batch:step id="step1"> <batch:tasklet transaction-manager="jobRepository-transactionManager"> <batch:chunk reader="hogeFileItemReader" processor="hogeProcessor" writer="hogeDBWriter" commit-interval="10" /> </batch:tasklet> </batch:step>
CSVをカンマ区切りで読込み。
scopeをstepにしてLate Binding→Late Binding of Job and Step Attributes
<bean id="hogeFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> <property name="resource" value="#{stepExecutionContext[fileName]}" /> <property name="strict" value="false" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="delimiter" value=","/> <property name="names" value="ID,name" /> </bean> </property> <property name="fieldSetMapper"> <bean class="org.springframework.sample.batch.example.HogeFieldSetMapper" /> </property> </bean> </property> </bean>
DBに書き出し
<bean id="hogeDBWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter"> <property name="assertUpdates" value="false" /> <property name="itemSqlParameterSourceProvider"> <bean name="sqlParameterSourceProvider" id="sqlParameterSourceProvider" class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" /> </property> <property name="sql" value="insert into hoge (hoge_id, hoge_name, hoge_value) values (:id, :hogeName, :hogeValue)"/> <property name="dataSource" ref="hoge-dataSource" /> </bean>
■プログラム
Hoge.java … setter,getter持ってるアレ。
HogeFieldSetMapper … ファイルから読み込んだデータをオブジェクト(上のHoge)にセット。
HogeProcessor … なんとなく使ってみたかっただけ。ファイルから取得した値に文字列追加して別カラムにセット。
■ 実行結果ログ(log4j.xmlに%tでスレッドを記載)
ジョブ定義XML読み込み
2010-11-19 21:16:38,202 INFO main [org.springframework.beans.factory.xml.XmlBeanDefinitionReader] - <Loading XML bean definitions from class path resource [FileToTableMultiJob.xml]>
マルチスレッドで実行
2010-11-19 21:16:39,803 DEBUG SimpleAsyncTaskExecutor-1 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=2, name=step1:partition3, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,808 DEBUG SimpleAsyncTaskExecutor-4 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=4, name=step1:partition4, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,811 DEBUG SimpleAsyncTaskExecutor-3 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=3, name=step1:partition2, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,815 DEBUG SimpleAsyncTaskExecutor-2 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=6, name=step1:partition0, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,822 DEBUG SimpleAsyncTaskExecutor-5 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=5, name=step1:partition1, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
処理完了
2010-11-19 21:16:39,893 DEBUG main [org.springframework.batch.core.job.flow.support.SimpleFlow] - <Completed state=hogeMultiJob.end1 with status=COMPLETED>
====
バージョンによってJob定義の記述とかもルールがチョコチョコ変わってたり、
そもそも中身よくわかってなくて、英語のフォーラムみならが、
パズルみたいにbeanを組み合わせていったのであんまり自信ないですが、
それっぽく動いてるように見えるので、参考までに載せておきました。。
来週からはもっと中身を深堀して、レコードをスキップしたり、バリデーションしたり、
リランどうするんだ?とか、その辺みていこうと思います。。
にしても、確かに、自分でコードはあんまり書かなくてもよいけどXML地獄だな。。。汗
コメント
[…] 先日、SpringBatchで複数ファイルをマルチスレッドでDBに突っ込むってエントリを書きましたが、 そういえば、時間の計測してなかったと思って、チョロっとやってみました。 # クアッド […]
[…] 手っ取り早いのかな…。前にSpringBatch試した時も情報少なくて苦労したしなぁ。。 […]
[…] Check1年近く前くらいにSpring Batchを検証して↓とか書いたりしたのですが、 SpringBatchで複数ファイルをマルチスレッドでDBに突っ込む | shinodogg.com 結局ヘビー過ぎて導入は見送りました。 […]