« WicketとSpringを悪魔合体させる その2 | トップページ | 今年もまたこのイベントがやってきた(通算47回目) »

2020年5月 6日 (水)

Spring Batchで改行がめちゃくちゃなCSVと戦う

0. まだまだSpring Batchと格闘中です

先日実際に起きたお話。

取引先からダウンロードしたとあるCSVをSpring Batchで書いた取込処理にかけたところ、ファイルの読み込みで死ぬケースが起きたので調べてみたところ、CSVのカラムに改行が入っていたことでFlatFileItemReaderがお亡くなりになっていた。
要はこんな感じである。

Csv_with_messy_line_breaks

実リリース前のテストで発覚したのでよかったものの、これはあかん、ということで、どうするべきかを考えることに。

1. 今回のお題目

  • 改行を含む可能性があるのは、特定のカラムだけ
  • 改行コードはバラバラ(CRLF、LF、CRのいずれもありうる)、かつエスケープされていない
  • データのカラムはダブルクォーテーションで囲まれている
  • 1行目はヘッダ行なので読み飛ばす必要がある
  • ....をチャンクモデルでどう扱うかを考えるわけだが、最大の問題は2番目のかつエスケープされていないという箇所。これがもとで、標準のDelimitedLineTokenizerArrayIndexOutOfBoundsExceptionでお亡くなりになってしまう。

    ちゃんとエスケープされているなら大丈夫だそうなんだが。
    TERASOLUNA Batch Frameworkのファイルアクセスの項を参照。

    そういう時にはCSVを読み込むカスタムReaderを作るべしということだそうなので、実際にやってみた。

    2. ItemReader / ItemProcessor / ItemWriter

    読み込みのときに問題があるので、まずItemReaderから。
    先ほどのQiitaの記事ほぼそのままで恐縮だが、使い慣れていたOpenCSVをパーサにしているのと、Builderを追加しているのが違うくらい。

      class CsvItemReader<T>() : AbstractItemCountingItemStreamItemReader<T>(),
          ResourceAwareItemReaderItemStream<T>, InitializingBean {
    
        var charset: Charset = Charset.defaultCharset()
        var linesToSkip: Int = 0;
        var delimiter: Char = ','
        var quotedChar: Char = '"'
        var escapeChar: Char = '"'
    
        private lateinit var resourceToRead: Resource
        private lateinit var headers: Array<String>
        private lateinit var fieldSetMapper: FieldSetMapper<T>
    
        private var noInput: Boolean = false
        private lateinit var csvReader: CSVReader
    
        init {
          setName(this.javaClass.simpleName)
        }
    
        override fun doOpen() {
          Assert.notNull(resourceToRead, "Resource to read is required")
    
          // 例外をスローするとバッチにブレーキがかかる
          noInput = true
          if (!resourceToRead.exists()) {
            throw IllegalStateException("Input resource does not exist : $resourceToRead")
          }
          if (!resourceToRead.isReadable) {
            throw IllegalStateException("Input resource must be readable : $resourceToRead")
          }
    
          // ここからOpenCSVの初期化
          // CSVParser
          val csvParserBuilder = CSVParserBuilder().withSeparator(delimiter)
              .withQuoteChar(quotedChar)
              .withStrictQuotes(true)
          // 同じ値を書き込むと怒られるので、不一致の場合のみにする
          if (quotedChar != escapeChar) {
            csvParserBuilder.withEscapeChar(escapeChar)
          }
    
          csvReader = CSVReaderBuilder(FileReader(resourceToRead.file, charset))
              .withCSVParser(csvParserBuilder.build())
              .withSkipLines(linesToSkip)
              .build()
    
          noInput = false
        }
    
        override fun doRead(): T? {
          // 読める状態にない、あるいは読んだ内容が空だったときは null を渡すと空データとして処理される
          if (noInput) {
            return null
          }
    
          if (csvReader == null) {
            throw ReaderNotOpenException("CSVReader is not initialized")
          }
    
          // OpenCSVで行を読む
          val line: Array<out String> = csvReader.readNext() ?: return null
    
          // FieldSetMapperに読んだ行を渡してPOJOにマップさせる
          val fs: FieldSet = DefaultFieldSet(line, headers)
          return fieldSetMapper.mapFieldSet(fs)
        }
    
        override fun doClose() {
          // 終了処理で呼ばれる
          // 各種パーサはここで閉じておくべし
          csvReader.close()
        }
    
        override fun setResource(resource: Resource) {
          // ResourceAwareItemReaderItemStream から。
          // これを実装しておくと、 MultiResourceItemReader の委譲先にすることができるようになるっぽい
          this.resourceToRead = resource
        }
    
        override fun afterPropertiesSet() {
          Assert.notNull(this.headers, "header is required")
          Assert.notNull(this.fieldSetMapper, "FieldSetMapper is required")
        }
    
        fun setHeaders(headers: Array<String>) {
          this.headers = headers
        }
    
        fun setFieldSetMapper(fieldSetMapper: FieldSetMapper<T>) {
          this.fieldSetMapper = fieldSetMapper
        }
    
        /**
         * 上記CsvItemReaderのビルダ。
         */
        class Builder<T>() {
          private val reader: CsvItemReader<T> = CsvItemReader()
    
          fun build(): CsvItemReader<T> {
            return reader
          }
    
          fun withResource(resource: Resource): Builder<T> {
            reader.setResource(resource)
            return this
          }
    
          fun withFieldSetMapper(fieldSetMapper: FieldSetMapper<T>): Builder<T> {
            reader.fieldSetMapper = fieldSetMapper
            return this
          }
    
          fun withHeaders(headers: Array<String>): Builder<T> {
            reader.headers = headers
            return this
          }
    
          fun withCharset(charset: Charset): Builder<T> {
            reader.charset = charset
            return this
          }
    
          fun withLinesToSkip(linesToSkip: Int): Builder<T> {
            reader.linesToSkip = linesToSkip
            return this
          }
    
          fun withDelimiterChar(delimiter: Char): Builder<T> {
            reader.delimiter = delimiter
            return this
          }
    
          fun withQuotedChar(quotedChar: Char): Builder<T> {
            reader.quotedChar = quotedChar
            return this
          }
    
          fun withEscapeChar(escapeChar: Char): Builder<T> {
            reader.escapeChar = escapeChar
            return this
          }
        }
      }

    ItemReaderとセットで使うFieldSetMapperは単純。たぶん、みればわかるレベル。

      @Component
      class CsvUserMapper: FieldSetMapper<CsvUser> {
        override fun mapFieldSet(fieldSet: FieldSet): CsvUser {
          // fieldSet の値を順番に抜いてはめて返すだけ
          return CsvUser(fieldSet.readString(0), fieldSet.readString(1))
        }
      }

    そのほか、中間処理をうけもつItemProcessorと書き込みを受け持つItemWriterも、話を単純にするためごく単純にしてみた。

    • ItemProcessor
      @Component
      class CsvImporterProcessor : ItemProcessor<CsvUser, AppUser> {
        override fun process(item: CsvUser): AppUser? {
          // 単にインスタンスを組み替えるだけ
          return AppUser(item.username, item.description)
        }
      }
    • ItemWriter
      @Component
      class CsvItemWriter(private val appUserRepository: AppUserRepository): ItemWriter<AppUser> {
        override fun write(items: MutableList<out AppUser>) {
          // こちらも右から左に永続化するだけ
          // AppUserRepository は AppUser の JpaRepository
          appUserRepository.saveAll(items)
        }
      }

    この状態でStepをSpring Beanとして構成してやる。

      @Bean
      fun csvItemReader(csvUserMapper: CsvUserMapper): ItemReader<CsvUser> {
        return CsvItemReader.Builder<CsvUser>()
                .withCharset(StandardCharsets.UTF_8)
                .withResource(ClassPathResource("/csv/userdata.csv")) // クラスパス内にあるファイルを指定している
                .withFieldSetMapper(csvUserMapper)
                .withLinesToSkip(1) // 1行飛ばす
                .withHeaders(arrayOf("username", "description"))  // ヘッダをマップするメンバーの定義
                .withDelimiterChar(',') // 区切り記号
                .withQuotedChar('"')    // 囲み文字
                .build()
      }
    
      @Bean
      fun step1(csvItemReader: ItemReader<CsvUser>, csvItemWriter: CsvItemWriter, csvImporterProcessor: CsvImporterProcessor): Step {
        return stepBuilderFactory.get("csvItemReaderStep")
                .chunk<CsvUser, AppUser>(10)
                .reader(csvItemReader)
                .processor(csvImporterProcessor)
                .writer(csvItemWriter)
                .build()
      }

    こうしてやることで、ようやく改行を含むカラムをちゃんと読めるようになった。
    DBに書き込んだ結果がこちら。

    Insert_result_1

    3. ソースコード

    https://github.com/f97one/LineBreakAwareCsvImporterDemoをご参照ください。

    « WicketとSpringを悪魔合体させる その2 | トップページ | 今年もまたこのイベントがやってきた(通算47回目) »

    コメント

    こんばんは。

    改行が上手く読み込めない問題、よく起こりそうですね。

    コメントを書く

    (ウェブ上には掲載しません)

    « WicketとSpringを悪魔合体させる その2 | トップページ | 今年もまたこのイベントがやってきた(通算47回目) »

    2020年7月
          1 2 3 4
    5 6 7 8 9 10 11
    12 13 14 15 16 17 18
    19 20 21 22 23 24 25
    26 27 28 29 30 31  

    最近のトラックバック

    無料ブログはココログ