From bc6b164ac791677631bfcc7e02bea0a7650e96d9 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Thu, 29 Jan 2015 11:40:32 +0530 Subject: [PATCH] KAFKA-1906 Default the Kafka data log directory to $KAFKA_HOME/data/kafka-logs directory, where KAFKA_HOME is the Kafka installation directory --- bin/kafka-run-class.sh | 20 +++++++++++--------- bin/windows/kafka-run-class.bat | 19 ++++++++++--------- config/server.properties | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 17 ++++++++++++++++- .../scala/unit/kafka/server/KafkaConfigTest.scala | 20 +++++++++++++++++++- 5 files changed, 58 insertions(+), 22 deletions(-) diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 8c3fa28..3091e33 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -21,10 +21,12 @@ then fi base_dir=$(dirname $0)/.. +KAFKA_HOME=`cd $base_dir; pwd` +export KAFKA_HOME # create logs directory if [ "x$LOG_DIR" = "x" ]; then - LOG_DIR="$base_dir/logs" + LOG_DIR="$KAFKA_HOME/logs" fi if [ ! -d "$LOG_DIR" ]; then @@ -40,38 +42,38 @@ if [ -z "$SCALA_BINARY_VERSION" ]; then fi # run ./gradlew copyDependantLibs to get all dependant jars in a local dir -for file in $base_dir/core/build/dependant-libs-${SCALA_VERSION}*/*.jar; +for file in $KAFKA_HOME/core/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/examples/build/libs//kafka-examples*.jar; +for file in $KAFKA_HOME/examples/build/libs//kafka-examples*.jar; do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar; +for file in $KAFKA_HOME/contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar; do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar; +for file in $KAFKA_HOME/contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar; do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/clients/build/libs/kafka-clients*.jar; +for file in $KAFKA_HOME/clients/build/libs/kafka-clients*.jar; do CLASSPATH=$CLASSPATH:$file done # classpath addition for release -for file in $base_dir/libs/*.jar; +for file in $KAFKA_HOME/libs/*.jar; do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar; +for file in $KAFKA_HOME/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar; do CLASSPATH=$CLASSPATH:$file done @@ -88,7 +90,7 @@ fi # Log4j settings if [ -z "$KAFKA_LOG4J_OPTS" ]; then - KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/tools-log4j.properties" + KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$KAFKA_HOME/config/tools-log4j.properties" fi KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS" diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 4aa2ab8..aa6c701 100644 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -25,6 +25,7 @@ rem Using pushd popd to set BASE_DIR to the absolute path pushd %~dp0..\.. set BASE_DIR=%CD% popd +set KAFKA_HOME=%BASE_DIR% set CLASSPATH= IF ["%SCALA_VERSION%"] EQU [""] ( @@ -36,42 +37,42 @@ IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( ) rem Classpath addition for kafka-core dependencies -for %%i in (%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( +for %%i in (%KAFKA_HOME%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( call :concat %%i ) rem Classpath addition for kafka-perf dependencies -for %%i in (%BASE_DIR%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( +for %%i in (%KAFKA_HOME%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( call :concat %%i ) rem Classpath addition for kafka-clients -for %%i in (%BASE_DIR%\clients\build\libs\kafka-clients-*.jar) do ( +for %%i in (%KAFKA_HOME%\clients\build\libs\kafka-clients-*.jar) do ( call :concat %%i ) rem Classpath addition for kafka-examples -for %%i in (%BASE_DIR%\examples\build\libs\kafka-examples-*.jar) do ( +for %%i in (%KAFKA_HOME%\examples\build\libs\kafka-examples-*.jar) do ( call :concat %%i ) rem Classpath addition for contrib/hadoop-consumer -for %%i in (%BASE_DIR%\contrib\hadoop-consumer\build\libs\kafka-hadoop-consumer-*.jar) do ( +for %%i in (%KAFKA_HOME%\contrib\hadoop-consumer\build\libs\kafka-hadoop-consumer-*.jar) do ( call :concat %%i ) rem Classpath addition for contrib/hadoop-producer -for %%i in (%BASE_DIR%\contrib\hadoop-producer\build\libs\kafka-hadoop-producer-*.jar) do ( +for %%i in (%KAFKA_HOME%\contrib\hadoop-producer\build\libs\kafka-hadoop-producer-*.jar) do ( call :concat %%i ) rem Classpath addition for release -for %%i in (%BASE_DIR%\libs\*.jar) do ( +for %%i in (%KAFKA_HOME%\libs\*.jar) do ( call :concat %%i ) rem Classpath addition for core -for %%i in (%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar) do ( +for %%i in (%KAFKA_HOME%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar) do ( call :concat %%i ) @@ -87,7 +88,7 @@ IF ["%JMX_PORT%"] NEQ [""] ( rem Log4j settings IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] ( - set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties + set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%KAFKA_HOME%/config/tools-log4j.properties ) rem Generic jvm settings you want to add diff --git a/config/server.properties b/config/server.properties index 80ee2fc..4b44707 100644 --- a/config/server.properties +++ b/config/server.properties @@ -56,8 +56,8 @@ socket.request.max.bytes=104857600 ############################# Log Basics ############################# -# A comma seperated list of directories under which to store log files -log.dirs=/tmp/kafka-logs +# A comma separated list of directories under which to store log files +#log.dirs= # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9efa15c..33743db 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -17,6 +17,7 @@ package kafka.server +import java.io.File import java.util.Properties import kafka.api.ApiVersion @@ -34,6 +35,9 @@ object Defaults { val ZkSyncTimeMs = 2000 /** ********* General Configuration ***********/ + /* The Kafka home */ + private val kafkaHome = System.getenv("KAFKA_HOME") + val MaxReservedBrokerId = 1000 val BrokerId = -1 val MessageMaxBytes = 1000000 + MessageSet.LogOverhead @@ -54,7 +58,17 @@ object Defaults { /** ********* Log Configuration ***********/ val NumPartitions = 1 - val LogDir = "/tmp/kafka-logs" + /* the directories in which the log data is kept */ + val LogDir: String = + if (kafkaHome != null) { + // default to KAFKA_HOME/data/kafka-logs + kafkaHome + File.separator + "data" + File.separator + "kafka-logs" + } else { + // if KAFKA_HOME isn't set (maybe the process was triggered with some other script than the regular Kafka startup scripts) + // we don't default the log dir and instead expect the log.dir or log.dirs config property to be explicitly set + "" + } + val LogSegmentBytes = 1 * 1024 * 1024 * 1024 val LogRollHours = 24 * 7 val LogRollJitterHours = 0 @@ -751,6 +765,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val advertisedHostName: String = _advertisedHostName.getOrElse(hostName) val advertisedPort: Int = _advertisedPort.getOrElse(port) val advertisedListeners = getAdvertisedListeners() + val logDirs = CoreUtils.parseCsvList(_logDirs.getOrElse(_logDir)) val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2428dbd..c193684 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -17,11 +17,12 @@ package kafka.server +import java.io.File import java.util.Properties import junit.framework.Assert._ import kafka.api.{ApiVersion, KAFKA_082} -import kafka.utils.{TestUtils, CoreUtils} +import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Test @@ -204,6 +205,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testListenerDefaults() { val props = new Properties() + props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put(KafkaConfig.BrokerIdProp, "1") props.put(KafkaConfig.ZkConnectProp, "localhost:2181") @@ -233,6 +235,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testVersionConfiguration() { val props = new Properties() + props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put(KafkaConfig.BrokerIdProp, "1") props.put(KafkaConfig.ZkConnectProp, "localhost:2181") val conf = KafkaConfig.fromProps(props) @@ -348,4 +351,19 @@ class KafkaConfigTest extends JUnit3Suite { KafkaConfig.fromProps(props) } } + + /** + * Tests that when the log.dirs property is explicitly specified, it gets used in the {@link KafkaConfig} + */ + @Test + def testLogDirsProvided() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val kafkaLogDir = System.getProperty("java.io.tmpdir") + File.separator + "foo" + File.separator + "bar" + props.put("log.dirs", kafkaLogDir) + val serverConfig = KafkaConfig.fromProps(props) + + assertEquals("Unexpected number of dirs for log.dirs in Kafka config", 1, serverConfig.logDirs.size) + assertEquals("Unexpected value for log.dirs in Kafka config", kafkaLogDir, serverConfig.logDirs(0)) + } + } -- 1.9.1