Kafka
  1. Kafka
  2. KAFKA-933

Hadoop example running DataGenerator causes kafka.message.Message cannot be cast to [B exception

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.1
    • Component/s: contrib
    • Labels:
    • Environment:

      Description

      Working of git master codebase

      and following instructions at

      https://github.com/apache/kafka/blob/trunk/contrib/hadoop-consumer/README

      https://github.com/apache/kafka

      when running

      ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties

      an exception is thrown

      Exception in thread "main" java.lang.ClassCastException: kafka.message.Message cannot be cast to [B
      at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
      at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:129)
      at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:124)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
      at scala.collection.Iterator$class.foreach(Iterator.scala:772)
      at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
      at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
      at scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:615)
      at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:124)
      at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
      at kafka.producer.Producer.send(Producer.scala:74)
      at kafka.javaapi.producer.Producer.send(Producer.scala:41)

        Activity

        Hide
        Andrew Milkowski added a comment - - edited

        workaround was to patch DataGenerator to send the message string, instead the Message object

        public void run() throws Exception {

        List<KeyedMessage<Integer, String>> list = new ArrayList<KeyedMessage<Integer, String>>();
        for (int i = 0; i < 50; i++)

        { Long timestamp = RANDOM.nextLong(); if (timestamp < 0) timestamp = -timestamp; String messageStr = timestamp.toString(); log.info(" creating message: " + messageStr); list.add(new KeyedMessage<Integer, String>(topic, null, messageStr)); }

        log.info(" send " + list.size() + " " + topic + " count events to " + uri);
        producer.send(list);
        producer.close();

        generateOffsets();
        }

        Show
        Andrew Milkowski added a comment - - edited workaround was to patch DataGenerator to send the message string, instead the Message object public void run() throws Exception { List<KeyedMessage<Integer, String>> list = new ArrayList<KeyedMessage<Integer, String>>(); for (int i = 0; i < 50; i++) { Long timestamp = RANDOM.nextLong(); if (timestamp < 0) timestamp = -timestamp; String messageStr = timestamp.toString(); log.info(" creating message: " + messageStr); list.add(new KeyedMessage<Integer, String>(topic, null, messageStr)); } log.info(" send " + list.size() + " " + topic + " count events to " + uri); producer.send(list); producer.close(); generateOffsets(); }
        Hide
        drunkedcat added a comment - - edited

        as 0.8.0-beta1, the following change will work:

        byte[] bytes = timestamp.toString().getBytes("UTF8");
        – Message message = new Message(bytes);
        – list.add(new KeyedMessage<Integer, Message>(_topic, null, message));
        ++
        ++ list.add(new KeyedMessage<Integer, byte[]>(_topic, null, bytes));
        }

        Show
        drunkedcat added a comment - - edited as 0.8.0-beta1, the following change will work: byte[] bytes = timestamp.toString().getBytes("UTF8"); – Message message = new Message(bytes); – list.add(new KeyedMessage<Integer, Message>(_topic, null, message)); ++ ++ list.add(new KeyedMessage<Integer, byte[]>(_topic, null, bytes)); }
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Committed to trunk.

        Show
        Jun Rao added a comment - Thanks for the patch. Committed to trunk.

          People

          • Assignee:
            Unassigned
            Reporter:
            Andrew Milkowski
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development