diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 2e94fee..b1892f8 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -17,22 +17,56 @@ package kafka - +import scala.collection.JavaConversions._ +import joptsimple.OptionParser import metrics.KafkaMetricsReporter import server.{KafkaConfig, KafkaServerStartable, KafkaServer} import utils.{Utils, Logging} +import java.util.Properties + object Kafka extends Logging { - def main(args: Array[String]): Unit = { - if (args.length != 1) { - println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName())) + def parseArguments(args: Array[String]): Properties = { + val prop = new Properties + val parser = new OptionParser + + val setOpt = parser.accepts("set", "Optional properties that should override values in server.properties file") + .withRequiredArg() + .ofType(classOf[String]) + + val options = parser.parse(args: _*) + + options.valuesOf(setOpt).toList.foreach { v => + debug("Overriding property: " + v) + val s = v.split("=") + if(s.length != 2) { + throw new IllegalArgumentException("Not a java property in form key=value: " + s) + } + prop.setProperty(s(0), s.last) + } + + prop + } + + def getKafkaConfigFromArgs(args: Array[String]): KafkaConfig = { + if (args.length == 0) { + println("USAGE: java [options] %s [kafka options] server.properties".format(classOf[KafkaServer].getSimpleName())) System.exit(1) } - + + val props = Utils.loadProps(args.last) + + if(args.length > 1) { + props.putAll(parseArguments(args.slice(0, args.length - 1))) + } + + new KafkaConfig(props) + } + + def main(args: Array[String]): Unit = { try { - val props = Utils.loadProps(args(0)) - val serverConfig = new KafkaConfig(props) + val serverConfig = getKafkaConfigFromArgs(args) KafkaMetricsReporter.startReporters(serverConfig.props) val kafkaServerStartable = new KafkaServerStartable(serverConfig) diff --git a/core/src/test/scala/unit/kafka/KafkaTest.scala b/core/src/test/scala/unit/kafka/KafkaTest.scala new file mode 100644 index 0000000..fe5d878 --- /dev/null +++ b/core/src/test/scala/unit/kafka/KafkaTest.scala @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka + +import java.io.{FileOutputStream, File} + +import kafka.Kafka +import org.junit.Test +import junit.framework.Assert._ + +class KafkaTest { + + @Test + def testParseArguments(): Unit = { + val prop = Kafka.parseArguments(Array("--set", "x=y", "--set", "a=b")) + + assertTrue(prop.containsKey("x")) + assertEquals("y", prop.get("x")) + + assertTrue(prop.containsKey("a")) + assertEquals("b", prop.get("a")) + } + + @Test + def testGetKafkaConfigFromArgs(): Unit = { + val propertiesFile = prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere")) + + // We should load configuration file without any arguments + val config1 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile)) + assertEquals(1, config1.brokerId) + + // We should be able to override given property on command line + val config2 = Kafka.getKafkaConfigFromArgs(Array("--set", "broker.id=2", propertiesFile)) + assertEquals(2, config2.brokerId) + + // We should be also able to set completely new property + val config3 = Kafka.getKafkaConfigFromArgs(Array("--set", "port=1987", propertiesFile)) + assertEquals(1, config3.brokerId) + assertEquals(1987, config3.port) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testGetKafkaConfigFromArgsWrongSetValue(): Unit = { + val propertiesFile = prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere")) + Kafka.getKafkaConfigFromArgs(Array("--set", "Koprivnice is in Czech Republic", propertiesFile)) + } + + def prepareConfig(lines : Array[String]): String = { + val file = File.createTempFile("kafkatest", ".properties") + file.deleteOnExit() + + val writer = new FileOutputStream(file) + lines.foreach { l => + writer.write(l.getBytes) + writer.write("\n".getBytes) + } + + writer.close + + file.getAbsolutePath + } +}