Spring Batchで改行がめちゃくちゃなCSVと戦う
0. まだまだSpring Batchと格闘中です
先日実際に起きたお話。
取引先からダウンロードしたとあるCSVをSpring Batchで書いた取込処理にかけたところ、ファイルの読み込みで死ぬケースが起きたので調べてみたところ、CSVのカラムに改行が入っていたことでFlatFileItemReaderがお亡くなりになっていた。
要はこんな感じである。
実リリース前のテストで発覚したのでよかったものの、これはあかん、ということで、どうするべきかを考えることに。
1. 今回のお題目
....をチャンクモデルでどう扱うかを考えるわけだが、最大の問題は2番目のかつエスケープされていないという箇所。これがもとで、標準のDelimitedLineTokenizerがArrayIndexOutOfBoundsException
でお亡くなりになってしまう。
ちゃんとエスケープされているなら大丈夫だそうなんだが。
※TERASOLUNA Batch Frameworkのファイルアクセスの項を参照。
そういう時にはCSVを読み込むカスタムReaderを作るべしということだそうなので、実際にやってみた。
2. ItemReader / ItemProcessor / ItemWriter
読み込みのときに問題があるので、まずItemReaderから。
先ほどのQiitaの記事ほぼそのままで恐縮だが、使い慣れていたOpenCSVをパーサにしているのと、Builderを追加しているのが違うくらい。
class CsvItemReader<T>() : AbstractItemCountingItemStreamItemReader<T>(),
ResourceAwareItemReaderItemStream<T>, InitializingBean {
var charset: Charset = Charset.defaultCharset()
var linesToSkip: Int = 0;
var delimiter: Char = ','
var quotedChar: Char = '"'
var escapeChar: Char = '"'
private lateinit var resourceToRead: Resource
private lateinit var headers: Array<String>
private lateinit var fieldSetMapper: FieldSetMapper<T>
private var noInput: Boolean = false
private lateinit var csvReader: CSVReader
init {
setName(this.javaClass.simpleName)
}
override fun doOpen() {
Assert.notNull(resourceToRead, "Resource to read is required")
// 例外をスローするとバッチにブレーキがかかる
noInput = true
if (!resourceToRead.exists()) {
throw IllegalStateException("Input resource does not exist : $resourceToRead")
}
if (!resourceToRead.isReadable) {
throw IllegalStateException("Input resource must be readable : $resourceToRead")
}
// ここからOpenCSVの初期化
// CSVParser
val csvParserBuilder = CSVParserBuilder().withSeparator(delimiter)
.withQuoteChar(quotedChar)
.withStrictQuotes(true)
// 同じ値を書き込むと怒られるので、不一致の場合のみにする
if (quotedChar != escapeChar) {
csvParserBuilder.withEscapeChar(escapeChar)
}
csvReader = CSVReaderBuilder(FileReader(resourceToRead.file, charset))
.withCSVParser(csvParserBuilder.build())
.withSkipLines(linesToSkip)
.build()
noInput = false
}
override fun doRead(): T? {
// 読める状態にない、あるいは読んだ内容が空だったときは null を渡すと空データとして処理される
if (noInput) {
return null
}
if (csvReader == null) {
throw ReaderNotOpenException("CSVReader is not initialized")
}
// OpenCSVで行を読む
val line: Array<out String> = csvReader.readNext() ?: return null
// FieldSetMapperに読んだ行を渡してPOJOにマップさせる
val fs: FieldSet = DefaultFieldSet(line, headers)
return fieldSetMapper.mapFieldSet(fs)
}
override fun doClose() {
// 終了処理で呼ばれる
// 各種パーサはここで閉じておくべし
csvReader.close()
}
override fun setResource(resource: Resource) {
// ResourceAwareItemReaderItemStream から。
// これを実装しておくと、 MultiResourceItemReader の委譲先にすることができるようになるっぽい
this.resourceToRead = resource
}
override fun afterPropertiesSet() {
Assert.notNull(this.headers, "header is required")
Assert.notNull(this.fieldSetMapper, "FieldSetMapper is required")
}
fun setHeaders(headers: Array<String>) {
this.headers = headers
}
fun setFieldSetMapper(fieldSetMapper: FieldSetMapper<T>) {
this.fieldSetMapper = fieldSetMapper
}
/**
* 上記CsvItemReaderのビルダ。
*/
class Builder<T>() {
private val reader: CsvItemReader<T> = CsvItemReader()
fun build(): CsvItemReader<T> {
return reader
}
fun withResource(resource: Resource): Builder<T> {
reader.setResource(resource)
return this
}
fun withFieldSetMapper(fieldSetMapper: FieldSetMapper<T>): Builder<T> {
reader.fieldSetMapper = fieldSetMapper
return this
}
fun withHeaders(headers: Array<String>): Builder<T> {
reader.headers = headers
return this
}
fun withCharset(charset: Charset): Builder<T> {
reader.charset = charset
return this
}
fun withLinesToSkip(linesToSkip: Int): Builder<T> {
reader.linesToSkip = linesToSkip
return this
}
fun withDelimiterChar(delimiter: Char): Builder<T> {
reader.delimiter = delimiter
return this
}
fun withQuotedChar(quotedChar: Char): Builder<T> {
reader.quotedChar = quotedChar
return this
}
fun withEscapeChar(escapeChar: Char): Builder<T> {
reader.escapeChar = escapeChar
return this
}
}
}
ItemReaderとセットで使うFieldSetMapperは単純。たぶん、みればわかるレベル。
@Component
class CsvUserMapper: FieldSetMapper<CsvUser> {
override fun mapFieldSet(fieldSet: FieldSet): CsvUser {
// fieldSet の値を順番に抜いてはめて返すだけ
return CsvUser(fieldSet.readString(0), fieldSet.readString(1))
}
}
そのほか、中間処理をうけもつItemProcessorと書き込みを受け持つItemWriterも、話を単純にするためごく単純にしてみた。
- ItemProcessor
@Component
class CsvImporterProcessor : ItemProcessor<CsvUser, AppUser> {
override fun process(item: CsvUser): AppUser? {
// 単にインスタンスを組み替えるだけ
return AppUser(item.username, item.description)
}
}
- ItemWriter
@Component
class CsvItemWriter(private val appUserRepository: AppUserRepository): ItemWriter<AppUser> {
override fun write(items: MutableList<out AppUser>) {
// こちらも右から左に永続化するだけ
// AppUserRepository は AppUser の JpaRepository
appUserRepository.saveAll(items)
}
}
この状態でStepをSpring Beanとして構成してやる。
@Bean
fun csvItemReader(csvUserMapper: CsvUserMapper): ItemReader<CsvUser> {
return CsvItemReader.Builder<CsvUser>()
.withCharset(StandardCharsets.UTF_8)
.withResource(ClassPathResource("/csv/userdata.csv")) // クラスパス内にあるファイルを指定している
.withFieldSetMapper(csvUserMapper)
.withLinesToSkip(1) // 1行飛ばす
.withHeaders(arrayOf("username", "description")) // ヘッダをマップするメンバーの定義
.withDelimiterChar(',') // 区切り記号
.withQuotedChar('"') // 囲み文字
.build()
}
@Bean
fun step1(csvItemReader: ItemReader<CsvUser>, csvItemWriter: CsvItemWriter, csvImporterProcessor: CsvImporterProcessor): Step {
return stepBuilderFactory.get("csvItemReaderStep")
.chunk<CsvUser, AppUser>(10)
.reader(csvItemReader)
.processor(csvImporterProcessor)
.writer(csvItemWriter)
.build()
}
こうしてやることで、ようやく改行を含むカラムをちゃんと読めるようになった。
DBに書き込んだ結果がこちら。
3. ソースコード
https://github.com/f97one/LineBreakAwareCsvImporterDemoをご参照ください。
« WicketとSpringを悪魔合体させる その2 | トップページ | 今年もまたこのイベントがやってきた(通算47回目) »
コメント
« WicketとSpringを悪魔合体させる その2 | トップページ | 今年もまたこのイベントがやってきた(通算47回目) »
こんばんは。
改行が上手く読み込めない問題、よく起こりそうですね。
投稿: 師子乃 | 2020年7月26日 (日) 19時53分