Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1898

compatibility testing framework

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None
    • None

    Description

      There are a few different scenarios where you want/need to know the status/state of a client library that works with Kafka. Client library development is not just about supporting the wire protocol but also the implementations around specific interactions of the API. The API has blossomed into a robust set of producer, consumer, broker and administrative calls all of which have layers of logic above them. A Client Library may choose to deviate from the path the project sets out and that is ok. The goal of this ticket is to have a system for Kafka that can help to explain what the library is or isn't doing (regardless of what it claims).

      The idea behind this stems in being able to quickly/easily/succinctly analyze the topic message data. Once you can analyze the topic(s) message you can gather lots of information about what the client library is doing, is not doing and such. There are a few components to this.

      1) dataset-generator

      Test Kafka dataset generation tool. Generates a random text file with given params:

      --filename, -f - output file name.
      --filesize, -s - desired size of output file. The actual size will always be a bit larger (with a maximum size of $filesize + $max.length - 1)
      --min.length, -l - minimum generated entry length.
      --max.length, -h - maximum generated entry length.

      Usage:

      ./gradlew build
      java jar dataset-generator/build/libs/dataset-generator*.jar -s 100000 -l 2 -h 20

      2) dataset-producer

      Test Kafka dataset producer tool. Able to produce the given dataset to Kafka or Syslog server. The idea here is you already have lots of data sets that you want to test different things for. You might have different sized messages, formats, etc and want a repeatable benchmark to run and re-run the testing on. You could just have a days worth of data and just choose to replay it. The CCTK idea is that you are always starting from CONSUME in your state of library. If your library is only producing then you will fail a bunch of tests and that might be ok for people.

      Accepts following params:

      --filename, -f - input file name.
      
      --kafka, -k - Kafka broker address in host:port format. If this parameter is set, --producer.config and --topic must be set too (otherwise they're ignored).
      
      --producer.config, -p - Kafka producer properties file location.
      
      --topic, -t - Kafka topic to produce to.
      
      --syslog, -s - Syslog server address. Format: protocol://host:port (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example)
      
      --loop, -l - flag to loop through file until shut off manually. False by default.
      
      Usage:
      
      ./gradlew build
      java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename dataset --syslog tcp://0.0.0.0:5140 --loop true
      
      

      3) extract

      This step is good so you can save data and compare tests. It could also be removed if folks are just looking for a real live test (and we could support that too). Here we are taking data out of Kafka and putting it into Cassandra (but other data stores can be used too and we should come up with a way to abstract this out completely so folks could implement whatever they wanted.

      package ly.stealth.shaihulud.reader
      
      import java.util.UUID
      
      import com.datastax.spark.connector._
      import com.datastax.spark.connector.cql.CassandraConnector
      import consumer.kafka.MessageAndMetadata
      import consumer.kafka.client.KafkaReceiver
      import org.apache.spark._
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming._
      import org.apache.spark.streaming.dstream.DStream
      
      object Main extends App with Logging {
        val parser = new scopt.OptionParser[ReaderConfiguration]("spark-reader") {
          head("Spark Reader for Kafka client applications", "1.0")
          opt[String]("testId") unbounded() optional() action { (x, c) =>
            c.copy(testId = x)
          } text ("Source topic with initial set of data")
          opt[String]("source") unbounded() required() action { (x, c) =>
            c.copy(sourceTopic = x)
          } text ("Source topic with initial set of data")
          opt[String]("destination") unbounded() required() action { (x, c) =>
            c.copy(destinationTopic = x)
          } text ("Destination topic with processed set of data")
          opt[Int]("partitions") unbounded() optional() action { (x, c) =>
            c.copy(partitions = x)
          } text ("Partitions in topic")
          opt[String]("zookeeper") unbounded() required() action { (x, c) =>
            c.copy(zookeeper = x)
          } text ("Zookeeper connection host:port")
          opt[Int]("kafka.fetch.size") unbounded() optional() action { (x, c) =>
            c.copy(kafkaFetchSize = x)
          } text ("Maximum KBs to fetch from Kafka")
          checkConfig { c =>
            if (c.testId.isEmpty || c.sourceTopic.isEmpty || c.destinationTopic.isEmpty || c.zookeeper.isEmpty) {
              failure("You haven't provided all required parameters")
            } else {
              success
            }
                      }
        }
        val config = parser.parse(args, ReaderConfiguration()) match {
          case Some(c) => c
          case None => sys.exit(1)
        }
      
        val sparkConfig = new SparkConf().setAppName("kafka_client_validator")
                                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val ssc = new StreamingContext(sparkConfig, Seconds(10))
        ssc.checkpoint("reader")
      
        CassandraConnector(sparkConfig).withSessionDo( session => {
          session.execute("CREATE KEYSPACE IF NOT EXISTS kafka_client_validation WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
          session.execute("CREATE TABLE IF NOT EXISTS kafka_client_validation.tests(test_id text PRIMARY KEY, source_topic text, destination_topic text)")
          session.execute("CREATE TABLE IF NOT EXISTS kafka_client_validation.counters(test_id text, topic text, total counter, PRIMARY KEY(test_id, topic))")
          session.execute("CREATE TABLE IF NOT EXISTS kafka_client_validation.messages(test_id text, topic text, partition int, offset int, payload text, PRIMARY KEY(test_id, topic, partition, offset))")
        })
        val test = Test(config.testId, config.sourceTopic, config.destinationTopic)
        ssc.sparkContext.parallelize(Seq(test)).saveToCassandra("kafka_client_validation", "tests")
      
        startStreamForTopic(test.test_id, config.sourceTopic, config)
        startStreamForTopic(test.test_id, config.destinationTopic, config)
      
        ssc.start()
        ssc.awaitTermination()
      
        def startStreamForTopic(testId: String, topic: String, config: ReaderConfiguration) {
          val stream = createKafkaStream(config.zookeeper, topic, config.partitions).repartition(config.partitions).persist(StorageLevel.MEMORY_AND_DISK_SER)
          stream.map(message => {
            Counter(testId, message.getTopic, 1L)
          }).reduce((prev, curr) => {
            Counter(testId, prev.topic, prev.total + curr.total)
          }).foreachRDD(rdd => {
            rdd.saveToCassandra("kafka_client_validation", "counters")
          })
      
          stream.map(message => {
            Message(testId, message.getTopic,message.getPartition.partition, message.getOffset, new String(message.getPayload))
          }).foreachRDD(rdd => {
            rdd.saveToCassandra("kafka_client_validation", "messages")
          })
        }
      
        private def createKafkaStream(zkConnect: String, topic: String, partitions: Int): DStream[MessageAndMetadata] = {
          val zkhosts = zkConnect.split(":")(0)
          val zkports = zkConnect.split(":")(1)
          val kafkaParams = Map("zookeeper.hosts" -> zkhosts,
                                "zookeeper.port" -> zkports,
                                "zookeeper.consumer.connection" -> zkConnect,
                                "zookeeper.broker.path" -> "/brokers",
                                "zookeeper.consumer.path" -> "/consumers",
                                "fetch.size.bytes" -> (config.kafkaFetchSize * 1024).toString,
                                "kafka.topic" -> topic,
                                "kafka.consumer.id" -> "%s-%s".format(topic, UUID.randomUUID().toString))
          val props = new java.util.Properties()
          kafkaParams foreach { case (key, value) => props.put(key, value)}
          val streams = (0 to partitions - 1).map { partitionId => ssc.receiverStream(new KafkaReceiver(StorageLevel.MEMORY_AND_DISK_SER, props, partitionId))}
          ssc.union(streams)
        }
      }
      
      case class Test(test_id: String = "", source_topic: String = "", destination_topic: String = "")
      case class Counter(test_id: String = "", topic: String = "", total: Long = 0L)
      case class Message(test_id: String = "", topic: String = "", partition: Int = 0, offset: Long = 0, payload: String = "")
      
      case class ReaderConfiguration(testId: String = UUID.randomUUID().toString, sourceTopic: String = "", destinationTopic: String = "",
                                        partitions: Int = 1, zookeeper: String = "", kafkaFetchSize: Int = 8)
      
      

      4) validator

      This is plug-able both for how to read the topics and process the results to once done

      Right now we have been checking out using Spark and Cassandra for this, we also are looking at Spark and HBase and Samza with the Mesos support. The nice thing about using Samza is we really don't have to use another data store it is just so easy to put the results back into a topic.

      Here is kind of what the Spark/Cassandra version looks like for whether or not a consumer/producer is a) at least once processing guarantee 2) order order preserving 3) etc, etc, etc. While this test is running many (as much as you want) negative testing can be done to the cluster. It is made to run in an environment where you want to pump through as much data as you can as fast as you can and then once done, analyze it.

      package ly.stealth.shaihulud.validator
      
      import java.security.MessageDigest
      import java.util.Iterator
      
      import com.datastax.driver.core.{Cluster, Row, SocketOptions}
      
      object Main extends App {
        val parser = new scopt.OptionParser[ValidatorConfiguration]("spark-validator") {
          head("Spark Validator for Kafka client applications", "1.0")
          opt[String]("test.id") unbounded() required() action { (x, c) =>
            c.copy(testId = x)
          } text ("Test ID")
          opt[String]("cassandra.connect") unbounded() required() action { (x, c) =>
            c.copy(cassandraConnect = x)
          } text ("Cassandra host")
          opt[String]("cassandra.user") unbounded() required() action { (x, c) =>
            c.copy(cassandraUser = x)
          } text ("Cassandra user")
          opt[String]("cassandra.password") unbounded() required() action { (x, c) =>
            c.copy(cassandraPassword = x)
          } text ("Cassandra password")
          checkConfig { c =>
            if (c.testId.isEmpty || c.cassandraConnect.isEmpty || c.cassandraUser.isEmpty || c.cassandraPassword.isEmpty) {
              failure("You haven't provided all required parameters")
            } else {
              success
            }
                      }
        }
        val config = parser.parse(args, ValidatorConfiguration()) match {
          case Some(c) => c
          case None => sys.exit(1)
        }
      
        val cluster = new Cluster.Builder()
          .addContactPoints(config.cassandraConnect)
          .withSocketOptions(new SocketOptions().setTcpNoDelay(true))
          .build()
      
        val session = cluster.connect("kafka_client_validation")
      
        val tests = session.execute("SELECT * FROM kafka_client_validation.tests WHERE test_id='%s'".format(config.testId))
        val test = tests.one()
        if (test != null) {
          val testId = test.getString("test_id")
          val sourceTopic = test.getString("source_topic")
          val destinationTopic = test.getString("destination_topic")
      
          val countersQuery = "SELECT * FROM kafka_client_validation.counters WHERE test_id='%s' AND topic='%s'"
          val sourceCounter = session.execute(countersQuery.format(testId, sourceTopic))
          val destinationCounter = session.execute(countersQuery.format(testId, destinationTopic))
      
          println("***** TEST RESULTS *****")
          var sameAmount = false
          val totalInSource = sourceCounter.one().getLong("total")
          val totalInDestination = destinationCounter.one().getLong("total")
          if (totalInSource == totalInDestination) {
            sameAmount = true
          }
          println(" - Destination topic contains the same amount of messages as Source topic(%d out of %d): %B".format(totalInSource,
                                                                                                                       totalInDestination,
                                                                                                                       sameAmount))
          val messagesQuery = "SELECT * FROM kafka_client_validation.messages WHERE test_id='%s' AND topic='%s'"
          val sourceMessages = session.execute(messagesQuery.format(testId, sourceTopic))
          val destinationMessages = session.execute(messagesQuery.format(testId, destinationTopic))
          val si = sourceMessages.iterator()
          val di = destinationMessages.iterator()
      
          val portionSize = 1000
          var isOrderPreserved = true
          while ((si.hasNext || di.hasNext) && isOrderPreserved) {
            val sourceHash = this.calculateMD5ForSlice(si, portionSize)
            val destinationHash = this.calculateMD5ForSlice(di, portionSize)
            if (sourceHash != destinationHash) {
              isOrderPreserved = false
            }
          }
          println(" - Destination topic preserves ordering of Source topic: %B".format(isOrderPreserved))
        } else {
          System.err.println("There is no such test '%s'".format(config.testId))
        }
      
        cluster.close()
      
        def calculateMD5ForSlice(it: Iterator[Row], portionSize: Int): String = {
          val sb = new StringBuilder
          var left = portionSize
          while (it.hasNext && left > 0) {
            sb.append(it.next.getString("payload"))
            left = left - 1
          }
      
          new String(MessageDigest.getInstance("MD5").digest(sb.toString().getBytes("UTF-8")))
        }
      }
      
      case class ValidatorConfiguration(testId: String = "", cassandraConnect: String = "", cassandraUser: String = "", cassandraPassword: String = "")
      
      

      Attachments

        1. cctk.png
          25 kB
          Joe Stein

        Activity

          People

            Unassigned Unassigned
            joestein Joe Stein
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: