Description
the KafkaAppender can't deserialize objects when the SerializedLayout is used:
java.io.StreamCorruptedException: invalid stream header: 7372003E
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at com.example.inceptiontest.TestSerializer.deserializeLogEvent(TestSerializer.java:35)
at com.example.inceptiontest.TestSerializer.serialize(TestSerializer.java:18)
at com.example.inceptiontest.TestSerializer.serialize(TestSerializer.java:10)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:326)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.send(KafkaManager.java:80)
at org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender.append(KafkaAppender.java:71)
at org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:148)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:121)
at org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:112)
at org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:80)
at org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:390)
at org.apache.logging.log4j.core.config.LoggerConfig.processLogEvent(LoggerConfig.java:378)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:362)
at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:352)
at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:59)
at org.apache.logging.log4j.core.Logger.logMessage(Logger.java:138)
at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:1016)
at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:993)
at org.apache.logging.log4j.spi.AbstractLogger.error(AbstractLogger.java:397)
at com.example.inceptiontest.InceptionTest.main(InceptionTest.java:14)
Looking through past issues, this seems related to a similar issue with SocketAppender at https://issues.apache.org/jira/browse/LOG4J2-181. My understanding in the previous case was that the headers were not being set by the manager, and it seems to be the same case here.
This is my xml configuration:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d
[%t] %-5level %logger
{36} - %msg%n" />
</Console>
<Kafka name="Kafka" topic="test">
<SerializedLayout/>
<Property name="bootstrap.servers">localhost:9092</Property>
<Property name="key.serializer">com.example.inceptiontest.TestSerializer</Property>
<Property name="value.serializer">com.example.inceptiontest.TestSerializer</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console" />
<AppenderRef ref="Kafka"/>
</Root>
<Logger name="org.apache.kafka" level="INFO" /> <!-- avoid recursive logging -->
</Loggers>
</Configuration>
Attachments
Issue Links
- relates to
-
LOG4J2-181 SocketAppender reconnection doesn't work
- Closed