Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4719

KryoSerializer random exception

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.1
    • Fix Version/s: 1.3.3
    • Component/s: None

      Description

      There's a random exception that involves somehow the KryoSerializer when using POJOs in Flink jobs reading large volumes of data.

      It is usually thrown in several places, e.g. (the Exceptions reported here can refer to previous versions of Flink...):

      java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
              at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
              at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
              at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
              at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
              at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
              at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
              ... 3 more
      Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
              at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
      Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: java.ttil.HashSet
              at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
              at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
              at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
              at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
              at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
              at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
              at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
              at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
              at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
              at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
              at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
              at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
              at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
              at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
      Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
              at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
              at java.lang.Class.forName0(Native Method)
              at java.lang.Class.forName(Class.java:348)
              at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
      
      Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
          at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
          at java.util.ArrayList.elementData(ArrayList.java:418)
          at java.util.ArrayList.get(ArrayList.java:431)
          at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
          at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
          at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
          at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
          at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
          at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
      
      java.lang.RuntimeException: Cannot instantiate class.
      	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
      	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
      	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
      	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
      	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
      	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
      	at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
      	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
      	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.ClassNotFoundException: it.okkam.flink.test.model.pojo.VdhicleEvent
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
      	... 10 more
      
      com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
              at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
              at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
              at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
              at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
              at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
              at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
              at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
              at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
              at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
              at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
              at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
              at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
              at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
              at java.lang.Thread.run(Thread.java:745)
      
      
      Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
      	at java.util.ArrayList.elementData(ArrayList.java:418)
      	at java.util.ArrayList.get(ArrayList.java:431)
      	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
      	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
      	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
      	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
      	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
      	at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
      	at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
      	at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
      	at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
      	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
      	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
      	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
      	at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
      	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
      	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
      	at java.lang.Thread.run(Thread.java:745)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                f.pompermaier Flavio Pompermaier
              • Votes:
                3 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: