Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10478

Kafka Producer wrongly formats "%" for transaction ID

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Duplicate
    • 1.4.2
    • None
    • Connectors / Kafka
    • Flink 1.4.2

      Scala 2.11.12

      jdk1.8.0_162

      Running on local embedded Flink mini cluster (This happened on a standalone cluster with another code)

    Description

      Kafka Producer with exactly-once feature uses its task name for a transaction ID. Because the Producer uses the name as a format string directly, in the case it contains "%" the job fails.

      Code to reproduce:

      object ExampleRunner {
        def main(args: Array[String]): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
          env.enableCheckpointing(1000)
          env.getConfig.disableSysoutLogging()
          env.setRestartStrategy(RestartStrategies.noRestart)
      
          val p = new java.util.Properties
          Map("bootstrap.servers" -> "192.168.1.100:9092", "transaction.timeout.ms" -> (10 * 60 * 1000).toString).foreach { case (k,v) => p.setProperty(k,v) }
      
          env
            .fromCollection(100 to 200)
            .map(_.toString)
            .addSink(new FlinkKafkaProducer011(
              "test",
              new KeyedSerializationSchemaWrapper(new SimpleStringSchema),
              p,
              Semantic.EXACTLY_ONCE)).name("100%")
          env.execute()
        }
      }
      

      Raised exception:

      2018-10-02 17:00:12.918 [Map -> Sink: 100% (1/8)] INFO  o.a.flink.runtime.taskmanager.Task  - Map -> Sink: 100% (1/8) (25190aeccdce738afdd00e9320903d7b) switched from RUNNING to FAILED.
      java.util.MissingFormatWidthException: %-%
      	at java.util.Formatter$FormatSpecifier.checkText(Formatter.java:3040)
      	at java.util.Formatter$FormatSpecifier.<init>(Formatter.java:2733)
      	at java.util.Formatter.parse(Formatter.java:2560)
      	at java.util.Formatter.format(Formatter.java:2501)
      	at java.util.Formatter.format(Formatter.java:2455)
      	at java.lang.String.format(String.java:2940)
      	at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateTransactionalId(TransactionalIdsGenerator.java:91)
      	at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToUse(TransactionalIdsGenerator.java:72)
      	at org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator.generateIdsToAbort(TransactionalIdsGenerator.java:85)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:850)
      	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
      	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      	at java.lang.Thread.run(Thread.java:748)
      

      Attachments

        Issue Links

          Activity

            People

              xleesf leesf
              woby Obi Tetsuya
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m