diff --git a/.gitignore b/.gitignore index 1fc794d..6d53774 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ project/sbt_project_definition.iml *~ *# .#* +rat.out diff --git a/bin/run-rat.sh b/bin/run-rat.sh old mode 100644 new mode 100755 index 28c0ccd..1b7bc31 --- a/bin/run-rat.sh +++ b/bin/run-rat.sh @@ -23,7 +23,7 @@ else JAVA="$JAVA_HOME/bin/java" fi -rat_command="$JAVA -jar $base_dir/lib_managed/scala_2.8.0/compile/apache-rat-0.8.jar --dir $base_dir " +rat_command="$JAVA -jar $base_dir/lib/apache-rat-0.8.jar --dir $base_dir " for f in $(cat $rat_excludes_file); do diff --git a/contrib/hadoop-consumer/build.sbt b/contrib/hadoop-consumer/build.sbt new file mode 100644 index 0000000..02e95eb --- /dev/null +++ b/contrib/hadoop-consumer/build.sbt @@ -0,0 +1 @@ +crossPaths := false diff --git a/contrib/hadoop-producer/build.sbt b/contrib/hadoop-producer/build.sbt new file mode 100644 index 0000000..02e95eb --- /dev/null +++ b/contrib/hadoop-producer/build.sbt @@ -0,0 +1 @@ +crossPaths := false diff --git a/core/build.sbt b/core/build.sbt new file mode 100644 index 0000000..df58410 --- /dev/null +++ b/core/build.sbt @@ -0,0 +1,24 @@ +import sbt._ +import Keys._ + +name := "kafka" + +resolvers ++= Seq( + "SonaType ScalaTest repo" at "https://oss.sonatype.org/content/groups/public/org/scalatest/" +) + +libraryDependencies ++= Seq( + "org.apache.zookeeper" % "zookeeper" % "3.3.4", + "com.github.sgroschupf" % "zkclient" % "0.1", + "org.xerial.snappy" % "snappy-java" % "1.0.4.1", + "org.easymock" % "easymock" % "3.0" % "test", + "junit" % "junit" % "4.1" % "test" +) + +libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => + deps :+ (sv match { + case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" + case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" + }) +} + diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index f72eed1..e0970ba 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -62,8 +62,9 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, } catch { case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining)) - if (attemptsRemaining == 0) + if (attemptsRemaining == 0) { throw e + } } } } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 5268e12..2ce795a 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -20,6 +20,7 @@ package kafka.producer import junit.framework.Assert import java.util.Properties import org.easymock.EasyMock +import org.easymock.IArgumentMatcher import kafka.api.ProducerRequest import org.apache.log4j.{Logger, Level} import org.junit.Test @@ -250,17 +251,37 @@ class AsyncProducerTest extends JUnitSuite { } + // A simple utility matcher for EasyMock to disregarding element order for arrays + class ArrayEqualsAnyOrder(private val expected: Array[_]) extends IArgumentMatcher { + def matches(actual: AnyRef): Boolean = actual match { + case a: Array[_] => expected.toSet == a.toSet + case _ => false + } + + def appendTo(buffer: StringBuffer) { + buffer.append("aryEqualsAnyOrder(") + buffer.append(expected.mkString("[",",","]")) + buffer.append(")") + } + } + + // Application function for the new EasyMock matcher + def aryEqualsAnyOrder(expected: Array[_]) = { + EasyMock.reportMatcher(new ArrayEqualsAnyOrder(expected)) + null + } + @Test def testCollateAndSerializeEvents() { val basicProducer = EasyMock.createMock(classOf[SyncProducer]) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1, - getMessageSetOfSize(List(message2), 5)), - new ProducerRequest(topic1, 0, - getMessageSetOfSize(List(message1), 5)), - new ProducerRequest(topic1, 1, - getMessageSetOfSize(List(message1), 5)), - new ProducerRequest(topic2, 0, - getMessageSetOfSize(List(message2), 5))))) + basicProducer.multiSend(aryEqualsAnyOrder(Array(new ProducerRequest(topic2, 1, + getMessageSetOfSize(List(message2), 5)), + new ProducerRequest(topic1, 0, + getMessageSetOfSize(List(message1), 5)), + new ProducerRequest(topic1, 1, + getMessageSetOfSize(List(message1), 5)), + new ProducerRequest(topic2, 0, + getMessageSetOfSize(List(message2), 5))))) EasyMock.expectLastCall basicProducer.close @@ -290,7 +311,6 @@ class AsyncProducerTest extends JUnitSuite { producer.close EasyMock.verify(basicProducer) - } private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = { diff --git a/examples/build.sbt b/examples/build.sbt new file mode 100644 index 0000000..d12d701 --- /dev/null +++ b/examples/build.sbt @@ -0,0 +1,3 @@ +name := "kafka-java-examples" + +crossPaths := false diff --git a/project/Build.scala b/project/Build.scala new file mode 100644 index 0000000..7bc767d --- /dev/null +++ b/project/Build.scala @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ +import Keys._ +import java.io.File + +object KafkaBuild extends Build { + val commonSettings = Seq( + version := "0.7.0", + organization := "org.apache.kafka", + scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"), + crossScalaVersions := Seq("2.8.0", "2.9.1", "2.9.2"), + javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"), + parallelExecution in Test := false, // Prevent tests from overrunning each other + libraryDependencies ++= Seq( + "log4j" % "log4j" % "1.2.15", + "net.sf.jopt-simple" % "jopt-simple" % "3.2" + ), + // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required + // some dependencies on various sun and javax packages. + ivyXML := + + + + + + + ) + + val hadoopSettings = Seq( + javacOptions ++= Seq("-Xlint:deprecation"), + libraryDependencies ++= Seq( + "org.apache.avro" % "avro" % "1.4.0", + "commons-logging" % "commons-logging" % "1.0.4", + "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5", + "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5", + "org.apache.hadoop" % "hadoop-core" % "0.20.2" + ) + ) + + val runRat = TaskKey[Unit]("run-rat-task", "Runs Apache rat on Kafka") + val runRatTask = runRat := { + "bin/run-rat.sh" ! + } + + val coreSettings: Seq[Project.Setting[_]] = Seq( +// artifactName := { +// (config: String, module: ModuleID, artifact: Artifact) => +// "kafka-" + module.revision + "." + artifact.extension +// } + ) + + lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib).settings(runRatTask).settings(Release.releaseZipTask) + lazy val core = Project(id = "core", base = file("core")).settings((commonSettings ++ coreSettings) :_*) + lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core) + lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core) + + lazy val contrib = Project(id = "contrib", base = file("contrib")) aggregate (hadoopProducer, hadoopConsumer) + lazy val hadoopProducer = Project(id = "hadoop producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) + lazy val hadoopConsumer = Project(id = "hadoop consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core) +} diff --git a/project/Release.scala b/project/Release.scala new file mode 100644 index 0000000..44ded64 --- /dev/null +++ b/project/Release.scala @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import sbt._ +import Keys._ +import java.io.File + +object Release { + val releaseZip = TaskKey[Unit]("release-zip", "Compiles, packages and zips together all dependencies, configs, scripts and generated jars") + val releaseZipTask = releaseZip <<= (streams, scalaVersion, baseDirectory,target,artifact,moduleName) { + (streams: Task[TaskStreams], scalaVersion: String, baseDirectory: File, target: File, artifact: Artifact, moduleName: String) => streams map { + s => { + val distDir = new File(target, "dist-" + scalaVersion) + val configSrc = new File(baseDirectory, "config") + val binSrc = new File(baseDirectory, "bin") + s.log.info("Generating dist structure for Scala " + scalaVersion + " under " + distDir) + } + } + } +} diff --git a/project/build.properties b/project/build.properties index 36df233..6a46e98 100644 --- a/project/build.properties +++ b/project/build.properties @@ -14,11 +14,4 @@ # limitations under the License. #Project properties #Mon Feb 28 11:55:49 PST 2011 -project.name=Kafka -sbt.version=0.7.5 -project.version=0.7.0 -build.scala.versions=2.8.0 -contrib.root.dir=contrib -lib.dir=lib -target.dir=target/scala_2.8.0 -dist.dir=dist +sbt.version=0.11.3 diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala deleted file mode 100644 index ae0494b..0000000 --- a/project/build/KafkaProject.scala +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import sbt._ -import scala.xml.{Node, Elem, NodeSeq} -import scala.xml.transform.{RewriteRule, RuleTransformer} - -class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject { - lazy val core = project("core", "core-kafka", new CoreKafkaProject(_)) - lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core) - lazy val contrib = project("contrib", "contrib", new ContribProject(_)) - lazy val perf = project("perf", "perf", new KafkaPerfProject(_)) - - lazy val releaseZipTask = core.packageDistTask - - val releaseZipDescription = "Compiles every sub project, runs unit tests, creates a deployable release zip file with dependencies, config, and scripts." - lazy val releaseZip = releaseZipTask dependsOn(core.corePackageAction, core.test, examples.examplesPackageAction, - contrib.producerPackageAction, contrib.consumerPackageAction) describedAs releaseZipDescription - - val runRatDescription = "Runs Apache rat on Kafka" - lazy val runRatTask = task { - Runtime.getRuntime().exec("bin/run-rat.sh") - None - } describedAs runRatDescription - - val rat = "org.apache.rat" % "apache-rat" % "0.8" - - class CoreKafkaProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject with CoreDependencies with TestDependencies with CompressionDependencies { - val corePackageAction = packageAllAction - - //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required - // some dependencies on various sun and javax packages. - override def ivyXML = - - - - - - - - - - - - - - - override def artifactID = "kafka" - override def filterScalaJars = false - - // build the executable jar's classpath. - // (why is it necessary to explicitly remove the target/{classes,resources} paths? hm.) - def dependentJars = { - val jars = - publicClasspath +++ mainDependencies.scalaJars --- mainCompilePath --- mainResourcesOutputPath - if (jars.get.find { jar => jar.name.startsWith("scala-library-") }.isDefined) { - // workaround bug in sbt: if the compiler is explicitly included, don't include 2 versions - // of the library. - jars --- jars.filter { jar => - jar.absolutePath.contains("/boot/") && jar.name == "scala-library.jar" - } - } else { - jars - } - } - - def dependentJarNames = dependentJars.getFiles.map(_.getName).filter(_.endsWith(".jar")) - override def manifestClassPath = Some(dependentJarNames.map { "libs/" + _ }.mkString(" ")) - - def distName = (artifactID + "-" + projectVersion.value) - def distPath = "dist" / distName ## - - def configPath = "config" ## - def configOutputPath = distPath / "config" - - def binPath = "bin" ## - def binOutputPath = distPath / "bin" - - def distZipName = { - "%s-%s.zip".format(artifactID, projectVersion.value) - } - - lazy val packageDistTask = task { - distPath.asFile.mkdirs() - (distPath / "libs").asFile.mkdirs() - binOutputPath.asFile.mkdirs() - configOutputPath.asFile.mkdirs() - - FileUtilities.copyFlat(List(jarPath), distPath, log).left.toOption orElse - FileUtilities.copyFlat(dependentJars.get, distPath / "libs", log).left.toOption orElse - FileUtilities.copy((configPath ***).get, configOutputPath, log).left.toOption orElse - FileUtilities.copy((binPath ***).get, binOutputPath, log).left.toOption orElse - FileUtilities.zip((("dist" / distName) ##).get, "dist" / distZipName, true, log) - None - } - - val PackageDistDescription = "Creates a deployable zip file with dependencies, config, and scripts." - lazy val packageDist = packageDistTask dependsOn(`package`, `test`) describedAs PackageDistDescription - - val cleanDist = cleanTask("dist" ##) describedAs("Erase any packaged distributions.") - override def cleanAction = super.cleanAction dependsOn(cleanDist) - - override def javaCompileOptions = super.javaCompileOptions ++ - List(JavaCompileOption("-source"), JavaCompileOption("1.5")) - - override def packageAction = super.packageAction dependsOn (testCompileAction) - - } - - class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies { - val perfPackageAction = packageAllAction - val dependsOnCore = core - - //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required - // some dependencies on various sun and javax packages. - override def ivyXML = - - - - - - - - - override def artifactID = "kafka-perf" - override def filterScalaJars = false - override def javaCompileOptions = super.javaCompileOptions ++ - List(JavaCompileOption("-Xlint:unchecked")) - } - - class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies { - val examplesPackageAction = packageAllAction - val dependsOnCore = core - //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required - // some dependencies on various sun and javax packages. - override def ivyXML = - - - - - - - - - override def artifactID = "kafka-java-examples" - override def filterScalaJars = false - override def javaCompileOptions = super.javaCompileOptions ++ - List(JavaCompileOption("-Xlint:unchecked")) - } - - class ContribProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject { - lazy val hadoopProducer = project("hadoop-producer", "hadoop producer", - new HadoopProducerProject(_), core) - lazy val hadoopConsumer = project("hadoop-consumer", "hadoop consumer", - new HadoopConsumerProject(_), core) - - val producerPackageAction = hadoopProducer.producerPackageAction - val consumerPackageAction = hadoopConsumer.consumerPackageAction - - class HadoopProducerProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies with HadoopDependencies { - val producerPackageAction = packageAllAction - override def ivyXML = - - - - - - - - - - - - - } - - class HadoopConsumerProject(info: ProjectInfo) extends DefaultProject(info) - with IdeaProject - with CoreDependencies { - val consumerPackageAction = packageAllAction - override def ivyXML = - - - - - - - - - - - - - - val jodaTime = "joda-time" % "joda-time" % "1.6" - } - } - - trait TestDependencies { - val easymock = "org.easymock" % "easymock" % "3.0" % "test" - val junit = "junit" % "junit" % "4.1" % "test" - val scalaTest = "org.scalatest" % "scalatest" % "1.2" % "test" - } - - trait CoreDependencies { - val log4j = "log4j" % "log4j" % "1.2.15" - val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2" - } - - trait HadoopDependencies { - val avro = "org.apache.avro" % "avro" % "1.4.0" - val commonsLogging = "commons-logging" % "commons-logging" % "1.0.4" - val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5" - val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5" - val hadoop = "org.apache.hadoop" % "hadoop-core" % "0.20.2" - } - - trait CompressionDependencies { - val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1" - } - -} diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..c0c73ef --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,8 @@ +resolvers ++= Seq( + "sbt-idea-repo" at "http://mpeltonen.github.com/maven/" +) + +libraryDependencies ++= Seq( + Defaults.sbtPluginExtra("com.eed3si9n" % "sbt-assembly" % "0.7.2", "0.11.2", "2.9.1" ), + Defaults.sbtPluginExtra("com.github.mpeltonen" % "sbt-idea" % "0.11.0", "0.11.2", "2.9.1") +) diff --git a/project/plugins/Plugins.scala b/project/plugins/Plugins.scala deleted file mode 100644 index 0777d82..0000000 --- a/project/plugins/Plugins.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import sbt._ - -class Plugins(info: ProjectInfo) extends PluginDefinition(info) { - val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/" - val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT" -}