Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2617

ConcurrentModificationException when using HCatRecordReader to access a hive table

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9, 0.10.0
    • Component/s: API / DataSet
    • Labels:
      None

      Description

      I don't know if it's a hcat or a flink problem, but when reading a hive table in a cluster with many slots (20 threads per container), I systematically run into a ConcurrentModificationException in a copy method of a Configuration object that change during the copy.

      From what I understand, this object comes from TaskAttemptContext.getConfiguration() created by HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());

      Maybe the job.Configuration object passed to the constructor of HadoopInputFormatBase should be cloned somewhere?

      Stack trace is :

      org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
      org.apache.flink.client.program.Client.run(Client.java:413)
      org.apache.flink.client.program.Client.run(Client.java:356)
      org.apache.flink.client.program.Client.run(Client.java:349)
      org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
      com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
      com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
      com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
      com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
      com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
      sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      java.lang.reflect.Method.invoke(Method.java:606)
      org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
      org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
      org.apache.flink.client.program.Client.run(Client.java:315)
      org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
      org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
      org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
      org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
      
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
      scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
      scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
      org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
      org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
      scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
      akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
      akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      akka.actor.ActorCell.invoke(ActorCell.scala:487)
      akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      akka.dispatch.Mailbox.run(Mailbox.scala:221)
      akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      Caused by:  java.util.ConcurrentModificationException
      java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
      java.util.HashMap$KeyIterator.next(HashMap.java:960)
      java.util.AbstractCollection.addAll(AbstractCollection.java:341)
      java.util.HashSet.<init>(HashSet.java:117)
      org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554)
      org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439)
      org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637)
      org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112)
      org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)
      org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182)
      org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56)
      org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      java.lang.Thread.run(Thread.java:744)
      

      Flink "user" code looks like:

      import java.io.IOException;
      import java.io.Serializable;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.common.functions.RichFlatMapFunction;
      import org.apache.flink.api.common.functions.RichMapFunction;
      import org.apache.flink.api.common.io.FileOutputFormat;
      import org.apache.flink.api.java.DataSet;
      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.core.fs.FileSystem.WriteMode;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.SinkFunction;
      import org.apache.flink.util.Collector;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.compress.CompressionCodec;
      import org.apache.hadoop.mapreduce.InputFormat;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hive.hcatalog.data.DefaultHCatRecord;
      import org.apache.hive.hcatalog.data.schema.HCatSchema;
      import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
      
      
      (...) 
              final Job job = Job.getInstance();
              @SuppressWarnings({ "unchecked", "rawtypes" })
              final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, 
              DefaultHCatRecord>(
                  (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //
                  NullWritable.class, //
                  DefaultHCatRecord.class, //
                  job);
      
              final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());
              @SuppressWarnings("serial")
              final DataSet<T> dataSet = cluster
                  .createInput(inputFormat)
                  .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {
                      @Override
                      public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception { // NOPMD
                          final T record = createBean(value.f1, inputSchema);
                          out.collect(record);
                      }
                  }).returns(beanClass);
      (...)            
      

        Attachments

          Activity

            People

            • Assignee:
              fhueske Fabian Hueske
              Reporter:
              ArnaudL Arnaud Linz
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: