Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6101

java.lang.ArrayIndexOutOfBoundsException Error reading state

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Cannot Reproduce
    • Affects Version/s: 2.8.0
    • Fix Version/s: Not applicable
    • Component/s: runner-flink
    • Labels:
      None

      Description

      We have a pipeline running on Flink 1.5.4 cluster for while, using Apache Beam as an abstraction layer.

      This pipeline have multiple windows and operations and during our deployment we always include in the flink run command '--allowNonRestoredState --fromSavepoint=......', our savePoint and state are stored on HDFS.

       

      Today we got the following exception few minutes after deployment:

      java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
          at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.
          at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
          at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
          at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
          at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:125)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:698)
          at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
          at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:567)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:537)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
          ... 7 more
      Caused by: java.lang.RuntimeException: Error reading state.
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:320)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:309)
          at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:998)
          at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
      Caused by: java.lang.ArrayIndexOutOfBoundsException: 22
          at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
          at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
          at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
          at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
          at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
          at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
          at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
          at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
          at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
          at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:317)
          at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
          at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
          at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:113)
          at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:48)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:317)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState$1.read(FlinkStateInternals.java:309)
          at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:998)
          at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
          at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
          at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
          at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
          at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:125)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:698)
          at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
          at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:567)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:537)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
          at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
          at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
          at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
          at java.lang.Thread.run(Thread.java:745)
      

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                jcgarciam JC
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: