Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Duplicate
-
2.0.0
-
None
-
None
Description
Steps:
~~~~~~
bash-4.2# kafka-avro-console-producer --broker-list localhost:9092 --topic connect_10oct_03 -property schema.registry.url=http://localhost:8081 --property value.schema='{"type":"record","name":"myrecord","fields":[
{"name":"f1","type":"string"},{"name":"f2","type":["null",
{"type":"long","logicalType":"timestamp-millis","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp"}],"default":null}]}'
{"f1": "value1","f2": {"null":null}}
{"f1": "value1","f2": {"long":1022}}
bash-4.2# kafka-avro-console-producer --broker-list localhost:9092 --topic connect_10oct_03 -property schema.registry.url=http://localhost:8081 --property value.schema='{"type":"record","name":"myrecord","fields":[
{"name":"f1","type":"string"},{"name":"f2","type":["null",
{"type":"long","logicalType":"timestamp-millis","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp"}],"default":null},{"name":"f3","type":"string","default":"green"}]}'
{"f1": "value1","f2":
,"f3":"toto"}
{"f1": "value1","f2":
,"f3":"toto"}
{"f1": "value1","f2":
,"f3":"toto"}
{"f1": "value1","f2":
,"f3":"tutu"}
bash-4.2# kafka-avro-console-producer --broker-list localhost:9092 --topic connect_10oct_03 -property schema.registry.url=http://localhost:8081 --property value.schema='{"type":"record","name":"myrecord","fields":[
{"name":"f1","type":"string"},{"name":"f2","type":["null",
{"type":"long","logicalType":"timestamp-millis","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp"}],"default":null}]}'
{"f1": "value1","f2": {"null":null}}
{"f1": "value1","f2": {"long":1022}}
bash-4.2# curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d '{"name":"hdfs-sink-connector-10oct-03", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"1", "topics":"connect_10oct_03", "hdfs.url": "hdfs://localhost:8020/tmp/", "flush.size":"1", "hive.integration": "true", "hive.metastore.uris": "thrift://localhost:9083", "hive.database": "rohit", "schema.compatibility": "BACKWARD"}}'
{"name":"hdfs-sink-connector-10oct-03","config":
,"tasks":[],"type":null}bash-4.2#
bash-4.2#
bash-4.2# curl http://localhost:8083/connectors/hdfs-sink-connector-10oct-03/status
{"name":"hdfs-sink-connector-10oct-03","connector":
,"tasks":[
],"type":"sink"}bash-4.2#
bash-4.2#
From connect logs:
---------------------------
[2018-10-10 12:31:44,816] ERROR WorkerSinkTask{id=hdfs-sink-connector-10oct-03-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.util.Date cannot be cast to java.lang.Number
at org.apache.kafka.connect.data.SchemaProjector.projectPrimitive(SchemaProjector.java:164)
at org.apache.kafka.connect.data.SchemaProjector.projectRequiredSchema(SchemaProjector.java:91)
at org.apache.kafka.connect.data.SchemaProjector.project(SchemaProjector.java:73)
at org.apache.kafka.connect.data.SchemaProjector.projectStruct(SchemaProjector.java:110)
at org.apache.kafka.connect.data.SchemaProjector.projectRequiredSchema(SchemaProjector.java:93)
at org.apache.kafka.connect.data.SchemaProjector.project(SchemaProjector.java:73)
at io.confluent.connect.storage.schema.StorageSchemaCompatibility.projectInternal(StorageSchemaCompatibility.java:196)
at io.confluent.connect.storage.schema.StorageSchemaCompatibility.projectInternal(StorageSchemaCompatibility.java:184)
at io.confluent.connect.storage.schema.StorageSchemaCompatibility.project(StorageSchemaCompatibility.java:156)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:378)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
... 10 more
Attachments
Attachments
Issue Links
- duplicates
-
KAFKA-7476 SchemaProjector is not properly handling Date-based logical types
- Resolved