Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-14233

camel-kafka - topic overriding not possible when using aggregation

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 2.24.2
    • Fix Version/s: 3.1.0
    • Component/s: camel-kafka
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      When exchange aggregation is used, using GroupedExchangeAggregationStrategy for example:

       

      from(..)
      .process(some processor here that sets KafkaConstants.TOPIC header here)
      .aggregate(new GroupedExchangeAggregationStrategy ())
      .to(kafka:...)
      

      it is not possible to override topic per exchange by using KafkaConstants.TOPIC header, because in createRecord of KafkaProducer class, the topic is chosen from header of aggregating Exchange which may not be set because it may have been set only on Exchanges that were aggregated. When creating ProducerRecord from Iterable, the topic should be chosen from header of each Exchange separately:

      protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws Exception {
          String topic = endpoint.getConfiguration().getTopic();
      
          if (!endpoint.getConfiguration().isBridgeEndpoint()) {
              String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class);
              boolean allowHeader = true;
      
              // when we do not bridge then detect if we try to send back to ourselves
              // which we most likely do not want to do
              if (headerTopic != null && endpoint.getConfiguration().isCircularTopicDetection()) {
                  Endpoint from = exchange.getFromEndpoint();
                  if (from instanceof KafkaEndpoint) {
                      String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic();
                      allowHeader = !headerTopic.equals(fromTopic);
                      if (!allowHeader) {
                          log.debug("Circular topic detected from message header."
                                  + " Cannot send to same topic as the message comes from: {}"
                                  + ". Will use endpoint configured topic: {}", from, topic);
                      }
                  }
              }
              if (allowHeader && headerTopic != null) {
                  topic = headerTopic;
              }
          }
      
          if (topic == null) {
              // if topic property was not received from configuration or header parameters take it from the remaining URI
              topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
          }
      
      ...
      
          Object msg = exchange.getIn().getBody();
      
          // is the message body a list or something that contains multiple values
          Iterator<Object> iterator = null;
          if (msg instanceof Iterable) {
              iterator = ((Iterable<Object>) msg).iterator();
          } else if (msg instanceof Iterator) {
              iterator = (Iterator<Object>) msg;
          }
          if (iterator != null) {
              final Iterator<Object> msgList = iterator;
      

      The msgTopic variable below should be set from KafkaConstants.TOPIC header of next exchange from collection

              final String msgTopic = topic;
      
              return new Iterator<ProducerRecord>() {
                  @Override
                  public boolean hasNext() {
                      return msgList.hasNext();
                  }
      
                  @Override
                  public ProducerRecord next() {
      
                      // must convert each entry of the iterator into the value according to the serializer
                      Object next = msgList.next();
                      Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass());
      
                      if (hasPartitionKey && hasMessageKey) {
                          return new ProducerRecord(msgTopic, partitionKey, null, key, value, propagatedHeaders);
                      } else if (hasMessageKey) {
                          return new ProducerRecord(msgTopic, null, null, key, value, propagatedHeaders);
                      } else {
                          return new ProducerRecord(msgTopic, null, null, null, value, propagatedHeaders);
                      }
                  }
      
                  @Override
                  public void remove() {
                      msgList.remove();
                  }
              };
          }
      
      ...
      }
      
      

       

        Attachments

        1. kafka2.patch
          8 kB
          Rafał Gała
        2. metadata.patch
          15 kB
          Rafał Gała

          Issue Links

            Activity

              People

              • Assignee:
                omarsmak Omar Al-Safi
                Reporter:
                rgala Rafał Gała
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m