Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11806

KafkaIO - Partition Recognition in WriteRecords

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: P2
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.29.0
    • Component/s: io-java-kafka
    • Labels:
      None

      Description

      At present, the `WriteRecords` support for the KafkaIO does not recognize the `partition` property defined on `ProducerRecord` instances consumed by the transform. This ticket would added support so that any explicit partitioning that was defined would be acknowledged accordingly while still respecting the default behavior if it was not explicitly included.

      This can be easily identified within the `KafkaWriter` class used behind the scenes in the `WriteRecords` transform:

      producer.send(
              // The null property in the following constructor represents partition
              new ProducerRecord<>(
                  topicName, null, timestampMillis, record.key(), record.value(), record.headers()),
              new SendCallback());
      

      Because of this limitation, in a scenario where a user may desire an explicitly defined partitioning strategy as opposed to round-robin, they would have to create their own custom DoFn that defines a KafkaProducer (preferably within a @StartBundle) similar to the following approach (in Kotlin):

      private class ExampleProducerDoFn(...): DoFn<...>() {
              private lateinit var producer: KafkaProducer<...>
      
              @StartBundle
              fun startBundle(context: StartBundleContext) {
                  val options = context.pipelineOptions.`as`(YourPipelineOptions::class.java)
                  producer = getKafkaProducer(options)
              }
      
              @ProcessElement
              fun processElement(context: ProcessContext){
                  // Omitted for brevity
                  
                  // Produce the record to a specific topic at a specific partition
                  producer.send(ProducerRecord(
                      "your_topic_here",
                      your_partition_here,
                      context.element().kv.key,
                      context.element().kv.value
                  ))
              }
      }
      

        Attachments

          Activity

            People

            • Assignee:
              rionmonster Rion Williams
              Reporter:
              rionmonster Rion Williams

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1.5h
                1.5h

                  Issue deployment