Kafka
  1. Kafka
  2. KAFKA-268

Add another reconnection condition to the syncProducer: the time elapsed since last connection

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7.1
    • Component/s: core
    • Labels:
      None

      Description

      Add another reconnection condition to the syncProducer: the time elapsed since last connection. If it's larger than the pre-specified threshold, close the connection and reopen it.

        Activity

        Hide
        Jun Rao added a comment -

        Committed the improvement to trunk.

        Show
        Jun Rao added a comment - Committed the improvement to trunk.
        Hide
        Yang Ye added a comment -

        This is clearer, thanks. This part is also included in the shallow_iterator patch.

        Show
        Yang Ye added a comment - This is clearer, thanks. This part is also included in the shallow_iterator patch.
        Hide
        Jun Rao added a comment -

        How about the following?
        private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval

        Show
        Jun Rao added a comment - How about the following? private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval
        Hide
        Yang Ye added a comment -

        Sure, what about doing it like this, I added on field in the SyncProducer object, var randomGenerator = new Random()

        then change the initilaization of lastConnectionTime to:
        private var lastConnectionTime = System.currentTimeMillis - config.reconnectInterval + SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval

        Otherwise if we write
        private var lastConnectionTime = System.currentTimeMillis - config.reconnectInterval + new Random().nextDouble() * config.reconnectInterval

        or
        private var lastConnectionTime = System.currentTimeMillis - config.reconnectInterval + new Random(System.currentTimeMillis).nextDouble() * config.reconnectInterval

        we may worry about the determinism of pseudo random generator

        Show
        Yang Ye added a comment - Sure, what about doing it like this, I added on field in the SyncProducer object, var randomGenerator = new Random() then change the initilaization of lastConnectionTime to: private var lastConnectionTime = System.currentTimeMillis - config.reconnectInterval + SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval Otherwise if we write private var lastConnectionTime = System.currentTimeMillis - config.reconnectInterval + new Random().nextDouble() * config.reconnectInterval or private var lastConnectionTime = System.currentTimeMillis - config.reconnectInterval + new Random(System.currentTimeMillis).nextDouble() * config.reconnectInterval we may worry about the determinism of pseudo random generator
        Hide
        Jun Rao added a comment -

        A suggestion: It's possible that we have a number of producer clients all started at around the same time. Then they will all reconnect at almost exactly the same time. This is probably not good for load balancing. What we want is to spread the reconnects. One way to do that is to initialize lastConnectionTime to a random timestamp btw (now - reconnectTimeInterval) and now.

        Show
        Jun Rao added a comment - A suggestion: It's possible that we have a number of producer clients all started at around the same time. Then they will all reconnect at almost exactly the same time. This is probably not good for load balancing. What we want is to spread the reconnects. One way to do that is to initialize lastConnectionTime to a random timestamp btw (now - reconnectTimeInterval) and now.
        Hide
        Jun Rao added a comment -

        Thanks for the patch, Yang. Just committed this.

        Show
        Jun Rao added a comment - Thanks for the patch, Yang. Just committed this.
        Hide
        Yang Ye added a comment -

        As suggested by the first comment

        Show
        Yang Ye added a comment - As suggested by the first comment
        Hide
        Yang Ye added a comment -

        got them, nice suggestions

        Show
        Yang Ye added a comment - got them, nice suggestions
        Hide
        Jun Rao added a comment -

        Some comments:
        1. SyncProducerConfigShared: change reconnectTimeInterval to reconnectTimeIntervalMs and reconnect.timeInterval to reconnect.time.interval.ms, to be consistent with existing naming convention.
        2. There is no real change to ByteBufferMessageSet and Utils. Please revert them.
        3. The default value of reconnectTimeInterval is too small. Let's set it to 10 minutes. Also, we probably need a way to turn off time-based reconnect. How about using a negative value to indicate that? Let's also add a comment in the code to document that.

        Show
        Jun Rao added a comment - Some comments: 1. SyncProducerConfigShared: change reconnectTimeInterval to reconnectTimeIntervalMs and reconnect.timeInterval to reconnect.time.interval.ms, to be consistent with existing naming convention. 2. There is no real change to ByteBufferMessageSet and Utils. Please revert them. 3. The default value of reconnectTimeInterval is too small. Let's set it to 10 minutes. Also, we probably need a way to turn off time-based reconnect. How about using a negative value to indicate that? Let's also add a comment in the code to document that.

          People

          • Assignee:
            Unassigned
            Reporter:
            Yang Ye
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development