Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7955

Dynamic Writer - combining computed shards' number for late events with window's

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.10.0
    • None
    • runner-dataflow
    • None

    Description

      Runner attempts to combine shards' numbers computed for the window and following panes with late events even if the window's accumulation mode is set to DISCARDING_FIRED_PANES. This results in an exception thrown by SingletonCombineFn.

      Steps to recreate this behaviour:

      • create dynamic writer with `withSharding()` option
      • send stream of messages to Dataflow job via PubSub
      • retain some messages
      • let the rest of the messages flow to the job, until the watermark reaches the window's end
      • release retained messages

      In case all PubSub traffic is halted and released after window's end, Beam won't try to merge them. This only happens, if just a part of messages come as late events.

      Stacktrace:

      java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
              org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358)
              org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448)
              org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429)
              org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925)
              org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
              org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
              org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
              org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
              org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
              org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
              org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
              org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
              org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
              org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
              org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              java.lang.Thread.run(Thread.java:745)
      

      Sharding implementation:

      class RecordCountSharding[T](recordsPerShard: Int) extends PTransform[PCollection[T], PCollectionView[java.lang.Integer]] {
        import RecordCountSharding._
        override def expand(input: PCollection[T]): PCollectionView[java.lang.Integer] = {
          val count = input.apply(
            Combine.globally(Count.combineFn[T]()).withoutDefaults()
          )
      
          val shardsNum = count.apply(
            MapElements.into(TypeDescriptors.integers())
              .via(Contextful.fn[java.lang.Long, java.lang.Integer] { count: java.lang.Long =>
                new java.lang.Integer(getShardsNum(count, recordsPerShard))
              })
          )
            shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows))
        }
      }
      
      object RecordCountSharding {
        val ShardsNumForEmptyWindows = 0
      
        def apply[T](recordsPerShard: Int): RecordCountSharding[T] = {
          if (recordsPerShard <= 0) {
            throw new IllegalArgumentException(s"recordsPerShard must be greater than 0! Got $recordsPerShard")
          }
          new RecordCountSharding[T](recordsPerShard)
        }
      
        def getShardsNum(count: Long, recordsPerShard: Int): Int = {
          (count.toFloat / recordsPerShard.toFloat).ceil.toInt
        }
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            mariusz.r.allegro Mariusz Rebandel
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: