Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6101

java.lang.ArrayIndexOutOfBoundsException Error reading state

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Cannot Reproduce
    • 2.8.0
    • Not applicable
    • runner-flink
    • None

    Description

      We have a pipeline running on Flink 1.5.4 cluster for while, using Apache Beam as an abstraction layer.

      This pipeline have multiple windows and operations and during our deployment we always include in the flink run command '--allowNonRestoredState --fromSavepoint=......', our savePoint and state are stored on HDFS.

       

      Today we got the following exception few minutes after deployment:

      java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
          at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.
          at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
          at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
          at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
          at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:125)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:698)
          at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
          at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:567)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:537)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
          ... 7 more
      Caused by: java.lang.RuntimeException: Error reading state.
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:320)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:309)
          at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:998)
          at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
      Caused by: java.lang.ArrayIndexOutOfBoundsException: 22
          at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
          at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
          at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
          at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
          at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
          at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
          at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
          at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:317)
          at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
          at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
          at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:113)
          at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:48)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:317)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:309)
          at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:998)
          at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
          at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
          at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
          at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:125)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:698)
          at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
          at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:567)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:537)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
          at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
          at java.lang.Thread.run(Thread.java:745)
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jcgarciam JC
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: