Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20088

[Kinesis][Polling] Issue using Polling consumer at timestamp with empty shard

    XMLWordPrintableJSON

    Details

      Description

      Background

      The consumer fails when a Polling record publisher uses a timestamp sentinel starting position and the first record batch is empty. This is because the consumer tries to recalculate the start position from the timestamp sentinel, this operation is not supported.

      Reproduction Steps

      Setup an application consuming from Kinesis with following properties and consume from an empty shard:

      String format = "yyyy-MM-dd'T'HH:mm:ss";
      String date = new SimpleDateFormat(format).format(new Date());
      
      consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, date);
      consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, format);
      consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); 

      Error

      Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
      	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
      	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
      	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
      	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
      	at akka.dispatch.OnComplete.internal(Future.scala:264)
      	at akka.dispatch.OnComplete.internal(Future.scala:261)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
      	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
      	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
      	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
      	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
      	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
      	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
      	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
      	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
      	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
      	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
      	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	... 4 more
      Caused by: java.lang.IllegalArgumentException: Unexpected sentinel type: AT_TIMESTAMP_SEQUENCE_NUM
      	at software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:107)
      	at software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSequenceNumber(StartingPosition.java:90)
      	at software.amazon.kinesis.connectors.flink.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:73)
      	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:113)
      	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:98)
      	at software.amazon.kinesis.connectors.flink.internals.ShardConsumer.run(ShardConsumer.java:108)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      	at java.util.concurrent.FutureTask.run(FutureTask.java)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

       

      Solution

      This is fixed by reusing the existing timestamp starting position in this condition.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                danny.cranmer Danny Cranmer
                Reporter:
                danny.cranmer Danny Cranmer
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: