diff --git a/.gitignore b/.gitignore
index 1fc794d..6d53774 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,4 @@ project/sbt_project_definition.iml
*~
*#
.#*
+rat.out
diff --git a/bin/run-rat.sh b/bin/run-rat.sh
old mode 100644
new mode 100755
index 28c0ccd..1b7bc31
--- a/bin/run-rat.sh
+++ b/bin/run-rat.sh
@@ -23,7 +23,7 @@ else
JAVA="$JAVA_HOME/bin/java"
fi
-rat_command="$JAVA -jar $base_dir/lib_managed/scala_2.8.0/compile/apache-rat-0.8.jar --dir $base_dir "
+rat_command="$JAVA -jar $base_dir/lib/apache-rat-0.8.jar --dir $base_dir "
for f in $(cat $rat_excludes_file);
do
diff --git a/contrib/hadoop-consumer/build.sbt b/contrib/hadoop-consumer/build.sbt
new file mode 100644
index 0000000..02e95eb
--- /dev/null
+++ b/contrib/hadoop-consumer/build.sbt
@@ -0,0 +1 @@
+crossPaths := false
diff --git a/contrib/hadoop-producer/build.sbt b/contrib/hadoop-producer/build.sbt
new file mode 100644
index 0000000..02e95eb
--- /dev/null
+++ b/contrib/hadoop-producer/build.sbt
@@ -0,0 +1 @@
+crossPaths := false
diff --git a/core/build.sbt b/core/build.sbt
new file mode 100644
index 0000000..df58410
--- /dev/null
+++ b/core/build.sbt
@@ -0,0 +1,24 @@
+import sbt._
+import Keys._
+
+name := "kafka"
+
+resolvers ++= Seq(
+ "SonaType ScalaTest repo" at "https://oss.sonatype.org/content/groups/public/org/scalatest/"
+)
+
+libraryDependencies ++= Seq(
+ "org.apache.zookeeper" % "zookeeper" % "3.3.4",
+ "com.github.sgroschupf" % "zkclient" % "0.1",
+ "org.xerial.snappy" % "snappy-java" % "1.0.4.1",
+ "org.easymock" % "easymock" % "3.0" % "test",
+ "junit" % "junit" % "4.1" % "test"
+)
+
+libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
+ deps :+ (sv match {
+ case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test"
+ case _ => "org.scalatest" %% "scalatest" % "1.8" % "test"
+ })
+}
+
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index f72eed1..e0970ba 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -62,8 +62,9 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
}
catch {
case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining))
- if (attemptsRemaining == 0)
+ if (attemptsRemaining == 0) {
throw e
+ }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 5268e12..2ce795a 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -20,6 +20,7 @@ package kafka.producer
import junit.framework.Assert
import java.util.Properties
import org.easymock.EasyMock
+import org.easymock.IArgumentMatcher
import kafka.api.ProducerRequest
import org.apache.log4j.{Logger, Level}
import org.junit.Test
@@ -250,17 +251,37 @@ class AsyncProducerTest extends JUnitSuite {
}
+ // A simple utility matcher for EasyMock to disregarding element order for arrays
+ class ArrayEqualsAnyOrder(private val expected: Array[_]) extends IArgumentMatcher {
+ def matches(actual: AnyRef): Boolean = actual match {
+ case a: Array[_] => expected.toSet == a.toSet
+ case _ => false
+ }
+
+ def appendTo(buffer: StringBuffer) {
+ buffer.append("aryEqualsAnyOrder(")
+ buffer.append(expected.mkString("[",",","]"))
+ buffer.append(")")
+ }
+ }
+
+ // Application function for the new EasyMock matcher
+ def aryEqualsAnyOrder(expected: Array[_]) = {
+ EasyMock.reportMatcher(new ArrayEqualsAnyOrder(expected))
+ null
+ }
+
@Test
def testCollateAndSerializeEvents() {
val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
- getMessageSetOfSize(List(message2), 5)),
- new ProducerRequest(topic1, 0,
- getMessageSetOfSize(List(message1), 5)),
- new ProducerRequest(topic1, 1,
- getMessageSetOfSize(List(message1), 5)),
- new ProducerRequest(topic2, 0,
- getMessageSetOfSize(List(message2), 5)))))
+ basicProducer.multiSend(aryEqualsAnyOrder(Array(new ProducerRequest(topic2, 1,
+ getMessageSetOfSize(List(message2), 5)),
+ new ProducerRequest(topic1, 0,
+ getMessageSetOfSize(List(message1), 5)),
+ new ProducerRequest(topic1, 1,
+ getMessageSetOfSize(List(message1), 5)),
+ new ProducerRequest(topic2, 0,
+ getMessageSetOfSize(List(message2), 5)))))
EasyMock.expectLastCall
basicProducer.close
@@ -290,7 +311,6 @@ class AsyncProducerTest extends JUnitSuite {
producer.close
EasyMock.verify(basicProducer)
-
}
private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
diff --git a/examples/build.sbt b/examples/build.sbt
new file mode 100644
index 0000000..d12d701
--- /dev/null
+++ b/examples/build.sbt
@@ -0,0 +1,3 @@
+name := "kafka-java-examples"
+
+crossPaths := false
diff --git a/project/Build.scala b/project/Build.scala
new file mode 100644
index 0000000..7bc767d
--- /dev/null
+++ b/project/Build.scala
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import sbt._
+import Keys._
+import java.io.File
+
+object KafkaBuild extends Build {
+ val commonSettings = Seq(
+ version := "0.7.0",
+ organization := "org.apache.kafka",
+ scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"),
+ crossScalaVersions := Seq("2.8.0", "2.9.1", "2.9.2"),
+ javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"),
+ parallelExecution in Test := false, // Prevent tests from overrunning each other
+ libraryDependencies ++= Seq(
+ "log4j" % "log4j" % "1.2.15",
+ "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+ ),
+ // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+ // some dependencies on various sun and javax packages.
+ ivyXML :=
+
+
+
+
+
+
+ )
+
+ val hadoopSettings = Seq(
+ javacOptions ++= Seq("-Xlint:deprecation"),
+ libraryDependencies ++= Seq(
+ "org.apache.avro" % "avro" % "1.4.0",
+ "commons-logging" % "commons-logging" % "1.0.4",
+ "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5",
+ "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5",
+ "org.apache.hadoop" % "hadoop-core" % "0.20.2"
+ )
+ )
+
+ val runRat = TaskKey[Unit]("run-rat-task", "Runs Apache rat on Kafka")
+ val runRatTask = runRat := {
+ "bin/run-rat.sh" !
+ }
+
+ val coreSettings: Seq[Project.Setting[_]] = Seq(
+// artifactName := {
+// (config: String, module: ModuleID, artifact: Artifact) =>
+// "kafka-" + module.revision + "." + artifact.extension
+// }
+ )
+
+ lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib).settings(runRatTask).settings(Release.releaseZipTask)
+ lazy val core = Project(id = "core", base = file("core")).settings((commonSettings ++ coreSettings) :_*)
+ lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core)
+ lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core)
+
+ lazy val contrib = Project(id = "contrib", base = file("contrib")) aggregate (hadoopProducer, hadoopConsumer)
+ lazy val hadoopProducer = Project(id = "hadoop producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
+ lazy val hadoopConsumer = Project(id = "hadoop consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
+}
diff --git a/project/Release.scala b/project/Release.scala
new file mode 100644
index 0000000..44ded64
--- /dev/null
+++ b/project/Release.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import sbt._
+import Keys._
+import java.io.File
+
+object Release {
+ val releaseZip = TaskKey[Unit]("release-zip", "Compiles, packages and zips together all dependencies, configs, scripts and generated jars")
+ val releaseZipTask = releaseZip <<= (streams, scalaVersion, baseDirectory,target,artifact,moduleName) {
+ (streams: Task[TaskStreams], scalaVersion: String, baseDirectory: File, target: File, artifact: Artifact, moduleName: String) => streams map {
+ s => {
+ val distDir = new File(target, "dist-" + scalaVersion)
+ val configSrc = new File(baseDirectory, "config")
+ val binSrc = new File(baseDirectory, "bin")
+ s.log.info("Generating dist structure for Scala " + scalaVersion + " under " + distDir)
+ }
+ }
+ }
+}
diff --git a/project/build.properties b/project/build.properties
index 36df233..6a46e98 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -14,11 +14,4 @@
# limitations under the License.
#Project properties
#Mon Feb 28 11:55:49 PST 2011
-project.name=Kafka
-sbt.version=0.7.5
-project.version=0.7.0
-build.scala.versions=2.8.0
-contrib.root.dir=contrib
-lib.dir=lib
-target.dir=target/scala_2.8.0
-dist.dir=dist
+sbt.version=0.11.3
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
deleted file mode 100644
index ae0494b..0000000
--- a/project/build/KafkaProject.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import sbt._
-import scala.xml.{Node, Elem, NodeSeq}
-import scala.xml.transform.{RewriteRule, RuleTransformer}
-
-class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
- lazy val core = project("core", "core-kafka", new CoreKafkaProject(_))
- lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core)
- lazy val contrib = project("contrib", "contrib", new ContribProject(_))
- lazy val perf = project("perf", "perf", new KafkaPerfProject(_))
-
- lazy val releaseZipTask = core.packageDistTask
-
- val releaseZipDescription = "Compiles every sub project, runs unit tests, creates a deployable release zip file with dependencies, config, and scripts."
- lazy val releaseZip = releaseZipTask dependsOn(core.corePackageAction, core.test, examples.examplesPackageAction,
- contrib.producerPackageAction, contrib.consumerPackageAction) describedAs releaseZipDescription
-
- val runRatDescription = "Runs Apache rat on Kafka"
- lazy val runRatTask = task {
- Runtime.getRuntime().exec("bin/run-rat.sh")
- None
- } describedAs runRatDescription
-
- val rat = "org.apache.rat" % "apache-rat" % "0.8"
-
- class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject with CoreDependencies with TestDependencies with CompressionDependencies {
- val corePackageAction = packageAllAction
-
- //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
- // some dependencies on various sun and javax packages.
- override def ivyXML =
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- override def artifactID = "kafka"
- override def filterScalaJars = false
-
- // build the executable jar's classpath.
- // (why is it necessary to explicitly remove the target/{classes,resources} paths? hm.)
- def dependentJars = {
- val jars =
- publicClasspath +++ mainDependencies.scalaJars --- mainCompilePath --- mainResourcesOutputPath
- if (jars.get.find { jar => jar.name.startsWith("scala-library-") }.isDefined) {
- // workaround bug in sbt: if the compiler is explicitly included, don't include 2 versions
- // of the library.
- jars --- jars.filter { jar =>
- jar.absolutePath.contains("/boot/") && jar.name == "scala-library.jar"
- }
- } else {
- jars
- }
- }
-
- def dependentJarNames = dependentJars.getFiles.map(_.getName).filter(_.endsWith(".jar"))
- override def manifestClassPath = Some(dependentJarNames.map { "libs/" + _ }.mkString(" "))
-
- def distName = (artifactID + "-" + projectVersion.value)
- def distPath = "dist" / distName ##
-
- def configPath = "config" ##
- def configOutputPath = distPath / "config"
-
- def binPath = "bin" ##
- def binOutputPath = distPath / "bin"
-
- def distZipName = {
- "%s-%s.zip".format(artifactID, projectVersion.value)
- }
-
- lazy val packageDistTask = task {
- distPath.asFile.mkdirs()
- (distPath / "libs").asFile.mkdirs()
- binOutputPath.asFile.mkdirs()
- configOutputPath.asFile.mkdirs()
-
- FileUtilities.copyFlat(List(jarPath), distPath, log).left.toOption orElse
- FileUtilities.copyFlat(dependentJars.get, distPath / "libs", log).left.toOption orElse
- FileUtilities.copy((configPath ***).get, configOutputPath, log).left.toOption orElse
- FileUtilities.copy((binPath ***).get, binOutputPath, log).left.toOption orElse
- FileUtilities.zip((("dist" / distName) ##).get, "dist" / distZipName, true, log)
- None
- }
-
- val PackageDistDescription = "Creates a deployable zip file with dependencies, config, and scripts."
- lazy val packageDist = packageDistTask dependsOn(`package`, `test`) describedAs PackageDistDescription
-
- val cleanDist = cleanTask("dist" ##) describedAs("Erase any packaged distributions.")
- override def cleanAction = super.cleanAction dependsOn(cleanDist)
-
- override def javaCompileOptions = super.javaCompileOptions ++
- List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
-
- override def packageAction = super.packageAction dependsOn (testCompileAction)
-
- }
-
- class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies {
- val perfPackageAction = packageAllAction
- val dependsOnCore = core
-
- //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
- // some dependencies on various sun and javax packages.
- override def ivyXML =
-
-
-
-
-
-
-
-
- override def artifactID = "kafka-perf"
- override def filterScalaJars = false
- override def javaCompileOptions = super.javaCompileOptions ++
- List(JavaCompileOption("-Xlint:unchecked"))
- }
-
- class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies {
- val examplesPackageAction = packageAllAction
- val dependsOnCore = core
- //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
- // some dependencies on various sun and javax packages.
- override def ivyXML =
-
-
-
-
-
-
-
-
- override def artifactID = "kafka-java-examples"
- override def filterScalaJars = false
- override def javaCompileOptions = super.javaCompileOptions ++
- List(JavaCompileOption("-Xlint:unchecked"))
- }
-
- class ContribProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
- lazy val hadoopProducer = project("hadoop-producer", "hadoop producer",
- new HadoopProducerProject(_), core)
- lazy val hadoopConsumer = project("hadoop-consumer", "hadoop consumer",
- new HadoopConsumerProject(_), core)
-
- val producerPackageAction = hadoopProducer.producerPackageAction
- val consumerPackageAction = hadoopConsumer.consumerPackageAction
-
- class HadoopProducerProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies with HadoopDependencies {
- val producerPackageAction = packageAllAction
- override def ivyXML =
-
-
-
-
-
-
-
-
-
-
-
-
- }
-
- class HadoopConsumerProject(info: ProjectInfo) extends DefaultProject(info)
- with IdeaProject
- with CoreDependencies {
- val consumerPackageAction = packageAllAction
- override def ivyXML =
-
-
-
-
-
-
-
-
-
-
-
-
-
- val jodaTime = "joda-time" % "joda-time" % "1.6"
- }
- }
-
- trait TestDependencies {
- val easymock = "org.easymock" % "easymock" % "3.0" % "test"
- val junit = "junit" % "junit" % "4.1" % "test"
- val scalaTest = "org.scalatest" % "scalatest" % "1.2" % "test"
- }
-
- trait CoreDependencies {
- val log4j = "log4j" % "log4j" % "1.2.15"
- val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
- }
-
- trait HadoopDependencies {
- val avro = "org.apache.avro" % "avro" % "1.4.0"
- val commonsLogging = "commons-logging" % "commons-logging" % "1.0.4"
- val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5"
- val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5"
- val hadoop = "org.apache.hadoop" % "hadoop-core" % "0.20.2"
- }
-
- trait CompressionDependencies {
- val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1"
- }
-
-}
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000..c0c73ef
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1,8 @@
+resolvers ++= Seq(
+ "sbt-idea-repo" at "http://mpeltonen.github.com/maven/"
+)
+
+libraryDependencies ++= Seq(
+ Defaults.sbtPluginExtra("com.eed3si9n" % "sbt-assembly" % "0.7.2", "0.11.2", "2.9.1" ),
+ Defaults.sbtPluginExtra("com.github.mpeltonen" % "sbt-idea" % "0.11.0", "0.11.2", "2.9.1")
+)
diff --git a/project/plugins/Plugins.scala b/project/plugins/Plugins.scala
deleted file mode 100644
index 0777d82..0000000
--- a/project/plugins/Plugins.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import sbt._
-
-class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
- val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/"
- val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT"
-}