Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
None
-
None
Description
I don't know if it's a hcat or a flink problem, but when reading a hive table in a cluster with many slots (20 threads per container), I systematically run into a ConcurrentModificationException in a copy method of a Configuration object that change during the copy.
From what I understand, this object comes from TaskAttemptContext.getConfiguration() created by HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
Maybe the job.Configuration object passed to the constructor of HadoopInputFormatBase should be cloned somewhere?
Stack trace is :
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. org.apache.flink.client.program.Client.run(Client.java:413) org.apache.flink.client.program.Client.run(Client.java:356) org.apache.flink.client.program.Client.run(Client.java:349) org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) org.apache.flink.client.program.Client.run(Client.java:315) org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) akka.actor.Actor$class.aroundReceive(Actor.scala:465) org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) akka.actor.ActorCell.invoke(ActorCell.scala:487) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) akka.dispatch.Mailbox.run(Mailbox.scala:221) akka.dispatch.Mailbox.exec(Mailbox.scala:231) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.ConcurrentModificationException java.util.HashMap$HashIterator.nextEntry(HashMap.java:926) java.util.HashMap$KeyIterator.next(HashMap.java:960) java.util.AbstractCollection.addAll(AbstractCollection.java:341) java.util.HashSet.<init>(HashSet.java:117) org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554) org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439) org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637) org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112) org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91) org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182) org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56) org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) java.lang.Thread.run(Thread.java:744)
Flink "user" code looks like:
import java.io.IOException; import java.io.Serializable; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.Collector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; (...) final Job job = Job.getInstance(); @SuppressWarnings({ "unchecked", "rawtypes" }) final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, DefaultHCatRecord>( (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), // NullWritable.class, // DefaultHCatRecord.class, // job); final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration()); @SuppressWarnings("serial") final DataSet<T> dataSet = cluster .createInput(inputFormat) .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() { @Override public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception { // NOPMD final T record = createBean(value.f1, inputSchema); out.collect(record); } }).returns(beanClass); (...)