Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Won't Fix
-
1.6.0
-
None
-
None
Description
Flink sql deal with User behavior data collection, such as:
{ "event_id": "session_start", "timestamp": "-", // error data, "viewport_height": "667", "viewport_width": "-" //error data }
Causing exception info :
2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - Could not restart the job Flink Streaming Job (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it. java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp at org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91. 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO checkpoint.StandaloneCompletedCheckpointStore (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down
Use Flink checkpoint function and Uncatch exception lead to Could not restart this job, so just error data happen exception set null, like under image.hope flink commiter provide better solution。