Uploaded image for project: 'Log4j 2'
  1. Log4j 2
  2. LOG4J2-2678

Add LogEvent timestamp to ProducerRecord in KafkaAppender

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Minor
    • Resolution: Fixed
    • None
    • 3.0.0
    • Appenders
    • None

    Description

      Just like I anticipated in the Mailing List, I'm creating this issue to add the possibility to send the LogEvent time as a timestamp for the record when using the log4j KafkaAppender.

      As far as I've seen in the source code, the message is sent by KafkaManager:

      private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
          final Layout<? extends Serializable> layout = getLayout();
          byte[] data;
          if (layout instanceof SerializedLayout) {
             final byte[] header = layout.getHeader();
             final byte[] body = layout.toByteArray(event);
             data = new byte[header.length + body.length];
             System.arraycopy(header, 0, data, 0, header.length);
             System.arraycopy(body, 0, data, header.length, body.length);
          } else {
           data = layout.toByteArray(event);
         }
        manager.send(data); //manager.send(data, event.getTimeMillis())
      }
      

      with manager.send() implemented this way, with highlighted the creation of the ProducerRecord:

      public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
          if (producer != null) {
              byte[] newKey = null;
      
              if(key != null && key.contains("${")) {
                   newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
              } else if (key != null) {
                   newKey = key.getBytes(StandardCharsets.UTF_8);
              }
      
          final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
          if (syncSend) {
              final Future<RecordMetadata> response = producer.send(newRecord);
              response.get(timeoutMillis, TimeUnit.MILLISECONDS);
          } else {
              producer.send(newRecord, new Callback() {
                   @Override
                   public void onCompletion(final RecordMetadata metadata, final Exception e) {
                       if (e != null) {
                           LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
                   }
               }});
         }
      }
      }
      

      Now, ProducerRecord has the additional parameters, in particular, I'm looking at: this constructor

      public ProducerRecord(java.lang.String topic,
                            java.lang.Integer partition,
                            java.lang.Long timestamp,
                            K key,
                            V value)
      

      which would allow us to set the timestamp as LogEvent#getTimeMillis(), but would force us to also input the partition where the record should be sent. Still, the logic behind the partitioning within the KafkaProducer is so that if partition is null, then the defined partitioner will be used (DefaultPartitioner or the one defined by the 'partitioner.class' property), so, we could simply assign it as null.

      In terms of interface, we could add a single flag in the KafkaAppender definition, something like:

      <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
      

      If the 'timestamp' flag is false, then the record would be sent with the timestamp parameter of the ProducerRecord as null, leaving the behaviour as it is right now.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              fedexist Federico D'Ambrosio
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 50m
                  2h 50m