Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10627

Connect TimestampConverter transform does not support multiple formats for the same field and only allows one field to be transformed at a time



    • New Feature
    • Status: Patch Available
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • connect


      Some of the limitations of the TimestampConverter transform are causing issues for us since we have a lot of different producers from different systems producing events to some of our topics.  We try our best to have governance on the data formats including strict usage of Avro schemas but there are still variations in timestamp data types that are allowed by the schema.

      In the end there will be multiple formats coming into the same timestamp fields (for example, with and without milliseconds, with and without a timezone specifier, etc).

      And then you get failed events in Connect with messages like this:

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorror(RetryWithToleranceOperator.java:178)
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
      	at org.apache.ntime.TransformationChain.apply(TransformationChain.java:50)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
      	at org.aect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
      	at org.apache.kafka.corkerSinkTask.iteration(WorkerSinkTask.java:228)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
      	at org.apache.kafka.connect.runtime.WorrkerTask.java:184)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$WorolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.connect.errors.DataException: Could not parse timestamp: value (2020-10-06T12:12:27h pattern (yyyy-MM-dd'T'HH:mm:ss.SSSX)
      	at org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampConverter.java:120)
      	at org.apache.kafka.connect.transformrter.convertTimestamp(TimestampConverter.java:450)
      	at org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:375)
      	at org.apachtransforms.TimestampConverter.applyWithSchema(TimestampConverter.java:362)
      	at org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:279)
      	at .connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithT.java:128)
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
      	... 14 more
      Caused by: java.text.Unparseable date: \"2020-10-06T12:12:27Z\"
      	at java.text.DateFormat.parse(DateFormat.java:366)
      	at org.apache.kafka.connect.transforms.TimestampConverter$1.toRaw(TimestampCo)
      	... 21 more


      My thinking is that maybe a good solution is to switch from using java.util.Date to instead using java.util.Time, then instead of SimpleDateFormatter switch to DateTimeFormatter which will allow usage of more sophisticated patterns in the config to match multiple different allowable formats.

      For example instead of effectively doing this:

      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");

      It can be something like this:

      DateTimeFormatter format = DateTimeFormatter.ofPattern("[yyyy-MM-dd[['T'][ ]HH:mm:ss[.SSSSSSSz][.SSS[XXX][X]]]]");

      Also if there are multiple timestamp fields in the schema/events, then today you have to chain multiple TimestampConverter transforms together but I can see a little bit of a performance impact if there are many timestamps on large events and the topic has a lot of events coming through.

      So it would be great actually if the field name could instead be a comma-separated list of field names (much like you can use with Cast, ReplaceField, etc transforms) and then it will just loop through each field in the list and apply the same logic (parse field based on string and give requested output type).



        Issue Links



              joshuagrisham Joshua Grisham
              joshuagrisham Joshua Grisham
              0 Vote for this issue
              2 Start watching this issue