Kafka
  1. Kafka
  2. KAFKA-1282

Disconnect idle socket connection in Selector

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.2.0
    • Fix Version/s: 0.9.0.0
    • Component/s: producer
    • Labels:

      Description

      To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time.

        Issue Links

          Activity

          Hide
          Nicolae Marasoiu added a comment -

          Right, the limitation is more critical on the client side of a client-server connection due to port count limitation, and/or socket/file count restrictions of the client env.

          On the other hand, the brokers could close the connections too on such condition, rather than relying on the clients(producers) to protect it.

          However, what is any other reason to reduce the socket connections count? To make the NIO select lighter on the server, on a lesser number of connections? I think epoll is quite relaxed on this.

          I would like to work on this, but also understand the original problem(s) / concern(s) to see if we can also see any more suitable solutions to the particular concern?

          Show
          Nicolae Marasoiu added a comment - Right, the limitation is more critical on the client side of a client-server connection due to port count limitation, and/or socket/file count restrictions of the client env. On the other hand, the brokers could close the connections too on such condition, rather than relying on the clients(producers) to protect it. However, what is any other reason to reduce the socket connections count? To make the NIO select lighter on the server, on a lesser number of connections? I think epoll is quite relaxed on this. I would like to work on this, but also understand the original problem(s) / concern(s) to see if we can also see any more suitable solutions to the particular concern?
          Hide
          Jay Kreps added a comment -

          The goal is just to reduce server connection count. In our environment there might be a single Kafka producer in each process we run publishing to a small Kafka cluster (say ~20 servers). However there are tens of thousands of client processes. Connections can end up going unused when leadership migrates and we should eventually close these out rather than retaining them indefinitely.

          As you say it is not critical as the server seems to do a good job of dealing with high connection counts, but it seems like a good thing to do.

          I agree that doing this on the server might be better. This does mean it is possible that the server will attempt to close the socket while the client is attempting to send something. But if the timeout is 10 mins, it is unlikely that this will happen often (i.e. if nothing was sent in the last 10 mins, it will not likely happen in the 0.5 ms it takes to do the close). The advantage of doing it on the server is that it will work for all clients.

          This change would be in core/.../kafka/network/SocketServer.scala.

          The only gotcha is that we likely need to avoid iterating over all connections to avoid latency impact (there could be 100k connections). One way to do this would be to use java.util.LinkedHashMap to implement an LRU hash map of the SelectionKeys, and access this every time the selection key comes up in a select operation. (There are a ton of details in LinkedHashMap--needs to be "access order", etc). Then every 5-10 select loop iterations we would iterate the map expiring connections until we come to a connection that doesn't need expiring, then stop.

          Show
          Jay Kreps added a comment - The goal is just to reduce server connection count. In our environment there might be a single Kafka producer in each process we run publishing to a small Kafka cluster (say ~20 servers). However there are tens of thousands of client processes. Connections can end up going unused when leadership migrates and we should eventually close these out rather than retaining them indefinitely. As you say it is not critical as the server seems to do a good job of dealing with high connection counts, but it seems like a good thing to do. I agree that doing this on the server might be better. This does mean it is possible that the server will attempt to close the socket while the client is attempting to send something. But if the timeout is 10 mins, it is unlikely that this will happen often (i.e. if nothing was sent in the last 10 mins, it will not likely happen in the 0.5 ms it takes to do the close). The advantage of doing it on the server is that it will work for all clients. This change would be in core/.../kafka/network/SocketServer.scala. The only gotcha is that we likely need to avoid iterating over all connections to avoid latency impact (there could be 100k connections). One way to do this would be to use java.util.LinkedHashMap to implement an LRU hash map of the SelectionKeys, and access this every time the selection key comes up in a select operation. (There are a ton of details in LinkedHashMap--needs to be "access order", etc). Then every 5-10 select loop iterations we would iterate the map expiring connections until we come to a connection that doesn't need expiring, then stop.
          Hide
          Nicolae Marasoiu added a comment -

          Beautiful, I can't wait to work this out, so I take this to code right?

          Show
          Nicolae Marasoiu added a comment - Beautiful, I can't wait to work this out, so I take this to code right?
          Hide
          Nicolae Marasoiu added a comment -

          Jun Rao You agree with the approach, do you?

          Show
          Nicolae Marasoiu added a comment - Jun Rao You agree with the approach, do you?
          Hide
          Jun Rao added a comment -

          Yes. Thanks for picking it up.

          Show
          Jun Rao added a comment - Yes. Thanks for picking it up.
          Hide
          Neha Narkhede added a comment -

          Hey Nicolae Marasoiu, are you actively working on this patch yet? If not, do you mind if we have someone else pick it up?

          Show
          Neha Narkhede added a comment - Hey Nicolae Marasoiu , are you actively working on this patch yet? If not, do you mind if we have someone else pick it up?
          Hide
          nicu marasoiu added a comment - - edited

          Hi,

          I will spend up to 4 hours per day the next week (11-15 august), when I have this time.
          So I would like to keep this nice task.
          My estimate, I will have a first working solution to put up for review in ~3 days, so Thursday.

          Does that sound good?

          Show
          nicu marasoiu added a comment - - edited Hi, I will spend up to 4 hours per day the next week (11-15 august), when I have this time. So I would like to keep this nice task. My estimate, I will have a first working solution to put up for review in ~3 days, so Thursday. Does that sound good?
          Hide
          nicu marasoiu added a comment -

          I attached a first version of the patch.
          I am still thinking on any other implications, but wanted to share a first draft to collect some feedback already.
          Thanks

          Show
          nicu marasoiu added a comment - I attached a first version of the patch. I am still thinking on any other implications, but wanted to share a first draft to collect some feedback already. Thanks
          Hide
          Neha Narkhede added a comment -

          Thanks for picking this up nicu marasoiu. Assigning to myself for review.

          Show
          Neha Narkhede added a comment - Thanks for picking this up nicu marasoiu . Assigning to myself for review.
          Hide
          Neha Narkhede added a comment -

          Took a look at the patch. How about the following -
          1. Limit the LRU cache size to the number of active connections that should be supported by the Kafka server. I'm guessing this should be a config.
          2. Override removeEldestEntry to evict the oldest entry if the cache size exceeds the configured number of LRU connections.

          That way, we don't have to traverse the map several times in the main loop, which can be expensive.

          Show
          Neha Narkhede added a comment - Took a look at the patch. How about the following - 1. Limit the LRU cache size to the number of active connections that should be supported by the Kafka server. I'm guessing this should be a config. 2. Override removeEldestEntry to evict the oldest entry if the cache size exceeds the configured number of LRU connections. That way, we don't have to traverse the map several times in the main loop, which can be expensive.
          Hide
          nicu marasoiu added a comment - - edited

          Traversing is quite cheap (it is traversing a linked list underneath, and only a prefix of it) and can be done every 1000 selects.
          The intent of your suggestion is to optimize, I understand, but the effects is a different behavior as I feel it (changes the expiration by time and switches it to an expiration by connection count), and to a low performance benefit (I think traversing is much cheaper than blocking close on each channel, that would happen either way).
          The idea of limited connection count can be used complementary to the existing traversing, but if you mean to take out the traversing every n selects, that changes the expiration by time and switches it to an expiration by connection count - is it an agreed requirements change with Jun Rao? I must warn that it is dangerous in my view to configure a maximum connection count per broker, because in event many brokers go down, and many clients need to use the system, this connection thrashing would not help anybody, and be a worse effect than not having this connection expiration at all, in such a scenario, relevant to a highly available system.

          Show
          nicu marasoiu added a comment - - edited Traversing is quite cheap (it is traversing a linked list underneath, and only a prefix of it) and can be done every 1000 selects. The intent of your suggestion is to optimize, I understand, but the effects is a different behavior as I feel it (changes the expiration by time and switches it to an expiration by connection count), and to a low performance benefit (I think traversing is much cheaper than blocking close on each channel, that would happen either way). The idea of limited connection count can be used complementary to the existing traversing, but if you mean to take out the traversing every n selects, that changes the expiration by time and switches it to an expiration by connection count - is it an agreed requirements change with Jun Rao ? I must warn that it is dangerous in my view to configure a maximum connection count per broker, because in event many brokers go down, and many clients need to use the system, this connection thrashing would not help anybody, and be a worse effect than not having this connection expiration at all, in such a scenario, relevant to a highly available system.
          Hide
          nicu marasoiu added a comment -

          To make the ~O(1) cost of "traversing" more clear, typically only the first element in the linked list is accessed, and it will typically be used in the last 10 minutes, and in this case nothing happens anymore. Of course, this is if the low volume topics do not generate many connections, which they won't, with this cleaning up in place. And I am checking now that map() and the rest are lazy, or else for sure I can make so that only the relevant "prefix/first" part of the collection is iterated, typically first element only.

          Show
          nicu marasoiu added a comment - To make the ~O(1) cost of "traversing" more clear, typically only the first element in the linked list is accessed, and it will typically be used in the last 10 minutes, and in this case nothing happens anymore. Of course, this is if the low volume topics do not generate many connections, which they won't, with this cleaning up in place. And I am checking now that map() and the rest are lazy, or else for sure I can make so that only the relevant "prefix/first" part of the collection is iterated, typically first element only.
          Hide
          Neha Narkhede added a comment -

          My suggestion was not just to address the performance concern which is somewhat of an issue nevertheless. The motivation was that there is an upper bound on the number of open connections you can support on the broker. That number is the # of open file handles configured on the box. Since that number is known anyway, you probably would want to configure your server so that the connections never exceed a certain percentage of that upper limit. Currently, if the server runs out of open file handles, it effectively stays alive, but is unable to serve any data and becomes a 'zombie'.

          But a downside of the expiration based on the connection count is that it doesn't necessarily achieve the goal of expiring really old connections. Instead it tries to solve the problem of preventing the broker from running out of available file handles, in which case we probably need a fairer strategy for expiring connections.

          Thinking more, I think it might be sufficient to override removeEldestEntry and check if the oldest entry is older than the threshold and let the map remove it. If the oldest entry is not above the threshold, traversing the map doesn't buy you anything. The downside is that if no new activity takes place on any of the connections all of a sudden, the server wouldn't proactively drop all connections, which is less of a concern.

          The advantage is that you will still get the same benefit of expiring older connections and it removes the need to traverse.

          Show
          Neha Narkhede added a comment - My suggestion was not just to address the performance concern which is somewhat of an issue nevertheless. The motivation was that there is an upper bound on the number of open connections you can support on the broker. That number is the # of open file handles configured on the box. Since that number is known anyway, you probably would want to configure your server so that the connections never exceed a certain percentage of that upper limit. Currently, if the server runs out of open file handles, it effectively stays alive, but is unable to serve any data and becomes a 'zombie'. But a downside of the expiration based on the connection count is that it doesn't necessarily achieve the goal of expiring really old connections. Instead it tries to solve the problem of preventing the broker from running out of available file handles, in which case we probably need a fairer strategy for expiring connections. Thinking more, I think it might be sufficient to override removeEldestEntry and check if the oldest entry is older than the threshold and let the map remove it. If the oldest entry is not above the threshold, traversing the map doesn't buy you anything. The downside is that if no new activity takes place on any of the connections all of a sudden, the server wouldn't proactively drop all connections, which is less of a concern. The advantage is that you will still get the same benefit of expiring older connections and it removes the need to traverse.
          Hide
          Nicolae Marasoiu added a comment - - edited

          Hi, I am sorry, but traversing will be limited to the connections that will actually be expired, so there is no traversing of non-expiring connections (please see the detailed example below).

          I do agree on the other hand that there will be a polling on the first entry until it expires, but this is how we can implement the requirement exactly as intended (expiration taking into account just time as per stated "stale connections" issue, not connection count or activity as well), and it can be done every 1000 selects.

          If we want to protect brokers from becoming zombies, this is a different concern I feel. However, I completely agree that we can do the LRU limiting as well to avoid zombeing (as part of this jira or another one). Both mechanisms to expire can be at work and solve both problems with no overhead in doing so (there would just be 2 contexts in which an evict+close would be performed, if we do not count the evict done in a normal close call).

          Jun Rao, Jay Kreps, what do you think?

          Say the server hold 100K connections. Say 100 connections are not used in the last 10 minutes.

          What the program does (or I will make sure it does) is just iterate through the first 101 connections, the first 100 will be expired and it will stop at number 101.
          I think this is an exact achievement of expected behavior of the jira task, as intended, and there is no performance penalty to that really!

          I will rewrite with a loop /(tail-)recursive function, to check the first entry, and if stale call close (which also does a remove on the map anyways), and retry the next entry. This would be to avoid copying of the first 100 selectionKeys as well as to avoid any overhead/eagerness in map function.

          Show
          Nicolae Marasoiu added a comment - - edited Hi, I am sorry, but traversing will be limited to the connections that will actually be expired, so there is no traversing of non-expiring connections (please see the detailed example below). I do agree on the other hand that there will be a polling on the first entry until it expires, but this is how we can implement the requirement exactly as intended (expiration taking into account just time as per stated "stale connections" issue, not connection count or activity as well), and it can be done every 1000 selects. If we want to protect brokers from becoming zombies, this is a different concern I feel. However, I completely agree that we can do the LRU limiting as well to avoid zombeing (as part of this jira or another one). Both mechanisms to expire can be at work and solve both problems with no overhead in doing so (there would just be 2 contexts in which an evict+close would be performed, if we do not count the evict done in a normal close call). Jun Rao , Jay Kreps , what do you think? Say the server hold 100K connections. Say 100 connections are not used in the last 10 minutes. What the program does (or I will make sure it does) is just iterate through the first 101 connections, the first 100 will be expired and it will stop at number 101. I think this is an exact achievement of expected behavior of the jira task, as intended, and there is no performance penalty to that really! I will rewrite with a loop /(tail-)recursive function, to check the first entry, and if stale call close (which also does a remove on the map anyways), and retry the next entry. This would be to avoid copying of the first 100 selectionKeys as well as to avoid any overhead/eagerness in map function.
          Hide
          nicu marasoiu added a comment -

          After discussion with Neha, we agreed that using the removeEldestEntry approach works better in the sense that avoids disruption caused by potentially many connections being up for close at once, and evens out that overhead. The disadvantage remains that an inactive server will not close connections but seems less than the advantage of closing overhead leveling and of performance plus of not traversing and of not polling the oldest entry.

          Show
          nicu marasoiu added a comment - After discussion with Neha, we agreed that using the removeEldestEntry approach works better in the sense that avoids disruption caused by potentially many connections being up for close at once, and evens out that overhead. The disadvantage remains that an inactive server will not close connections but seems less than the advantage of closing overhead leveling and of performance plus of not traversing and of not polling the oldest entry.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Looks good to me overall. Some minor comments below.

          1. Could we make connectionsLruTimeout a broker side configuration?

          2. Do we need to insert the key to lruConnections in write()? It seems to me doing that in read() (for incoming requests) is enough.

          3. The patch doesn't seem to apply for me. Could you rebase?

          git apply -p0 ~/Downloads/KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch
          /Users/jrao/Downloads/KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch:13: trailing whitespace.
          import java.util
          /Users/jrao/Downloads/KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch:21: trailing whitespace.
          import java.util.Map.Entry
          /Users/jrao/Downloads/KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch:30: trailing whitespace.
          private val connectionsLruTimeout: Long = TimeUnit.MINUTES.toNanos(10)
          /Users/jrao/Downloads/KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch:31: trailing whitespace.
          private var currentTime: Long = SystemTime.nanoseconds
          /Users/jrao/Downloads/KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch:32: trailing whitespace.
          private val lruConnections = new util.LinkedHashMap[SelectionKey, Long](100, .75F, true) {
          error: patch failed: core/src/main/scala/kafka/network/SocketServer.scala:16
          error: core/src/main/scala/kafka/network/SocketServer.scala: patch does not apply

          Show
          Jun Rao added a comment - Thanks for the patch. Looks good to me overall. Some minor comments below. 1. Could we make connectionsLruTimeout a broker side configuration? 2. Do we need to insert the key to lruConnections in write()? It seems to me doing that in read() (for incoming requests) is enough. 3. The patch doesn't seem to apply for me. Could you rebase? git apply -p0 ~/Downloads/ KAFKA-1282 _Disconnect_idle_socket_connection_in_Selector.patch /Users/jrao/Downloads/ KAFKA-1282 _Disconnect_idle_socket_connection_in_Selector.patch:13: trailing whitespace. import java.util /Users/jrao/Downloads/ KAFKA-1282 _Disconnect_idle_socket_connection_in_Selector.patch:21: trailing whitespace. import java.util.Map.Entry /Users/jrao/Downloads/ KAFKA-1282 _Disconnect_idle_socket_connection_in_Selector.patch:30: trailing whitespace. private val connectionsLruTimeout: Long = TimeUnit.MINUTES.toNanos(10) /Users/jrao/Downloads/ KAFKA-1282 _Disconnect_idle_socket_connection_in_Selector.patch:31: trailing whitespace. private var currentTime: Long = SystemTime.nanoseconds /Users/jrao/Downloads/ KAFKA-1282 _Disconnect_idle_socket_connection_in_Selector.patch:32: trailing whitespace. private val lruConnections = new util.LinkedHashMap [SelectionKey, Long] (100, .75F, true) { error: patch failed: core/src/main/scala/kafka/network/SocketServer.scala:16 error: core/src/main/scala/kafka/network/SocketServer.scala: patch does not apply
          Hide
          nicu marasoiu added a comment -

          Hi, Thank you, for 2. I agree for producers but I am not sure if the same SocketServer is used to serve consumers as well, and in this case, for consumers, the read/write ratio may be well in favor of writes making it risky perhaps to account just the reads?

          Show
          nicu marasoiu added a comment - Hi, Thank you, for 2. I agree for producers but I am not sure if the same SocketServer is used to serve consumers as well, and in this case, for consumers, the read/write ratio may be well in favor of writes making it risky perhaps to account just the reads?
          Hide
          Jun Rao added a comment -

          Nicu,

          Similar to producers, consumers just issue fetch requests. The SocketServer first reads the fetch request from the network and then writes the fetch response to the network once the fetch request is served by the broker. So, there is a 1-to-1 mapping btw reads and writes and writes typically happen within a second after the reads.

          Show
          Jun Rao added a comment - Nicu, Similar to producers, consumers just issue fetch requests. The SocketServer first reads the fetch request from the network and then writes the fetch response to the network once the fetch request is served by the broker. So, there is a 1-to-1 mapping btw reads and writes and writes typically happen within a second after the reads.
          Hide
          nicu marasoiu added a comment -

          uploaded with parametrization and no more access-touch from write

          Show
          nicu marasoiu added a comment - uploaded with parametrization and no more access-touch from write
          Hide
          nicu marasoiu added a comment -

          Neha Narkhede Hi, I implemented our discussion and applied Jun Rao suggestions, can you check and perhaps commit it if looks good? Hope for more tasks like this, do you have any suggestions?

          Show
          nicu marasoiu added a comment - Neha Narkhede Hi, I implemented our discussion and applied Jun Rao suggestions, can you check and perhaps commit it if looks good? Hope for more tasks like this, do you have any suggestions?
          Hide
          Jun Rao added a comment -

          Thanks for the patch. The following should be * 1000000, right?

          private val connectionsLruTimeout: Long = connectionsMaxIdleMs * 1000

          Show
          Jun Rao added a comment - Thanks for the patch. The following should be * 1000000, right? private val connectionsLruTimeout: Long = connectionsMaxIdleMs * 1000
          Hide
          nicu marasoiu added a comment -

          Patch updated. Configurable max idleness of a connection since the last read on it. On creating new N connections, the server will be Closing at most N idle connections too, if they are idle for more than the mentioned threshold, default 10 minutes.

          Show
          nicu marasoiu added a comment - Patch updated. Configurable max idleness of a connection since the last read on it. On creating new N connections, the server will be Closing at most N idle connections too, if they are idle for more than the mentioned threshold, default 10 minutes.
          Hide
          Jun Rao added a comment -

          Looking at the patch again, in removeEldestEntry(), shouldn't we close the socket for eldest if the entry is to be removed? Right now, it seems that we only remove the entry from LRU w/o actually closing the idle socket connection.

          Show
          Jun Rao added a comment - Looking at the patch again, in removeEldestEntry(), shouldn't we close the socket for eldest if the entry is to be removed? Right now, it seems that we only remove the entry from LRU w/o actually closing the idle socket connection.
          Hide
          nicu marasoiu added a comment - - edited

          I am sorry, Yes, that was the intent! I will write unit tests from now on to avoid such slips.

          Moreover, the removeEldestEntry will return false all the time, because it keeps the responsability of mutating the map for itself, as part of calling the close method.

          Attached the patch, tests pass.

          Show
          nicu marasoiu added a comment - - edited I am sorry, Yes, that was the intent! I will write unit tests from now on to avoid such slips. Moreover, the removeEldestEntry will return false all the time, because it keeps the responsability of mutating the map for itself, as part of calling the close method. Attached the patch, tests pass.
          Hide
          Jun Rao added a comment -

          Thanks for the latest patch. I was trying to do some local testing. The following are my observations.

          1. I first started a local ZK and broker (setting connections.max.idle.ms 10secs). I then started a console-producer and a console-consumer. Then, I typed in sth in console-producer every 15 secs. However, I don't see the producer connection gets killed. I added sth instrumentation. It doesn't seem that removeEldestEntry() is called on every fetch request.

          2. As I was debugging this, I realized that it's kind of weird to kill idle connections only when there is another non-idle connection. This makes debugging harder since one can't just test this out with a single connection. It's much simpler to understand if the idle connection can just be killed after the connection idle time, independent of other connections to the broker. To address the concern of closing many sockets in one iteration of the selector, we can calculate the time that a socket entry is expected to be killed (this is the access time of the oldest entry + maxIdleTime, or maxIdleTime if no entry exists). When that time comes during the iteration of the selector, we can just check the oldest entry and see if it needs to be closed.

          3. It would be good to check if our clients (especially the producer, both old and new) can handle a closed idle connection properly. For example, when detecting an already closed socket, the producer should be able to resend the message and therefore we shouldn't see any data loss.

          Show
          Jun Rao added a comment - Thanks for the latest patch. I was trying to do some local testing. The following are my observations. 1. I first started a local ZK and broker (setting connections.max.idle.ms 10secs). I then started a console-producer and a console-consumer. Then, I typed in sth in console-producer every 15 secs. However, I don't see the producer connection gets killed. I added sth instrumentation. It doesn't seem that removeEldestEntry() is called on every fetch request. 2. As I was debugging this, I realized that it's kind of weird to kill idle connections only when there is another non-idle connection. This makes debugging harder since one can't just test this out with a single connection. It's much simpler to understand if the idle connection can just be killed after the connection idle time, independent of other connections to the broker. To address the concern of closing many sockets in one iteration of the selector, we can calculate the time that a socket entry is expected to be killed (this is the access time of the oldest entry + maxIdleTime, or maxIdleTime if no entry exists). When that time comes during the iteration of the selector, we can just check the oldest entry and see if it needs to be closed. 3. It would be good to check if our clients (especially the producer, both old and new) can handle a closed idle connection properly. For example, when detecting an already closed socket, the producer should be able to resend the message and therefore we shouldn't see any data loss.
          Hide
          nicu marasoiu added a comment -

          Hi, I am not completely sure I fully understood your solution in point 2:

          Do you mean to close at most one connection per iteration, right? This is ok, the worst case scenario is closing 100K old connections in 10 hours, one per select.

          On storing the time to close in a local variable, the access of the oldest entry every iteration is O(1) super cheap so I would skip this optimization.

          Show
          nicu marasoiu added a comment - Hi, I am not completely sure I fully understood your solution in point 2: Do you mean to close at most one connection per iteration, right? This is ok, the worst case scenario is closing 100K old connections in 10 hours, one per select. On storing the time to close in a local variable, the access of the oldest entry every iteration is O(1) super cheap so I would skip this optimization.
          Hide
          nicu marasoiu added a comment -

          Jun Rao, hi, can you answer please? I agree with what you say if I understood all of it, I am doing a small patch right now

          Show
          nicu marasoiu added a comment - Jun Rao , hi, can you answer please? I agree with what you say if I understood all of it, I am doing a small patch right now
          Hide
          nicu marasoiu added a comment -

          Neha Narkhede Hi, can you also check the new idea? It is consistent with my initial approach and solves the potential overhead of closing too many connections on a single iteration.

          Show
          nicu marasoiu added a comment - Neha Narkhede Hi, can you also check the new idea? It is consistent with my initial approach and solves the potential overhead of closing too many connections on a single iteration.
          Hide
          Neha Narkhede added a comment -

          Thanks for the patch, nicu marasoiu! Looks good overall. Few review comments -

          1. Do we really need connectionsLruTimeout in addition to connectionsMaxIdleMs? It seems to me that we are translating the idle connection timeout plugged in by the user to 1000000x times more than what is configured. That's probably why Jun saw the behavior he reported earlier.
          2. I don't really share Jun's concern in #2 and we can state that more clearly in the comment that describes the new config in KafkaConfig. Connections that are idle for more than connections.max.idle.ms may get killed. I don't think the users particularly care about a hard guarantee of their connections getting killed here. So the simplicity of this approach is well justified.
          3. I do think that adding a produce and fetch test where the connections get killed will be great

          Show
          Neha Narkhede added a comment - Thanks for the patch, nicu marasoiu ! Looks good overall. Few review comments - 1. Do we really need connectionsLruTimeout in addition to connectionsMaxIdleMs? It seems to me that we are translating the idle connection timeout plugged in by the user to 1000000x times more than what is configured. That's probably why Jun saw the behavior he reported earlier. 2. I don't really share Jun's concern in #2 and we can state that more clearly in the comment that describes the new config in KafkaConfig. Connections that are idle for more than connections.max.idle.ms may get killed. I don't think the users particularly care about a hard guarantee of their connections getting killed here. So the simplicity of this approach is well justified. 3. I do think that adding a produce and fetch test where the connections get killed will be great
          Hide
          Jun Rao added a comment -

          Nicu,

          On #2, I wasn't worried about any performance optimization. My concern is mostly on testing and ease of understanding. Since removeEldestEntry is only called on update, you can't test the logic on a single connection to the broker. It's a bit weird that if there is only a single idle connection, that connection is never killed. But as soon as a second connection is added, the idle connection will be killed. For the user's perspective, it's simpler to understand how idle connections are killed if they are not tied to # of connection.

          Also, could you explain how you fixed #1 in the latest patch? It wasn't obvious to me.

          Show
          Jun Rao added a comment - Nicu, On #2, I wasn't worried about any performance optimization. My concern is mostly on testing and ease of understanding. Since removeEldestEntry is only called on update, you can't test the logic on a single connection to the broker. It's a bit weird that if there is only a single idle connection, that connection is never killed. But as soon as a second connection is added, the idle connection will be killed. For the user's perspective, it's simpler to understand how idle connections are killed if they are not tied to # of connection. Also, could you explain how you fixed #1 in the latest patch? It wasn't obvious to me.
          Hide
          Nicolae Marasoiu added a comment - - edited

          Hi,

          I have understood what you say and I agree it is highly unintuitive and we should change that. I just saw you propose a solution which included a precomputation of the time to close, and it was bit confusion, looked like an attempt of micro optimization.

          I have not made any patch yet, I waited for feedback from Neha too, but I will do the patch today: it looks ok to me the idea of closing at most one old connection per selector iteration.

          So the solution will look more like the previous patch, but instead of traversing n+1 entries to close n old connections, it will just pick the oldest and check if it is time to close.

          For #1, the way Neha and me discussed, and the way you understood it works (for the latest patch), is that an old connection is taken into consideration for close only when a new connection is being opened up (or activity exists on an existing connection too). But this will no longer be the case.

          Show
          Nicolae Marasoiu added a comment - - edited Hi, I have understood what you say and I agree it is highly unintuitive and we should change that. I just saw you propose a solution which included a precomputation of the time to close, and it was bit confusion, looked like an attempt of micro optimization. I have not made any patch yet, I waited for feedback from Neha too, but I will do the patch today: it looks ok to me the idea of closing at most one old connection per selector iteration. So the solution will look more like the previous patch, but instead of traversing n+1 entries to close n old connections, it will just pick the oldest and check if it is time to close. For #1, the way Neha and me discussed, and the way you understood it works (for the latest patch), is that an old connection is taken into consideration for close only when a new connection is being opened up (or activity exists on an existing connection too). But this will no longer be the case.
          Hide
          nicu marasoiu added a comment -

          Attached patch: every select iteration, zero or one connections are closed for being idle for too long.
          The units pass well, but
          For the moment I am blocked by:
          ./kafka-console-producer.sh
          Error: Could not find or load main class kafka.tools.ConsoleProducer

          Show
          nicu marasoiu added a comment - Attached patch: every select iteration, zero or one connections are closed for being idle for too long. The units pass well, but For the moment I am blocked by: ./kafka-console-producer.sh Error: Could not find or load main class kafka.tools.ConsoleProducer
          Hide
          Jun Rao added a comment -

          Did you do "./gradlew jar" first?

          Show
          Jun Rao added a comment - Did you do "./gradlew jar" first?
          Hide
          nicu marasoiu added a comment - - edited

          Hi,

          Unfortunately the client used in console-producer is not very robust with respect to disconnections, as will detail below. Is this the "old" scala producer, and can we hope for a resilient behaviour that I can test with the new java producer?

          More specifically, the connection is closed from the broker side, but the producer is unaware of this. The first message after the close is lost (and is not retried later). The second message sees the broken channel, outputs the exception below, and reconnects and is succesfully retried, I can see it consumed.

          [2014-09-17 12:44:12,009] WARN Failed to send producer request with correlation id 15 to broker 0 with data for partitions [topi,0] (kafka.producer.async.DefaultEventHandler)
          java.io.IOException: Broken pipe
          at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
          at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
          at sun.nio.ch.IOUtil.write(IOUtil.java:149)
          at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:483)
          at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
          at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
          at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
          at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
          at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

          Show
          nicu marasoiu added a comment - - edited Hi, Unfortunately the client used in console-producer is not very robust with respect to disconnections, as will detail below. Is this the "old" scala producer, and can we hope for a resilient behaviour that I can test with the new java producer? More specifically, the connection is closed from the broker side, but the producer is unaware of this. The first message after the close is lost (and is not retried later). The second message sees the broken channel, outputs the exception below, and reconnects and is succesfully retried, I can see it consumed. [2014-09-17 12:44:12,009] WARN Failed to send producer request with correlation id 15 to broker 0 with data for partitions [topi,0] (kafka.producer.async.DefaultEventHandler) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.write(IOUtil.java:149) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:483) at java.nio.channels.SocketChannel.write(SocketChannel.java:493) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
          Hide
          nicu marasoiu added a comment -

          re-attached fixed patch, but we may have a blocker to the whole solution on the broker side, pls see comment above/below (first message after disconnect is lost on the client used in console-prod)

          Show
          nicu marasoiu added a comment - re-attached fixed patch, but we may have a blocker to the whole solution on the broker side, pls see comment above/below (first message after disconnect is lost on the client used in console-prod)
          Hide
          nicu marasoiu added a comment -

          here is a time line:

          he -> produced
          he -> consumed
          [ wait beyond timeout here, connection got closed underneath by the other side]
          [2014-09-17 15:02:28,689] INFO Got user-level KeeperException when processing sessionid:0x148837ce1800001 type:setData cxid:0x24 zxid:0xec txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-87959/offsets/topi/0 Error:KeeperErrorCode = NoNode for /consumers/console-consumer-87959/offsets/topi/0 (org.apache.zookeeper.server.PrepRequestProcessor)
          [2014-09-17 15:02:28,691] INFO Got user-level KeeperException when processing sessionid:0x148837ce1800001 type:create cxid:0x25 zxid:0xed txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-87959/offsets Error:KeeperErrorCode = NoNode for /consumers/console-consumer-87959/offsets (org.apache.zookeeper.server.PrepRequestProcessor)
          dddddddddddddd --> produce attempt (never retried, or never reached the broker or at least never reached the consumer)
          [ many seconds wait, to see if the message is being retried, apparently not, even though the default retry is 3 times]
          wwwwwwwwwwwwwwwww --> new attempt (immediattely I see the message below with the stack trace, and reconnect + retry is instantly sucesfull)
          [2014-09-17 15:03:12,599] WARN Failed to send producer request with correlation id 9 to broker 0 with data for partitions [topi,0] (kafka.producer.async.DefaultEventHandler)
          java.io.IOException: Broken pipe
          at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
          at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
          at sun.nio.ch.IOUtil.write(IOUtil.java:149)
          at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:483)
          at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
          at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
          at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
          at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
          at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
          at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
          at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
          at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
          at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
          at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
          at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
          at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
          at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
          at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
          at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
          at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
          at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
          at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
          at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
          at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
          at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
          at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
          at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
          at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
          at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
          at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
          at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
          at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
          at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
          at scala.collection.immutable.Stream.foreach(Stream.scala:547)
          at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
          at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
          [2014-09-17 15:03:12,712] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
          wwwwwwwwwwwwwwwww

          Show
          nicu marasoiu added a comment - here is a time line: he -> produced he -> consumed [ wait beyond timeout here, connection got closed underneath by the other side] [2014-09-17 15:02:28,689] INFO Got user-level KeeperException when processing sessionid:0x148837ce1800001 type:setData cxid:0x24 zxid:0xec txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-87959/offsets/topi/0 Error:KeeperErrorCode = NoNode for /consumers/console-consumer-87959/offsets/topi/0 (org.apache.zookeeper.server.PrepRequestProcessor) [2014-09-17 15:02:28,691] INFO Got user-level KeeperException when processing sessionid:0x148837ce1800001 type:create cxid:0x25 zxid:0xed txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-87959/offsets Error:KeeperErrorCode = NoNode for /consumers/console-consumer-87959/offsets (org.apache.zookeeper.server.PrepRequestProcessor) dddddddddddddd --> produce attempt (never retried, or never reached the broker or at least never reached the consumer) [ many seconds wait, to see if the message is being retried, apparently not, even though the default retry is 3 times] wwwwwwwwwwwwwwwww --> new attempt (immediattely I see the message below with the stack trace, and reconnect + retry is instantly sucesfull) [2014-09-17 15:03:12,599] WARN Failed to send producer request with correlation id 9 to broker 0 with data for partitions [topi,0] (kafka.producer.async.DefaultEventHandler) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.write(IOUtil.java:149) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:483) at java.nio.channels.SocketChannel.write(SocketChannel.java:493) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [2014-09-17 15:03:12,712] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) wwwwwwwwwwwwwwwww
          Hide
          Nicolae Marasoiu added a comment -

          in fact, this is something that needs fixing in the producer(s) anyway, but the issue is with the currently deployed producers.
          One of the main reasons to go with a broker side close of the idle connections was that it is easier to redeploy brokers then producers.
          But if this is indeed a bug in the producer(s) as I reproduced, those producers would need redeploy.
          So moving this to the producer side as a configuration may again be an option on the table.

          Show
          Nicolae Marasoiu added a comment - in fact, this is something that needs fixing in the producer(s) anyway, but the issue is with the currently deployed producers. One of the main reasons to go with a broker side close of the idle connections was that it is easier to redeploy brokers then producers. But if this is indeed a bug in the producer(s) as I reproduced, those producers would need redeploy. So moving this to the producer side as a configuration may again be an option on the table.
          Hide
          Jun Rao added a comment -

          Interesting. The data loss may have to do with ack=0, which is the default in console producer. Could you try ack=1?

          Show
          Jun Rao added a comment - Interesting. The data loss may have to do with ack=0, which is the default in console producer. Could you try ack=1?
          Hide
          nicu marasoiu added a comment - - edited

          Indeed, ack=1 solves it for most times but not for all:

          • in 6 of 7 tests it gets a reset by peer and a socket timeout on fetch meta, than re connects and sends message.
          • in one test, after leaving one night the laptop, I entered:
            sdfgsdfgdsfg --> that never returned, no exception, nothing at all reported
            aaaaaaaaaaa
            aaaaaaaaaaa
            ff
            ff

          The "ok" flow, which reproduces most of the time with ack=1 is (sometimes with just one of the 2 expcetions):
          gffhgfhgfjfgjhfhjfgjhf
          [2014-09-18 08:22:35,057] WARN Failed to send producer request with correlation id 43 to broker 0 with data for partitions [topi,0] (kafka.producer.async.DefaultEventHandler)
          java.io.IOException: Connection reset by peer
          at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
          ..
          at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
          [2014-09-18 08:22:36,663] WARN Fetching topic metadata with correlation id 44 for topics [Set(topi)] from broker [id:0,host:localhost,port:9092] failed (kafka.client.ClientUtils$)
          java.net.SocketTimeoutException
          at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
          ..
          [2014-09-18 08:22:36,664] ERROR fetching topic metadata for topics [Set(topi)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed (kafka.utils.Utils$)
          kafka.common.KafkaException: fetching topic metadata for topics [Set(topi)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:71)
          ..
          Caused by: java.net.SocketTimeoutException
          at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
          .. kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
          at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
          at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
          at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
          at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
          at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:57)
          ... 12 more
          gffhgfhgfjfgjhfhjfgjhf

          Show
          nicu marasoiu added a comment - - edited Indeed, ack=1 solves it for most times but not for all: in 6 of 7 tests it gets a reset by peer and a socket timeout on fetch meta, than re connects and sends message. in one test, after leaving one night the laptop, I entered: sdfgsdfgdsfg --> that never returned, no exception, nothing at all reported aaaaaaaaaaa aaaaaaaaaaa ff ff The "ok" flow, which reproduces most of the time with ack=1 is (sometimes with just one of the 2 expcetions): gffhgfhgfjfgjhfhjfgjhf [2014-09-18 08:22:35,057] WARN Failed to send producer request with correlation id 43 to broker 0 with data for partitions [topi,0] (kafka.producer.async.DefaultEventHandler) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) .. at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [2014-09-18 08:22:36,663] WARN Fetching topic metadata with correlation id 44 for topics [Set(topi)] from broker [id:0,host:localhost,port:9092] failed (kafka.client.ClientUtils$) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226) .. [2014-09-18 08:22:36,664] ERROR fetching topic metadata for topics [Set(topi)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(topi)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:71) .. Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226) .. kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:57) ... 12 more gffhgfhgfjfgjhfhjfgjhf
          Hide
          Neha Narkhede added a comment -

          Thanks for the updated patch. Overall, looks great. Few comments -
          1. Can you rename initialNextIdleCloseCheckTimeValue to nextIdleCloseCheckTimeValue?
          2. It will be easier to understand the code if we rename currentTime to currentTimeNanos.

          Show
          Neha Narkhede added a comment - Thanks for the updated patch. Overall, looks great. Few comments - 1. Can you rename initialNextIdleCloseCheckTimeValue to nextIdleCloseCheckTimeValue? 2. It will be easier to understand the code if we rename currentTime to currentTimeNanos.
          Hide
          nicu marasoiu added a comment -

          attached, renamed time and for the "initial/reset value of the nextIdleCheck", i just inlined the function, the code is more clear like this i think

          Show
          nicu marasoiu added a comment - attached, renamed time and for the "initial/reset value of the nextIdleCheck", i just inlined the function, the code is more clear like this i think
          Hide
          Jun Rao added a comment -

          Do you think you can reproduce that data loss issue in 1 out of your 7 tests? With ack=1 and retries, this shouldn't happen. Perhaps it's useful to enable the trace logging in the producer to see what's exactly happening there.

          Could you also do the same test by enabling the new producer in console producer?

          Show
          Jun Rao added a comment - Do you think you can reproduce that data loss issue in 1 out of your 7 tests? With ack=1 and retries, this shouldn't happen. Perhaps it's useful to enable the trace logging in the producer to see what's exactly happening there. Could you also do the same test by enabling the new producer in console producer?
          Hide
          Neha Narkhede added a comment -

          +1 on your latest patch. I'm leaning towards accepting the patch since the test above points to an issue that seems unrelated to the patch. nicu marasoiu, it will be great if you can follow Jun's suggestion to reproduce the issue. Then file a JIRA to track it. I'm guessing killing idle connections shouldn't lead to data loss.

          Show
          Neha Narkhede added a comment - +1 on your latest patch. I'm leaning towards accepting the patch since the test above points to an issue that seems unrelated to the patch. nicu marasoiu , it will be great if you can follow Jun's suggestion to reproduce the issue. Then file a JIRA to track it. I'm guessing killing idle connections shouldn't lead to data loss.
          Hide
          Neha Narkhede added a comment -

          Pushed the latest patch to trunk.

          Show
          Neha Narkhede added a comment - Pushed the latest patch to trunk.
          Hide
          Jun Rao added a comment -

          Nicu,

          I was doing some manual testing of this feature. What I observed is that sometimes, the idle connections are not closed. The following was what I did.

          1. Configure a small connections.max.idle.ms = 10000.
          2. start ZK and Kafka broker
          3. start a console consumer
          bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning
          4. start a console producer and type in sth every 15 secs or so.
          bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1 --request-required-acks 1

          What I observed was that initially, the producer connections kept getting killed by the broker correctly after being idle for 10 secs. The next producer send would hit an IOException and trigger a resend. However, after typing in 10 or so messages, at some point, no idle connections were killed by the broker any more and the producer send always succeeded.

          Show
          Jun Rao added a comment - Nicu, I was doing some manual testing of this feature. What I observed is that sometimes, the idle connections are not closed. The following was what I did. 1. Configure a small connections.max.idle.ms = 10000. 2. start ZK and Kafka broker 3. start a console consumer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning 4. start a console producer and type in sth every 15 secs or so. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1 --request-required-acks 1 What I observed was that initially, the producer connections kept getting killed by the broker correctly after being idle for 10 secs. The next producer send would hit an IOException and trigger a resend. However, after typing in 10 or so messages, at some point, no idle connections were killed by the broker any more and the producer send always succeeded.
          Hide
          nicu marasoiu added a comment -

          Indeed, I can reproduce this. I did saw an instance where no exception was thrown by the producer but still the broker mentioned new connection being listened to suggesting close took place. However, checking with required-acks 0 I can see that after some time the connection does not close anymore.

          Show
          nicu marasoiu added a comment - Indeed, I can reproduce this. I did saw an instance where no exception was thrown by the producer but still the broker mentioned new connection being listened to suggesting close took place. However, checking with required-acks 0 I can see that after some time the connection does not close anymore.
          Hide
          nicu marasoiu added a comment -

          Fixed it - I have mistakenly deleted at some point the fact that the linked hash map needs to be in access order
          I tested with your scenario and looks ok now.

          Show
          nicu marasoiu added a comment - Fixed it - I have mistakenly deleted at some point the fact that the linked hash map needs to be in access order I tested with your scenario and looks ok now.
          Hide
          Neha Narkhede added a comment -

          good catch nicu marasoiu. +1 on your change

          Show
          Neha Narkhede added a comment - good catch nicu marasoiu . +1 on your change
          Hide
          Jun Rao added a comment -

          Nicu,

          Thanks for the patch. Do you think it's easy to add a unit test on Processor?

          Show
          Jun Rao added a comment - Nicu, Thanks for the patch. Do you think it's easy to add a unit test on Processor?
          Hide
          nicu marasoiu added a comment -

          I want, yes, I will add a few tests this week.

          Show
          nicu marasoiu added a comment - I want, yes, I will add a few tests this week.
          Hide
          Joel Koshy added a comment -

          This is already in 0.8.2 so we should incorporate the follow-ups there as well I think.

          Show
          Joel Koshy added a comment - This is already in 0.8.2 so we should incorporate the follow-ups there as well I think.
          Hide
          Neha Narkhede added a comment -

          nicu marasoiu, Jun Rao This is marked for 0.8.2. Is anyone working or planning to work on this?

          Show
          Neha Narkhede added a comment - nicu marasoiu , Jun Rao This is marked for 0.8.2. Is anyone working or planning to work on this?
          Hide
          Nicolae Marasoiu added a comment -

          I will do unit tests tommorow / day after. The fix should be ok otherwise,
          and ready to be pushed on trunk and 0.8.2. I will announce when done with
          units.

          On Tue, Dec 30, 2014 at 12:21 AM, Neha Narkhede (JIRA) <jira@apache.org>

          Show
          Nicolae Marasoiu added a comment - I will do unit tests tommorow / day after. The fix should be ok otherwise, and ready to be pushed on trunk and 0.8.2. I will announce when done with units. On Tue, Dec 30, 2014 at 12:21 AM, Neha Narkhede (JIRA) <jira@apache.org>
          Hide
          nicu marasoiu added a comment - - edited

          Hi Jun Rao, Neha Narkhede, I added a test, please review. The patch has 2 variations (latest 2 patches), explained at point 2 below, while the latest implements 1' below.

          1. I wanted to sleep on MockTime, but here we actually need to physically wait at leat one epoll/select cycle. Since I have put 10ms idle time & it works, mocked time would not bring benefits, i.e. only the select time needs to be waited over.

          1'. Because of potentially large & not deterministically bounded select times, I implemented a mechanism to try a few times, waiting 50% more time every time.

          2. Seems to work with low (10ms) idle timeout for all current test methods. However, I attach a patch with separate test class for this (and yet another utils class for reuse), to isolate configuration between group of test methods.

          3. Shall I do a multiple connections test?

          Show
          nicu marasoiu added a comment - - edited Hi Jun Rao , Neha Narkhede , I added a test, please review. The patch has 2 variations (latest 2 patches), explained at point 2 below, while the latest implements 1' below. 1. I wanted to sleep on MockTime, but here we actually need to physically wait at leat one epoll/select cycle. Since I have put 10ms idle time & it works, mocked time would not bring benefits, i.e. only the select time needs to be waited over. 1'. Because of potentially large & not deterministically bounded select times, I implemented a mechanism to try a few times, waiting 50% more time every time. 2. Seems to work with low (10ms) idle timeout for all current test methods. However, I attach a patch with separate test class for this (and yet another utils class for reuse), to isolate configuration between group of test methods. 3. Shall I do a multiple connections test?
          Hide
          Jun Rao added a comment -

          nicu marasoiu, thanks for the patch. We are changing SocketServer to reuse Selector right now in KAFKA-1928. Once that's done, the idle connection logic will be moved into Selector and should be easier to test since Selector supports mock time. That patch is almost ready. Perhaps you can wait until it's committed and submit a new patch.

          Show
          Jun Rao added a comment - nicu marasoiu , thanks for the patch. We are changing SocketServer to reuse Selector right now in KAFKA-1928 . Once that's done, the idle connection logic will be moved into Selector and should be easier to test since Selector supports mock time. That patch is almost ready. Perhaps you can wait until it's committed and submit a new patch.
          Hide
          nicu marasoiu added a comment -

          Hi,

          I noticed that the dependencies are done and I will resume this task.
          The task contributions had been:

          • a fix
          • unit test(s)

          As far as the fix is concerned, I noticed that it is already fixed in the current Selector, namely the lruConnections is a LinkedHashMap with accessOrder=true. This was the only fix needed, and I am 100% convinced that the fix is already done.

          I already have a unit test too, I will try to put a patch here this week.

          Just wanted to mention that the old connections should be closed by the kafka installations using the new reusable network code.

          Thanks
          Nicu

          Show
          nicu marasoiu added a comment - Hi, I noticed that the dependencies are done and I will resume this task. The task contributions had been: a fix unit test(s) As far as the fix is concerned, I noticed that it is already fixed in the current Selector, namely the lruConnections is a LinkedHashMap with accessOrder=true. This was the only fix needed, and I am 100% convinced that the fix is already done. I already have a unit test too, I will try to put a patch here this week. Just wanted to mention that the old connections should be closed by the kafka installations using the new reusable network code. Thanks Nicu
          Hide
          Jun Rao added a comment -

          nicu marasoiu, yes, the actual problem is now fixed in trunk. We just need to add a unit test. I created a followup jira KAFKA-2661 for that. Resolving this jira.

          Show
          Jun Rao added a comment - nicu marasoiu , yes, the actual problem is now fixed in trunk. We just need to add a unit test. I created a followup jira KAFKA-2661 for that. Resolving this jira.

            People

            • Assignee:
              nicu marasoiu
              Reporter:
              Jun Rao
              Reviewer:
              Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development