diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6d6b4ab..c08eab0 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -90,7 +90,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) + val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 27b0ec8..57386b1 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -70,11 +70,7 @@ object ConsoleProducer { .describedAs("broker-list") .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + - "If specified without value, than it defaults to 'gzip'") - .withOptionalArg() - .describedAs("compression-codec") - .ofType(classOf[String]) + val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed") val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") .withRequiredArg .describedAs("size") @@ -182,12 +178,7 @@ object ConsoleProducer { val topic = options.valueOf(topicOpt) val brokerList = options.valueOf(brokerListOpt) val sync = options.has(syncOpt) - val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) - val compressionCodec = if (options.has(compressionCodecOpt)) - if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) - DefaultCompressionCodec.name - else compressionCodecOptionValue - else NoCompressionCodec.name + val compress = options.has(compressOpt) val batchSize = options.valueOf(batchSizeOpt) val sendTimeout = options.valueOf(sendTimeoutOpt) val queueSize = options.valueOf(queueSizeOpt) @@ -264,7 +255,8 @@ object ConsoleProducer { class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { val props = new Properties() props.put("metadata.broker.list", producerConfig.brokerList) - props.put("compression.type", producerConfig.compressionCodec) + val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name + props.put("compression.type", compression) props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString) props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString) @@ -295,7 +287,8 @@ object ConsoleProducer { class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer { val props = new Properties() props.put("metadata.broker.list", producerConfig.brokerList) - props.put("compression.codec", producerConfig.compressionCodec) + val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec + props.put("compression.codec", codec.toString) props.put("producer.type", if(producerConfig.sync) "sync" else "async") props.put("batch.num.messages", producerConfig.batchSize.toString) props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0f5aaa9..5588f59 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -462,7 +462,7 @@ class ReplicaManager(val config: KafkaConfig, */ def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b4bee33..713bb20 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -201,6 +201,7 @@ class LogManagerTest extends JUnit3Suite { /** * Test that it is not possible to open two log managers using the same data directory */ + @Test def testTwoLogManagersUsingSameDirFails() { try { new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) @@ -209,24 +210,74 @@ class LogManagerTest extends JUnit3Suite { case e: KafkaException => // this is good } } + + private def verifyCheckpointRecovery(topicAndPartitions: Seq[TopicAndPartition], + logManager: LogManager) { + val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig)) + logs.foreach(log => { + for(i <- 0 until 50) + log.append(TestUtils.singleMessageSet("test".getBytes())) + + log.flush() + }) + + logManager.checkpointRecoveryPointOffsets() + val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() + + topicAndPartitions.zip(logs).foreach { + case(tp, log) => { + assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) + } + } + } /** * Test that recovery points are correctly written out to disk */ + @Test def testCheckpointRecoveryPoints() { - val topicA = TopicAndPartition("test-a", 1) - val topicB = TopicAndPartition("test-b", 1) - val logA = this.logManager.createLog(topicA, logConfig) - val logB = this.logManager.createLog(topicB, logConfig) - for(i <- 0 until 50) - logA.append(TestUtils.singleMessageSet("test".getBytes())) - for(i <- 0 until 100) - logB.append(TestUtils.singleMessageSet("test".getBytes())) - logA.flush() - logB.flush() - logManager.checkpointRecoveryPointOffsets() - val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() - assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) - assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint) + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), TopicAndPartition("test-b", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with trailing slash + */ + @Test + def testRecoveryDirectoryMappingWithTrailingSlash() { + logManager.shutdown() + logDir = TestUtils.tempDir() + logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with relative directory + */ + @Test + def testRecoveryDirectoryMappingWithRelativeDirectory() { + logManager.shutdown() + logDir = new File("data" + File.separator + logDir.getName) + logDir.mkdirs() + logDir.deleteOnExit() + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd..2639309 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -6,14 +6,14 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.log4j @@ -56,7 +56,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) server = TestUtils.createServer(config); - simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") + simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "") } @After @@ -72,8 +72,8 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -82,15 +82,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -99,15 +99,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -116,15 +116,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -132,7 +132,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with // serializer missing try { PropertyConfigurator.configure(props) - }catch { + } catch { case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder") } } @@ -141,26 +141,26 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with def testLog4jAppends() { PropertyConfigurator.configure(getLog4jConfig) - for(i <- 1 to 5) + for (i <- 1 to 5) info("test") - val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) - val fetchMessage = response.messageSet("test-topic", 0) - - var count = 0 - for(message <- fetchMessage) { - count = count + 1 - } - - assertEquals(5, count) + assertTrue(TestUtils.waitUntilTrue(() => { + var count = 0 + val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024 * 1024).build()) + val fetchMessage = response.messageSet("test-topic", 0) + for (message <- fetchMessage) { + count = count + 1 + } + count == 5 + }, 2000)); } private def getLog4jConfig: Properties = { var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b5936d4..02109bf 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -24,23 +24,52 @@ import java.util.concurrent.atomic.AtomicBoolean import java.io.File import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient -import kafka.cluster.Replica -import kafka.log.{LogManager, LogConfig, Log} +import kafka.log.{CleanerConfig, LogManager, LogConfig} class ReplicaManagerTest extends JUnit3Suite { @Test def testHighwaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1) - val dir = "/tmp/kafka-logs/" - new File(dir).mkdir() - props.setProperty("log.dirs", dir) + val dir = TestUtils.tempDir() + props.setProperty("log.dirs", dir + File.separator) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = EasyMock.createMock(classOf[LogManager]) + val mockLogMgr = createLogManager(dir) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition("test-topic", 1, 1) - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) + partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() + + } + + @Test + def testHighwaterMarkRelativeDirectoryMapping() { + val props = TestUtils.createBrokerConfig(1) + val dir = new File("data/kafka-logs/") + dir.mkdir() + dir.deleteOnExit() + props.setProperty("log.dirs", "data/kafka-logs") + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val mockLogMgr = createLogManager(dir) + val time: MockTime = new MockTime() + val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) + val partition = rm.getOrCreatePartition("test-topic", 1, 1) + partition.getOrCreateReplica(1) + rm.checkpointHighWatermarks() + } + + private def createLogManager(logDir: File): LogManager = { + val time = new MockTime() + return new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = new LogConfig(), + cleanerConfig = CleanerConfig(enableCleaner = false), + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) } }