Kafka
  1. Kafka
  2. KAFKA-966

Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: consumer
    • Labels:
      None

      Description

      Enhancement request.

      The high level consumer is very close to handling a lot of situations a 'typical' client would need. Except for when the message received from Kafka is valid, but the business logic that wants to consume it has a problem.

      For example if I want to write the value to a MongoDB or Cassandra database and the database is not available. I won't know until I go to do the write that the database isn't available, but by then it is too late to NOT read the message from Kafka. Thus if I call shutdown() to stop reading, that message is lost since the offset Kafka writes to ZooKeeper is the next offset.

      Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the next offset to read for this partition to this message when I start up again. And if there are any messages in the BlockingQueue for other partitions, find the lowest # and use it for that partitions offset since I haven't consumed them yet.

      Thus I can cleanly shutdown my processing, resolve whatever the issue is and restart the process.

      Another idea might be to allow a 'peek' into the next message and if I succeed in writing to the database call 'next' to remove it from the queue.

      I understand this won't deal with a 'kill -9' or hard failure of the JVM leading to the latest offsets not being written to ZooKeeper but it addresses a likely common scenario for consumers. Nor will it add true transactional support since the ZK update could fail.

        Activity

        Hide
        Neha Narkhede added a comment -

        Yes, it should.

        Show
        Neha Narkhede added a comment - Yes, it should.
        Hide
        Jason Rosenberg added a comment -

        Cool,

        Will this work for consumers set up to consume over a white list topic filter? Which might include an open ended number of topics/partitions?

        Jason

        Show
        Jason Rosenberg added a comment - Cool, Will this work for consumers set up to consume over a white list topic filter? Which might include an open ended number of topics/partitions? Jason
        Hide
        Neha Narkhede added a comment -

        Jason,

        You might want to check out the new proposal for the Consumer API. It allows you to invoke commitOffsets() on either particular partitions or all partitions that the consumer is consuming - https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI

        This should get you some sort of transactionality for message consumption, where you can invoke commitOffsets() only after your application has processed the messages successfully. Since commitOffsets() in the new consumer proposal will use Kafka as the offset storage mechanism, it will be fast and scalable, unlike Zookeeper offset storage.

        Show
        Neha Narkhede added a comment - Jason, You might want to check out the new proposal for the Consumer API. It allows you to invoke commitOffsets() on either particular partitions or all partitions that the consumer is consuming - https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI This should get you some sort of transactionality for message consumption, where you can invoke commitOffsets() only after your application has processed the messages successfully. Since commitOffsets() in the new consumer proposal will use Kafka as the offset storage mechanism, it will be fast and scalable, unlike Zookeeper offset storage.
        Hide
        Jason Rosenberg added a comment -

        I think a nice api would be to have an enhanced version of 'auto.commit.enable', which allows you to 'mark for commit' a message after you are done processing it. So, you'd call 'markForCommit()' once you have successfully processed a message (and this would imply of course, that you are marking all previous messages up to that offset). So, when a subsequent invocation of commitOffsets() runs, it will only commit up to the last offset 'markedForCommit'. This would apply both to auto-initiated commits (if auto.commit.enable is on) and to manually invocations of commitOffsets().

        So, this would be a change in the PartitionTopicInfo class, I'd expect. So, in addition to tracking 'fetchedOffset' and 'consumedOffset', it could track 'markedForCommitOffset' (maybe a better name )

        Then, when commitOffsets() runs, if 'reliable.auto.commit.enable' (or whatever we call it) is enabled, then it would use the 'markedForCommitOffset' rather than the 'consumedOffset' when committing.

        Thoughts?

        Show
        Jason Rosenberg added a comment - I think a nice api would be to have an enhanced version of 'auto.commit.enable', which allows you to 'mark for commit' a message after you are done processing it. So, you'd call 'markForCommit()' once you have successfully processed a message (and this would imply of course, that you are marking all previous messages up to that offset). So, when a subsequent invocation of commitOffsets() runs, it will only commit up to the last offset 'markedForCommit'. This would apply both to auto-initiated commits (if auto.commit.enable is on) and to manually invocations of commitOffsets(). So, this would be a change in the PartitionTopicInfo class, I'd expect. So, in addition to tracking 'fetchedOffset' and 'consumedOffset', it could track 'markedForCommitOffset' (maybe a better name ) Then, when commitOffsets() runs, if 'reliable.auto.commit.enable' (or whatever we call it) is enabled, then it would use the 'markedForCommitOffset' rather than the 'consumedOffset' when committing. Thoughts?
        Hide
        Joel Koshy added a comment -

        Yes if you need to implement support for transactions across partitions that are potentially owned by different consumer instances then this approach wouldn't work. Not sure if it is feasible in your case but if there are a group of messages that need to be committed together then you could send them with a key and partition those messages into the same partition. So exactly one consumer thread will be responsible for those messages.

        Show
        Joel Koshy added a comment - Yes if you need to implement support for transactions across partitions that are potentially owned by different consumer instances then this approach wouldn't work. Not sure if it is feasible in your case but if there are a group of messages that need to be committed together then you could send them with a key and partition those messages into the same partition. So exactly one consumer thread will be responsible for those messages.
        Hide
        Chris Curtin added a comment -

        Not to be dense, but wouldn't managing the offsets that way remove the ability to easily multi-thread the consumer? The commitOffsets method is on the ConsumerConnector not the KafkaStream, so to do a multi-threaded client I'd need to write logic to checkpoint all the threads to make sure they are all okay before committing back to Kafka.

        commitOffsets would also require that all the messages on all partitions succeed or be rolled back together, so a failure on one message could stop everything. In a multi-partition model where the partitions end up in different Shards, databases etc. that makes the consumer a lot more complicated.

        Show
        Chris Curtin added a comment - Not to be dense, but wouldn't managing the offsets that way remove the ability to easily multi-thread the consumer? The commitOffsets method is on the ConsumerConnector not the KafkaStream, so to do a multi-threaded client I'd need to write logic to checkpoint all the threads to make sure they are all okay before committing back to Kafka. commitOffsets would also require that all the messages on all partitions succeed or be rolled back together, so a failure on one message could stop everything. In a multi-partition model where the partitions end up in different Shards, databases etc. that makes the consumer a lot more complicated.
        Hide
        Joel Koshy added a comment -

        One way to accomplish this is to turn off autocommit and checkpoint offsets only after a message (or batch of messages) have been written to the DB.

        One caveat though is that rebalances (e.g., if a new consumer instance shows up) will result in offsets being committed so there would be an issue if the DB is unavailable and a rebalance occurs simultaneously and there are unprocessed messages that have already been pulled out of the iterator.

        Show
        Joel Koshy added a comment - One way to accomplish this is to turn off autocommit and checkpoint offsets only after a message (or batch of messages) have been written to the DB. One caveat though is that rebalances (e.g., if a new consumer instance shows up) will result in offsets being committed so there would be an issue if the DB is unavailable and a rebalance occurs simultaneously and there are unprocessed messages that have already been pulled out of the iterator.

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Chris Curtin
          • Votes:
            2 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development