Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6290

Kafka Connect cast transformation should support logical types

    XMLWordPrintableJSON

    Details

    • Flags:
      Important

      Description

      I am facing same issue when consuming from KAFKA to HDFS with CAST TRANSFORMS. Any pointer please.

      My Connector :
      *********************

      {
       "name": "hdfs-sink-avro-cast-test-stndln",
       "config": {
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "internal.key.converter.schemas.enable": "false",
        "internal.value.converter.schemas.enable": "false",
        "offset.storage.file.filename": "/tmp/connect.offsets.avroHdfsConsumer.casttest.stndln",
        "offset.flush.interval.ms": "500",
        "parse.key": "true",
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "hadoop.home": "/usr/lib/hadoop",
        "hdfs.url": "hdfs://ip-127-34-56-789.us-east-1.compute.interna:8020",
        "topics": "avro_raw_KFK_SRP_USER_TEST_V,avro_raw_KFK_SRP_PG_HITS_TEST_V",
        "tasks.max": "1",
        "topics.dir": "/home/hadoop/kafka/data/streams/in/raw/casttest1",
        "logs.dir": "/home/hadoop/kafka/wal/streams/in/raw/casttest1",
        "hive.integration": "true",
        "hive.metastore.uris": "thrift://ip-127-34-56-789.us-east-1.compute.internal:9083",
        "schema.compatibility": "BACKWARD",
        "flush.size": "10000",
        "rotate.interval.ms": "1000",
        "mode": "timestamp",
        "transforms": "Cast",
        "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.Cast.spec": "residuals:float64,comp:float64"
       }
      }
      

      Exception :
      *************

      
      [2017-11-16 01:14:39,719] ERROR Task hdfs-sink-avro-cast-test-stndln-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
      org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.util.Date for field: "null"
              at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239)
              at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
              at org.apache.kafka.connect.data.Struct.put(Struct.java:214)
              at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152)
              at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108)
              at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:414)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2017-11-16 01:14:39,719] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
      [2017-11-16 01:14:39,719] INFO Shutting down Hive executor service. (io.confluent.connect.hdfs.DataWriter:309)
      
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                nigel.liang Nigel Liang
                Reporter:
                srpradhan Sudhir Pradhan
                Reviewer:
                Randall Hauch
              • Votes:
                1 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: