Kafka
  1. Kafka
  2. KAFKA-937

ConsumerFetcherThread can deadlock

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      We have the following access pattern that can introduce a deadlock.

      AbstractFetcherThread.processPartitionsWithError() ->
      ConsumerFetcherThread.processPartitionsWithError() ->
      ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
      LeaderFinderThread holding lock while calling AbstractFetcherManager.shutdownIdleFetcherThreads() ->
      AbstractFetcherManager calling fetcher.shutdown, which needs to wait until AbstractFetcherThread.processPartitionsWithError() completes.

      1. kafka-937.patch
        8 kB
        Jun Rao
      2. kafka-937_delta.patch
        1 kB
        Jun Rao
      3. kafka-937_ConsumerOffsetChecker.patch
        1 kB
        Jun Rao

        Activity

        Hide
        Jun Rao added a comment -

        Attach a patch. The fix is to make sure that the fetcher thread never gets blocked, no matter what other threads like the LeaderFindThread does. Specifically, LeaderFinderThread no longer holds lock when calling addFetcher() or shudownIdleFetcherThreads(). This way ConsumerFetcherManager.addPartitionsWithError() never gets blocked, which in turn means that the ConsumerFetcherThread never gets blocked and can complete the shutdown if required.

        Double-checked other paths and don't see any other potential deadlocks.

        Also fixed another potential socket leak through SimpleConsumer. When we shutdown a fetcher, we first interrupt the fetcher thread and close the SimpleConsumer. However, after that, it is possible for the fetcher thread to make another fetch request on SimpleConsumer. This will establish the socket connection again. Add a fix in SimpleConsumer so that after it is closed, no new socket connections will be established and the fetch call will get a ClosedChannelException instead.

        Show
        Jun Rao added a comment - Attach a patch. The fix is to make sure that the fetcher thread never gets blocked, no matter what other threads like the LeaderFindThread does. Specifically, LeaderFinderThread no longer holds lock when calling addFetcher() or shudownIdleFetcherThreads(). This way ConsumerFetcherManager.addPartitionsWithError() never gets blocked, which in turn means that the ConsumerFetcherThread never gets blocked and can complete the shutdown if required. Double-checked other paths and don't see any other potential deadlocks. Also fixed another potential socket leak through SimpleConsumer. When we shutdown a fetcher, we first interrupt the fetcher thread and close the SimpleConsumer. However, after that, it is possible for the fetcher thread to make another fetch request on SimpleConsumer. This will establish the socket connection again. Add a fix in SimpleConsumer so that after it is closed, no new socket connections will be established and the fetch call will get a ClosedChannelException instead.
        Hide
        Joel Koshy added a comment -

        +1 on the patch.

        Additionally, can you make this small (unrelated change) - make the console consumer's autoCommitIntervalOpt default to ConsumerConfig.AutoCommitInterval ?

        I think it is worth documenting the typical path of getting into the above deadlock:

        • Assume at least two fetchers F1, F2
        • One or more partitions on F1 go into error and leader finder thread L is notified
        • L unblocks and proceeds to handle partitions without leader. It holds the ConsumerFetcherManager's lock at this point.
        • All partitions on F2 go into error.
        • F2's handlePartitionsWithError removes partitions from its fetcher's partitionMap. (At this point, F2 is by definition an idle fetcher thread.)
        • L tries to shutdown idle fetcher threads - i.e., tries to shutdown F2.
        • However, F2 at this point is trying to addPartitionsWithError which needs to acquire the ConsumerFetcherManager's lock (which is currently held by L).

        It is relatively rare in the sense that it can happen only if all partitions on the fetcher are in error. This could happen for example if all the leaders for those partitions move or become unavailable. Another instance where this may be seen in practice is mirroring: we ran into it when running the mirror maker with a very large number of producers and ran out of file handles. Running out of file handles could easily lead to exceptions on most/all fetches and result in an error state for all partitions.

        Show
        Joel Koshy added a comment - +1 on the patch. Additionally, can you make this small (unrelated change) - make the console consumer's autoCommitIntervalOpt default to ConsumerConfig.AutoCommitInterval ? I think it is worth documenting the typical path of getting into the above deadlock: Assume at least two fetchers F1, F2 One or more partitions on F1 go into error and leader finder thread L is notified L unblocks and proceeds to handle partitions without leader. It holds the ConsumerFetcherManager's lock at this point. All partitions on F2 go into error. F2's handlePartitionsWithError removes partitions from its fetcher's partitionMap. (At this point, F2 is by definition an idle fetcher thread.) L tries to shutdown idle fetcher threads - i.e., tries to shutdown F2. However, F2 at this point is trying to addPartitionsWithError which needs to acquire the ConsumerFetcherManager's lock (which is currently held by L). It is relatively rare in the sense that it can happen only if all partitions on the fetcher are in error. This could happen for example if all the leaders for those partitions move or become unavailable. Another instance where this may be seen in practice is mirroring: we ran into it when running the mirror maker with a very large number of producers and ran out of file handles. Running out of file handles could easily lead to exceptions on most/all fetches and result in an error state for all partitions.
        Hide
        Jun Rao added a comment -

        Thanks for the review. Committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for the review. Committed to 0.8.
        Hide
        Jun Rao added a comment -

        Just realize a rare corner case issue. addFetcher() may call ConsumerFetcherThread.handleOffsetOutOfRange() and can get an exception from SimpleConsumer.earliestOrLatestOffset(). In this case, we shouldn't kill the leaderFinderThread unless it is shut down. Attach a patch.

        Show
        Jun Rao added a comment - Just realize a rare corner case issue. addFetcher() may call ConsumerFetcherThread.handleOffsetOutOfRange() and can get an exception from SimpleConsumer.earliestOrLatestOffset(). In this case, we shouldn't kill the leaderFinderThread unless it is shut down. Attach a patch.
        Hide
        Alexey Ozeritskiy added a comment -

        That patch breaks kafka.tools.ConsumerOffsetChecker:

        [2013-06-24 18:11:17,638] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer)
        java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)

        Show
        Alexey Ozeritskiy added a comment - That patch breaks kafka.tools.ConsumerOffsetChecker: [2013-06-24 18:11:17,638] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
        Hide
        Jun Rao added a comment -

        Alexey,

        This issue seems to be unrelated to this patch. The exception is thrown in SimpleConsumer and this patch doesn't touch SimpleConsumer. Could you describe how you get to this issue and how reproducible it is?

        Show
        Jun Rao added a comment - Alexey, This issue seems to be unrelated to this patch. The exception is thrown in SimpleConsumer and this patch doesn't touch SimpleConsumer. Could you describe how you get to this issue and how reproducible it is?
        Hide
        Alexey Ozeritskiy added a comment -

        kafka.tools.ConsumerOffsetChecker uses SimpleConsumer for OffsetRequest

        To reproduce just make git pull and run
        bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group --zkconnect zk-servers --topic topic

        The problem is in the following diff:

        diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
        index bdeee91..1c28328 100644
        — a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
        +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
        @@ -37,6 +37,7 @@ class SimpleConsumer(val host: String,
        private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
        val brokerInfo = "host_%s-port_%s".format(host, port)
        private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
        + private var isClosed = false

        private def connect(): BlockingChannel = {
        close
        @@ -58,7 +59,8 @@ class SimpleConsumer(val host: String,

        def close() {
        lock synchronized

        { - disconnect() + disconnect() + isClosed = true }

        }

        @@ -123,7 +125,7 @@ class SimpleConsumer(val host: String,
        def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)

        private def getOrMakeConnection() {

        • if(!blockingChannel.isConnected)
          Unknown macro: {+ if(!isClosed && !blockingChannel.isConnected) { connect() } }

        SimpleConsumer stops working after close (ConsumerOffsetChecker.scala, line 77)

        Show
        Alexey Ozeritskiy added a comment - kafka.tools.ConsumerOffsetChecker uses SimpleConsumer for OffsetRequest To reproduce just make git pull and run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group --zkconnect zk-servers --topic topic The problem is in the following diff: diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index bdeee91..1c28328 100644 — a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -37,6 +37,7 @@ class SimpleConsumer(val host: String, private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) val brokerInfo = "host_%s-port_%s".format(host, port) private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) + private var isClosed = false private def connect(): BlockingChannel = { close @@ -58,7 +59,8 @@ class SimpleConsumer(val host: String, def close() { lock synchronized { - disconnect() + disconnect() + isClosed = true } } @@ -123,7 +125,7 @@ class SimpleConsumer(val host: String, def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) private def getOrMakeConnection() { if(!blockingChannel.isConnected) Unknown macro: {+ if(!isClosed && !blockingChannel.isConnected) { connect() } } SimpleConsumer stops working after close (ConsumerOffsetChecker.scala, line 77)
        Show
        Alexey Ozeritskiy added a comment - - edited This patch touches SimpleConsumer. proof: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5
        Hide
        Jun Rao added a comment -

        Thanks for reporting this. It is actually a real issue. However, the problem is not because of the change in SimpleConsumer, but in how ConsumerOffsetChecker uses SimpleConsumer. It should only close a SimpleConsumer after it's no longer needed. Could you try the attached patch?

        Show
        Jun Rao added a comment - Thanks for reporting this. It is actually a real issue. However, the problem is not because of the change in SimpleConsumer, but in how ConsumerOffsetChecker uses SimpleConsumer. It should only close a SimpleConsumer after it's no longer needed. Could you try the attached patch?
        Hide
        Alexey Ozeritskiy added a comment -

        This patch works, thanks.

        Show
        Alexey Ozeritskiy added a comment - This patch works, thanks.
        Hide
        Jun Rao added a comment -

        Alexey,

        Thanks for the review. Committed the ConsumerOffsetChecker patch to 0.8.

        Show
        Jun Rao added a comment - Alexey, Thanks for the review. Committed the ConsumerOffsetChecker patch to 0.8.
        Hide
        Joel Koshy added a comment -

        The delta patch slipped through the cracks. We hit that issue recently - a network glitch led to the leader-finder-thread hitting an exception while adding fetchers and the thread quit:

        leader-finder-thread], Error due to 
        java.net.ConnectException: Connection timed out
                at sun.nio.ch.Net.connect(Native Method)
                at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507)
                at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
                at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
                at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129)
                at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
                at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
                at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:144)
                at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
                at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:180)
                at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:80)
                at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:95)
                at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:92)
                at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
                at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
                at scala.collection.Iterator$class.foreach(Iterator.scala:631)
                at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
                at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
                at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
                at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
                at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:92)
                at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
        

        +1 on kafka-937_delta with one minor comment: change the log to indicate that will attempt to look up the leader again and add fetchers - right now it just says "failed to add".

        Show
        Joel Koshy added a comment - The delta patch slipped through the cracks. We hit that issue recently - a network glitch led to the leader-finder-thread hitting an exception while adding fetchers and the thread quit: leader-finder-thread], Error due to java.net.ConnectException: Connection timed out at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:144) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:180) at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:95) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:92) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:92) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) +1 on kafka-937_delta with one minor comment: change the log to indicate that will attempt to look up the leader again and add fetchers - right now it just says "failed to add".
        Hide
        Jun Rao added a comment -

        Thanks for the review. Committed the delta patch to 0.8, after fixing the logging.

        Show
        Jun Rao added a comment - Thanks for the review. Committed the delta patch to 0.8, after fixing the logging.

          People

          • Assignee:
            Jun Rao
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development