Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3880

Improve performance of Accumulator map

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.1.0
    • 1.1.0
    • None
    • None

    Description

      I was looking at improving DataSet performance - this is for a job created using the Cascading-Flink planner for Cascading 3.1.

      While doing a quick "poor man's profiler" session with one of the TaskManager processes, I noticed that many (most?) of the threads that were actually running were in this state:

      "DataSource (/working1/terms) (8/20)" daemon prio=10 tid=0x00007f55673e0800 nid=0x666a runnable [0x00007f556abcf000]
         java.lang.Thread.State: RUNNABLE
          at java.util.Collections$SynchronizedMap.get(Collections.java:2037)
          - locked <0x00000006e73fe718> (a java.util.Collections$SynchronizedMap)
          at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getAccumulator(AbstractRuntimeUDFContext.java:162)
          at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getLongCounter(AbstractRuntimeUDFContext.java:113)
          at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.getOrInitCounter(FlinkFlowProcess.java:245)
          at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:128)
          at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:122)
          at cascading.tap.hadoop.util.MeasuredRecordReader.next(MeasuredRecordReader.java:65)
          at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:97)
          at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166)
          at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139)
          at com.dataartisans.flink.cascading.runtime.source.TapSourceStage.readNextRecord(TapSourceStage.java:70)
          at com.dataartisans.flink.cascading.runtime.source.TapInputFormat.reachedEnd(TapInputFormat.java:175)
          at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
          at java.lang.Thread.run(Thread.java:745)}}}
      

      It looks like Cascading is asking Flink to increment a counter with each Tuple read, and that in turn is often blocked on getting access to the Accumulator object in a map. It looks like this is a SynchronizedMap, but using a ConcurrentHashMap (for example) would reduce this contention.

      Attachments

        Activity

          People

            mxm Maximilian Michels
            kkrugler Kenneth William Krugler
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: