Description
Just like I anticipated in the Mailing List, I'm creating this issue to add the possibility to send the LogEvent time as a timestamp for the record when using the log4j KafkaAppender.
As far as I've seen in the source code, the message is sent by KafkaManager:
private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException { final Layout<? extends Serializable> layout = getLayout(); byte[] data; if (layout instanceof SerializedLayout) { final byte[] header = layout.getHeader(); final byte[] body = layout.toByteArray(event); data = new byte[header.length + body.length]; System.arraycopy(header, 0, data, 0, header.length); System.arraycopy(body, 0, data, header.length, body.length); } else { data = layout.toByteArray(event); } manager.send(data); //manager.send(data, event.getTimeMillis()) }
with manager.send() implemented this way, with highlighted the creation of the ProducerRecord:
public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { if (producer != null) { byte[] newKey = null; if(key != null && key.contains("${")) { newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8); } else if (key != null) { newKey = key.getBytes(StandardCharsets.UTF_8); } final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg); if (syncSend) { final Future<RecordMetadata> response = producer.send(newRecord); response.get(timeoutMillis, TimeUnit.MILLISECONDS); } else { producer.send(newRecord, new Callback() { @Override public void onCompletion(final RecordMetadata metadata, final Exception e) { if (e != null) { LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e); } }}); } } }
Now, ProducerRecord has the additional parameters, in particular, I'm looking at: this constructor
public ProducerRecord(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value)
which would allow us to set the timestamp as LogEvent#getTimeMillis(), but would force us to also input the partition where the record should be sent. Still, the logic behind the partitioning within the KafkaProducer is so that if partition is null, then the defined partitioner will be used (DefaultPartitioner or the one defined by the 'partitioner.class' property), so, we could simply assign it as null.
In terms of interface, we could add a single flag in the KafkaAppender definition, something like:
<Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
If the 'timestamp' flag is false, then the record would be sent with the timestamp parameter of the ProducerRecord as null, leaving the behaviour as it is right now.
Attachments
Issue Links
- links to