Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.15.0
-
None
-
None
Description
The following is my demo code.
public class WatermarkDemo { private final static String SERVICE_URL = "pulsar://localhost:6650"; private final static String ADMIN_URL = "http://localhost:8080"; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); PulsarSource<String> source = PulsarSource.builder() .setServiceUrl(SERVICE_URL) .setAdminUrl(ADMIN_URL) .setStartCursor(StartCursor.earliest()) .setTopics("ada") .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) .setSubscriptionName("my-subscription") .setSubscriptionType(SubscriptionType.Exclusive) .build(); PulsarSink<String> sink = PulsarSink.builder() .setServiceUrl(SERVICE_URL) .setAdminUrl(ADMIN_URL) .setTopics("beta") .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema())) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); DataStream stream = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Pulsar Source"); stream.sinkTo(sink); env.execute(); } }
It will throw the following error.
Caused by: java.lang.IllegalArgumentException: Invalid timestamp : '0' at org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:203) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.eventTime(TypedMessageBuilderImpl.java:204) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.createMessageBuilder(PulsarWriter.java:216) at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:141) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41) at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748)
Attachments
Issue Links
- is duplicated by
-
FLINK-29207 Pulsar message eventTime may be incorrectly set to a negative number
- Closed