diff --git a/.gitignore b/.gitignore index 1fc794d..553a077 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ project/sbt_project_definition.iml *~ *# .#* +rat.out +TAGS diff --git a/bin/run-rat.sh b/bin/run-rat.sh index 28c0ccd..1b7bc31 100755 --- a/bin/run-rat.sh +++ b/bin/run-rat.sh @@ -23,7 +23,7 @@ else JAVA="$JAVA_HOME/bin/java" fi -rat_command="$JAVA -jar $base_dir/lib_managed/scala_2.8.0/compile/apache-rat-0.8.jar --dir $base_dir " +rat_command="$JAVA -jar $base_dir/lib/apache-rat-0.8.jar --dir $base_dir " for f in $(cat $rat_excludes_file); do diff --git a/contrib/hadoop-consumer/build.sbt b/contrib/hadoop-consumer/build.sbt new file mode 100644 index 0000000..02e95eb --- /dev/null +++ b/contrib/hadoop-consumer/build.sbt @@ -0,0 +1 @@ +crossPaths := false diff --git a/contrib/hadoop-producer/build.sbt b/contrib/hadoop-producer/build.sbt new file mode 100644 index 0000000..02e95eb --- /dev/null +++ b/contrib/hadoop-producer/build.sbt @@ -0,0 +1 @@ +crossPaths := false diff --git a/core/build.sbt b/core/build.sbt new file mode 100644 index 0000000..df58410 --- /dev/null +++ b/core/build.sbt @@ -0,0 +1,24 @@ +import sbt._ +import Keys._ + +name := "kafka" + +resolvers ++= Seq( + "SonaType ScalaTest repo" at "https://oss.sonatype.org/content/groups/public/org/scalatest/" +) + +libraryDependencies ++= Seq( + "org.apache.zookeeper" % "zookeeper" % "3.3.4", + "com.github.sgroschupf" % "zkclient" % "0.1", + "org.xerial.snappy" % "snappy-java" % "1.0.4.1", + "org.easymock" % "easymock" % "3.0" % "test", + "junit" % "junit" % "4.1" % "test" +) + +libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => + deps :+ (sv match { + case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" + case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" + }) +} + diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index dc44e75..819bc6c 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -24,7 +24,7 @@ import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging} -import scala.collection.Map +import scala.collection.{Map,SortedMap} import scala.collection.mutable.{ListBuffer, HashMap} class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing @@ -101,13 +101,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // postpone the failure until the send operation, so that requests for other brokers are handled correctly val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1) - var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null + var dataPerBroker: SortedMap[(String, Int), Seq[ProducerData[K,Message]]] = null ret.get(leaderBrokerId) match { case Some(element) => - dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] + dataPerBroker = element.asInstanceOf[SortedMap[(String, Int), Seq[ProducerData[K,Message]]]] case None => - dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] - ret.put(leaderBrokerId, dataPerBroker) + dataPerBroker = SortedMap[(String, Int), Seq[ProducerData[K,Message]]]() } val topicAndPartition = (event.getTopic, brokerPartition.partitionId) @@ -117,9 +116,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig, dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]] case None => dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]] - dataPerBroker.put(topicAndPartition, dataPerTopicPartition) + dataPerBroker += (topicAndPartition -> dataPerTopicPartition) } dataPerTopicPartition.append(event) + ret.put(leaderBrokerId, dataPerBroker) } ret } diff --git a/examples/build.sbt b/examples/build.sbt new file mode 100644 index 0000000..d12d701 --- /dev/null +++ b/examples/build.sbt @@ -0,0 +1,3 @@ +name := "kafka-java-examples" + +crossPaths := false diff --git a/project/Build.scala b/project/Build.scala new file mode 100644 index 0000000..7124fa9 --- /dev/null +++ b/project/Build.scala @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ +import Keys._ +import java.io.File + +object KafkaBuild extends Build { + val commonSettings = Seq( + version := "0.8.0", + organization := "org.apache.kafka", + scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"), + crossScalaVersions := Seq("2.8.0", "2.9.1"), + scalaVersion := "2.8.0", + javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"), + parallelExecution in Test := false, // Prevent tests from overrunning each other + libraryDependencies ++= Seq( + "log4j" % "log4j" % "1.2.15", + "net.sf.jopt-simple" % "jopt-simple" % "3.2" + ), + // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required + // some dependencies on various sun and javax packages. + ivyXML := + + + + + + + ) + + val hadoopSettings = Seq( + javacOptions ++= Seq("-Xlint:deprecation"), + libraryDependencies ++= Seq( + "org.apache.avro" % "avro" % "1.4.0", + "org.apache.pig" % "pig" % "0.8.0", + "commons-logging" % "commons-logging" % "1.0.4", + "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5", + "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5", + "org.apache.hadoop" % "hadoop-core" % "0.20.2" + ), + ivyXML := + + + ) + + val runRat = TaskKey[Unit]("run-rat-task", "Runs Apache rat on Kafka") + val runRatTask = runRat := { + "bin/run-rat.sh" ! + } + + lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib).settings((commonSettings ++ runRatTask): _*).settings(Release.releaseZipTask) + lazy val core = Project(id = "core", base = file("core")).settings((commonSettings) :_*) + lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core) + lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core) + + lazy val contrib = Project(id = "contrib", base = file("contrib")).aggregate(hadoopProducer, hadoopConsumer).settings(commonSettings :_*) + lazy val hadoopProducer = Project(id = "hadoop producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) + lazy val hadoopConsumer = Project(id = "hadoop consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) +} diff --git a/project/Release.scala b/project/Release.scala new file mode 100644 index 0000000..44ded64 --- /dev/null +++ b/project/Release.scala @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ +import Keys._ +import java.io.File + +object Release { + val releaseZip = TaskKey[Unit]("release-zip", "Compiles, packages and zips together all dependencies, configs, scripts and generated jars") + val releaseZipTask = releaseZip <<= (streams, scalaVersion, baseDirectory,target,artifact,moduleName) { + (streams: Task[TaskStreams], scalaVersion: String, baseDirectory: File, target: File, artifact: Artifact, moduleName: String) => streams map { + s => { + val distDir = new File(target, "dist-" + scalaVersion) + val configSrc = new File(baseDirectory, "config") + val binSrc = new File(baseDirectory, "bin") + s.log.info("Generating dist structure for Scala " + scalaVersion + " under " + distDir) + } + } + } +} diff --git a/project/build.properties b/project/build.properties index a8895d3..a268eb4 100644 --- a/project/build.properties +++ b/project/build.properties @@ -14,11 +14,5 @@ # limitations under the License. #Project properties #Mon Feb 28 11:55:49 PST 2011 -project.name=Kafka -sbt.version=0.7.5 -project.version=0.8.0 -build.scala.versions=2.8.0 -contrib.root.dir=contrib -lib.dir=lib -target.dir=target/scala_2.8.0 -dist.dir=dist +sbt.version=0.11.3 + diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala deleted file mode 100644 index 5ad1739..0000000 --- a/project/build/KafkaProject.scala +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import sbt._ -import scala.xml.{Node, Elem, NodeSeq} -import scala.xml.transform.{RewriteRule, RuleTransformer} - -class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject { - lazy val core = project("core", "core-kafka", new CoreKafkaProject(_)) - lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core) - lazy val contrib = project("contrib", "contrib", new ContribProject(_)) - lazy val perf = project("perf", "perf", new KafkaPerfProject(_)) - - lazy val releaseZipTask = core.packageDistTask - - val releaseZipDescription = "Compiles every sub project, runs unit tests, creates a deployable release zip file with dependencies, config, and scripts." - lazy val releaseZip = releaseZipTask dependsOn(core.corePackageAction, core.test, examples.examplesPackageAction, - contrib.producerPackageAction, contrib.consumerPackageAction) describedAs releaseZipDescription - - val runRatDescription = "Runs Apache rat on Kafka" - lazy val runRatTask = task { - Runtime.getRuntime().exec("bin/run-rat.sh") - None - } describedAs runRatDescription - - val rat = "org.apache.rat" % "apache-rat" % "0.8" - - class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject with CoreDependencies with TestDependencies with CompressionDependencies { - val corePackageAction = packageAllAction - - //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required - // some dependencies on various sun and javax packages. - override def ivyXML = - - - - - - - - - - - - - def zkClientDep = - - zkclient - zkclient - 20120522 - compile - - - object ZkClientDepAdder extends RuleTransformer(new RewriteRule() { - override def transform(node: Node): Seq[Node] = node match { - case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => { - Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep :_*) - } - case other => other - } - }) - - override def pomPostProcess(pom: Node): Node = { - ZkClientDepAdder(pom) - } - - override def artifactID = "kafka" - override def filterScalaJars = false - - // build the executable jar's classpath. - // (why is it necessary to explicitly remove the target/{classes,resources} paths? hm.) - def dependentJars = { - val jars = - publicClasspath +++ mainDependencies.scalaJars --- mainCompilePath --- mainResourcesOutputPath - if (jars.get.find { jar => jar.name.startsWith("scala-library-") }.isDefined) { - // workaround bug in sbt: if the compiler is explicitly included, don't include 2 versions - // of the library. - jars --- jars.filter { jar => - jar.absolutePath.contains("/boot/") && jar.name == "scala-library.jar" - } - } else { - jars - } - } - - def dependentJarNames = dependentJars.getFiles.map(_.getName).filter(_.endsWith(".jar")) - override def manifestClassPath = Some(dependentJarNames.map { "libs/" + _ }.mkString(" ")) - - def distName = (artifactID + "-" + projectVersion.value) - def distPath = "dist" / distName ## - - def configPath = "config" ## - def configOutputPath = distPath / "config" - - def binPath = "bin" ## - def binOutputPath = distPath / "bin" - - def distZipName = { - "%s-%s.zip".format(artifactID, projectVersion.value) - } - - lazy val packageDistTask = task { - distPath.asFile.mkdirs() - (distPath / "libs").asFile.mkdirs() - binOutputPath.asFile.mkdirs() - configOutputPath.asFile.mkdirs() - - FileUtilities.copyFlat(List(jarPath), distPath, log).left.toOption orElse - FileUtilities.copyFlat(dependentJars.get, distPath / "libs", log).left.toOption orElse - FileUtilities.copy((configPath ***).get, configOutputPath, log).left.toOption orElse - FileUtilities.copy((binPath ***).get, binOutputPath, log).left.toOption orElse - FileUtilities.zip((("dist" / distName) ##).get, "dist" / distZipName, true, log) - None - } - - val PackageDistDescription = "Creates a deployable zip file with dependencies, config, and scripts." - lazy val packageDist = packageDistTask dependsOn(`package`, `test`) describedAs PackageDistDescription - - val cleanDist = cleanTask("dist" ##) describedAs("Erase any packaged distributions.") - override def cleanAction = super.cleanAction dependsOn(cleanDist) - - override def javaCompileOptions = super.javaCompileOptions ++ - List(JavaCompileOption("-source"), JavaCompileOption("1.5")) - - override def packageAction = super.packageAction dependsOn (testCompileAction) - - } - - class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies { - val perfPackageAction = packageAllAction - val dependsOnCore = core - - //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required - // some dependencies on various sun and javax packages. - override def ivyXML = - - - - - - - - - override def artifactID = "kafka-perf" - override def filterScalaJars = false - override def javaCompileOptions = super.javaCompileOptions ++ - List(JavaCompileOption("-Xlint:unchecked")) - } - - class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies { - val examplesPackageAction = packageAllAction - val dependsOnCore = core - //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required - // some dependencies on various sun and javax packages. - override def ivyXML = - - - - - - - - - override def artifactID = "kafka-java-examples" - override def filterScalaJars = false - override def javaCompileOptions = super.javaCompileOptions ++ - List(JavaCompileOption("-Xlint:unchecked")) - } - - class ContribProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject { - lazy val hadoopProducer = project("hadoop-producer", "hadoop producer", - new HadoopProducerProject(_), core) - lazy val hadoopConsumer = project("hadoop-consumer", "hadoop consumer", - new HadoopConsumerProject(_), core) - - val producerPackageAction = hadoopProducer.producerPackageAction - val consumerPackageAction = hadoopConsumer.consumerPackageAction - - class HadoopProducerProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies with HadoopDependencies { - val producerPackageAction = packageAllAction - override def ivyXML = - - - - - - - - - - - - - - - - } - - class HadoopConsumerProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies { - val consumerPackageAction = packageAllAction - override def ivyXML = - - - - - - - - - - - - - - - - - val jodaTime = "joda-time" % "joda-time" % "1.6" - } - } - - trait TestDependencies { - val easymock = "org.easymock" % "easymock" % "3.0" % "test" - val junit = "junit" % "junit" % "4.1" % "test" - val scalaTest = "org.scalatest" % "scalatest" % "1.2" % "test" - } - - trait CoreDependencies { - val log4j = "log4j" % "log4j" % "1.2.15" - val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2" - } - - trait HadoopDependencies { - val avro = "org.apache.avro" % "avro" % "1.4.0" - val commonsLogging = "commons-logging" % "commons-logging" % "1.0.4" - val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5" - val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5" - val hadoop = "org.apache.hadoop" % "hadoop-core" % "0.20.2" - } - - trait CompressionDependencies { - val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1" - } - -} diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..69cd6a3 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,10 @@ +resolvers ++= Seq( + "sbt-idea-repo" at "http://mpeltonen.github.com/maven/", + "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", + "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" +) + +libraryDependencies ++= Seq( + Defaults.sbtPluginExtra("com.eed3si9n" % "sbt-assembly" % "0.7.2", "0.11.2", "2.9.1" ), + Defaults.sbtPluginExtra("com.github.mpeltonen" % "sbt-idea" % "0.11.0", "0.11.2", "2.9.1") +) diff --git a/project/plugins/Plugins.scala b/project/plugins/Plugins.scala deleted file mode 100644 index 0777d82..0000000 --- a/project/plugins/Plugins.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import sbt._ - -class Plugins(info: ProjectInfo) extends PluginDefinition(info) { - val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/" - val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT" -}