Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2384

Deadlock during partition spilling

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 0.9, 0.10.0
    • 0.9.1, 0.10.0
    • Runtime / Coordination
    • None

    Description

      A user reported a deadlock when spilling partitions. The stack trace is here:

      2015-07-20 11:41:23
      Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.72-b04 mixed mode):
      
      "Attach Listener" daemon prio=10 tid=0x00007f9370006000 nid=0x76b2 waiting on condition [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (8/8)" daemon prio=10 tid=0x00007f9394019000 nid=0x74b2 waiting for monitor entry [0x00007f93a23e2000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (3/8)" daemon prio=10 tid=0x00007f92b044f000 nid=0x74b1 waiting for monitor entry [0x00007f93a24e3000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (7/8)" daemon prio=10 tid=0x00007f92b044d000 nid=0x74b0 waiting for monitor entry [0x00007f93a25e4000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (6/8)" daemon prio=10 tid=0x00007f92b044b000 nid=0x74af waiting for monitor entry [0x00007f93a26e5000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (5/8)" daemon prio=10 tid=0x00007f92b0449000 nid=0x74ae waiting for monitor entry [0x00007f93a27e6000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (4/8)" daemon prio=10 tid=0x00007f92b0446800 nid=0x74ad waiting for monitor entry [0x00007f93a28e7000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (2/8)" daemon prio=10 tid=0x00007f92b0444800 nid=0x74ac waiting for monitor entry [0x00007f93a29e8000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (5/8)" daemon prio=10 tid=0x00007f92b0443000 nid=0x74ab waiting for monitor entry [0x00007f93a2ae9000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (3/8)" daemon prio=10 tid=0x00007f92b0441800 nid=0x74aa waiting for monitor entry [0x00007f93a2bea000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (2/8)" daemon prio=10 tid=0x00007f92b043f800 nid=0x74a9 waiting for monitor entry [0x00007f93a2ceb000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (4/8)" daemon prio=10 tid=0x00007f92b043e000 nid=0x74a8 waiting for monitor entry [0x00007f93a2dec000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (6/8)" daemon prio=10 tid=0x00007f92b043c000 nid=0x74a7 waiting for monitor entry [0x00007f93a2eed000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (8/8)" daemon prio=10 tid=0x00007f92b043a800 nid=0x74a6 waiting for monitor entry [0x00007f93a2fee000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (1/8)" daemon prio=10 tid=0x00007f92b0439000 nid=0x74a5 waiting for monitor entry [0x00007f93a30ef000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (1/8)" daemon prio=10 tid=0x00007f92b0436800 nid=0x74a4 waiting for monitor entry [0x00007f93a31f0000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:269)
      	- waiting to lock <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (7/8)" daemon prio=10 tid=0x00007f92b0434800 nid=0x74a3 waiting for monitor entry [0x00007f93a32f1000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:148)
      	- waiting to lock <0x00000005eb263380> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:346)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:248)
      	- locked <0x00000005eb2633e8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:268)
      	at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:199)
      	- locked <0x000000052e0bc3e8> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:284)
      	- locked <0x000000052e0804c0> (a java.lang.Object)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (5/8)" daemon prio=10 tid=0x00007f92b0432800 nid=0x74a2 in Object.wait() [0x00007f93a33f2000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5c5e168> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5c5e2e8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (8/8)" daemon prio=10 tid=0x00007f92b0430800 nid=0x74a1 in Object.wait() [0x00007f93a34f3000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5c62bc0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5c62cb0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (3/8)" daemon prio=10 tid=0x00007f92b042e800 nid=0x74a0 in Object.wait() [0x00007f93a35f4000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5ee62a0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5ee6390> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (7/8)" daemon prio=10 tid=0x00007f92b042c800 nid=0x749f in Object.wait() [0x00007f93a36f5000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5f00400> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5f004f0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (8/8)" daemon prio=10 tid=0x00007f92b042a800 nid=0x749e in Object.wait() [0x00007f93a37f6000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d61f38f0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d61f39e8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (6/8)" daemon prio=10 tid=0x00007f92b0428000 nid=0x749d in Object.wait() [0x00007f93a38f7000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d620e278> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d61f83d0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (2/8)" daemon prio=10 tid=0x00007f92b0426000 nid=0x749c in Object.wait() [0x00007f93a39f8000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d60d0350> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d60d1a20> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (4/8)" daemon prio=10 tid=0x00007f92b0424800 nid=0x749b in Object.wait() [0x00007f93a3af9000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d6022528> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d60cbba8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (3/8)" daemon prio=10 tid=0x00007f92b0423800 nid=0x749a in Object.wait() [0x00007f93a3bfa000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d601c6c0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d601dd80> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (5/8)" daemon prio=10 tid=0x00007f92b0011800 nid=0x7499 in Object.wait() [0x00007f93a3cfb000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5c3aad0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5c3c920> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (1/8)" daemon prio=10 tid=0x00007f92b0010000 nid=0x7498 in Object.wait() [0x00007f93a3dfc000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5ede458> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5edfa70> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (6/8)" daemon prio=10 tid=0x00007f92b000e800 nid=0x7497 in Object.wait() [0x00007f93a3efd000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5a514f8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5c36408> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (4/8)" daemon prio=10 tid=0x00007f92b000e000 nid=0x7496 in Object.wait() [0x00007f93c27bd000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5a4aeb8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5a4ccd0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (2/8)" daemon prio=10 tid=0x00007f92b000d000 nid=0x7495 in Object.wait() [0x00007f93b27e6000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d56d6798> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d56d85e8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (1/8)" daemon prio=10 tid=0x00007f92b000c800 nid=0x7494 in Object.wait() [0x00007f93b25e4000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d5272a78> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d5274880> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f933002d000 nid=0x7489 waiting on condition [0x00007f93a3ffe000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x0000000764af9fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f9348041000 nid=0x7488 waiting on condition [0x00007f93a81c0000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x0000000716cafde0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f933c04d000 nid=0x7487 waiting on condition [0x00007f93a82c1000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000076bbe9c30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f934404b000 nid=0x7484 waiting on condition [0x00007f93a83c2000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x0000000751c21f50> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9344049000 nid=0x7483 waiting on condition [0x00007f93a84c3000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d6ffc520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9344047000 nid=0x7482 waiting on condition [0x00007f93a85c4000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d61f9098> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f933c04b000 nid=0x7481 waiting on condition [0x00007f93a86c5000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d6eb6750> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f934803f000 nid=0x7480 waiting on condition [0x00007f93a87c6000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d305c058> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f933002a800 nid=0x747f waiting on condition [0x00007f93a88c7000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff493d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f9334028800 nid=0x747e waiting on condition [0x00007f93a89c8000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d00bf230> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f931804a800 nid=0x747d waiting on condition [0x00007f93a8ac9000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d00b8f28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9318047800 nid=0x747c waiting on condition [0x00007f93a8bca000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d00b8aa0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f935000f000 nid=0x747b waiting on condition [0x00007f93a8ccb000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000076e8f53b0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f935000d000 nid=0x7478 waiting on condition [0x00007f93a8dcc000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d00343e8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f935001b000 nid=0x7477 waiting on condition [0x00007f93a8ecd000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d0034210> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9330029000 nid=0x7476 waiting on condition [0x00007f93a8fce000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff49200> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f9344045000 nid=0x7475 waiting on condition [0x00007f93a90cf000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000076834ecf0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9344044000 nid=0x7474 waiting on condition [0x00007f93a91d0000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d0184b80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9344043800 nid=0x7473 waiting on condition [0x00007f93a92d1000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d01849a8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f934803c800 nid=0x7472 waiting on condition [0x00007f93a93d2000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d305be80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f933c049000 nid=0x7471 waiting on condition [0x00007f93a94d3000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d639b458> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f932405a800 nid=0x7470 waiting on condition [0x00007f93a95d4000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d0208290> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9324006000 nid=0x746f waiting on condition [0x00007f93a96d5000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d0207f40> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9324850000 nid=0x746e waiting on condition [0x00007f93a97d6000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d0207d68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f9334026800 nid=0x746d waiting on condition [0x00007f93a98d7000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007258b0d50> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f933565a800 nid=0x746c waiting on condition [0x00007f93a99d8000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff082a8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9335658800 nid=0x746b waiting on condition [0x00007f93a9ad9000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff080d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f931403b800 nid=0x746a waiting on condition [0x00007f93a9bda000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000077c6e66f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:569)
      	at org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger$CombiningSpillingThread.go(CombiningUnilateralSortMerger.java:331)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9314039800 nid=0x7469 waiting on condition [0x00007f93a9cdb000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff48e20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9314037800 nid=0x7468 waiting on condition [0x00007f93a9ddc000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff0cb00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f9318044000 nid=0x7467 waiting on condition [0x00007f93a9edd000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff4dac8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9318042000 nid=0x7466 waiting on condition [0x00007f93a9fde000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff4de18> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9318040000 nid=0x7465 waiting on condition [0x00007f93aa0df000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007cff4dbf0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f931803f800 nid=0x7464 waiting on condition [0x00007f93aa1e0000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d00b8bc8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9335655000 nid=0x7463 waiting on condition [0x00007f93aa2e1000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d00bf580> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f9314035800 nid=0x7462 waiting on condition [0x00007f93aa3e2000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d012d5a8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9314033800 nid=0x7461 waiting on condition [0x00007f93aa4e3000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d012d8f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9314030800 nid=0x7460 waiting on condition [0x00007f93aa5e4000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d012d6d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f930c057800 nid=0x745f waiting on condition [0x00007f93aa6e5000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d01a39c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f930c055800 nid=0x745e waiting on condition [0x00007f93aa7e6000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d01a3d10> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f930c8ed800 nid=0x745d waiting on condition [0x00007f93aa8e7000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d01a3ae8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f932802f000 nid=0x745c waiting on condition [0x00007f93aa9e8000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d02a96d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9335653800 nid=0x745b waiting on condition [0x00007f93aaae9000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d00bf358> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f932802d800 nid=0x745a waiting on condition [0x00007f93aabea000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d02a9cf8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f932802c800 nid=0x7459 waiting on condition [0x00007f93aaceb000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d02a99b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger spilling thread" daemon prio=10 tid=0x00007f9350007800 nid=0x7458 waiting on condition [0x00007f93aadec000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d02b8d00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
      	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:64)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.getNextReturnedBlock(AsynchronousBlockWriter.java:27)
      	at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:206)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
      	at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
      	at org.apache.flink.types.StringValue.copyString(StringValue.java:835)
      	at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:78)
      	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:71)
      	at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:522)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger sorting thread" daemon prio=10 tid=0x00007f9350004800 nid=0x7457 waiting on condition [0x00007f93aaeed000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d02b9208> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SortingThread.go(UnilateralSortMerger.java:1099)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "SortMerger Reading Thread" daemon prio=10 tid=0x00007f9350010000 nid=0x7456 waiting on condition [0x00007f93aafee000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000007d02b8fe0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:910)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (4/8)" daemon prio=10 tid=0x00007f93d8b39000 nid=0x7455 in Object.wait() [0x00007f93ab0ef000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d61fb0f8> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (3/8)" daemon prio=10 tid=0x00007f93d8a7f000 nid=0x7454 in Object.wait() [0x00007f93ab1f0000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d012d110> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (8/8)" daemon prio=10 tid=0x00007f93d8a7d000 nid=0x7453 in Object.wait() [0x00007f93ab2f1000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d02be970> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (2/8)" daemon prio=10 tid=0x00007f93d8a7b000 nid=0x7452 in Object.wait() [0x00007f93ab3f2000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d01a3528> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (5/8)" daemon prio=10 tid=0x00007f93d8a79000 nid=0x7451 in Object.wait() [0x00007f93ab4f3000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d00bee18> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (1/8)" daemon prio=10 tid=0x00007f93d8b78800 nid=0x7450 in Object.wait() [0x00007f93ab5f4000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d02af720> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (6/8)" daemon prio=10 tid=0x00007f93d8b76800 nid=0x744f in Object.wait() [0x00007f93ab6f5000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d020e100> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (8/8)" daemon prio=10 tid=0x00007f93d8b74800 nid=0x744e in Object.wait() [0x00007f93ab7f6000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d63a53e0> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (4/8)" daemon prio=10 tid=0x00007f93d8b72800 nid=0x744d in Object.wait() [0x00007f93ab8f7000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d02079f8> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (5/8)" daemon prio=10 tid=0x00007f93d8f38000 nid=0x744c in Object.wait() [0x00007f93ab9f8000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d305e358> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (7/8)" daemon prio=10 tid=0x00007f92b0009800 nid=0x744b in Object.wait() [0x00007f93abaf9000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007cff4d620> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (1/8)" daemon prio=10 tid=0x00007f92b0007800 nid=0x744a in Object.wait() [0x00007f93abbfa000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d0186e80> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (6/8)" daemon prio=10 tid=0x00007f92b0005000 nid=0x7449 in Object.wait() [0x00007f93abcfb000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d00b8730> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (3/8)" daemon prio=10 tid=0x00007f92b0003000 nid=0x7448 in Object.wait() [0x00007f93abdfc000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007cff0c640> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Reduce(Distinct at compactDataSources(SicrasReconciledGenerator.java:214)) (2/8)" daemon prio=10 tid=0x00007f92b0017000 nid=0x7447 in Object.wait() [0x00007f93abefd000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007cff48cc8> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CoGroup (CoGroup at compactDataSources(SicrasReconciledGenerator.java:232)) (7/8)" daemon prio=10 tid=0x00007f93d8b7f000 nid=0x7446 in Object.wait() [0x00007f93abffe000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:603)
      	- locked <0x00000007d0033ea0> (a java.lang.Object)
      	at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
      	at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "flink-akka.actor.default-dispatcher-19" daemon prio=10 tid=0x0000000000a39000 nid=0x7445 waiting on condition [0x00007f93b0170000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-18" daemon prio=10 tid=0x00007f9318002800 nid=0x7444 waiting on condition [0x00007f93b0271000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "Map (Projection [0, 1, 2, 3, 4]) (8/8)" daemon prio=10 tid=0x00007f92ac014000 nid=0x7443 in Object.wait() [0x00007f93b0372000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0044f70> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6532638> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (6/8)" daemon prio=10 tid=0x00007f9340093000 nid=0x7442 in Object.wait() [0x00007f93c33c9000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0044470> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d63c2110> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (5/8)" daemon prio=10 tid=0x00007f9340091800 nid=0x7441 in Object.wait() [0x00007f93c28be000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d00ba228> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6536ae8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (7/8)" daemon prio=10 tid=0x00007f9340090000 nid=0x7440 in Object.wait() [0x00007f93c32c8000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007cff4f3d0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d693df48> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (3/8)" daemon prio=10 tid=0x00007f934008e800 nid=0x743f in Object.wait() [0x00007f93c2cc2000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0044a10> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d31cefb8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (6/8)" daemon prio=10 tid=0x00007f934008d000 nid=0x743e in Object.wait() [0x00007f93c39ce000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0035470> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6967070> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (2/8)" daemon prio=10 tid=0x00007f934008b800 nid=0x743d in Object.wait() [0x00007f93c2dc3000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0043390> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d69423f0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (4/8)" daemon prio=10 tid=0x00007f934008a000 nid=0x743c in Object.wait() [0x00007f93b2bea000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0043710> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6be6b50> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 1, 2, 3, 4]) (1/8)" daemon prio=10 tid=0x00007f9340089000 nid=0x743b in Object.wait() [0x00007f93b2fee000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0044e50> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6c73218> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (8/8)" daemon prio=10 tid=0x00007f9340086800 nid=0x743a in Object.wait() [0x00007f93b2ceb000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d00434b0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6ca4708> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (7/8)" daemon prio=10 tid=0x00007f9341571800 nid=0x7439 in Object.wait() [0x00007f93b2ae9000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007cff4a460> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6c77838> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (5/8)" daemon prio=10 tid=0x00007f9341571000 nid=0x7438 in Object.wait() [0x00007f93b24e3000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0043eb0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d31c9920> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (2/8)" daemon prio=10 tid=0x00007f9341570800 nid=0x7437 in Object.wait() [0x00007f93b30ef000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007cff0da68> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6c4e130> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (4/8)" daemon prio=10 tid=0x00007f9341575800 nid=0x7436 in Object.wait() [0x00007f93b23e2000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d63a7f48> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6937b70> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (3/8)" daemon prio=10 tid=0x00007f9341574800 nid=0x7435 in Object.wait() [0x00007f93b29e8000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007cff09448> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d6799598> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Map (Projection [0, 5, 4]) (1/8)" daemon prio=10 tid=0x00007f934156b800 nid=0x7434 in Object.wait() [0x00007f93b21e0000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000007d0044350> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000007d67931a8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
      	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
      	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "ForkJoinPool-1-worker-7" daemon prio=10 tid=0x00007f9320010000 nid=0x7432 waiting on condition [0x00007f93b26e5000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x0000000500925a30> (a scala.concurrent.forkjoin.ForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "org.apache.hadoop.hdfs.PeerCache@223cc532" daemon prio=10 tid=0x00007f9370019800 nid=0x742e waiting on condition [0x00007f93b06c5000]
         java.lang.Thread.State: TIMED_WAITING (sleeping)
      	at java.lang.Thread.sleep(Native Method)
      	at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:245)
      	at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:41)
      	at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:119)
      	at java.lang.Thread.run(Thread.java:745)
      
      "flink-akka.actor.default-dispatcher-16" daemon prio=10 tid=0x00007f93d8101800 nid=0x742c waiting on condition [0x00007f93b08c7000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-17" daemon prio=10 tid=0x00007f9340063800 nid=0x742b waiting on condition [0x00007f93b09c8000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-15" daemon prio=10 tid=0x00007f93a400f000 nid=0x742a waiting on condition [0x00007f93b0ac9000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f92b0014800 nid=0x7429 waiting on condition [0x00007f93b0bca000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-13" daemon prio=10 tid=0x00007f92c4019000 nid=0x7428 waiting on condition [0x00007f93b0ccb000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-12" daemon prio=10 tid=0x00007f92c4018800 nid=0x7427 waiting on condition [0x00007f93b0dcc000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-10" daemon prio=10 tid=0x00007f9338086000 nid=0x7426 waiting on condition [0x00007f93b0ecd000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-9" daemon prio=10 tid=0x00007f92ac010000 nid=0x7425 waiting on condition [0x00007f93b0fce000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-8" daemon prio=10 tid=0x00007f9334013800 nid=0x7423 waiting on condition [0x00007f93c29bf000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f92c4011800 nid=0x7422 in Object.wait() [0x00007f93b11d0000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d2a108> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f8d7bc08> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f92c400f800 nid=0x7421 in Object.wait() [0x00007f93b12d1000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d2bb08> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fa8d9b38> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f92c400d800 nid=0x7420 in Object.wait() [0x00007f93b13d2000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d39e08> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fdbdd738> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f92c400b800 nid=0x741f in Object.wait() [0x00007f93b14d3000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d44e00> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fdc351b8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f9338082800 nid=0x741e in Object.wait() [0x00007f93b15d4000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d216c8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f6d64cf8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f9338080800 nid=0x741d in Object.wait() [0x00007f93b16d5000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d39728> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fda875f8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f933807e800 nid=0x741c in Object.wait() [0x00007f93b17d6000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d2ae88> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f8db3b60> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f933807d000 nid=0x741b in Object.wait() [0x00007f93b18d7000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d22cd8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f8778d90> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f933807b000 nid=0x741a in Object.wait() [0x00007f93b19d8000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d221a8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f77c9f90> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f9338078800 nid=0x7419 in Object.wait() [0x00007f93b1ad9000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d387f8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fa98d3f0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f9338077000 nid=0x7418 in Object.wait() [0x00007f93b1bda000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d458e0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fc0a0df0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f9338075000 nid=0x7416 in Object.wait() [0x00007f93b1cdb000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f7fe5ad8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f7fe6268> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f9338073000 nid=0x7415 in Object.wait() [0x00007f93b1ddc000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f847f5f8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f847f268> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f9338070800 nid=0x7414 in Object.wait() [0x00007f93b1edd000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f874b988> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060e1bb7b8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f933806e800 nid=0x7413 in Object.wait() [0x00007f93b1fde000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4b40a0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000006035cc3a0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f933806c800 nid=0x7412 in Object.wait() [0x00007f93b20df000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005f6d3b668> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f6d3b978> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f9338049800 nid=0x73ff in Object.wait() [0x00007f93b31f0000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb250e78> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060975c508> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f9338047000 nid=0x73fd in Object.wait() [0x00007f93b32f1000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4a0ec8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000006083de0d8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f9338045000 nid=0x73fb in Object.wait() [0x00007f93b33f2000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb3c4c38> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed44890> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f9338043000 nid=0x73fa in Object.wait() [0x00007f93b34f3000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4252e8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x0000000609d351c8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f9338041000 nid=0x73f9 in Object.wait() [0x00007f93b35f4000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4b48c8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000006086e8288> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f933803f000 nid=0x73f8 in Object.wait() [0x00007f93b36f5000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb253ea8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed47a80> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f933803d000 nid=0x73f6 in Object.wait() [0x00007f93b37f6000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4289d8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fc3fb418> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f933803b000 nid=0x73f5 in Object.wait() [0x00007f93b38f7000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb485f38> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005f8ebb670> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f9338038800 nid=0x73f4 in Object.wait() [0x00007f93b39f8000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb3c3950> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fc3fd830> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f9338036800 nid=0x73e9 in Object.wait() [0x00007f93b3af9000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb49b7c8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fb452750> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f9338034800 nid=0x73e8 in Object.wait() [0x00007f93b3bfa000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb3a19a8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fbae66a8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f9338032800 nid=0x73e7 in Object.wait() [0x00007f93b3cfb000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4b2610> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed4b780> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f9338031000 nid=0x73e4 in Object.wait() [0x00007f93b3dfc000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb28c338> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb28db80> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f933802f000 nid=0x73e3 in Object.wait() [0x00007f93b3efd000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb49d8b0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x0000000608854c58> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f933802d000 nid=0x73e2 in Object.wait() [0x00007f93b3ffe000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb419c40> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb41b010> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f933802b000 nid=0x73e0 in Object.wait() [0x00007f93c0197000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb429be8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fca1a610> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f9338029000 nid=0x73df in Object.wait() [0x00007f93c0298000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4851a8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fdc7b2e0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f9338027000 nid=0x73dd in Object.wait() [0x00007f93c0399000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb49e140> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fcb9d558> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f9338022800 nid=0x73d8 in Object.wait() [0x00007f93c059b000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4a1378> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x0000000608890f40> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f9338020800 nid=0x73d7 in Object.wait() [0x00007f93c069c000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4863c8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed4f528> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f933801e800 nid=0x73d6 in Object.wait() [0x00007f93c079d000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb3c14e0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed58490> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f933801c800 nid=0x73d5 in Object.wait() [0x00007f93c089e000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb49e7d8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed5bd00> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f933801a800 nid=0x73d3 in Object.wait() [0x00007f93c099f000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb426548> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed5e598> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f9338018800 nid=0x73d2 in Object.wait() [0x00007f93c0aa0000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb35d640> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060a7523f0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f9338016800 nid=0x73d1 in Object.wait() [0x00007f93c0ba1000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb3a1310> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060a03f8b0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f9338014000 nid=0x73d0 in Object.wait() [0x00007f93c0ca2000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb35e850> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed69330> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f9338012000 nid=0x73cf in Object.wait() [0x00007f93c0da3000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb49cd88> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed6b9a8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f9338010800 nid=0x73ce in Object.wait() [0x00007f93c0ea4000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb268338> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ecfb270> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "flink-akka.actor.default-dispatcher-6" daemon prio=10 tid=0x00007f934005f800 nid=0x73cd waiting on condition [0x00007f93c0fa5000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-akka.actor.default-dispatcher-5" daemon prio=10 tid=0x00007f934005d000 nid=0x73cc waiting on condition [0x00007f93c10a6000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f933800e800 nid=0x73cb in Object.wait() [0x00007f93c11a7000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb488af8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb4a1c30> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f933800c800 nid=0x73ca in Object.wait() [0x00007f93c12a8000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb49c498> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060e5cfaa0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f933800b000 nid=0x73c9 in Object.wait() [0x00007f93c13a9000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4873b0> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x000000060ed76ad8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f9338009000 nid=0x73c8 in Object.wait() [0x00007f93c14aa000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb433e98> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005fcbc8fc8> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f9338007800 nid=0x73c7 in Object.wait() [0x00007f93c15ab000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x000000061576a130> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb4b2088> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb4b2050> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f934005b000 nid=0x73c4 in Object.wait() [0x00007f93c16ac000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x0000000612907c48> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb263380> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb263348> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f9340059000 nid=0x73c3 in Object.wait() [0x00007f93c17ad000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x000000061576fdf8> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb4b2588> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb4b2550> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f9340057800 nid=0x73c2 in Object.wait() [0x00007f93c18ae000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x00000006128fbb20> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb36cb20> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb36cae8> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f9340055000 nid=0x73c1 in Object.wait() [0x00007f93c19af000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x00000006158cda50> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb4a0da0> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb4a0d68> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f9340053000 nid=0x73c0 in Object.wait() [0x00007f93c1ab0000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x0000000615758600> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb3a2650> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb3a2618> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f9340051000 nid=0x73bf in Object.wait() [0x00007f93c1bb1000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x000000061587c900> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb39fe20> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb39fde8> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f934004f000 nid=0x73be in Object.wait() [0x00007f93c1cb2000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
      	- locked <0x00000006158cf7f0> (a java.lang.Object)
      	at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.finish(SpillableSubpartition.java:103)
      	- locked <0x00000005eb3a0528> (a java.util.ArrayList)
      	at org.apache.flink.runtime.io.network.partition.ResultPartition.finish(ResultPartition.java:265)
      	- locked <0x00000005eb3a04f0> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:574)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (8/8)" daemon prio=10 tid=0x00007f934004d000 nid=0x73bd in Object.wait() [0x00007f93c1db3000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb39e4f8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005ebbceb78> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (7/8)" daemon prio=10 tid=0x00007f934004b000 nid=0x73bc in Object.wait() [0x00007f93c1eb4000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4b2a80> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb617018> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (6/8)" daemon prio=10 tid=0x00007f9340049000 nid=0x73bb in Object.wait() [0x00007f93c1fb5000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb3c5e48> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb3c7630> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (5/8)" daemon prio=10 tid=0x00007f9340047800 nid=0x73ba in Object.wait() [0x00007f93c20b6000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb4350f8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb4368e0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (4/8)" daemon prio=10 tid=0x00007f9340046000 nid=0x73b9 in Object.wait() [0x00007f93c21b7000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb39f218> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb585f20> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (3/8)" daemon prio=10 tid=0x00007f9340041000 nid=0x73b8 in Object.wait() [0x00007f93c22b8000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb485860> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005ebf58178> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (2/8)" daemon prio=10 tid=0x00007f934003f000 nid=0x73b7 in Object.wait() [0x00007f93c23b9000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb3208f8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb322680> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> FlatMap (FlatMap at readFlinkTuplesFromThriftParquet(ParquetThriftEntitons.java:96)) (1/8)" daemon prio=10 tid=0x00007f934003d800 nid=0x73b6 in Object.wait() [0x00007f93c24ba000]
         java.lang.Thread.State: TIMED_WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
      	- locked <0x00000005eb49d218> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
      	- locked <0x00000005eb4ae638> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
      	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:125)
      	at it.okkam.flink.entitons.io.ParquetThriftEntitons$1.flatMap(ParquetThriftEntitons.java:1)
      	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
      	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      	at java.lang.Thread.run(Thread.java:745)
      
      "IOManager reader thread #1" daemon prio=10 tid=0x00007f93d8b7c000 nid=0x73a9 waiting on condition [0x00007f93c2ec4000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000005eb23a0e8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
      	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      	at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:349)
      
      "IOManager writer thread #1" daemon prio=10 tid=0x00007f93d8b7b000 nid=0x73a8 waiting for monitor entry [0x00007f93c2fc5000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:176)
      	- waiting to lock <0x00000005eb2633e8> (a java.util.ArrayDeque)
      	at org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119)
      	- locked <0x00000007d71c8b60> (a java.lang.Object)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter$RecyclingCallback.requestSuccessful(AsynchronousBufferFileWriter.java:56)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter$RecyclingCallback.requestSuccessful(AsynchronousBufferFileWriter.java:52)
      	at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199)
      	at org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest.requestDone(AsynchronousFileIOChannel.java:378)
      	at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:483)
      
      "BLOB Server listener at 37556" daemon prio=10 tid=0x00007f93d8f6e800 nid=0x73a6 runnable [0x00007f93c31c7000]
         java.lang.Thread.State: RUNNABLE
      	at java.net.PlainSocketImpl.socketAccept(Native Method)
      	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
      	at java.net.ServerSocket.implAccept(ServerSocket.java:530)
      	at java.net.ServerSocket.accept(ServerSocket.java:498)
      	at org.apache.flink.runtime.blob.BlobServer.run(BlobServer.java:177)
      
      "flink-akka.actor.default-dispatcher-2" daemon prio=10 tid=0x00007f93d8f25000 nid=0x73a3 waiting on condition [0x00007f93c34ca000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000052e0825c0> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
      	at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      "flink-scheduler-1" daemon prio=10 tid=0x00007f93d8ea2000 nid=0x73a2 sleeping[0x00007f93c35cb000]
         java.lang.Thread.State: TIMED_WAITING (sleeping)
      	at java.lang.Thread.sleep(Native Method)
      	at akka.actor.LightArrayRevolverScheduler.waitNanos(Scheduler.scala:226)
      	at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:405)
      	at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
      	at java.lang.Thread.run(Thread.java:745)
      
      "Service Thread" daemon prio=10 tid=0x00007f93d80c5000 nid=0x739b runnable [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "C2 CompilerThread1" daemon prio=10 tid=0x00007f93d80c2800 nid=0x739a waiting on condition [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "C2 CompilerThread0" daemon prio=10 tid=0x00007f93d80c0000 nid=0x7399 waiting on condition [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "JDWP Command Reader" daemon prio=10 tid=0x00007f9364001000 nid=0x7396 runnable [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "JDWP Event Helper Thread" daemon prio=10 tid=0x00007f93d80bd800 nid=0x7395 runnable [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "JDWP Transport Listener: dt_socket" daemon prio=10 tid=0x00007f93d80ba000 nid=0x7394 runnable [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "Signal Dispatcher" daemon prio=10 tid=0x00007f93d80a9000 nid=0x7390 runnable [0x0000000000000000]
         java.lang.Thread.State: RUNNABLE
      
      "Finalizer" daemon prio=10 tid=0x00007f93d8081000 nid=0x738e in Object.wait() [0x00007f93c93a0000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
      	- locked <0x00000005000ae7c0> (a java.lang.ref.ReferenceQueue$Lock)
      	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
      	at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
      
      "Reference Handler" daemon prio=10 tid=0x00007f93d807f000 nid=0x738d in Object.wait() [0x00007f93c94a1000]
         java.lang.Thread.State: WAITING (on object monitor)
      	at java.lang.Object.wait(Native Method)
      	at java.lang.Object.wait(Object.java:503)
      	at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
      	- locked <0x00000005000b0710> (a java.lang.ref.Reference$Lock)
      
      "main" prio=10 tid=0x00007f93d8012000 nid=0x7382 waiting on condition [0x00007f93de012000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x00000005eb262490> (a scala.concurrent.impl.Promise$CompletionLatch)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
      	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
      	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
      	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
      	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
      	at scala.concurrent.Await$.result(package.scala:107)
      	at scala.concurrent.Await.result(package.scala)
      	at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:143)
      	at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:205)
      	at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:195)
      	at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:179)
      	- locked <0x0000000500870ac0> (a java.lang.Object)
      	at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
      	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
      	at it.sicras.persistence.flink.apps.SicrasReconciledGenerator.main(SicrasReconciledGenerator.java:170)
      
      "VM Thread" prio=10 tid=0x00007f93d807a800 nid=0x738c runnable 
      
      "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f93d8028000 nid=0x7384 runnable 
      
      "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f93d802a000 nid=0x7385 runnable 
      
      "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f93d802b800 nid=0x7386 runnable 
      
      "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f93d802d800 nid=0x7387 runnable 
      
      "GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f93d802f800 nid=0x7388 runnable 
      
      "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f93d8031800 nid=0x7389 runnable 
      
      "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f93d8033000 nid=0x738a runnable 
      
      "GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f93d8035000 nid=0x738b runnable 
      
      "VM Periodic Task Thread" prio=10 tid=0x00007f93d80d8000 nid=0x739c waiting on condition 
      
      JNI global references: 7826
      

      Attachments

        Activity

          People

            uce Ufuk Celebi
            uce Ufuk Celebi
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: