Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16628

Complex JSON sink into kafka error

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.10.0
    • Fix Version/s: None
    • Component/s: Table SQL / Client
    • Labels:
      None

      Description

      // the kafka data
      
      {"svt":"2020-02-24T12:58:09.209+0800""ops":{"id":"281708d0-4092-4c21-9233-931950b6eccf"}}

       

      //source table
      
      CREATE TABLE source_kafka_sasl (
          svt STRING,
          ops ROW<id STRING>
      ) WITH ()
      
      //sink table
      
      CREATE TABLE sink_kafka_sasl (
          svt STRING,
          ops ROW<id STRING>
      ) WITH ()
      
      // operating
      insert into sink_kafka_sasl
      select svt,ops from source_kafka_sasl
      
      Caused by: java.lang.RuntimeException: Could not serialize row '2020-02-24T12:58:09.209+0800,281708d0-4092-4c21-9233-931950b6eccf'. Make sure that the schema matches the input.
      	at org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:141)
      	at org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:68)
      	at org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:47)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:771)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
      	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
      	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
      	at SinkConversion$3.processElement(Unknown Source)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
      	at SourceConversion$1.processElement(Unknown Source)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
      	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
      	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
      	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
      	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
      	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
      	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
      Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
      	at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337)
      	at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
      	at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345)
      	at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189)
      	at org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138)
      	... 32 more

       

       

      how can i sink a json like {"svt":"2020-02-24T12:58:09.209+0800","ops":{"id":"281708d0-4092-4c21-9233-931950b6eccf"}} into kafka?

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jark Jark Wu
                Reporter:
                jackray jackray wang
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: