diff --git a/.gitignore b/.gitignore index 99b32a6..4d3b0ee 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,6 @@ dist *classes target/ -build/ -.gradle/ lib_managed/ src_managed/ project/boot/ diff --git a/README.md b/README.md index 7bd762f..dece147 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,8 @@ The release file can be found inside ./core/build/distributions/. ### Cleaning the build ### ./gradlew clean -### Running a task on a particular version of Scala (either 2.8.0, 2.8.2, 2.9.1, 2.9.2 or 2.10.1) ### -#### (If building a jar with a version other than 2.8.0, the scala version variable in bin/kafka-run-class.sh needs to be changed to run quick start.) #### +### Running a task on a particular version of Scala #### +either 2.8.0, 2.8.2, 2.9.1, 2.9.2 or 2.10.1) (If building a jar with a version other than 2.8.0, the scala version variable in bin/kafka-run-class.sh needs to be changed to run quick start.) ./gradlew -PscalaVersion=2.9.1 jar ./gradlew -PscalaVersion=2.9.1 test ./gradlew -PscalaVersion=2.9.1 releaseTarGz diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 514941c..6a98134 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -75,11 +75,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each - lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) - .filter(l => l.totalBytes > 0) // skip any empty logs + val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each + .filter(l => l.totalBytes > 0) // skip any empty logs if(!dirtyLogs.isEmpty) this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d96229e..db16ca7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -52,20 +52,116 @@ class KafkaApis(val requestChannel: RequestChannel, private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) private val delayedRequestMetrics = new DelayedRequestMetrics - /* following 3 data structures are updated by the update metadata request - * and is queried by the topic metadata request. */ var metadataCache = new MetadataCache - private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[KafkaApi-%d] ".format(brokerId) class MetadataCache { private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() + private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() + private val partitionMetadataLock = new ReentrantReadWriteLock() + + def getTopicMetadata(topics: Set[String]): Tuple2[mutable.ListBuffer[TopicMetadata], mutable.ListBuffer[String]] = { + val isAllTopics = topics.isEmpty + val topicsRequested = if(isAllTopics) cache.keySet else topics + val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] + inLock(partitionMetadataLock.readLock()) { + for (topic <- topicsRequested) { + if (isAllTopics || this.containsTopic(topic)) { + val partitionStateInfos = cache(topic) + val partitionMetadata = partitionStateInfos.map { + case (partitionId, partitionState) => + val replicas = partitionState.allReplicas + val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq + var leaderInfo: Option[Broker] = None + var isrInfo: Seq[Broker] = Nil + val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch + val leader = leaderIsrAndEpoch.leaderAndIsr.leader + val isr = leaderIsrAndEpoch.leaderAndIsr.isr + debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) + try { + leaderInfo = aliveBrokers.get(leader) + if (!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + if (replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if (isrInfo.size < isr.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: Throwable => + debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + } + topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { + topicsToBeCreated += topic + } else { + topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) + } + } + } + (topicResponses, topicsToBeCreated) + } def addPartitionInfo(topic: String, partitionId: Int, stateInfo: PartitionStateInfo) { + inLock(partitionMetadataLock.writeLock()) { + addPartitionInfoInternal(topic, partitionId, stateInfo) + } + } + + def getPartitionInfos(topic: String) = { + inLock(partitionMetadataLock.readLock()) { + cache(topic) + } + } + + def containsTopicAndPartition(topic: String, + partitionId: Int): Boolean = { + inLock(partitionMetadataLock.readLock()) { + cache.get(topic) match { + case Some(partitionInfos) => partitionInfos.contains(partitionId) + case None => false + } + } + } + + def containsTopic(topic: String) = cache.contains(topic) + + def updateCache(updateMetadataRequest: UpdateMetadataRequest, + brokerId: Int, + stateChangeLogger: StateChangeLogger) { + inLock(partitionMetadataLock.writeLock()) { + updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) + updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => + if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { + removePartitionInfo(tp.topic, tp.partition) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } else { + addPartitionInfo(tp.topic, tp.partition, info) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, info, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + } + } + } + + private def addPartitionInfoInternal(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { cache.get(topic) match { case Some(infos) => infos.put(partitionId, stateInfo) case None => { @@ -76,7 +172,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def removePartitionInfo(topic: String, partitionId: Int) = { + private def removePartitionInfo(topic: String, partitionId: Int) = { cache.get(topic) match { case Some(infos) => { infos.remove(partitionId) @@ -88,42 +184,6 @@ class KafkaApis(val requestChannel: RequestChannel, case None => false } } - - def getPartitionInfos(topic: String) = cache(topic) - - def containsTopicAndPartition(topic: String, - partitionId: Int): Boolean = { - cache.get(topic) match { - case Some(partitionInfos) => partitionInfos.contains(partitionId) - case None => false - } - } - - def allTopics = cache.keySet - - def removeTopic(topic: String) = cache.remove(topic) - - def containsTopic(topic: String) = cache.contains(topic) - - def updateCache(updateMetadataRequest: UpdateMetadataRequest, - brokerId: Int, - stateChangeLogger: StateChangeLogger) = { - updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => - if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { - removePartitionInfo(tp.topic, tp.partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } else { - addPartitionInfo(tp.topic, tp.partition, info) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, info, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } - } - } } /** @@ -156,10 +216,8 @@ class KafkaApis(val requestChannel: RequestChannel, // ensureTopicExists is only for client facing requests private def ensureTopicExists(topic: String) = { - inLock(partitionMetadataLock.readLock()) { - if (!metadataCache.containsTopic(topic)) - throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") - } + if (!metadataCache.containsTopic(topic)) + throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") } def handleLeaderAndIsrRequest(request: RequestChannel.Request) { @@ -203,12 +261,8 @@ class KafkaApis(val requestChannel: RequestChannel, stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateControllerEpochErrorMessage) } - inLock(partitionMetadataLock.writeLock()) { - replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch - // cache the list of alive brokers in the cluster - updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) - metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) - } + replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch + metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } @@ -659,54 +713,7 @@ class KafkaApis(val requestChannel: RequestChannel, private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { val config = replicaManager.config - - // Returning all topics when requested topics are empty - val isAllTopics = topics.isEmpty - val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] - val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] - - inLock(partitionMetadataLock.readLock()) { - val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics - for (topic <- topicsRequested) { - if (isAllTopics || metadataCache.containsTopic(topic)) { - val partitionStateInfos = metadataCache.getPartitionInfos(topic) - val partitionMetadata = partitionStateInfos.map { - case (partitionId, partitionState) => - val replicas = partitionState.allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) - try { - leaderInfo = aliveBrokers.get(leader) - if (!leaderInfo.isDefined) - throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) - if (replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if (isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - } catch { - case e: Throwable => - debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } - } - topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) - } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { - topicsToBeCreated += topic - } else { - topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) - } - } - } + val (topicResponses, topicsToBeCreated) = metadataCache.getTopicMetadata(topics) topicResponses.appendAll(topicsToBeCreated.map { topic => try { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d0bbeb6..b0506d4 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -116,7 +116,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue)) - /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "compact" */ + /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */ val logCleanupPolicy = props.getString("log.cleanup.policy", "delete") /* the number of background threads to use for log cleaning */ diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 7af2f43..19f61a9 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging { val topic = pieces(0) val partition = pieces(1).toInt val offset = pieces(2).toLong - offsets += (TopicAndPartition(topic, partition) -> offset) + offsets += (TopicAndPartition(pieces(0), partition) -> offset) line = reader.readLine() } if(offsets.size != expectedSize) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5417628..89a88a7 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -159,7 +159,7 @@ class OffsetManager(val config: OffsetManagerConfig, def offsetsTopicConfig: Properties = { val props = new Properties props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CleanupPolicyProp, "compact") + props.put(LogConfig.CleanupPolicyProp, "dedupe") props } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5bfa764..9aeb69d 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -92,7 +92,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { def makeCleaner(parts: Int, minDirtyMessages: Int = 0, numThreads: Int = 1, - defaultPolicy: String = "compact", + defaultPolicy: String = "dedupe", policyOverrides: Map[String, String] = Map()): LogCleaner = { // create partitions and add them to the pool diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a71e48d..b5936d4 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -29,17 +29,18 @@ import kafka.log.{LogManager, LogConfig, Log} class ReplicaManagerTest extends JUnit3Suite { @Test - def testHighWaterMarkDirectoryMapping() { + def testHighwaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1) - val topic = "test-topic" + val dir = "/tmp/kafka-logs/" + new File(dir).mkdir() + props.setProperty("log.dirs", dir) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) val mockLogMgr = EasyMock.createMock(classOf[LogManager]) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition(topic, 1, 1) - val logFilename = config.logDirs.head + File.separator + topic + "-1" - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File(logFilename), new LogConfig(), 0L, null)))) + val partition = rm.getOrCreatePartition("test-topic", 1, 1) + partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) rm.checkpointHighWatermarks() } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 53d01aa..71ab6e1 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -163,7 +163,7 @@ object TestUtils extends Logging { props.put("group.id", groupId) props.put("consumer.id", consumerId) props.put("consumer.timeout.ms", consumerTimeout.toString) - props.put("zookeeper.session.timeout.ms", "6000") + props.put("zookeeper.session.timeout.ms", "400") props.put("zookeeper.sync.time.ms", "200") props.put("auto.commit.interval.ms", "1000") props.put("rebalance.max.retries", "4") diff --git a/gradlew.bat b/gradlew.bat deleted file mode 100644 index 84974e2..0000000 --- a/gradlew.bat +++ /dev/null @@ -1,90 +0,0 @@ -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS=-Xmx1024m -Xms256m -XX:MaxPermSize=512m - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windowz variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega