Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6683

ReplicaFetcher crashes with "Attempted to complete a transaction which was not started"

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.0.0
    • Fix Version/s: 1.1.0
    • Component/s: replication
    • Labels:
      None
    • Environment:
      os: GNU/Linux
      arch: x86_64
      Kernel: 4.9.77
      jvm: OpenJDK 1.8.0

      Description

      We have been experiencing this issue lately when restarting or replacing brokers of our Kafka clusters during maintenance operations.

      Having restarted or replaced a broker, after some minutes performing normally it may suddenly throw the following exception and stop replicating some partitions:

      2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
      java.lang.IllegalArgumentException: Attempted to complete a transaction which was not started
              at kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
              at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
              at kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
              at scala.collection.immutable.List.foreach(List.scala:389)
              at scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
              at scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
              at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
              at kafka.log.Log.loadProducersFromLog(Log.scala:540)
              at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
              at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
              at scala.collection.Iterator.foreach(Iterator.scala:929)
              at scala.collection.Iterator.foreach$(Iterator.scala:929)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
              at scala.collection.IterableLike.foreach(IterableLike.scala:71)
              at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at kafka.log.Log.loadProducerState(Log.scala:514)
              at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
              at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
              at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
              at kafka.log.Log.truncateTo(Log.scala:1467)
              at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
              at kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
              at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
              at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
              at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
              at kafka.log.LogManager.truncateTo(LogManager.scala:445)
              at kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
              at scala.collection.Iterator.foreach(Iterator.scala:929)
              at scala.collection.Iterator.foreach$(Iterator.scala:929)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
              at scala.collection.IterableLike.foreach(IterableLike.scala:71)
              at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
              at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
              at kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
              at kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
              at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
              at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
              at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
      [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
      

      As during system updates all brokers in a cluster are restarted, it happened some times the issue to manifest in different brokers holding replicas for the same partition at the same time, which caused downtime due not enough ISR replica.

      It is necessary to restart the faulted broker in order to recover partition replication, but after hitting this issue we often face that after restarting the broker it shuts itself down with the following error among lots of warnings due corrupted indices:

      [2018-03-05 16:02:22,450] ERROR There was an error in one of the threads during logs loading: org.apache.kafka.common.errors.ProducerFencedException: Invalid producer epoch: 20 (zombie): 21 (current) (kafka.log.LogManager)
      [2018-03-05 16:02:22,453] FATAL [KafkaServer id=10] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
      org.apache.kafka.common.errors.ProducerFencedException: Invalid producer epoch: 20 (zombie): 21 (current)
      

      When this happened the only way to keep Kafka up has been to delete all the data inside the log directory (/var/lib/kafka in our case).

      The problem manifest randomly but we managed to reproduce the ReplicaFetcher crashing (although not the failed startup) out of our production cluster by doing this:
       1 - Setup a Kafka cluster running 3 brokers (see attached configuration): 10, 11 and 12
       2 - Create a topic with the following settings: Topic:mytopic2, PartitionCount:12, ReplicationFactor:3, Configs:segment.bytes=52428800,retention.ms=1800000
       3 - Run some producers like this:

      while true
      do
       ./kafka-producer-perf-test.sh --topic mytopic2 --record-size=2048 --producer-props bootstrap.servers=ec2-XXX-XXX-XXX-XXX.eu-west-1.compute.amazonaws.com:9092 enable.idempotence=true --throughput 50 --num-records 6000 --transactional-id pruebatrans4 --transaction-duration-ms 100
      done
      

       4 - Run some consumer on mytopic2.
       5 - Wait for some time for semegments to be rotated.
       6 - Stop broker 11, remove everything inside /var/lib/kafka, start it again.
       7 - Wait for data to be replicated and all replicas be in ISR.
       8 - Stop broker 12, remove everything inside /var/lib/kafka, start it again.
       9 - Wait for data to be replicated and all replicas be in ISR.
       10 - Wait for the issue to manifest. If it manifests, after some minutes of normal behaviour, broker 11 may suddenly stop replicating and some partitions may appear underreplicated.

      If replication after restarting node 12 takes long enough, node 11 may crash its ReplicaFetcher before replicas in 12 are available causing partitions to go offline. Whe have manage to reproduce the issue without deleting log data in steps 6 and 8 but it seems more likely to manifest if we do it. The broker experiencing the issue is quite random, but most of the time seems to be one of the already restarted brokers but not necessary the latest one.

        Attachments

        1. server.properties
          0.9 kB
          Chema Sanchez

          Issue Links

            Activity

              People

              • Assignee:
                hachikuji Jason Gustafson
                Reporter:
                chema.sanchez Chema Sanchez
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: