diff --git a/.gitignore b/.gitignore index 99b32a6..4d3b0ee 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,6 @@ dist *classes target/ -build/ -.gradle/ lib_managed/ src_managed/ project/boot/ diff --git a/README.md b/README.md index 7bd762f..dece147 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,8 @@ The release file can be found inside ./core/build/distributions/. ### Cleaning the build ### ./gradlew clean -### Running a task on a particular version of Scala (either 2.8.0, 2.8.2, 2.9.1, 2.9.2 or 2.10.1) ### -#### (If building a jar with a version other than 2.8.0, the scala version variable in bin/kafka-run-class.sh needs to be changed to run quick start.) #### +### Running a task on a particular version of Scala #### +either 2.8.0, 2.8.2, 2.9.1, 2.9.2 or 2.10.1) (If building a jar with a version other than 2.8.0, the scala version variable in bin/kafka-run-class.sh needs to be changed to run quick start.) ./gradlew -PscalaVersion=2.9.1 jar ./gradlew -PscalaVersion=2.9.1 test ./gradlew -PscalaVersion=2.9.1 releaseTarGz diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index d29e556..e4bc243 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -81,12 +81,14 @@ class TopicDeletionManager(controller: KafkaController, val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable + @volatile var isShuttingDown = false /** * Invoked at the end of new controller initiation */ def start() { if(isDeleteTopicEnabled) { + isShuttingDown = false deleteTopicsThread = new DeleteTopicsThread() deleteTopicStateChanged.set(true) deleteTopicsThread.start() @@ -98,10 +100,13 @@ class TopicDeletionManager(controller: KafkaController, */ def shutdown() { if(isDeleteTopicEnabled) { + isShuttingDown = true + resumeTopicDeletionThread() deleteTopicsThread.shutdown() topicsToBeDeleted.clear() partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() + isShuttingDown = false } } @@ -202,7 +207,7 @@ class TopicDeletionManager(controller: KafkaController, */ private def awaitTopicDeletionNotification() { inLock(deleteLock) { - while(!deleteTopicStateChanged.compareAndSet(true, false)) { + while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) { info("Waiting for signal to start or continue topic deletion") deleteTopicsCond.await() } @@ -360,15 +365,20 @@ class TopicDeletionManager(controller: KafkaController, } } - class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { + class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) { val zkClient = controllerContext.zkClient override def doWork() { awaitTopicDeletionNotification() + if(!isRunning.get) + return + inLock(controllerContext.controllerLock) { val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted - if(topicsQueuedForDeletion.size > 0) + + if(!topicsQueuedForDeletion.isEmpty) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) + topicsQueuedForDeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 514941c..6a98134 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -75,11 +75,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each - lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) - .filter(l => l.totalBytes > 0) // skip any empty logs + val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each + .filter(l => l.totalBytes > 0) // skip any empty logs if(!dirtyLogs.isEmpty) this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d0bbeb6..b0506d4 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -116,7 +116,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue)) - /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "compact" */ + /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */ val logCleanupPolicy = props.getString("log.cleanup.policy", "delete") /* the number of background threads to use for log cleaning */ diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 7af2f43..19f61a9 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging { val topic = pieces(0) val partition = pieces(1).toInt val offset = pieces(2).toLong - offsets += (TopicAndPartition(topic, partition) -> offset) + offsets += (TopicAndPartition(pieces(0), partition) -> offset) line = reader.readLine() } if(offsets.size != expectedSize) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5417628..89a88a7 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -159,7 +159,7 @@ class OffsetManager(val config: OffsetManagerConfig, def offsetsTopicConfig: Properties = { val props = new Properties props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CleanupPolicyProp, "compact") + props.put(LogConfig.CleanupPolicyProp, "dedupe") props } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5bfa764..9aeb69d 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -92,7 +92,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { def makeCleaner(parts: Int, minDirtyMessages: Int = 0, numThreads: Int = 1, - defaultPolicy: String = "compact", + defaultPolicy: String = "dedupe", policyOverrides: Map[String, String] = Map()): LogCleaner = { // create partitions and add them to the pool diff --git a/gradlew.bat b/gradlew.bat deleted file mode 100644 index 84974e2..0000000 --- a/gradlew.bat +++ /dev/null @@ -1,90 +0,0 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS=-Xmx1024m -Xms256m -XX:MaxPermSize=512m - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windowz variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega