diff --git a/.gitignore b/.gitignore
index 1fc794d..553a077 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,5 @@ project/sbt_project_definition.iml
*~
*#
.#*
+rat.out
+TAGS
diff --git a/bin/run-rat.sh b/bin/run-rat.sh
index 28c0ccd..1b7bc31 100755
--- a/bin/run-rat.sh
+++ b/bin/run-rat.sh
@@ -23,7 +23,7 @@ else
JAVA="$JAVA_HOME/bin/java"
fi
-rat_command="$JAVA -jar $base_dir/lib_managed/scala_2.8.0/compile/apache-rat-0.8.jar --dir $base_dir "
+rat_command="$JAVA -jar $base_dir/lib/apache-rat-0.8.jar --dir $base_dir "
for f in $(cat $rat_excludes_file);
do
diff --git a/contrib/hadoop-consumer/build.sbt b/contrib/hadoop-consumer/build.sbt
new file mode 100644
index 0000000..02e95eb
--- /dev/null
+++ b/contrib/hadoop-consumer/build.sbt
@@ -0,0 +1 @@
+crossPaths := false
diff --git a/contrib/hadoop-producer/build.sbt b/contrib/hadoop-producer/build.sbt
new file mode 100644
index 0000000..02e95eb
--- /dev/null
+++ b/contrib/hadoop-producer/build.sbt
@@ -0,0 +1 @@
+crossPaths := false
diff --git a/core/build.sbt b/core/build.sbt
new file mode 100644
index 0000000..df58410
--- /dev/null
+++ b/core/build.sbt
@@ -0,0 +1,24 @@
+import sbt._
+import Keys._
+
+name := "kafka"
+
+resolvers ++= Seq(
+ "SonaType ScalaTest repo" at "https://oss.sonatype.org/content/groups/public/org/scalatest/"
+)
+
+libraryDependencies ++= Seq(
+ "org.apache.zookeeper" % "zookeeper" % "3.3.4",
+ "com.github.sgroschupf" % "zkclient" % "0.1",
+ "org.xerial.snappy" % "snappy-java" % "1.0.4.1",
+ "org.easymock" % "easymock" % "3.0" % "test",
+ "junit" % "junit" % "4.1" % "test"
+)
+
+libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
+ deps :+ (sv match {
+ case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test"
+ case _ => "org.scalatest" %% "scalatest" % "1.8" % "test"
+ })
+}
+
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index dc44e75..819bc6c 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -24,7 +24,7 @@ import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
-import scala.collection.Map
+import scala.collection.{Map,SortedMap}
import scala.collection.mutable.{ListBuffer, HashMap}
class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing
@@ -101,13 +101,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
- var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+ var dataPerBroker: SortedMap[(String, Int), Seq[ProducerData[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) =>
- dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+ dataPerBroker = element.asInstanceOf[SortedMap[(String, Int), Seq[ProducerData[K,Message]]]]
case None =>
- dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
- ret.put(leaderBrokerId, dataPerBroker)
+ dataPerBroker = SortedMap[(String, Int), Seq[ProducerData[K,Message]]]()
}
val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
@@ -117,9 +116,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
case None =>
dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
- dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+ dataPerBroker += (topicAndPartition -> dataPerTopicPartition)
}
dataPerTopicPartition.append(event)
+ ret.put(leaderBrokerId, dataPerBroker)
}
ret
}
diff --git a/examples/build.sbt b/examples/build.sbt
new file mode 100644
index 0000000..d12d701
--- /dev/null
+++ b/examples/build.sbt
@@ -0,0 +1,3 @@
+name := "kafka-java-examples"
+
+crossPaths := false
diff --git a/project/Build.scala b/project/Build.scala
new file mode 100644
index 0000000..7124fa9
--- /dev/null
+++ b/project/Build.scala
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import sbt._
+import Keys._
+import java.io.File
+
+object KafkaBuild extends Build {
+ val commonSettings = Seq(
+ version := "0.8.0",
+ organization := "org.apache.kafka",
+ scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"),
+ crossScalaVersions := Seq("2.8.0", "2.9.1"),
+ scalaVersion := "2.8.0",
+ javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"),
+ parallelExecution in Test := false, // Prevent tests from overrunning each other
+ libraryDependencies ++= Seq(
+ "log4j" % "log4j" % "1.2.15",
+ "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+ ),
+ // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+ // some dependencies on various sun and javax packages.
+ ivyXML :=
+
+
+
+
+
+
+ )
+
+ val hadoopSettings = Seq(
+ javacOptions ++= Seq("-Xlint:deprecation"),
+ libraryDependencies ++= Seq(
+ "org.apache.avro" % "avro" % "1.4.0",
+ "org.apache.pig" % "pig" % "0.8.0",
+ "commons-logging" % "commons-logging" % "1.0.4",
+ "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5",
+ "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5",
+ "org.apache.hadoop" % "hadoop-core" % "0.20.2"
+ ),
+ ivyXML :=
+
+
+ )
+
+ val runRat = TaskKey[Unit]("run-rat-task", "Runs Apache rat on Kafka")
+ val runRatTask = runRat := {
+ "bin/run-rat.sh" !
+ }
+
+ lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib).settings((commonSettings ++ runRatTask): _*).settings(Release.releaseZipTask)
+ lazy val core = Project(id = "core", base = file("core")).settings((commonSettings) :_*)
+ lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core)
+ lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core)
+
+ lazy val contrib = Project(id = "contrib", base = file("contrib")).aggregate(hadoopProducer, hadoopConsumer).settings(commonSettings :_*)
+ lazy val hadoopProducer = Project(id = "hadoop producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
+ lazy val hadoopConsumer = Project(id = "hadoop consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
+}
diff --git a/project/Release.scala b/project/Release.scala
new file mode 100644
index 0000000..44ded64
--- /dev/null
+++ b/project/Release.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import sbt._
+import Keys._
+import java.io.File
+
+object Release {
+ val releaseZip = TaskKey[Unit]("release-zip", "Compiles, packages and zips together all dependencies, configs, scripts and generated jars")
+ val releaseZipTask = releaseZip <<= (streams, scalaVersion, baseDirectory,target,artifact,moduleName) {
+ (streams: Task[TaskStreams], scalaVersion: String, baseDirectory: File, target: File, artifact: Artifact, moduleName: String) => streams map {
+ s => {
+ val distDir = new File(target, "dist-" + scalaVersion)
+ val configSrc = new File(baseDirectory, "config")
+ val binSrc = new File(baseDirectory, "bin")
+ s.log.info("Generating dist structure for Scala " + scalaVersion + " under " + distDir)
+ }
+ }
+ }
+}
diff --git a/project/build.properties b/project/build.properties
index a8895d3..a268eb4 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -14,11 +14,5 @@
# limitations under the License.
#Project properties
#Mon Feb 28 11:55:49 PST 2011
-project.name=Kafka
-sbt.version=0.7.5
-project.version=0.8.0
-build.scala.versions=2.8.0
-contrib.root.dir=contrib
-lib.dir=lib
-target.dir=target/scala_2.8.0
-dist.dir=dist
+sbt.version=0.11.3
+
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
deleted file mode 100644
index 5ad1739..0000000
--- a/project/build/KafkaProject.scala
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import sbt._
-import scala.xml.{Node, Elem, NodeSeq}
-import scala.xml.transform.{RewriteRule, RuleTransformer}
-
-class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
- lazy val core = project("core", "core-kafka", new CoreKafkaProject(_))
- lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core)
- lazy val contrib = project("contrib", "contrib", new ContribProject(_))
- lazy val perf = project("perf", "perf", new KafkaPerfProject(_))
-
- lazy val releaseZipTask = core.packageDistTask
-
- val releaseZipDescription = "Compiles every sub project, runs unit tests, creates a deployable release zip file with dependencies, config, and scripts."
- lazy val releaseZip = releaseZipTask dependsOn(core.corePackageAction, core.test, examples.examplesPackageAction,
- contrib.producerPackageAction, contrib.consumerPackageAction) describedAs releaseZipDescription
-
- val runRatDescription = "Runs Apache rat on Kafka"
- lazy val runRatTask = task {
- Runtime.getRuntime().exec("bin/run-rat.sh")
- None
- } describedAs runRatDescription
-
- val rat = "org.apache.rat" % "apache-rat" % "0.8"
-
- class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject with CoreDependencies with TestDependencies with CompressionDependencies {
- val corePackageAction = packageAllAction
-
- //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
- // some dependencies on various sun and javax packages.
- override def ivyXML =
-
-
-
-
-
-
-
-
-
-
-
-
- def zkClientDep =
-
- zkclient
- zkclient
- 20120522
- compile
-
-
- object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
- override def transform(node: Node): Seq[Node] = node match {
- case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
- Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep :_*)
- }
- case other => other
- }
- })
-
- override def pomPostProcess(pom: Node): Node = {
- ZkClientDepAdder(pom)
- }
-
- override def artifactID = "kafka"
- override def filterScalaJars = false
-
- // build the executable jar's classpath.
- // (why is it necessary to explicitly remove the target/{classes,resources} paths? hm.)
- def dependentJars = {
- val jars =
- publicClasspath +++ mainDependencies.scalaJars --- mainCompilePath --- mainResourcesOutputPath
- if (jars.get.find { jar => jar.name.startsWith("scala-library-") }.isDefined) {
- // workaround bug in sbt: if the compiler is explicitly included, don't include 2 versions
- // of the library.
- jars --- jars.filter { jar =>
- jar.absolutePath.contains("/boot/") && jar.name == "scala-library.jar"
- }
- } else {
- jars
- }
- }
-
- def dependentJarNames = dependentJars.getFiles.map(_.getName).filter(_.endsWith(".jar"))
- override def manifestClassPath = Some(dependentJarNames.map { "libs/" + _ }.mkString(" "))
-
- def distName = (artifactID + "-" + projectVersion.value)
- def distPath = "dist" / distName ##
-
- def configPath = "config" ##
- def configOutputPath = distPath / "config"
-
- def binPath = "bin" ##
- def binOutputPath = distPath / "bin"
-
- def distZipName = {
- "%s-%s.zip".format(artifactID, projectVersion.value)
- }
-
- lazy val packageDistTask = task {
- distPath.asFile.mkdirs()
- (distPath / "libs").asFile.mkdirs()
- binOutputPath.asFile.mkdirs()
- configOutputPath.asFile.mkdirs()
-
- FileUtilities.copyFlat(List(jarPath), distPath, log).left.toOption orElse
- FileUtilities.copyFlat(dependentJars.get, distPath / "libs", log).left.toOption orElse
- FileUtilities.copy((configPath ***).get, configOutputPath, log).left.toOption orElse
- FileUtilities.copy((binPath ***).get, binOutputPath, log).left.toOption orElse
- FileUtilities.zip((("dist" / distName) ##).get, "dist" / distZipName, true, log)
- None
- }
-
- val PackageDistDescription = "Creates a deployable zip file with dependencies, config, and scripts."
- lazy val packageDist = packageDistTask dependsOn(`package`, `test`) describedAs PackageDistDescription
-
- val cleanDist = cleanTask("dist" ##) describedAs("Erase any packaged distributions.")
- override def cleanAction = super.cleanAction dependsOn(cleanDist)
-
- override def javaCompileOptions = super.javaCompileOptions ++
- List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
-
- override def packageAction = super.packageAction dependsOn (testCompileAction)
-
- }
-
- class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies {
- val perfPackageAction = packageAllAction
- val dependsOnCore = core
-
- //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
- // some dependencies on various sun and javax packages.
- override def ivyXML =
-
-
-
-
-
-
-
-
- override def artifactID = "kafka-perf"
- override def filterScalaJars = false
- override def javaCompileOptions = super.javaCompileOptions ++
- List(JavaCompileOption("-Xlint:unchecked"))
- }
-
- class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies {
- val examplesPackageAction = packageAllAction
- val dependsOnCore = core
- //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
- // some dependencies on various sun and javax packages.
- override def ivyXML =
-
-
-
-
-
-
-
-
- override def artifactID = "kafka-java-examples"
- override def filterScalaJars = false
- override def javaCompileOptions = super.javaCompileOptions ++
- List(JavaCompileOption("-Xlint:unchecked"))
- }
-
- class ContribProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
- lazy val hadoopProducer = project("hadoop-producer", "hadoop producer",
- new HadoopProducerProject(_), core)
- lazy val hadoopConsumer = project("hadoop-consumer", "hadoop consumer",
- new HadoopConsumerProject(_), core)
-
- val producerPackageAction = hadoopProducer.producerPackageAction
- val consumerPackageAction = hadoopConsumer.consumerPackageAction
-
- class HadoopProducerProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies with HadoopDependencies {
- val producerPackageAction = packageAllAction
- override def ivyXML =
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- }
-
- class HadoopConsumerProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies {
- val consumerPackageAction = packageAllAction
- override def ivyXML =
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- val jodaTime = "joda-time" % "joda-time" % "1.6"
- }
- }
-
- trait TestDependencies {
- val easymock = "org.easymock" % "easymock" % "3.0" % "test"
- val junit = "junit" % "junit" % "4.1" % "test"
- val scalaTest = "org.scalatest" % "scalatest" % "1.2" % "test"
- }
-
- trait CoreDependencies {
- val log4j = "log4j" % "log4j" % "1.2.15"
- val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
- }
-
- trait HadoopDependencies {
- val avro = "org.apache.avro" % "avro" % "1.4.0"
- val commonsLogging = "commons-logging" % "commons-logging" % "1.0.4"
- val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5"
- val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5"
- val hadoop = "org.apache.hadoop" % "hadoop-core" % "0.20.2"
- }
-
- trait CompressionDependencies {
- val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1"
- }
-
-}
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000..69cd6a3
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1,10 @@
+resolvers ++= Seq(
+ "sbt-idea-repo" at "http://mpeltonen.github.com/maven/",
+ "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
+ "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
+)
+
+libraryDependencies ++= Seq(
+ Defaults.sbtPluginExtra("com.eed3si9n" % "sbt-assembly" % "0.7.2", "0.11.2", "2.9.1" ),
+ Defaults.sbtPluginExtra("com.github.mpeltonen" % "sbt-idea" % "0.11.0", "0.11.2", "2.9.1")
+)
diff --git a/project/plugins/Plugins.scala b/project/plugins/Plugins.scala
deleted file mode 100644
index 0777d82..0000000
--- a/project/plugins/Plugins.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import sbt._
-
-class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
- val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/"
- val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT"
-}