Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34414

EXACTLY_ONCE guarantee doesn't work properly for Flink/Pulsar connector

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.17.2
    • None
    • Connectors / Pulsar
    • None

    Description

      Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version 1.17.2) when there is an exception thrown within the job, the job gets restarted, starts from the last checkpoint, but the sink writes to the output more events than it should, even though the EXACT_ONCE guarantees are set everywhere. To be more specific, here is my Job's flow:

      • a Pulsar source that reads from the input topic,
      • a simple processing function,
      • and a sink that writes to the output topic.

      Here is a fragment of the source creation:

          .setDeserializationSchema(Schema.AVRO(inClass), inClass)
          .setSubscriptionName(subscription)
          .enableSchemaEvolution()
          .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
          .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)
          .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1)
          .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, false);
      

      Here is the fragment of the sink creation:

          .setSerializationSchema(Schema.AVRO(outClass), outClass)
          .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
          .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, DeliveryGuarantee.EXACTLY_ONCE)
          .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
      

      And here is the Flink environment preparation:

          environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
          environment.enableCheckpointing(CHECKPOINTING_INTERVAL, CheckpointingMode.EXACTLY_ONCE);
      

      After sending 1000 events on the input topic, on the output topic I got 1048 events.

      I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator.

      Here is the MRE for this problem (mind that there is an internal dependency, but it may be commented out together with the code that relies on it): https://github.com/trojczak/flink-pulsar-connector-problem

      Attachments

        Activity

          People

            Unassigned Unassigned
            rtrojczak Rafał Trójczak
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: