Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-202

Make the request processing in kafka asynchonous

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      We need to handle long-lived requests to support replication. To make this work we need to make the processing mechanism asynchronous from the network threads.

      To accomplish this we will retain the existing pool of network threads but add a new pool of request handling threads. These will do all the disk I/O. There will be a queue mechanism to transfer requests to and from this secondary pool.

      1. KAFKA-202-v2.patch
        10 kB
        Jay Kreps
      2. KAFKA-202-v3.patch
        55 kB
        Jay Kreps
      3. KAFKA-202-v4.patch
        64 kB
        Jay Kreps
      4. KAFKA-202-v5.patch
        50 kB
        Jay Kreps
      5. KAFKA-202-v6.patch
        61 kB
        Jay Kreps
      6. KAFKA-48-socket-server-refactor-draft.patch
        44 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          jkreps Jay Kreps added a comment -

          This patch is ready for review. Note the new config I added for controlling the number of I/O threads.

          Also, this patch removes all JMX monitoring from the socket server. Temporarily ignore this fact. The issue was our JMX was tangling together the socket server and the request handling. I want to generalize our stat collection and will open a second JIRA for that.

          Show
          jkreps Jay Kreps added a comment - This patch is ready for review. Note the new config I added for controlling the number of I/O threads. Also, this patch removes all JMX monitoring from the socket server. Temporarily ignore this fact. The issue was our JMX was tangling together the socket server and the request handling. I want to generalize our stat collection and will open a second JIRA for that.
          Hide
          tgautier Taylor Gautier added a comment -

          Unfortunately it seems this patch doesn't apply cleanly against the latest code.

          Show
          tgautier Taylor Gautier added a comment - Unfortunately it seems this patch doesn't apply cleanly against the latest code.
          Hide
          jkreps Jay Kreps added a comment -

          Updated patch to current trunk.

          Show
          jkreps Jay Kreps added a comment - Updated patch to current trunk.
          Hide
          junrao Jun Rao added a comment -

          Overall, the patch looks good. Some comments:

          1. KafkaServer.startup doesn't have to capture exception and shutdown. The caller in KafkaServerStarable already does that. Plus, it shuts down embedded consumer appropriately if needed.

          2. There is KafkaRequestHandlers.scala.rej in the patch.

          3. Unit test seems to fail occasionally, giving the following error.
          [info] == core-kafka / kafka.integration.LazyInitProducerTest ==
          [2012-01-05 21:57:38,773] ERROR Error processing MultiProducerRequest on test:0 (kafka.server.KafkaApis:82)
          java.nio.channels.ClosedChannelException
          at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
          at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
          at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
          at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
          at kafka.log.Log.append(Log.scala:215)
          at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
          at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
          at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
          at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
          at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64)
          at kafka.server.KafkaApis.handle(KafkaApis.scala:43)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
          at java.lang.Thread.run(Thread.java:662)
          [2012-01-05 21:57:38,773] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaApis:82)
          java.nio.channels.ClosedChannelException
          at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
          at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
          at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
          at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
          at kafka.log.Log.append(Log.scala:215)
          at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
          at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
          at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
          at java.lang.Thread.run(Thread.java:662)
          [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92)
          java.nio.channels.ClosedChannelException
          at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
          at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
          at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
          at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
          at kafka.log.Log.append(Log.scala:215)
          at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
          at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
          at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
          at java.lang.Thread.run(Thread.java:662)
          [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92)
          java.nio.channels.ClosedChannelException
          at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
          at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
          at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
          at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
          at kafka.log.Log.append(Log.scala:215)
          at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
          at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
          at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
          at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
          at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64)
          at kafka.server.KafkaApis.handle(KafkaApis.scala:43)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
          at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
          at java.lang.Thread.run(Thread.java:662)
          [info] Test Starting: testProduceAndFetch(kafka.integration.LazyInitProducerTest)

          Show
          junrao Jun Rao added a comment - Overall, the patch looks good. Some comments: 1. KafkaServer.startup doesn't have to capture exception and shutdown. The caller in KafkaServerStarable already does that. Plus, it shuts down embedded consumer appropriately if needed. 2. There is KafkaRequestHandlers.scala.rej in the patch. 3. Unit test seems to fail occasionally, giving the following error. [info] == core-kafka / kafka.integration.LazyInitProducerTest == [2012-01-05 21:57:38,773] ERROR Error processing MultiProducerRequest on test:0 (kafka.server.KafkaApis:82) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64) at kafka.server.KafkaApis.handle(KafkaApis.scala:43) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,773] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaApis:82) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55) at kafka.server.KafkaApis.handle(KafkaApis.scala:40) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55) at kafka.server.KafkaApis.handle(KafkaApis.scala:40) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64) at kafka.server.KafkaApis.handle(KafkaApis.scala:43) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [info] Test Starting: testProduceAndFetch(kafka.integration.LazyInitProducerTest)
          Hide
          jkreps Jay Kreps added a comment -

          Fixed random jvm halts in tests. Removed stray file in patch.

          Show
          jkreps Jay Kreps added a comment - Fixed random jvm halts in tests. Removed stray file in patch.
          Hide
          jkreps Jay Kreps added a comment -

          Improved version of patch. Fixes a bug where a response for a closed socket can throw an uncaught exception.

          Show
          jkreps Jay Kreps added a comment - Improved version of patch. Fixes a bug where a response for a closed socket can throw an uncaught exception.
          Hide
          nehanarkhede Neha Narkhede added a comment - - edited

          I tried to apply the v5 patch and couldn't find RequestChannel.scala in there. Would you mind uploading a patch with that included ?

          Also, if the kafka cluster is experiencing either network or IO bottleneck, it slows processing down and backs up the producer queue causing QueueFullException. To detect this, seems like it will be helpful to expose the number of queued requests in the SocketServerStats ? This could either be a separate JIRA or part of this one, your call.

          Show
          nehanarkhede Neha Narkhede added a comment - - edited I tried to apply the v5 patch and couldn't find RequestChannel.scala in there. Would you mind uploading a patch with that included ? Also, if the kafka cluster is experiencing either network or IO bottleneck, it slows processing down and backs up the producer queue causing QueueFullException. To detect this, seems like it will be helpful to expose the number of queued requests in the SocketServerStats ? This could either be a separate JIRA or part of this one, your call.
          Hide
          jkreps Jay Kreps added a comment -

          Missed some files in the last patch, checked that this one actually builds correctly on a clean checkout.

          Show
          jkreps Jay Kreps added a comment - Missed some files in the last patch, checked that this one actually builds correctly on a clean checkout.
          Hide
          junrao Jun Rao added a comment -

          unit tests now pass. +1 on the patch.

          We should probably commit this patch to an 0.8 branch.

          Show
          junrao Jun Rao added a comment - unit tests now pass. +1 on the patch. We should probably commit this patch to an 0.8 branch.
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Created the 0.8 branch and committed this on the branch.

          Show
          nehanarkhede Neha Narkhede added a comment - Created the 0.8 branch and committed this on the branch.
          Hide
          jkreps Jay Kreps added a comment -

          Hey Neha, 0.8 doesn't build for me and is missing a few files in the patch. Did those get missed in the checkin?

          [info] == core-kafka / compile ==
          [info] Source analysis: 134 new/modified, 0 indirectly invalidated, 0 removed.
          [info] Compiling main sources...
          [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:45: not found: type RequestChannel
          [error] val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
          [error] ^
          [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:190: not found: type RequestChannel
          [error] val requestChannel: RequestChannel,
          [error] ^
          [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:301: not found: value RequestChannel
          [error] val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds)
          [error] ^
          [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:26: RequestChannel is not a member of kafka.network
          [error] import kafka.network.

          {SocketServerStats, SocketServer, RequestChannel}

          [error] ^
          [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:40: not found: type KafkaRequestHandlerPool
          [error] var requestHandlerPool: KafkaRequestHandlerPool = null
          [error] ^
          [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:69: not found: type KafkaRequestHandlerPool
          [error] requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, new KafkaApis(logManager).handle, config.numIoThreads)
          [error] ^
          [error] 6 errors found
          [info] == core-kafka / compile ==
          [error] Error running compile: Compilation failed
          [info]
          [info] Total time: 21 s, completed Jan 13, 2012 12:46:07 PM
          [info]
          [info] Total session time: 24 s, completed Jan 13, 2012 12:46:07 PM
          [error] Error during build

          Show
          jkreps Jay Kreps added a comment - Hey Neha, 0.8 doesn't build for me and is missing a few files in the patch. Did those get missed in the checkin? [info] == core-kafka / compile == [info] Source analysis: 134 new/modified, 0 indirectly invalidated, 0 removed. [info] Compiling main sources... [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:45: not found: type RequestChannel [error] val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) [error] ^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:190: not found: type RequestChannel [error] val requestChannel: RequestChannel, [error] ^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:301: not found: value RequestChannel [error] val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds) [error] ^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:26: RequestChannel is not a member of kafka.network [error] import kafka.network. {SocketServerStats, SocketServer, RequestChannel} [error] ^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:40: not found: type KafkaRequestHandlerPool [error] var requestHandlerPool: KafkaRequestHandlerPool = null [error] ^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:69: not found: type KafkaRequestHandlerPool [error] requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, new KafkaApis(logManager).handle, config.numIoThreads) [error] ^ [error] 6 errors found [info] == core-kafka / compile == [error] Error running compile: Compilation failed [info] [info] Total time: 21 s, completed Jan 13, 2012 12:46:07 PM [info] [info] Total session time: 24 s, completed Jan 13, 2012 12:46:07 PM [error] Error during build
          Hide
          nehanarkhede Neha Narkhede added a comment -

          My bad. Forgot to do the svn add on those. Fixed that, should build fine now.

          Show
          nehanarkhede Neha Narkhede added a comment - My bad. Forgot to do the svn add on those. Fixed that, should build fine now.
          Hide
          jkreps Jay Kreps added a comment -

          Thanks Neha! Working great now!

          Show
          jkreps Jay Kreps added a comment - Thanks Neha! Working great now!

            People

            • Assignee:
              nehanarkhede Neha Narkhede
              Reporter:
              jkreps Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development