From 6bd973e8ec4dbbee3b0679b4ee76bd887bfdde07 Mon Sep 17 00:00:00 2001 From: "vishal.garg" Date: Sun, 7 Jun 2015 23:18:02 -0700 Subject: [PATCH] # ignite-428 --- modules/kafka/pom.xml | 128 ++++++++ .../org/apache/ignite/kafka/KafkaStreamer.java | 226 ++++++++++++++ .../apache/ignite/kafka/KafkaEmbeddedBroker.java | 329 +++++++++++++++++++++ .../ignite/kafka/KafkaIgniteStreamerTest.java | 184 ++++++++++++ .../org/apache/ignite/kafka/SimplePartitioner.java | 42 +++ 5 files changed, 909 insertions(+) create mode 100644 modules/kafka/pom.xml create mode 100644 modules/kafka/src/main/java/org/apache/ignite/kafka/KafkaStreamer.java create mode 100644 modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaEmbeddedBroker.java create mode 100644 modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaIgniteStreamerTest.java create mode 100644 modules/kafka/src/test/java/org/apache/ignite/kafka/SimplePartitioner.java diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml new file mode 100644 index 0000000..165ec1c --- /dev/null +++ b/modules/kafka/pom.xml @@ -0,0 +1,128 @@ + + + + + + + 4.0.0 + + + org.apache.ignite + ignite-parent + 1 + ../../parent + + + ignite-kafka + 1.1.1-SNAPSHOT + + + + org.apache.ignite + ignite-core + ${project.version} + + + org.apache.kafka + kafka_2.10 + 0.8.2.1 + + + com.sun.jmx + jmxri + + + com.sun.jdmk + jmxtools + + + net.sf.jopt-simple + jopt-simple + + + org.slf4j + slf4j-simple + + + org.apache.zookeeper + zookeeper + + + + + org.apache.zookeeper + zookeeper + 3.4.5 + + + + commons-io + commons-io + 2.4 + + + + org.apache.ignite + ignite-log4j + ${project.version} + + + + org.ow2.asm + asm-all + 4.2 + + + + + log4j + log4j + + + + org.gridgain + ignite-shmem + 1.0.0 + + + + commons-beanutils + commons-beanutils + 1.8.3 + test + + + + org.apache.ignite + ignite-spring + ${project.version} + test + + + + org.apache.ignite + ignite-core + ${project.version} + test-jar + test + + + + diff --git a/modules/kafka/src/main/java/org/apache/ignite/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/kafka/KafkaStreamer.java new file mode 100644 index 0000000..4674a00 --- /dev/null +++ b/modules/kafka/src/main/java/org/apache/ignite/kafka/KafkaStreamer.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.kafka; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.stream.StreamAdapter; + +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; +import kafka.serializer.Decoder; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Server that subscribes to topic messages from Kafka broker, streams its to key-value pairs into {@link org.apache.ignite.IgniteDataStreamer} instance. + *

+ * Uses Kafka's High Level Consumer API to read messages from Kafka + *

+ * + * @see Consumer Consumer Group Example + */ +public class KafkaStreamer + extends StreamAdapter { + + /** Logger. */ + private IgniteLogger log; + + /** Executor used to submit kafka streams. */ + private ExecutorService executor; + + /** Topic. */ + private String topic; + + /** Number of threads to process kafka streams. */ + private int threads; + + /** Kafka Consumer Config. */ + private ConsumerConfig consumerConfig; + + /** Key Decoder. */ + private Decoder keyDecoder; + + /** Value Decoder. */ + private Decoder valueDecoder; + + /** Kafka Consumer connector. */ + private ConsumerConnector consumer; + + /** + * Sets the topic. + * + * @param topic + */ + public void setTopic(final String topic) { + this.topic = topic; + } + + /** + * Sets the threads. + * + * @param threads + */ + public void setThreads(final int threads) { + this.threads = threads; + } + + /** + * Sets the consumer config. + * + * @param consumerConfig + */ + public void setConsumerConfig(final ConsumerConfig consumerConfig) { + this.consumerConfig = consumerConfig; + } + + /** + * Sets the key decoder. + * + * @param keyDecoder + */ + public void setKeyDecoder(final Decoder keyDecoder) { + this.keyDecoder = keyDecoder; + } + + /** + * Sets the value decoder. + * + * @param valueDecoder + */ + public void setValueDecoder(final Decoder valueDecoder) { + this.valueDecoder = valueDecoder; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.notNull(topic, "topic"); + A.notNull(keyDecoder, "key decoder"); + A.notNull(valueDecoder, "value decoder"); + A.notNull(consumerConfig, "kafka consumer config"); + A.ensure(threads > 0, "threads > 0"); + + log = getIgnite().log(); + + consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); + + Map topicCountMap = new HashMap(); + topicCountMap.put(topic, new Integer(threads)); + + Map>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, + valueDecoder); + + List> streams = consumerMap.get(topic); + + // Now launch all the consumer threads. + executor = Executors.newFixedThreadPool(threads); + + // Now create an object to consume the messages. + for (final KafkaStream stream : streams) { + executor.submit(new Runnable() { + @Override + public void run() { + ConsumerIterator it = stream.iterator(); + while (it.hasNext()) { + final MessageAndMetadata messageAndMetadata = it.next(); + getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message()); + } + } + }); + } + } + + /** + * Stops streamer. + */ + public void stop() { + if (consumer != null) + consumer.shutdown(); + if (executor != null) + executor.shutdown(); + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + if (log.isDebugEnabled()) + log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + } + } + catch (InterruptedException e) { + if (log.isDebugEnabled()) + log.debug("Interrupted during shutdown, exiting uncleanly"); + } + } + + /** + * Creates default consumer config. + * + * @param zooKeeper zookeeper address + * @param groupId group Id for kafka subscriber + * @return {@link ConsumerConfig} kafka consumer configuration + */ + public static final ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String groupId) { + A.notNull(zooKeeper, "zookeeper"); + A.notNull(groupId, "groupId"); + Properties props = new Properties(); + props.put("zookeeper.connect", zooKeeper); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + props.put("auto.offset.reset", "smallest"); + return new ConsumerConfig(props); + } + + /** + * @return New cache configuration with modified defaults. + */ + public static CacheConfiguration defaultCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setStartSize(1024); + cfg.setAtomicWriteOrderMode(PRIMARY); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setNearConfiguration(new NearCacheConfiguration()); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setEvictionPolicy(null); + + return cfg; + } + +} \ No newline at end of file diff --git a/modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaEmbeddedBroker.java new file mode 100644 index 0000000..6ad01b9 --- /dev/null +++ b/modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaEmbeddedBroker.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.kafka; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeoutException; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; + +import kafka.admin.AdminUtils; +import kafka.api.LeaderAndIsr; +import kafka.api.PartitionStateInfo; +import kafka.api.Request; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.producer.KeyedMessage; +import kafka.producer.Producer; +import kafka.producer.ProducerConfig; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.SystemTime$; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +/** + * Kafka Embedded Broker. + */ +public class KafkaEmbeddedBroker { + + /** Default ZooKeeper Host. */ + private static final String ZK_HOST = "localhost"; + + /** Broker Port. */ + private static final int BROKER_PORT = 9092; + + /** ZooKeeper Connection Timeout. */ + private static final int ZK_CONNECTION_TIMEOUT = 6000; + + /** ZooKeeper Session Timeout. */ + private static final int ZK_SESSION_TIMEOUT = 6000; + + /** ZooKeeper port. */ + private static int zkPort = 0; + + /** Is ZooKeeper Ready. */ + private boolean zkReady; + + /** Is Broker Ready. */ + private boolean brokerReady; + + /** Kafka Config. */ + private KafkaConfig brokerConfig; + + /** Kafka Server. */ + private KafkaServer kafkaServer; + + /** ZooKeeper Client. */ + private ZkClient zkClient; + + /** Embedded ZooKeeper. */ + private EmbeddedZooKeeper zooKeeper; + + public KafkaEmbeddedBroker() { + try { + setupEmbeddedZookeeper(); + setupEmbeddedKafkaServer(); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException("failed to start Kafka Broker " + e); + } + + } + + /** + * Gets ZooKeeper Address. + * + * @return + */ + public static final String getZKAddress() { + return ZK_HOST + ":" + zkPort; + } + + /** + * Creates a Topic. + * + * @param topic topic name + * @param partitions number of paritions for the topic + * @param replicationFactor replication factor + * @throws TimeoutException + * @throws InterruptedException + */ + public void createTopic(String topic, final int partitions, final int replicationFactor) + throws TimeoutException, InterruptedException { + AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties()); + waitUntilMetadataIsPropagated(topic, 0, 10000, 100); + } + + /** + * Sends message to Kafka Broker. + * + * @param keyedMessages + * @return + */ + public Producer sendMessages(List> keyedMessages) { + Producer producer = new Producer(getProducerConfig()); + producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages)); + return producer; + } + + /** + * Shuts down Kafka Broker. + * + * @throws IOException + */ + public void shutdown() + throws IOException { + brokerReady = false; + zkReady = false; + + if (kafkaServer != null) + kafkaServer.shutdown(); + + List logDirs = scala.collection.JavaConversions.asJavaList(brokerConfig.logDirs()); + for (String logDir : logDirs) { + FileUtils.deleteDirectory(new File(logDir)); + } + + if (zkClient != null) { + zkClient.close(); + zkClient = null; + } + + if (zooKeeper != null) { + try { + zooKeeper.shutdown(); + } + catch (IOException e) { + // ignore + } + + zooKeeper = null; + } + + } + + private ZkClient getZkClient() { + A.ensure(zkReady == true, "Zookeeper not setup yet"); + A.notNull(zkClient, "Zookeeper client is not yet initialized"); + return zkClient; + } + + private boolean isMetadataPropagated(final String topic, final int partition) { + final scala.Option partitionStateOption = kafkaServer.apis().metadataCache().getPartitionInfo( + topic, partition); + if (partitionStateOption.isDefined()) { + final PartitionStateInfo partitionState = partitionStateOption.get(); + final LeaderAndIsr leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch().leaderAndIsr(); + if (ZkUtils.getLeaderForPartition(getZkClient(), topic, partition) != null + && Request.isValidBrokerId(leaderAndInSyncReplicas.leader()) + && leaderAndInSyncReplicas.isr().size() >= 1) { + return true; + } + } + return false; + } + + private void waitUntilMetadataIsPropagated(final String topic, + final int partition, + final long timeout, + final long interval) + throws TimeoutException, InterruptedException { + int attempt = 1; + final long startTime = System.currentTimeMillis(); + while (true) { + if (isMetadataPropagated(topic, partition)) + return; + + final long duration = System.currentTimeMillis() - startTime; + if (duration < timeout) { + Thread.sleep(interval); + } + else { + throw new TimeoutException("metadata propagate timed out, attempt=" + attempt); + } + attempt++; + } + + } + + private void setupEmbeddedKafkaServer() + throws IOException { + A.ensure(zkReady == true, "Zookeeper should be setup before hand"); + + brokerConfig = new KafkaConfig(getBrokerConfig()); + kafkaServer = new KafkaServer(brokerConfig, SystemTime$.MODULE$); + kafkaServer.startup(); + brokerReady = true; + } + + private void setupEmbeddedZookeeper() + throws IOException, InterruptedException { + EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort); + zooKeeper.startup(); + zkPort = zooKeeper.getActualPort(); + zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$); + zkReady = true; + } + + private static final String getBrokerAddress() { + return ZK_HOST + ":" + BROKER_PORT; + } + + private static Properties getBrokerConfig() + throws IOException { + Properties props = new Properties(); + props.put("broker.id", "0"); + props.put("host.name", ZK_HOST); + props.put("port", "" + BROKER_PORT); + props.put("log.dir", createTempDir("_cfg").getAbsolutePath()); + props.put("zookeeper.connect", getZKAddress()); + props.put("log.flush.interval.messages", "1"); + props.put("replica.socket.timeout.ms", "1500"); + return props; + } + + static ProducerConfig getProducerConfig() { + Properties props = new Properties(); + props.put("metadata.broker.list", getBrokerAddress()); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("key.serializer.class", "kafka.serializer.StringEncoder"); + props.put("partitioner.class", "org.apache.ignite.kafka.SimplePartitioner"); + return new ProducerConfig(props); + } + + private static File createTempDir(final String prefix) + throws IOException { + final Path path = Files.createTempDirectory(prefix); + return path.toFile(); + + } + + /** + * Creates Embedded ZooKeeper. + */ + private static class EmbeddedZooKeeper { + /** Default ZooKeeper Host. */ + private final String zkHost; + + /** Default ZooKeeper Port. */ + private final int zkPort; + + /** NIO Context Factory. */ + private NIOServerCnxnFactory factory; + + /** Snapshot Directory. */ + private File snapshotDir; + + /** Log Directory. */ + private File logDir; + + EmbeddedZooKeeper(final String zkHost, final int zkPort) { + this.zkHost = zkHost; + this.zkPort = zkPort; + } + + /** + * Starts up ZooKeeper. + * + * @throws IOException + * @throws InterruptedException + */ + void startup() + throws IOException, InterruptedException { + snapshotDir = createTempDir("_ss"); + logDir = createTempDir("_log"); + ZooKeeperServer zooServer = new ZooKeeperServer(snapshotDir, logDir, 500); + factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress(zkHost, zkPort), 16); + factory.startup(zooServer); + } + + int getActualPort() { + return factory.getLocalPort(); + } + + /** + * Shuts down ZooKeeper. + * + * @throws IOException + */ + void shutdown() + throws IOException { + if (factory != null) { + factory.shutdown(); + FileUtils.deleteDirectory(snapshotDir); + FileUtils.deleteDirectory(logDir); + } + } + } + +} diff --git a/modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaIgniteStreamerTest.java b/modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaIgniteStreamerTest.java new file mode 100644 index 0000000..7e32f50 --- /dev/null +++ b/modules/kafka/src/test/java/org/apache/ignite/kafka/KafkaIgniteStreamerTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.kafka; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import kafka.producer.KeyedMessage; +import kafka.producer.Producer; +import kafka.serializer.StringDecoder; +import kafka.utils.VerifiableProperties; + +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; + +/** + * Tests {@link KafkaStreamer}. + */ +public class KafkaIgniteStreamerTest + extends GridCommonAbstractTest { + /** Embedded Kafka. */ + private KafkaEmbeddedBroker embeddedBroker; + + /** Count. */ + private static final int CNT = 100; + + /** Test Topic. */ + private static final String TOPIC_NAME = "page_visits"; + + /** Kafka Partition. */ + private static final int PARTITIONS = 4; + + /** Kafka Replication Factor. */ + private static final int REPLICATION_FACTOR = 1; + + /** Topic Message Key Prefix. */ + private static final String KEY_PREFIX = "192.168.2."; + + /** Topic Message Value Url. */ + private static final String VALUE_URL = ",www.example.com,"; + + /** Constructor. */ + public KafkaIgniteStreamerTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override + protected void beforeTest() + throws Exception { + grid(). getOrCreateCache(defaultCacheConfiguration()); + embeddedBroker = new KafkaEmbeddedBroker(); + } + + /** {@inheritDoc} */ + @Override + protected void afterTest() + throws Exception { + grid().cache(null).clear(); + embeddedBroker.shutdown(); + } + + public void testKafkaStreamer() + throws TimeoutException, InterruptedException { + embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR); + Map keyValueMap = produceStream(TOPIC_NAME); + consumerStream(TOPIC_NAME, keyValueMap); + } + + private Map produceStream(final String topic) { + final Map keyValueMap = new HashMap<>(); + + // Produce + final Random rnd = new Random(); + final List> messages = new ArrayList<>(); + for (long nEvents = 0; nEvents < CNT; nEvents++) { + long runtime = new Date().getTime(); + String ip = KEY_PREFIX + rnd.nextInt(255); + String msg = runtime + VALUE_URL + ip; + messages.add(new KeyedMessage(topic, ip, msg)); + keyValueMap.put(ip, msg); + } + final Producer producer = embeddedBroker.sendMessages(messages); + producer.close(); + return keyValueMap; + } + + private void consumerStream(final String topic, final Map keyValueMap) + throws TimeoutException, InterruptedException { + + KafkaStreamer kafkaStmr = null; + + final Ignite ignite = grid(); + try (IgniteDataStreamer stmr = ignite.dataStreamer(null)) { + + stmr.allowOverwrite(true); + stmr.autoFlushFrequency(10); + + // Configure socket streamer. + kafkaStmr = new KafkaStreamer<>(); + + // Get the cache. + IgniteCache cache = ignite.cache(null); + + // Set ignite instance. + kafkaStmr.setIgnite(ignite); + + // Set data streamer instance. + kafkaStmr.setStreamer(stmr); + + // Set the topic. + kafkaStmr.setTopic(topic); + + // Set the number of threads. + kafkaStmr.setThreads(4); + + // Set the consumer configuration. + kafkaStmr.setConsumerConfig(KafkaStreamer.createDefaultConsumerConfig(KafkaEmbeddedBroker.getZKAddress(), + "groupX")); + + // Set the decoders. + StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties()); + kafkaStmr.setKeyDecoder(stringDecoder); + kafkaStmr.setValueDecoder(stringDecoder); + + // Start kafka streamer. + kafkaStmr.start(); + + final CountDownLatch latch = new CountDownLatch(CNT); + IgniteBiPredicate locLsnr = new IgniteBiPredicate() { + @Override + public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); + latch.await(); + + for (Map.Entry entry : keyValueMap.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + + final String cacheValue = cache.get(key); + assertEquals(value, cacheValue); + } + } + + finally { + // Shutdown kafka streamer. + kafkaStmr.stop(); + } + } + +} diff --git a/modules/kafka/src/test/java/org/apache/ignite/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/kafka/SimplePartitioner.java new file mode 100644 index 0000000..31ae8d9 --- /dev/null +++ b/modules/kafka/src/test/java/org/apache/ignite/kafka/SimplePartitioner.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.kafka; + +import kafka.producer.Partitioner; +import kafka.utils.VerifiableProperties; + +/** + * Simple Partitioner for Kafka. + */ +@SuppressWarnings("UnusedDeclaration") +public class SimplePartitioner + implements Partitioner { + public SimplePartitioner(VerifiableProperties properties) { + } + + public int partition(Object key, int numberOfPartitions) { + int partition = 0; + String keyStr = (String) key; + String[] keyValues = keyStr.split("\\."); + Integer intKey = Integer.parseInt(keyValues[3]); + if (intKey > 0) { + partition = intKey % numberOfPartitions; + } + return partition; + } +} -- 1.7.12.4 (Apple Git-37)