Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-13391

camel-kafka : no-string message headers are extracted as null ( type converter issue)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Not A Problem
    • 2.21.2
    • None
    • camel-kafka
    • None
    • Unknown

    Description

      While trying to read non-String ( Ex-Boolean, Integer) message headers from a consumed message, the headers are returned as null.

      The headers of type "String" are converted correctly.
      Looks like a type-convertor issue, Message headers in a consumed message are in "byte[]" format. 

      Below is a simple program to demonstrate:

      // code placeholder
      package com.test.KafkaSimpleProject;
      
      import org.apache.camel.CamelContext;
      import org.apache.camel.Message;
      import org.apache.camel.builder.RouteBuilder;
      import org.apache.camel.impl.DefaultCamelContext;
      
      public class App2 {
          public static void main(String[] args) throws Exception {
              CamelContext context = new DefaultCamelContext();
              try {
                  context.addRoutes(new RouteBuilder() {
                      @Override
                      public void configure() throws Exception {
      
                          //producer route
                          from("timer:foo?period=3000&repeatCount=1000&delay=3000")
                              .setBody()
                              .constant("This is kafka message")
                              .setHeader("name")
                              .simple("ApacheCamelkafka", String.class)
                              .setHeader("contact")
                              .simple("123123", Integer.class)
                              .setHeader("trueFalse")
                              .simple("false", Boolean.class)
                              .to("kafka:test?brokers=localhost:9092");
      
                          //Consumer route
                          from("kafka:test?brokers=localhost:9092")
                              .process(new org.apache.camel.Processor() {
                                  public void process(org.apache.camel.Exchange exchange)
                                  throws Exception {
      
                                      Message in = exchange.getIn();
                                      String name = in .getHeader("name", String.class);
                                      Integer contact = in .getHeader("contact", Integer.class);
                                      Boolean trueFalse = in .getHeader("trueFalse", Boolean.class);
                                      System.out.println("Name:" + name);
                                      System.out.println("contact:" + contact);
                                      System.out.println("trueFalse:" + trueFalse);
      
                                  }
                              });
                      }
                  });
                  context.start();
                  Thread.sleep(20000000);
              } finally {
                  context.stop();
              }
          }
      }
      

      Expected output:

      Name: ApacheCamelkafka

      contact: 123123

      trueFalse: false

      Actual output:

      Name: ApacheCamelkafka

      contact: null

      trueFalse: null

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            abdulhamid Abdulhamid
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: