Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1650

Mirror Maker could lose data on unclean shutdown.

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently if mirror maker got shutdown uncleanly, the data in the data channel and buffer could potentially be lost. With the new producer's callback, this issue could be solved.

      1. KAFKA-1650_2014-10-06_10:17:46.patch
        46 kB
        Jiangjie Qin
      2. KAFKA-1650_2014-11-12_09:51:30.patch
        56 kB
        Jiangjie Qin
      3. KAFKA-1650_2014-11-17_18:44:37.patch
        60 kB
        Jiangjie Qin
      4. KAFKA-1650_2014-11-20_12:00:16.patch
        81 kB
        Jiangjie Qin
      5. KAFKA-1650_2014-11-24_08:15:17.patch
        88 kB
        Jiangjie Qin
      6. KAFKA-1650_2014-12-03_15:02:31.patch
        90 kB
        Jiangjie Qin
      7. KAFKA-1650_2014-12-03_19:02:13.patch
        91 kB
        Jiangjie Qin
      8. KAFKA-1650_2014-12-04_11:59:07.patch
        93 kB
        Jiangjie Qin
      9. KAFKA-1650_2014-12-06_18:58:57.patch
        112 kB
        Jiangjie Qin
      10. KAFKA-1650_2014-12-08_01:36:01.patch
        124 kB
        Jiangjie Qin
      11. KAFKA-1650_2014-12-16_08:03:45.patch
        124 kB
        Jiangjie Qin
      12. KAFKA-1650_2014-12-17_12:29:23.patch
        149 kB
        Jiangjie Qin
      13. KAFKA-1650_2014-12-18_18:48:18.patch
        155 kB
        Jiangjie Qin
      14. KAFKA-1650_2014-12-18_22:17:08.patch
        157 kB
        Jiangjie Qin
      15. KAFKA-1650_2014-12-18_22:53:26.patch
        159 kB
        Jiangjie Qin
      16. KAFKA-1650_2014-12-18_23:41:16.patch
        165 kB
        Jiangjie Qin
      17. KAFKA-1650_2014-12-22_19:07:24.patch
        186 kB
        Jiangjie Qin
      18. KAFKA-1650_2014-12-23_07:04:28.patch
        194 kB
        Jiangjie Qin
      19. KAFKA-1650_2014-12-23_16:44:06.patch
        196 kB
        Jiangjie Qin
      20. KAFKA-1650.patch
        28 kB
        Jiangjie Qin

        Issue Links

          Activity

          Hide
          becket_qin Jiangjie Qin added a comment -

          Created reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Created reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          guozhang Guozhang Wang added a comment -

          Jiangjie, could you also consider fixing KAFKA-1385 in your patch?

          Show
          guozhang Guozhang Wang added a comment - Jiangjie, could you also consider fixing KAFKA-1385 in your patch?
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Joel Koshy Feel free to reassign for review.

          Show
          nehanarkhede Neha Narkhede added a comment - Joel Koshy Feel free to reassign for review.
          Hide
          becket_qin Jiangjie Qin added a comment -

          Guozhang Wang I checked KAFKA-1385 and it seems that Mirror Maker code has changed a lot since then. I checked my code and it should not have that issue.

          Show
          becket_qin Jiangjie Qin added a comment - Guozhang Wang I checked KAFKA-1385 and it seems that Mirror Maker code has changed a lot since then. I checked my code and it should not have that issue.
          Hide
          guozhang Guozhang Wang added a comment -

          Cool, in this case could you close that ticket?

          Show
          guozhang Guozhang Wang added a comment - Cool, in this case could you close that ticket?
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment - - edited

          Jun Rao Could you also help review the patch as Joel thought it is kind of a big patch of new design for mirror maker. Thanks.
          Some major changes are listed below to expedite the review:
          1. ZookeeperConsumerConnector auto commit is turned off.
          2. A Map<SourceTopicPartition, Map<TargetTopicPartition, Offset>> is used to keep track of the offsets of messages successfully sent to target cluster. The map is updated on new producer's callback.
          3. ZookeeperConsumerConnector has been modified to take a rebalance listener and take external offset map to commit offset.
          4. Mirror maker wires in a consumer rebalance listener to avoid duplicates on consumer rebalance. On consumer rebalance, before releasing the partition ownership, it will first clean the messages in data channel, wait until the messages taken by producer receives callback, commit offset and continue the rebalance.
          5. An offset commit thread will be responsible for committing offset periodically.

          Show
          becket_qin Jiangjie Qin added a comment - - edited Jun Rao Could you also help review the patch as Joel thought it is kind of a big patch of new design for mirror maker. Thanks. Some major changes are listed below to expedite the review: 1. ZookeeperConsumerConnector auto commit is turned off. 2. A Map<SourceTopicPartition, Map<TargetTopicPartition, Offset>> is used to keep track of the offsets of messages successfully sent to target cluster. The map is updated on new producer's callback. 3. ZookeeperConsumerConnector has been modified to take a rebalance listener and take external offset map to commit offset. 4. Mirror maker wires in a consumer rebalance listener to avoid duplicates on consumer rebalance. On consumer rebalance, before releasing the partition ownership, it will first clean the messages in data channel, wait until the messages taken by producer receives callback, commit offset and continue the rebalance. 5. An offset commit thread will be responsible for committing offset periodically.
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          joestein Joe Stein added a comment -

          I am getting a local failure running ./gradlew test

          kafka.server.ServerShutdownTest > testCleanShutdownAfterFailedStartup FAILED
          java.lang.NullPointerException
          at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
          at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
          at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
          at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
          at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
          at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
          at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
          at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
          at kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:141)

          kafka.server.ServerShutdownTest > testCleanShutdownWithDeleteTopicEnabled FAILED
          java.lang.NullPointerException
          at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
          at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
          at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
          at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
          at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
          at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
          at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
          at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
          at kafka.server.ServerShutdownTest.testCleanShutdownWithDeleteTopicEnabled(ServerShutdownTest.scala:114)

          kafka.server.ServerShutdownTest > testConsecutiveShutdown PASSED

          kafka.server.ServerShutdownTest > testCleanShutdown FAILED
          java.lang.NullPointerException
          at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
          at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147)
          at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114)
          at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
          at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
          at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113)
          at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108)
          at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147)
          at kafka.server.ServerShutdownTest.testCleanShutdown(ServerShutdownTest.scala:101)

          and the CI is broken too just ran another one just now to triple check https://builds.apache.org/view/All/job/Kafka-trunk/352/

          I am a bit lost on this ticket it looks like the code is committed to trunk (commit 2801629964882015a9148e1c0ade22da46376faa) but this JIRA doesn't have resolved or which fix version (and more patches after commit) and tests are failing Guozhang Wang can you take a look please (it looks like your commit)

          Show
          joestein Joe Stein added a comment - I am getting a local failure running ./gradlew test kafka.server.ServerShutdownTest > testCleanShutdownAfterFailedStartup FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdownAfterFailedStartup(ServerShutdownTest.scala:141) kafka.server.ServerShutdownTest > testCleanShutdownWithDeleteTopicEnabled FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdownWithDeleteTopicEnabled(ServerShutdownTest.scala:114) kafka.server.ServerShutdownTest > testConsecutiveShutdown PASSED kafka.server.ServerShutdownTest > testCleanShutdown FAILED java.lang.NullPointerException at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest$$anonfun$verifyNonDaemonThreadsStatus$2.apply(ServerShutdownTest.scala:147) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:114) at scala.collection.TraversableOnce$$anonfun$count$1.apply(TraversableOnce.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableOnce$class.count(TraversableOnce.scala:113) at scala.collection.mutable.ArrayOps$ofRef.count(ArrayOps.scala:108) at kafka.server.ServerShutdownTest.verifyNonDaemonThreadsStatus(ServerShutdownTest.scala:147) at kafka.server.ServerShutdownTest.testCleanShutdown(ServerShutdownTest.scala:101) and the CI is broken too just ran another one just now to triple check https://builds.apache.org/view/All/job/Kafka-trunk/352/ I am a bit lost on this ticket it looks like the code is committed to trunk (commit 2801629964882015a9148e1c0ade22da46376faa) but this JIRA doesn't have resolved or which fix version (and more patches after commit) and tests are failing Guozhang Wang can you take a look please (it looks like your commit)
          Hide
          becket_qin Jiangjie Qin added a comment -

          Joe Stein It was my bad. In the ConsumerRebalanceListenerTest in ZookeeperConsumerConnectorTest, I forgot to shutdown the ZKconsumerConnector which causes the later on test failure. This problem is fixed by KAFKA-1815. Could you help check that in.

          Show
          becket_qin Jiangjie Qin added a comment - Joe Stein It was my bad. In the ConsumerRebalanceListenerTest in ZookeeperConsumerConnectorTest, I forgot to shutdown the ZKconsumerConnector which causes the later on test failure. This problem is fixed by KAFKA-1815 . Could you help check that in.
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          becket_qin Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/25995/diff/
          against branch origin/trunk

          Show
          becket_qin Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk
          Hide
          jjkoshy Joel Koshy added a comment -

          Committed to trunk

          Show
          jjkoshy Joel Koshy added a comment - Committed to trunk
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user ottomata opened a pull request:

          https://github.com/apache/kafka/pull/1654

          MINOR: Update MirrorMaker docs to remove multiple --consumer.config options

          See:

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/ottomata/kafka mirror-maker-doc-fix

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/1654.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1654


          commit 1ecc0a6491fdfdaf7a9bfa9611d5938599fdad3f
          Author: Andrew Otto <acotto@gmail.com>
          Date: 2016-07-22T19:03:44Z

          Update MirrorMaker docs to remove multiple --consumer.config options

          See:


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user ottomata opened a pull request: https://github.com/apache/kafka/pull/1654 MINOR: Update MirrorMaker docs to remove multiple --consumer.config options See: https://issues.apache.org/jira/browse/KAFKA-1650 https://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAHwHRrUeTq_-EHXiUXdrbgHcRt-0E_t0+5kOYaF9Qy4aNVqYkA@mail.gmail.com%3E You can merge this pull request into a Git repository by running: $ git pull https://github.com/ottomata/kafka mirror-maker-doc-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1654.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1654 commit 1ecc0a6491fdfdaf7a9bfa9611d5938599fdad3f Author: Andrew Otto <acotto@gmail.com> Date: 2016-07-22T19:03:44Z Update MirrorMaker docs to remove multiple --consumer.config options See: https://issues.apache.org/jira/browse/KAFKA-1650 https://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAHwHRrUeTq_-EHXiUXdrbgHcRt-0E_t0+5kOYaF9Qy4aNVqYkA@mail.gmail.com%3E
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/1654

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/1654

            People

            • Assignee:
              becket_qin Jiangjie Qin
              Reporter:
              becket_qin Jiangjie Qin
              Reviewer:
              Joel Koshy
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development