Index: system_test/broker_failure/config/server_source1.properties =================================================================== --- system_test/broker_failure/config/server_source1.properties (revision 1353697) +++ system_test/broker_failure/config/server_source1.properties (working copy) @@ -41,7 +41,8 @@ socket.receive.buffer=1048576 # the maximum size of a log segment -log.file.size=10000000 +#log.file.size=1000000 +log.file.size=10000 # the interval between running cleanup on the logs log.cleanup.interval.mins=1 Index: system_test/broker_failure/config/log4j.properties =================================================================== --- system_test/broker_failure/config/log4j.properties (revision 1353697) +++ system_test/broker_failure/config/log4j.properties (working copy) @@ -82,4 +82,5 @@ # to print socket buffer size validated by SimpleConsumer log4j.logger.kafka.consumer.SimpleConsumer=TRACE +log4j.logger.kafka.consumer.FetcherRunnable=TRACE Index: system_test/broker_failure/bin/kafka-run-class.sh =================================================================== --- system_test/broker_failure/bin/kafka-run-class.sh (revision 1353697) +++ system_test/broker_failure/bin/kafka-run-class.sh (working copy) @@ -28,6 +28,11 @@ CLASSPATH=$CLASSPATH:$file done +for file in $kafka_inst_dir/examples/target/scala_2.8.0/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar; do CLASSPATH=$CLASSPATH:$file Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1353697) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -301,7 +301,6 @@ * The byte offset of the message that will be appended next. */ def nextAppendOffset: Long = { - flush val last = segments.view.last last.start + last.size } @@ -329,6 +328,7 @@ */ def roll() { lock synchronized { + flush val newOffset = nextAppendOffset val newFile = new File(dir, nameFromOffset(newOffset)) if (newFile.exists) { Index: core/src/main/scala/kafka/consumer/FetcherRunnable.scala =================================================================== --- core/src/main/scala/kafka/consumer/FetcherRunnable.scala (revision 1353697) +++ core/src/main/scala/kafka/consumer/FetcherRunnable.scala (working copy) @@ -67,7 +67,8 @@ val fetchRequest = builder.build() val start = System.currentTimeMillis val response = simpleConsumer.fetch(fetchRequest) - trace("Fetch completed in " + (System.currentTimeMillis - start) + " ms with max wait of " + config.maxFetchWaitMs) + trace("Fetch request %s completed in %d ms with max wait of %d".format(fetchRequest, + (System.currentTimeMillis - start), config.maxFetchWaitMs)) var read = 0L for(infopti <- partitionTopicInfos) { Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (revision 1353697) +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (working copy) @@ -62,7 +62,17 @@ .withRequiredArg .describedAs("size") .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) + .defaultsTo(1024 * 1024) + val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") .withRequiredArg .describedAs("size") @@ -116,6 +126,8 @@ props.put("groupid", options.valueOf(groupIdOpt)) props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString) props.put("fetch.size", options.valueOf(fetchSizeOpt).toString) + props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString) + props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString) props.put("auto.commit", "true") props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1353697) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -33,7 +33,7 @@ val AutoOffsetReset = OffsetRequest.SmallestTimeString val ConsumerTimeoutMs = -1 val MinFetchBytes = 1 - val MaxFetchWaitMs = 3000 + val MaxFetchWaitMs = 100 val MirrorTopicsWhitelist = "" val MirrorTopicsBlacklist = "" val MirrorConsumerNumThreads = 1