Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18637

Key group is not in KeyGroupRange

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • None
    • None
    • None

    Description

      I'm getting this error when creating a savepoint. I've read in https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by unstable hashcode or equals on the key, or improper use of reinterpretAsKeyedStream.
       
      My key is a string and I don't use reinterpretAsKeyedStream.

       

      senv
        .addSource(source)
        .flatMap(…)
        .filterWith { case (metadata, _, _) => … }
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(…))
        .keyingBy { case (meta, _) => meta.toPathString }
        .process(new TruncateLargeSessions(config.sessionSizeLimit))
        .keyingBy { case (meta, _) => meta.toPathString }
        .window(EventTimeSessionWindows.withGap(Time.of(…)))
        .process(new ProcessSession(sessionPlayback, config))
        .addSink(sink)

       

      org.apache.flink.util.FlinkException: Triggering a savepoint for the job 962fc8e984e7ca1ed65a038aa62ce124 failed.
      	at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
      	at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
      	at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
      	at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
      	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
      	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
      	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
      Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
      	at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
      	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
      	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
      	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
      	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
      	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
      	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
      	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
      	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
      	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
      	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
      	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
      	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
      	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:429)
      	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1466)
      	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1379)
      	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:807)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
      	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:428)
      	... 11 more
      Caused by: java.lang.Exception: Could not materialize checkpoint 15 for operator KeyedProcess (11/216).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
      	... 3 more
      Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Key group 13880 is not in KeyGroupRange{startKeyGroup=24, endKeyGroup=26}.
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      	at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
      	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
      	... 3 more
      Caused by: java.lang.IllegalArgumentException: Key group 13880 is not in KeyGroupRange{startKeyGroup=24, endKeyGroup=26}.
      	at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
      	at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
      
      	at 
      org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:350)
      
      	at 
      org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
      
      	at 
      org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
      
      	at 
      org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
      	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              oripwk Ori Popowski
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: