Kafka
  1. Kafka
  2. KAFKA-156

Messages should not be dropped when brokers are unavailable

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      When none of the broker is available, producer should spool the messages to disk and keep retrying for brokers to come back.

      This will also enable brokers upgrade/maintenance without message loss.

        Issue Links

          Activity

          Hide
          Steve Morin added a comment -

          Scott Carey - completely agree with your use-case here. The ability todo rolling restarts and have processes spill to disk when there is outages so that we can in some-case withstand hours in stead of minutes in the case of a failure of the cluster or a strange network outage.

          Show
          Steve Morin added a comment - Scott Carey - completely agree with your use-case here. The ability todo rolling restarts and have processes spill to disk when there is outages so that we can in some-case withstand hours in stead of minutes in the case of a failure of the cluster or a strange network outage.
          Hide
          Scott Carey added a comment -

          Jay –

          I agree, the duplication issue does not depend on whether there is a disk or memory queue. However, in both cases one can choose to dither duplicate messages or drop them on failures. In the in memory case, biasing it to drop a message rather than duplicate on a failure is more acceptable than the on disk case. This is because an in memory queue is more likely to suffer loss than a disk queue. For example, a producer may crash or be kill-9'd and we would expect in flight, in memory data to be lost.
          My thoughts on this issue are biased by our legacy system – each producer-equivalent would log locally and then the equivalent of the broker would 'harvest' these logs with no possible duplication. Loss is possible if the disks failed on the client, but that would take down the whole app anyway. Furthermore, we use SSD's on those servers (since late 2008!) and have not had a single SSD drive failure where data was lost (we had a couple have their performance degrade to abysmal levels, but the data was still there).
          Additionally, we are able to restart / service the nodes that collect the data without data loss because of the local spooling. Replication in Kafka will allow us to do rolling restarts of brokers and achieve similar operational utility. The need for 'spill to disk' is certainly less with replication active. However, it doesn't take us long to fill our entire memory buffer up full of messages on some of our clients – even a 10 second window of unavailability means losing messages unless we can spill to disk.

          On your proposal:

          • What happens if there is a 'bubble' in sequence ids from the broker perspective? What does the broker do? How does the client know to re-send?
          • What happens when two clients assign themselves the same id?

          Answer to question on my proposal:

          • It is assumed that the final batch commit is idempotent, so if the client fails to get the final ACK (step 4, "Acknowledge Batch Commit" it will go back to step 3 and send the batch commit message again. If it is the same broker, it can simply acknowledge since it already committed it. If it is a replica, then there are two cases:
            a) The other broker has the UUID info (which is replicated?) and can restart the process at the right point.
            b) Failover to another broker starts the process over at step 1 with the same UUID, and when the broker that crashed comes online the brokers in the replica set reconcile to remove the duplicate. There are a limited number of in flight or recently in flight batches.

          I think b will work, but I don't know enough about how a broker replica set reconciles in 0.8 when one fails. If we assume strict ordering on whether the replica or the client gets the ACK for a batch commit first, a repair process should be consistent.

          A two-phase produce doesn't have to be serial from batch to batch – a few pipelined requests could be supported, but too many could be used for DOS. A high-water-mark approach is more difficult to pipeline, but probably does not need it.

          One idea I had is far more radical. It boils down to these questions:
          Why even have a separate producer protocol at all? Why isn't the consumer protocol good enough for getting data to the brokers?

          I admit, this is tricky and I have not thought through it well; but I think it is worth sharing. The consumer protocol is highly reliable and easy to enforce once-only semantics. If there was some sort of client-initiated broker 'pull' with the consumer protocol, there might be some opportunities for overall simplification in the protocol and more sharing in the code.
          A producer would be required to assign an offset id and increment per message. The producer would trigger the broker to begin initiate a request to read all of the batches from that starting ID to the "end" , commit it, then start from the last offset to the "end", and repeat. This makes a producer like a broker – except that it wants to drop data a lot faster, and therefore needs to know how far along the broker is in pulling data down. Perhaps it can safely assume that if batch "1 to 50" was requested, and subsequently batch "51 to 100" is requested, that the request for the latter batch indicates that the first has successfully been committed, but that serializes batches and prevents pipelining. Alternatively the "1 to 50 is committed" message can ride with the "get 51 to 100" request.
          What I find useful here is the same thing that is great about the consumer protocol: putting the burden on the one obtaining the data to track progress is cleaner in the face of failure.
          This bears similarity to your proposal, but with inversion of control – the broker asks for the next batch when it is ready. If there is a broker failure, the replica can pull in messages, and duplicate removal can occur when they reconcile (and the offset id in the topic will be consistent, since both sides use offsets). Producers are then responsible for buffering up to the threshold they can tolerate, and can spool to disk if they please (perhaps re-using some broker code to do so).

          Show
          Scott Carey added a comment - Jay – I agree, the duplication issue does not depend on whether there is a disk or memory queue. However, in both cases one can choose to dither duplicate messages or drop them on failures. In the in memory case, biasing it to drop a message rather than duplicate on a failure is more acceptable than the on disk case. This is because an in memory queue is more likely to suffer loss than a disk queue. For example, a producer may crash or be kill-9'd and we would expect in flight, in memory data to be lost. My thoughts on this issue are biased by our legacy system – each producer-equivalent would log locally and then the equivalent of the broker would 'harvest' these logs with no possible duplication. Loss is possible if the disks failed on the client, but that would take down the whole app anyway. Furthermore, we use SSD's on those servers (since late 2008!) and have not had a single SSD drive failure where data was lost (we had a couple have their performance degrade to abysmal levels, but the data was still there). Additionally, we are able to restart / service the nodes that collect the data without data loss because of the local spooling. Replication in Kafka will allow us to do rolling restarts of brokers and achieve similar operational utility. The need for 'spill to disk' is certainly less with replication active. However, it doesn't take us long to fill our entire memory buffer up full of messages on some of our clients – even a 10 second window of unavailability means losing messages unless we can spill to disk. On your proposal: What happens if there is a 'bubble' in sequence ids from the broker perspective? What does the broker do? How does the client know to re-send? What happens when two clients assign themselves the same id? Answer to question on my proposal: It is assumed that the final batch commit is idempotent, so if the client fails to get the final ACK (step 4, "Acknowledge Batch Commit" it will go back to step 3 and send the batch commit message again. If it is the same broker, it can simply acknowledge since it already committed it. If it is a replica, then there are two cases: a) The other broker has the UUID info (which is replicated?) and can restart the process at the right point. b) Failover to another broker starts the process over at step 1 with the same UUID, and when the broker that crashed comes online the brokers in the replica set reconcile to remove the duplicate. There are a limited number of in flight or recently in flight batches. I think b will work, but I don't know enough about how a broker replica set reconciles in 0.8 when one fails. If we assume strict ordering on whether the replica or the client gets the ACK for a batch commit first, a repair process should be consistent. A two-phase produce doesn't have to be serial from batch to batch – a few pipelined requests could be supported, but too many could be used for DOS. A high-water-mark approach is more difficult to pipeline, but probably does not need it. One idea I had is far more radical. It boils down to these questions: Why even have a separate producer protocol at all? Why isn't the consumer protocol good enough for getting data to the brokers? I admit, this is tricky and I have not thought through it well; but I think it is worth sharing. The consumer protocol is highly reliable and easy to enforce once-only semantics. If there was some sort of client-initiated broker 'pull' with the consumer protocol, there might be some opportunities for overall simplification in the protocol and more sharing in the code. A producer would be required to assign an offset id and increment per message. The producer would trigger the broker to begin initiate a request to read all of the batches from that starting ID to the "end" , commit it, then start from the last offset to the "end", and repeat. This makes a producer like a broker – except that it wants to drop data a lot faster, and therefore needs to know how far along the broker is in pulling data down. Perhaps it can safely assume that if batch "1 to 50" was requested, and subsequently batch "51 to 100" is requested, that the request for the latter batch indicates that the first has successfully been committed, but that serializes batches and prevents pipelining. Alternatively the "1 to 50 is committed" message can ride with the "get 51 to 100" request. What I find useful here is the same thing that is great about the consumer protocol: putting the burden on the one obtaining the data to track progress is cleaner in the face of failure. This bears similarity to your proposal, but with inversion of control – the broker asks for the next batch when it is ready. If there is a broker failure, the replica can pull in messages, and duplicate removal can occur when they reconcile (and the offset id in the topic will be consistent, since both sides use offsets). Producers are then responsible for buffering up to the threshold they can tolerate, and can spool to disk if they please (perhaps re-using some broker code to do so).
          Hide
          Jay Kreps added a comment -

          Scott--I am interested in implementing a deduplication scheme similar to what you propose. I think this would have several uses. This would definitely be post 0.8.

          I do think we are conflating the storage location (disk versus memory) with deduplication/commit mechanism. I claim the scheme you propose just avoids duplicates and is unrelated to writing data to disk.

          I have to say I am a little skeptical of the "fall back to disk thing". In our usage we have many thousands of servers and a small number of kafka servers with nice disks--I think this is fairly standard. The idea that involving thousands of crappy local disks in the data pipeline will decrease the empirical frequency of data loss seems dubious to me.

          But regardless of whether you buffer on disk or buffer in memory (as the client currently does). As long as the client has sufficient space to buffer until the server is available again there is no data loss. And indeed the replication fail-over is very fast so this really does work. As you point out, though that does lead to the possibility of duplicate messages. Which is where you proposal comes in.

          I had thought of a similar thing. Here was my idea:
          1. Client provides a unique instance id for itself.
          2. Each message contains the instance id and a per-client sequence number
          3. Broker maintains a per-client highwater mark on the sequence number, periodically checkpointed to disk
          4. In the event of a hard crash the broker rebuilds the highwater marks from the last checkpoint and the log
          5. Broker discards any request containing a message from a client that has a sequence number less than or equal to the high-water mark.

          The advantage of this approach would be that it doesn't require a multi-phase produce, the disadvantage is that it requires assigning client ids.

          One question about your proposal. Let's say that the broker fails before sending the "Acknowledge Batch Commit", ownership of that partition fails over to another broker but that broker but the client doesn't know if the transaction was committed (and the broker died just before sending the ack) or was not committed. How can the producer then send to the other broker which won't have the same UUID info?

          Show
          Jay Kreps added a comment - Scott--I am interested in implementing a deduplication scheme similar to what you propose. I think this would have several uses. This would definitely be post 0.8. I do think we are conflating the storage location (disk versus memory) with deduplication/commit mechanism. I claim the scheme you propose just avoids duplicates and is unrelated to writing data to disk. I have to say I am a little skeptical of the "fall back to disk thing". In our usage we have many thousands of servers and a small number of kafka servers with nice disks--I think this is fairly standard. The idea that involving thousands of crappy local disks in the data pipeline will decrease the empirical frequency of data loss seems dubious to me. But regardless of whether you buffer on disk or buffer in memory (as the client currently does). As long as the client has sufficient space to buffer until the server is available again there is no data loss. And indeed the replication fail-over is very fast so this really does work. As you point out, though that does lead to the possibility of duplicate messages. Which is where you proposal comes in. I had thought of a similar thing. Here was my idea: 1. Client provides a unique instance id for itself. 2. Each message contains the instance id and a per-client sequence number 3. Broker maintains a per-client highwater mark on the sequence number, periodically checkpointed to disk 4. In the event of a hard crash the broker rebuilds the highwater marks from the last checkpoint and the log 5. Broker discards any request containing a message from a client that has a sequence number less than or equal to the high-water mark. The advantage of this approach would be that it doesn't require a multi-phase produce, the disadvantage is that it requires assigning client ids. One question about your proposal. Let's say that the broker fails before sending the "Acknowledge Batch Commit", ownership of that partition fails over to another broker but that broker but the client doesn't know if the transaction was committed (and the broker died just before sending the ack) or was not committed. How can the producer then send to the other broker which won't have the same UUID info?
          Hide
          Rob Withers added a comment - - edited

          is there any code for this or the SyncProducer available? I like the idea of just having a log to write and read from. Perhaps try and send an event to a Broker manager. Are there any projects with replicated broker/consumer/producer management out there?

          thanks,
          rob

          Show
          Rob Withers added a comment - - edited is there any code for this or the SyncProducer available? I like the idea of just having a log to write and read from. Perhaps try and send an event to a Broker manager. Are there any projects with replicated broker/consumer/producer management out there? thanks, rob
          Hide
          Scott Carey added a comment -

          To support lossless transmission, the producer protocol will need to change to have a two-phase exchange.

          Rather than sending a message batch, and receiving a response with the offset of the first message as described: https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-ProduceAPI as the Producer Request and Producer Response, the process would have two exchanges per batch:

          Option 1: Broker assigns batch ids as the batches come in. Drawback: broker must track UUIDs and hold state on them for potentially a long time.

          1. "Batch Prepare": Producer send batch to broker
          2. "Acknowledge Batch Prepare": Broker commits batch to staging area, and assigns a UUID (or similar unique id) to the batch, and returns the UUID to the producer
          3. "Batch Commit": Producer sends message to broker to commit the batch, with the UUID token to identify the batch;
          4. "Acknowledge Batch Commit": Broker commits batch from staging area to topic atomically (or idempotently and non-atomically), and returns an acknowledgement with the offset

          If the producer crashes or loses connection between steps 1 and 2 or 2 and 3 (or the network breaks, and restores), it can send the batch again, get a new UIUD, and start over, orphaning the first batch. The client needs to be able to clear out orphaned batches it created, or they must expire after a long time.
          If the producer crashes or has network issues between steps 3 and 4, then upon restore it will attempt step 3 again, which is idempotent and safe. The broker has to keep the in flight UUIDs and used UUIDs for a while because a client may have some large time lag in recovery between a failed step 3 to 3 exchange, and step 3 and 4 may occur multiple times as a result.

          Option 2: Pre-assigned batch ids. Benefit: failure between steps 1 and 3 does not orphan a batch.
          0a. "Request Batch IDs": Producer requests a set of batch ids that are unique for use
          0b. "Receive Batch IDs": Broker returns UUIDs for use as batch ids later.
          1. "Batch Prepare": Producer send batch to broker with one of the UUIDs.
          2. "Acknowledge Batch Prepare": Broker commits batch to staging area tagged with the UUID
          3. "Batch Commit": Producer sends message to broker to commit the batch, with the UUID token to identify the batch;
          4. "Acknowledge Batch Commit": Broker commits batch from staging area to topic atomically (or idempotently and non-atomically), and returns an acknowledgement with the offset

          If the producer crashes or loses connection between steps 1 and 2 or 2 and 3, it can attempt step 3, optimistically assuming the broker got the batch and has been staged. If it did not and step 3 fails, it can start over with step 1 using the same UUID.
          If the producer crashes or loses connection between steps 3 and 4, then upon restore it will attempt step 3 again, which is idempotent and safe. If step 3 fails it can assume the batch has already been committed. The broker has to track in flight UUIDs and recently committed UUIDs and the corresponding offsets for a while because a client may have some large time lag in recovery after a failure between steps 3 and 4, and steps 3 and 4 may occur more than once for a given batch.

          If it is tolerable to lose or duplicate up to one batch per failure (network, consumer, or broker), none of the above is required, and the described protocol is sufficient. Since there is always the possibility of message loss if the producer crashes, this may be acceptable, however it would be nice to not worry about data loss due to broker or network failure, and leave the only loss window at the producer side.

          With two phase batches, a producer can safely spool data to disk in the event of serious error, and recover, or spool to disk in all conditions prior to sending downstream to the broker. Each batch from disk would be tagged with a UUID and a log can be kept on what the current state is relative to the four steps above so that recovery can initiate at the right spot and no batches missed or duplicated.

          Show
          Scott Carey added a comment - To support lossless transmission, the producer protocol will need to change to have a two-phase exchange. Rather than sending a message batch, and receiving a response with the offset of the first message as described: https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-ProduceAPI as the Producer Request and Producer Response, the process would have two exchanges per batch: Option 1: Broker assigns batch ids as the batches come in. Drawback: broker must track UUIDs and hold state on them for potentially a long time. 1. "Batch Prepare": Producer send batch to broker 2. "Acknowledge Batch Prepare": Broker commits batch to staging area, and assigns a UUID (or similar unique id) to the batch, and returns the UUID to the producer 3. "Batch Commit": Producer sends message to broker to commit the batch, with the UUID token to identify the batch; 4. "Acknowledge Batch Commit": Broker commits batch from staging area to topic atomically (or idempotently and non-atomically), and returns an acknowledgement with the offset If the producer crashes or loses connection between steps 1 and 2 or 2 and 3 (or the network breaks, and restores), it can send the batch again, get a new UIUD, and start over, orphaning the first batch. The client needs to be able to clear out orphaned batches it created, or they must expire after a long time. If the producer crashes or has network issues between steps 3 and 4, then upon restore it will attempt step 3 again, which is idempotent and safe. The broker has to keep the in flight UUIDs and used UUIDs for a while because a client may have some large time lag in recovery between a failed step 3 to 3 exchange, and step 3 and 4 may occur multiple times as a result. Option 2: Pre-assigned batch ids. Benefit: failure between steps 1 and 3 does not orphan a batch. 0a. "Request Batch IDs": Producer requests a set of batch ids that are unique for use 0b. "Receive Batch IDs": Broker returns UUIDs for use as batch ids later. 1. "Batch Prepare": Producer send batch to broker with one of the UUIDs. 2. "Acknowledge Batch Prepare": Broker commits batch to staging area tagged with the UUID 3. "Batch Commit": Producer sends message to broker to commit the batch, with the UUID token to identify the batch; 4. "Acknowledge Batch Commit": Broker commits batch from staging area to topic atomically (or idempotently and non-atomically), and returns an acknowledgement with the offset If the producer crashes or loses connection between steps 1 and 2 or 2 and 3, it can attempt step 3, optimistically assuming the broker got the batch and has been staged. If it did not and step 3 fails, it can start over with step 1 using the same UUID. If the producer crashes or loses connection between steps 3 and 4, then upon restore it will attempt step 3 again, which is idempotent and safe. If step 3 fails it can assume the batch has already been committed. The broker has to track in flight UUIDs and recently committed UUIDs and the corresponding offsets for a while because a client may have some large time lag in recovery after a failure between steps 3 and 4, and steps 3 and 4 may occur more than once for a given batch. If it is tolerable to lose or duplicate up to one batch per failure (network, consumer, or broker), none of the above is required, and the described protocol is sufficient. Since there is always the possibility of message loss if the producer crashes, this may be acceptable, however it would be nice to not worry about data loss due to broker or network failure, and leave the only loss window at the producer side. With two phase batches, a producer can safely spool data to disk in the event of serious error, and recover, or spool to disk in all conditions prior to sending downstream to the broker. Each batch from disk would be tagged with a UUID and a log can be kept on what the current state is relative to the four steps above so that recovery can initiate at the right spot and no batches missed or duplicated.
          Hide
          Scott Carey added a comment -

          I am positive that the producer wire protocol has to have built-in features to support the ability to prevent dropped messages when brokers are unavailable. There is no way to achieve 'optimal transmission' without two-phase commit or idempotence between the producer and broker. I define 'optimal transmission' as the guarantee that data is not duplicated or lost after some well known point has been reached as viewed by the producer. Prior to this point (for example, when the message is in a in memory queue), there can be no guarantees from any system.

          "FWIW Clearspring has a pipeline with: ConcurrentQueue --> spill to disk queue with max size (then drops messages) --> SyncProducer with retry/backoff. "

          Such a system can get as close as only losing or duplicating one 'batch' of messages, where that batch size is >= 1 message. At best, when reading form the data spilled from disk, between sending a batch and recieving acknowledgement, a crash at either end will leave that batch in limbo. The batch needs an identifier that both sides can persist or generate to identify the batch in case one side has to recover from a crash.(two phase commit). Many database systems have this (see http://www.postgresql.org/docs/9.2/static/sql-prepare-transaction.html), where as a client you can name a transaction so that after you get an acknowledgement from the prepare commit, the client can log that it has been prepared, send the commit command, and if it crashes before getting the acknowledgement, upon recovery it can look up the identifier for the in flight commit, and check with the system to see if it succeeded or not.

          We have an internal system that we are attempting to replace Kafka with, but it does not guarantee delivery as we do. We spool data on our producers into batches (a file per batch), and then transfer these batches into the downstream system. This system stores these batches in a staging area, so that if either side crashes before the batch transfer completes recovery is simple. Upon validating that the batch (which is uniquely named) is identical on both sides, the producer can remove it locally and promote from the staging area to the completed area (atomically). This again is safe if either side crashes, since an item in the staging area that does not exist on the producer indicates it has successfully been moved.

          Kafka will have to mimic this sort of safety at each stage. On the consumer side, batch offsets + partition and topic information serve as unique identifiers for a batch that allow only-once semantics. On the producer side, is there something equivalent?

          Replication mitigates the problem significantly, but there is still the possibility that an item is dropped or duplicated if there is a transient network issue that TCP/IP does not handle, if the broker does not hand out unique batch ids for each batch (I am unsure of this).

          If messages are spooled to disk when a broker is unavailable, the process of reading back items from that log and sending them to the broker without loss or duplication is tricky. Each batch of messages will need an identifier shared between the broker and producer, and the batch will need to be marked with the identifier safely to disk prior to sending the batch to the broker. After acknowledgement the producer can delete the batch or mark it complete. If it crashes between sending the batch to the broker and receiving a response (or otherwise fails to get acknowledgement) it must be able to ask the broker whether the batch with the given identifier was received, or alternatively, it can send the batch twice and the broker will ignore the duplicate send based on the identifier.

          Does the producer wire protocol include batch ids generated by the broker so that this can be implemented? It does not seem to be the case here https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-ProduceAPI
          This protocol does not seem to support the ability to support "only once" message semantics.

          Show
          Scott Carey added a comment - I am positive that the producer wire protocol has to have built-in features to support the ability to prevent dropped messages when brokers are unavailable. There is no way to achieve 'optimal transmission' without two-phase commit or idempotence between the producer and broker. I define 'optimal transmission' as the guarantee that data is not duplicated or lost after some well known point has been reached as viewed by the producer. Prior to this point (for example, when the message is in a in memory queue), there can be no guarantees from any system. "FWIW Clearspring has a pipeline with: ConcurrentQueue --> spill to disk queue with max size (then drops messages) --> SyncProducer with retry/backoff. " Such a system can get as close as only losing or duplicating one 'batch' of messages, where that batch size is >= 1 message. At best, when reading form the data spilled from disk, between sending a batch and recieving acknowledgement, a crash at either end will leave that batch in limbo. The batch needs an identifier that both sides can persist or generate to identify the batch in case one side has to recover from a crash.(two phase commit). Many database systems have this (see http://www.postgresql.org/docs/9.2/static/sql-prepare-transaction.html ), where as a client you can name a transaction so that after you get an acknowledgement from the prepare commit, the client can log that it has been prepared, send the commit command, and if it crashes before getting the acknowledgement, upon recovery it can look up the identifier for the in flight commit, and check with the system to see if it succeeded or not. We have an internal system that we are attempting to replace Kafka with, but it does not guarantee delivery as we do. We spool data on our producers into batches (a file per batch), and then transfer these batches into the downstream system. This system stores these batches in a staging area, so that if either side crashes before the batch transfer completes recovery is simple. Upon validating that the batch (which is uniquely named) is identical on both sides, the producer can remove it locally and promote from the staging area to the completed area (atomically). This again is safe if either side crashes, since an item in the staging area that does not exist on the producer indicates it has successfully been moved. Kafka will have to mimic this sort of safety at each stage. On the consumer side, batch offsets + partition and topic information serve as unique identifiers for a batch that allow only-once semantics. On the producer side, is there something equivalent? Replication mitigates the problem significantly, but there is still the possibility that an item is dropped or duplicated if there is a transient network issue that TCP/IP does not handle, if the broker does not hand out unique batch ids for each batch (I am unsure of this). If messages are spooled to disk when a broker is unavailable, the process of reading back items from that log and sending them to the broker without loss or duplication is tricky. Each batch of messages will need an identifier shared between the broker and producer, and the batch will need to be marked with the identifier safely to disk prior to sending the batch to the broker. After acknowledgement the producer can delete the batch or mark it complete. If it crashes between sending the batch to the broker and receiving a response (or otherwise fails to get acknowledgement) it must be able to ask the broker whether the batch with the given identifier was received, or alternatively, it can send the batch twice and the broker will ignore the duplicate send based on the identifier. Does the producer wire protocol include batch ids generated by the broker so that this can be implemented? It does not seem to be the case here https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-ProduceAPI This protocol does not seem to support the ability to support "only once" message semantics.
          Hide
          Jun Rao added a comment -

          Yes, this is likely a post 0.8 item.

          Show
          Jun Rao added a comment - Yes, this is likely a post 0.8 item.
          Hide
          Matan Safriel added a comment - - edited

          I was led to believe this is most probably not planned for 0.8 now.
          Just to comment that currently in 0.8 a message set asynchronously sent may partially fail, in the sense that only some of its destinations (determined by topic & partition) will fail while others succeed. That seems to mean a 'one log spool' for all messages (if all messages are spooled before attempting to send them) requires non-sequential management of such a log, or overheads over-proportional to the proportion of failed messages. Perhaps a log per topic would simplify the algorithm but then there's partitions as well...

          Show
          Matan Safriel added a comment - - edited I was led to believe this is most probably not planned for 0.8 now. Just to comment that currently in 0.8 a message set asynchronously sent may partially fail, in the sense that only some of its destinations (determined by topic & partition) will fail while others succeed. That seems to mean a 'one log spool' for all messages (if all messages are spooled before attempting to send them) requires non-sequential management of such a log, or overheads over-proportional to the proportion of failed messages. Perhaps a log per topic would simplify the algorithm but then there's partitions as well...
          Hide
          David Almroth added a comment -

          This is feature is very important for me. I am intrested in migrating from Scribe to Kafka and this feature is the only thing keeping me from starting now.

          It looks like isse https://issues.apache.org/jira/browse/KAFKA-789 is a duplicate of this.

          Show
          David Almroth added a comment - This is feature is very important for me. I am intrested in migrating from Scribe to Kafka and this feature is the only thing keeping me from starting now. It looks like isse https://issues.apache.org/jira/browse/KAFKA-789 is a duplicate of this.
          Hide
          Jay Kreps added a comment -

          I agree it is a good feature! It overlaps somewhat with broker replication which makes the partitions themselves more highly available, but they solve slightly different problems. The weakness of the spooling is that you can lose the spooled data if you lose the client filesystem, but it has the advantage of lower resource requirements overall, I think. It would be good to support both. I think the next step would be to have a more detailed design of how this would impact the producer code.

          Show
          Jay Kreps added a comment - I agree it is a good feature! It overlaps somewhat with broker replication which makes the partitions themselves more highly available, but they solve slightly different problems. The weakness of the spooling is that you can lose the spooled data if you lose the client filesystem, but it has the advantage of lower resource requirements overall, I think. It would be good to support both. I think the next step would be to have a more detailed design of how this would impact the producer code.
          Hide
          Sharad Agarwal added a comment -

          because managing disk space on all machines is something we would like to avoid.

          true. but I assume that disk buffer will be bounded which will keep things simple to manage the disk space.

          IMO this feature is critical to have critical data pipelines to move to kafka. Without this:

          • Incase of temporary network glitches messages would be lost
          • Rolling upgrades is required which may not be that easy operability wise. Otherwise we can simply restart the whole cluster.

          The other systems (flume, scribe) do manage this via producer/agent side spooling.

          If we spool, when the broker comes back, should we deliver the spool messages first, or deliver them in parallel with new incoming messages?

          We should always deliver the messages in the order of being produced. So it should always be from the spool first.

          Show
          Sharad Agarwal added a comment - because managing disk space on all machines is something we would like to avoid. true. but I assume that disk buffer will be bounded which will keep things simple to manage the disk space. IMO this feature is critical to have critical data pipelines to move to kafka. Without this: Incase of temporary network glitches messages would be lost Rolling upgrades is required which may not be that easy operability wise. Otherwise we can simply restart the whole cluster. The other systems (flume, scribe) do manage this via producer/agent side spooling. If we spool, when the broker comes back, should we deliver the spool messages first, or deliver them in parallel with new incoming messages? We should always deliver the messages in the order of being produced. So it should always be from the spool first.
          Hide
          Chris Burroughs added a comment -

          Perhaps the producers themselves should just take some sort of MessageSendErrorHandler. And implementations could queue, retry with backoff, writ to disk, drop but log, etc.

          FWIW Clearspring has a pipeline with: ConcurrentQueue --> spill to disk queue with max size (then drops messages) --> SyncProducer with retry/backoff.

          Show
          Chris Burroughs added a comment - Perhaps the producers themselves should just take some sort of MessageSendErrorHandler. And implementations could queue, retry with backoff, writ to disk, drop but log, etc. FWIW Clearspring has a pipeline with: ConcurrentQueue --> spill to disk queue with max size (then drops messages) --> SyncProducer with retry/backoff.
          Hide
          Jay Kreps added a comment -

          To clarify what I was saying, the single log would likely have the message content as well as the key (if any), and the topic...essentially all the metadata about the send. I think there is no reason to keep more than one log since this is effectively just the "unsent messages" log.

          Show
          Jay Kreps added a comment - To clarify what I was saying, the single log would likely have the message content as well as the key (if any), and the topic...essentially all the metadata about the send. I think there is no reason to keep more than one log since this is effectively just the "unsent messages" log.
          Hide
          Jay Kreps added a comment -

          This would be a good feature to have. I think we will probably not use it at LinkedIn, because managing disk space on all machines is something we would like to avoid. I think likely we will just use replication in cases where we want to guarantee delivery. However many people have asked for this.

          There are a lot of implementation details to work out. I think the sanest thing to do would be to use the Log.scala class to create a single log for the client. In the case of the async producer, it should really log the event to this log immediately and background thread should pull from this log instead of using an in-memory blocking queue. This would "guarantee" delivery (assuming nothing happens to the filesystem of the sender machine). The sync producer could probably just log events that fail? That kind of changes the semantics though. It would be good to see a concrete proposal of how all this would plug together, what guarantees it would give, etc.

          Show
          Jay Kreps added a comment - This would be a good feature to have. I think we will probably not use it at LinkedIn, because managing disk space on all machines is something we would like to avoid. I think likely we will just use replication in cases where we want to guarantee delivery. However many people have asked for this. There are a lot of implementation details to work out. I think the sanest thing to do would be to use the Log.scala class to create a single log for the client. In the case of the async producer, it should really log the event to this log immediately and background thread should pull from this log instead of using an in-memory blocking queue. This would "guarantee" delivery (assuming nothing happens to the filesystem of the sender machine). The sync producer could probably just log events that fail? That kind of changes the semantics though. It would be good to see a concrete proposal of how all this would plug together, what guarantees it would give, etc.
          Hide
          Jun Rao added a comment -

          If we spool, when the broker comes back, should we deliver the spool messages first, or deliver them in parallel with new incoming messages?

          Show
          Jun Rao added a comment - If we spool, when the broker comes back, should we deliver the spool messages first, or deliver them in parallel with new incoming messages?
          Hide
          Chris Burroughs added a comment -

          Sorry, wrong "we". I meant this is what the company I work for does when calling SyncProducer.

          Show
          Chris Burroughs added a comment - Sorry, wrong "we". I meant this is what the company I work for does when calling SyncProducer.
          Hide
          Neha Narkhede added a comment -

          Chris,

          We currently don't do this with any Producer in Kafka. The scenario that Sharad has specified is when no broker is available. Today, in Kafka, the producers would just throw ConnectionRefused/NoBrokerPartitionsAvailable exceptions back to the caller, instead of spooling data to disk and then retrying later.

          Show
          Neha Narkhede added a comment - Chris, We currently don't do this with any Producer in Kafka. The scenario that Sharad has specified is when no broker is available. Today, in Kafka, the producers would just throw ConnectionRefused/NoBrokerPartitionsAvailable exceptions back to the caller, instead of spooling data to disk and then retrying later.
          Hide
          Chris Burroughs added a comment -

          This is more or less what we do with SyncProducer . But I'm not sure there is one right choice for durability, shedding load, exponential vs linear backoff, etc.

          Show
          Chris Burroughs added a comment - This is more or less what we do with SyncProducer . But I'm not sure there is one right choice for durability, shedding load, exponential vs linear backoff, etc.

            People

            • Assignee:
              Unassigned
              Reporter:
              Sharad Agarwal
            • Votes:
              11 Vote for this issue
              Watchers:
              27 Start watching this issue

              Dates

              • Created:
                Updated:

                Development