Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
1.6.3
-
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.5.6</flink.version> <slf4j.version>1.7.7</slf4j.version> <log4j.version>1.2.17</log4j.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version></properties>
parameters.setBoolean("recursive.file.enumeration",true)
<properties> <project.build.sourceEncoding> UTF-8 </project.build.sourceEncoding> <flink.version> 1.5.6 </flink.version> <slf4j.version> 1.7.7 </slf4j.version> <log4j.version> 1.2.17 </log4j.version> <scala.binary.version> 2.11 </scala.binary.version> <scala.version> 2.11.12 </scala.version> </properties> parameters.setBoolean("recursive.file .enumeration",true)
Description
Symptom:
flink program can sucessfully read and process single ORC file from HDFS,whatever given reading path is file's parent folder or specific file path. However,I put them together in the same folder and program reads that folder, the following error always occurs.
val configHadoop = new org.apache.hadoop.conf.Configuration()
configHadoop.set("HADOOP_USER_NAME", "user")
configHadoop.set("fs.defaultFS", "xx.xxx.xx.xx")
val env = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(env)
val orcTableSource = OrcTableSource.builder()
// path to ORC file(s). NOTE: By default, directories are recursively scanned. .path(inPath)
// schema of ORC files .forOrcSchema("struct<storage_time:String,storage_source_msg:String,rev_name:String,where2:String,rev_phone:String,bind_email:String,ord_pin:String,ord_tm:String>")
// Hadoop configuration .withConfiguration(configHadoop)
// build OrcTableSource .build()
------------The following is stack info --------------------
Root exception
Timestamp: 2019-08-08, 20:15:05
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at com.jd.risk.flink.analysis.framework.core.EventOfflineServiceFrameWork.startService(EventOfflineServiceFrameWork.scala:41)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 97, Size: 17
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 97, Size: 17
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 97, Size: 17
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 17
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:335)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:350)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:84)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1375)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
CHAIN GroupReduce (GroupReduce at com.jd.risk.flink.analysis.framework.core.EventOfflineServiceFrameWork.startService(EventOfflineServiceFrameWork.scala:41)) -> Map (Key Extractor) (305/480)
Timestamp: 2019-08-08, 20:15:05 Location: LF-BCC-POD0-172-21-60-234.hadoop.jd.local:15837
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at com.jd.risk.flink.analysis.framework.core.EventOfflineServiceFrameWork.startService(EventOfflineServiceFrameWork.scala:41)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 97, Size: 17
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 97, Size: 17
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 97, Size: 17
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 17
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:335)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:350)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:84)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:99)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1375)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)