Kafka
  1. Kafka
  2. KAFKA-130

Provide a default producer for receiving messages from STDIN

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 0.6
    • Fix Version/s: 0.7
    • Component/s: None
    • Labels:
      None

      Description

      It would be useful to provide a default producer we can fire up that reads from STDIN and sends one message per line to the broker.

      The most obvious use case for this is to pipe a tail -f command into it, to tail log files as they're generated. Making it depend on STDIN seems more flexible than a producer that just tails files though.

        Activity

        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Fix Version/s 0.7 [ 12317243 ]
        Resolution Fixed [ 1 ]
        Hide
        Jun Rao added a comment -

        +1

        Show
        Jun Rao added a comment - +1
        Jay Kreps made changes -
        Attachment kafka-console-producer.patch [ 12497421 ]
        Hide
        Jay Kreps added a comment -

        Added to the --help info on --property.

        Show
        Jay Kreps added a comment - Added to the --help info on --property.
        Hide
        Jay Kreps added a comment -

        I will add some docs, can I get a +1 on this?

        Show
        Jay Kreps added a comment - I will add some docs, can I get a +1 on this?
        Hide
        Jun Rao added a comment -

        Ok, that makes sense. Just document the format in the description of the property and we can commit the patch.

        Show
        Jun Rao added a comment - Ok, that makes sense. Just document the format in the description of the property and we can commit the patch.
        Hide
        Jay Kreps added a comment -

        Hey Jun, the idea of the properties option is this: we want to give a way for the user to plug in code for parsing the input file, and that code may require some options of its own. For example you might need to give the charset of the input file or some other info. To make this possible there needs to be a way to pass config through this code to the user's plug-in message reader. Obviously for our usage, the thing I had in mind was being able to pass in an avro schema or url to get one. This is all fairly advanced functionality, so the user doesn't need to mess with it unless they want to read in messages a particular way.

        LineMessageReader is a default implementation of this interface that just turns one line into an input to the producer as a string, used in combination with the StringEncoder it does the normal logging thing of sending one line as a message.

        Hopefully that makes sense...

        Show
        Jay Kreps added a comment - Hey Jun, the idea of the properties option is this: we want to give a way for the user to plug in code for parsing the input file, and that code may require some options of its own. For example you might need to give the charset of the input file or some other info. To make this possible there needs to be a way to pass config through this code to the user's plug-in message reader. Obviously for our usage, the thing I had in mind was being able to pass in an avro schema or url to get one. This is all fairly advanced functionality, so the user doesn't need to mess with it unless they want to read in messages a particular way. LineMessageReader is a default implementation of this interface that just turns one line into an input to the producer as a string, used in combination with the StringEncoder it does the normal logging thing of sending one line as a message. Hopefully that makes sense...
        Hide
        Jun Rao added a comment -

        Thanks for the patch Jay. It looks good. About the propertyOpt, could you add a description about its purpose and format? Also, it's not used in the init method of LineMessageReader.

        Show
        Jun Rao added a comment - Thanks for the patch Jay. It looks good. About the propertyOpt, could you add a description about its purpose and format? Also, it's not used in the init method of LineMessageReader.
        Jay Kreps made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Assignee Jay Kreps [ jkreps ]
        Hide
        Jay Kreps added a comment -

        Usage is the following:

        jkreps-mn:kafka-git jkreps$ bin/kafka-run-class.sh kafka.producer.ConsoleProducer
        Missing required argument "[topic]"
        Option Description
        ------ -----------
        --batch-size <Integer: size> Number of messages to send in a single
        batch if they are not being sent
        synchronously. (default: 200)
        --compress If set, messages batches are sent
        compressed
        --line-reader <reader_class> The class name of the class to use for
        reading lines from standard in. By
        default each line is read as a
        seperate message. (default: kafka.
        producer.
        ConsoleProducer$LineMessageReader)
        --message-encoder <encoder_class> The class name of the message encoder
        implementation to use. (default:
        kafka.serializer.StringEncoder)
        --property <prop>
        --sync If set message send requests to the
        brokers are synchronously, one at a
        time as they arrive.
        --timeout <Long: timeout_ms> If set and the producer is running in
        asynchronous mode, this gives the
        maximum amount of time a message
        will queue awaiting suffient batch
        size. The value is given in ms.
        (default: 1000)
        --topic <topic> REQUIRED: The topic id to produce
        messages to.
        --zookeeper <connection_string> REQUIRED: The zookeeper connection
        string for the kafka zookeeper
        instance in the form HOST:PORT
        [/CHROOT].

        It reads from standard input so it can take interactive input or else you can pipe stuff to it:
        cat ~/some_log_file.log| bin/kafka-run-class.sh kafka.producer.ConsoleProducer --topic test --zookeeper localhost:2181

        By default it reads lines from the log and sends them as string messages to Kafka.

        Both the class used to read objects and the Kafka producer serializer can be customized from the arguments. The serializer is the normal kafka.serializer.Encoder interface, and the class used to read messages from System.in should extend the MessageReader abstract class (trait), which has the following methods:

        trait MessageReader {
        def init(inputStream: InputStream, props: Properties) {}
        def readMessage(): AnyRef
        def close() {}
        }

        AKA:

        interface MessageReader

        { public void init(InputStream in, Properties props); public Object readMessage(); public void close(); }
        Show
        Jay Kreps added a comment - Usage is the following: jkreps-mn:kafka-git jkreps$ bin/kafka-run-class.sh kafka.producer.ConsoleProducer Missing required argument " [topic] " Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. (default: 200) --compress If set, messages batches are sent compressed --line-reader <reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a seperate message. (default: kafka. producer. ConsoleProducer$LineMessageReader) --message-encoder <encoder_class> The class name of the message encoder implementation to use. (default: kafka.serializer.StringEncoder) --property <prop> --sync If set message send requests to the brokers are synchronously, one at a time as they arrive. --timeout <Long: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting suffient batch size. The value is given in ms. (default: 1000) --topic <topic> REQUIRED: The topic id to produce messages to. --zookeeper <connection_string> REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT [/CHROOT] . It reads from standard input so it can take interactive input or else you can pipe stuff to it: cat ~/some_log_file.log| bin/kafka-run-class.sh kafka.producer.ConsoleProducer --topic test --zookeeper localhost:2181 By default it reads lines from the log and sends them as string messages to Kafka. Both the class used to read objects and the Kafka producer serializer can be customized from the arguments. The serializer is the normal kafka.serializer.Encoder interface, and the class used to read messages from System.in should extend the MessageReader abstract class (trait), which has the following methods: trait MessageReader { def init(inputStream: InputStream, props: Properties) {} def readMessage(): AnyRef def close() {} } AKA: interface MessageReader { public void init(InputStream in, Properties props); public Object readMessage(); public void close(); }
        Jay Kreps made changes -
        Field Original Value New Value
        Attachment kafka-console-producer.patch [ 12493954 ]
        Hide
        Jay Kreps added a comment -

        Add a console producer for kafka that reads from standard in.

        Show
        Jay Kreps added a comment - Add a console producer for kafka that reads from standard in.
        Felix GV created issue -

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Felix GV
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development