diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index 4b1d117..acc51fa 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -74,7 +74,7 @@ public class DataGenerator { producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE)); producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); - + producerProps.put("strict.verification", "true"); _producer = new Producer(new ProducerConfig(producerProps)); } diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java index 417b4b3..dc6e329 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java @@ -16,25 +16,22 @@ */ package kafka.bridge.hadoop; -import java.io.IOException; -import java.net.URI; -import java.util.*; - import kafka.common.KafkaException; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.log4j.Logger; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + public class KafkaOutputFormat extends OutputFormat { private Logger log = Logger.getLogger(KafkaOutputFormat.class); @@ -138,6 +135,7 @@ public class KafkaOutputFormat extends OutputFormat } else throw new KafkaException("missing scheme from kafka uri (must be kafka://)"); + props.put("strict.verification", "true"); Producer producer = new Producer(new ProducerConfig(props)); return new KafkaRecordWriter(producer, topic, queueBytes); } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index cc526ec..924882e 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -84,6 +84,7 @@ object ClientUtils extends Logging{ props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(",")) props.put("client.id", clientId) props.put("request.timeout.ms", timeoutMs.toString) + props.put("strict.verification", "true") val producerConfig = new ProducerConfig(props) fetchTopicMetadata(topics, brokers, producerConfig, correlationId) } diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 719beb5..f13ef4b 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -166,6 +166,7 @@ object ConsoleConsumer extends Logging { props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString) props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString) + props.put("strict.verification", "true") val config = new ConsumerConfig(props) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c8c4212..9051087 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -75,7 +75,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) - props.verify() + props.verify(strictVerification) } /** a string that uniquely identifies a set of consumers within the same consumer group */ diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 5539bce..704b733 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -140,6 +140,7 @@ object ConsoleProducer { props.put("key.serializer.class", keyEncoderClass) props.put("serializer.class", valueEncoderClass) props.put("send.buffer.bytes", socketBuffer.toString) + props.put("strict.verification", "true") val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 88ae784..c1fedf1 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -80,6 +80,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout) if(queueSize != null) props.put("queue.buffering.max.messages", queueSize) if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString) + props.put("strict.verification", "true") val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList) diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 7947b18..338968b 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -19,7 +19,7 @@ package kafka.producer import async.AsyncProducerConfig import java.util.Properties -import kafka.utils.{Utils, VerifiableProperties} +import kafka.utils.{BaseConfig, Utils, VerifiableProperties} import kafka.message.{CompressionCodec, NoCompressionCodec} import kafka.common.{InvalidConfigException, Config} @@ -49,12 +49,12 @@ object ProducerConfig extends Config { } class ProducerConfig private (val props: VerifiableProperties) - extends AsyncProducerConfig with SyncProducerConfigShared { + extends BaseConfig(props) with AsyncProducerConfig with SyncProducerConfigShared { import ProducerConfig._ def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) - props.verify() + props.verify(strictVerification) } /** This is for bootstrapping and the producer will only use it for getting metadata diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 41c9626..c32c0e3 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -29,7 +29,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) - props.verify() + props.verify(strictVerification) } /*********** General Configuration ***********/ diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 814d61a..a3f7391 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -48,6 +48,7 @@ object ReplayLogProducer extends Logging { consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString) consumerProps.put("fetch.message.max.bytes", (1024*1024).toString) consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) + consumerProps.put("strict.verification", "true") val consumerConfig = new ConsumerConfig(consumerProps) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads)) @@ -145,7 +146,8 @@ object ReplayLogProducer extends Logging { props.put("compression.codec", config.compressionCodec.codec.toString) props.put("batch.num.messages", config.batchSize.toString) props.put("queue.enqueue.timeout.ms", "-1") - + props.put("strict.verification", "true") + if(config.isAsync) props.put("producer.type", "async") diff --git a/core/src/main/scala/kafka/utils/BaseConfig.scala b/core/src/main/scala/kafka/utils/BaseConfig.scala new file mode 100644 index 0000000..0273dd6 --- /dev/null +++ b/core/src/main/scala/kafka/utils/BaseConfig.scala @@ -0,0 +1,11 @@ +package kafka.utils + +/** + * + * + */ +class BaseConfig(props: VerifiableProperties) { + + /** strict verification of configuration properties */ + val strictVerification = props.getBoolean("strict.verification", false) +} diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index d694ba9..3209ef2 100644 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -176,13 +176,16 @@ class VerifiableProperties(val props: Properties) extends Logging { } } - def verify() { + def verify(strict: Boolean) { info("Verifying properties") val specifiedProperties = props.propertyNames() while (specifiedProperties.hasMoreElements) { val key = specifiedProperties.nextElement().asInstanceOf[String] if (!referenceSet.contains(key)) - warn("Property %s is not valid".format(key)) + if (strict) + throw new VerificationException("Property %s is not valid".format(key)) + else + warn("Property %s is not valid".format(key)) else info("Property %s is overridden to %s".format(key, props.getProperty(key))) } diff --git a/core/src/main/scala/kafka/utils/VerificationException.scala b/core/src/main/scala/kafka/utils/VerificationException.scala new file mode 100644 index 0000000..df7aac4 --- /dev/null +++ b/core/src/main/scala/kafka/utils/VerificationException.scala @@ -0,0 +1,9 @@ +package kafka.utils + +/** + * + * + */ +class VerificationException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index d53d511..86b9e16 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -368,7 +368,7 @@ object ZkUtils extends Logging { case e2 => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -391,7 +391,7 @@ object ZkUtils extends Logging { case e2 => throw e2 } } - + def maybeDeletePath(zkUrl: String, dir: String) { try { val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) @@ -621,7 +621,7 @@ object ZkUtils extends Logging { partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId)) }.flatten } - + def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = { // read the partitions and their new replica list val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1 @@ -763,7 +763,7 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) } -class ZKConfig(props: VerifiableProperties) { +class ZKConfig(props: VerifiableProperties) extends BaseConfig(props) { /** ZK host string */ val zkConnect = props.getString("zookeeper.connect") diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index c4aed10..97c0bc8 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -40,7 +40,8 @@ object TestEndToEndLatency { consumerProps.put("auto.offset.reset", "largest") consumerProps.put("zookeeper.connect", zkConnect) consumerProps.put("socket.timeout.ms", 1201000.toString) - + consumerProps.put("strict.verification", "true") + val config = new ConsumerConfig(consumerProps) val connector = Consumer.create(config) var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head @@ -49,6 +50,7 @@ object TestEndToEndLatency { val producerProps = new Properties() producerProps.put("metadata.broker.list", brokerList) producerProps.put("producer.type", "sync") + producerProps.put("strict.verification", "true") val producer = new Producer[Any, Any](new ProducerConfig(producerProps)) val message = new Message("hello there beautiful".getBytes) @@ -68,4 +70,4 @@ object TestEndToEndLatency { connector.shutdown() System.exit(0) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index fcfc583..40ad8fb 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -351,6 +351,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar props.put("compression.codec", compression.codec.toString) props.put("key.serializer.class", classOf[IntEncoder].getName.toString) props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put("strict.verification", "true") val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) @@ -370,6 +371,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") props.put("key.serializer.class", classOf[IntEncoder].getName.toString) props.put("serializer.class", classOf[StringEncoder].getName) + props.put("strict.verification", "true") val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) for (partition <- 0 until numParts) { val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index f764151..23e0573 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -30,6 +30,7 @@ import scala.collection._ import kafka.admin.CreateTopicCommand import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.utils.{TestUtils, Utils} +import kafka.message.DefaultCompressionCodec /** * End to end tests of the primitive apis against a local server @@ -107,7 +108,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" val props = producer.config.props.props - props.put("compression", "true") + props.put("compression.codec", DefaultCompressionCodec.name) val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 1781bc0..476f2b3 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -66,6 +66,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("queue.buffering.max.messages", "10") props.put("batch.num.messages", "1") props.put("queue.enqueue.timeout.ms", "0") + props.put("strict.verification", "true") val config = new ProducerConfig(props) val produceData = getProduceData(12) @@ -89,6 +90,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") props.put("batch.num.messages", "1") + props.put("strict.verification", "true") val config = new ProducerConfig(props) val produceData = getProduceData(10) @@ -166,6 +168,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("strict.verification", "true") + val broker1 = new Broker(0, "localhost", 9092) val broker2 = new Broker(1, "localhost", 9093) broker1 @@ -216,6 +220,7 @@ class AsyncProducerTest extends JUnit3Suite { val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m)) val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("strict.verification", "true") val config = new ProducerConfig(props) // form expected partitions metadata val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092) @@ -242,6 +247,8 @@ class AsyncProducerTest extends JUnit3Suite { producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes))) val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("strict.verification", "true") + val config = new ProducerConfig(props) // form expected partitions metadata @@ -272,6 +279,7 @@ class AsyncProducerTest extends JUnit3Suite { def testNoBroker() { val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("strict.verification", "true") val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -303,6 +311,8 @@ class AsyncProducerTest extends JUnit3Suite { def testIncompatibleEncoder() { val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("strict.verification", "true") + val config = new ProducerConfig(props) val producer=new Producer[String, String](config) @@ -320,6 +330,8 @@ class AsyncProducerTest extends JUnit3Suite { def testRandomPartitioner() { val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("strict.verification", "true") + val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -360,6 +372,7 @@ class AsyncProducerTest extends JUnit3Suite { val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") props.put("batch.num.messages", "5") + props.put("strict.verification", "true") val config = new ProducerConfig(props) @@ -397,6 +410,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) + props.put("strict.verification", "true") val config = new ProducerConfig(props) @@ -471,6 +485,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("producer.type", "async") + props.put("strict.verification", "true") + try { new ProducerConfig(props) fail("should complain about wrong config") diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index b511d90..ba6b6b1 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -100,6 +100,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val props1 = new util.Properties() props1.put("metadata.broker.list", "localhost:80,localhost:81") props1.put("serializer.class", "kafka.serializer.StringEncoder") + props1.put("strict.verification", "true") val producerConfig1 = new ProducerConfig(props1) val producer1 = new Producer[String, String](producerConfig1) try{ @@ -115,6 +116,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val props2 = new util.Properties() props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1))) props2.put("serializer.class", "kafka.serializer.StringEncoder") + props2.put("strict.verification", "true") val producerConfig2= new ProducerConfig(props2) val producer2 = new Producer[String, String](producerConfig2) try{ @@ -128,6 +130,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val props3 = new util.Properties() props3.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props3.put("serializer.class", "kafka.serializer.StringEncoder") + props3.put("strict.verification", "true") val producerConfig3 = new ProducerConfig(props3) val producer3 = new Producer[String, String](producerConfig3) try{ @@ -147,11 +150,13 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props1.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props1.put("request.required.acks", "2") props1.put("request.timeout.ms", "1000") + props1.put("strict.verification", "true") val props2 = new util.Properties() props2.putAll(props1) props2.put("request.required.acks", "3") props2.put("request.timeout.ms", "1000") + props2.put("strict.verification", "true") val producerConfig1 = new ProducerConfig(props1) val producerConfig2 = new ProducerConfig(props2) @@ -206,6 +211,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("request.timeout.ms", "2000") props.put("request.required.acks", "1") props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put("strict.verification", "true") val topic = "new-topic" // create topic @@ -266,6 +272,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) props.put("request.required.acks", "1") props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") + props.put("strict.verification", "true") + val config = new ProducerConfig(props) val producer = new Producer[String, String](config) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index db46247..d91fbab 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -51,6 +51,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) producerProps.put("request.required.acks", "-1") + producerProps.put("strict.verification", "true") + def testHWCheckpointNoFailuresSingleLogSegment { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a4dcca6..61865d1 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -131,6 +131,8 @@ object TestUtils extends Logging { props.put("log.flush.interval.messages", "1") props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") + props.put("strict.verification", "true") + props } @@ -149,6 +151,7 @@ object TestUtils extends Logging { props.put("auto.commit.interval.ms", "1000") props.put("rebalance.max.retries", "4") props.put("auto.offset.reset", "smallest") + props.put("strict.verification", "true") props } @@ -296,10 +299,10 @@ object TestUtils extends Logging { val props = new Properties() props.put("metadata.broker.list", brokerList) props.put("send.buffer.bytes", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") props.put("serializer.class", encoder.getClass.getCanonicalName) props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName) + props.put("strict.verification", "true") + new Producer[K, V](new ProducerConfig(props)) } @@ -312,6 +315,7 @@ object TestUtils extends Logging { props.put("request.timeout.ms", "500") props.put("request.required.acks", "-1") props.put("serializer.class", classOf[StringEncoder].getName.toString) + props.put("strict.verification", "true") props } diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 63f099a..f7e260c 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -17,15 +17,15 @@ package kafka.examples; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; public class Consumer extends Thread @@ -48,6 +48,7 @@ public class Consumer extends Thread props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); + props.put("strict.verification", "true"); return new ConsumerConfig(props); diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 96e9893..ae5c139 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -17,10 +17,11 @@ package kafka.examples; -import java.util.Properties; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; +import java.util.Properties; + public class Producer extends Thread { private final kafka.javaapi.producer.Producer producer; @@ -31,6 +32,8 @@ public class Producer extends Thread { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "localhost:9092"); + props.put("strict.verification", "true"); + // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 3158a22..3737073 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -130,6 +130,7 @@ object ConsumerPerformance { props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", "5000") + props.put("strict.verification", "true") val consumerConfig = new ConsumerConfig(props) val numThreads = options.valueOf(numThreadsOpt).intValue val topic = options.valueOf(topicOpt) diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index ad2ac26..6494fad 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -190,7 +190,7 @@ object ProducerPerformance extends Logging { props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) props.put("serializer.class", classOf[DefaultEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString) - + props.put("strict.verification", "true") val producerConfig = new ProducerConfig(props) val producer = new Producer[Long, Array[Byte]](producerConfig)