Description
We embed the Kafka client in IBM App Connect Enterprise in order to provide Kafka consume and produce functionality. This product is a little bit like an app server in that it may host multiple workloads including some which may not use the Kafka functionality.
When the Kafka server is installed in an open shift environment we are seeing cases where the clients receive OutOfMemory errors due to single large (1.2Gb) byte buffers being allocated by the client.
From research this appears to be a known issue when a plaintext client is configured to attempt connection to a TLS secured endpoint however in this instance we see successful communication via TLS and then when the Kafka server is restarted (or connectivity is broken) both the consumers and producers can throw OutOfMemoryError's with the following stacks:
Producer
------------
4XESTACKTRACE at java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57(Compiled Code))
4XESTACKTRACE at java/nio/ByteBuffer.allocate(ByteBuffer.java:335(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:102(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543(Compiled Code))
4XESTACKTRACE at org/apache/kafka/common/network/Selector.poll(Selector.java:481(Compiled Code))
4XESTACKTRACE at org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:571(Compiled Code))
4XESTACKTRACE at org/apache/kafka/clients/producer/internals/Sender.runOnce(Sender.java:328(Compiled Code))
4XESTACKTRACE at org/apache/kafka/clients/producer/internals/Sender.run(Sender.java:243(Compiled Code))
4XESTACKTRACE at java/lang/Thread.run(Thread.java:830)
Consumer
-------------
{{{{3XMTHREADINFO3 Java callstack:
4XESTACKTRACE at java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57)
4XESTACKTRACE at java/nio/ByteBuffer.allocate(ByteBuffer.java:335)
4XESTACKTRACE at org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30)
4XESTACKTRACE at org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:113)
4XESTACKTRACE at org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475)
4XESTACKTRACE at org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572)
4XESTACKTRACE at org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250)
4XESTACKTRACE at org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181)
4XESTACKTRACE at org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543)
4XESTACKTRACE at org/apache/kafka/common/network/Selector.poll(Selector.java:481)
4XESTACKTRACE at org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:551)
4XESTACKTRACE at org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
4XESTACKTRACE at org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
4XESTACKTRACE at org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
4XESTACKTRACE at org/apache/kafka/clients/consumer/internals/Fetcher.getTopicMetadata(Fetcher.java:374)
4XESTACKTRACE at org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1949)
4XESTACKTRACE at org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1917)
4XESTACKTRACE at com/ibm/broker/connector/kafka/KafkaIIBConsumer.initialise(KafkaIIBConsumer.java:177)
4XESTACKTRACE at com/ibm/broker/connector/kafka/KafkaIIBConsumer.start(KafkaIIBConsumer.java:512)
5XESTACKTRACE (entered lock: com/ibm/broker/connector/kafka/KafkaIIBConsumer@0x00000000C0A94038, entry count: 1)
4XESTACKTRACE at com/ibm/broker/connector/kafka/KafkaInputConnector.start(KafkaInputConnector.java:250)}}}}
We believe that what is happening is that when the Kafka server goes down, in the RHOS environment the route is still available for some small period of time and the SASLClientAuthenticator is able to receive rogue packets which it interprets as a length to read off stream.
For the consumer code since there is application code on the stack we were able to implement a workaround by catching the OOM but on the producer side the entire stack is Kafka client code.
I looked at the SaslClientAuthenticator code and I can see that it's use of the network buffer is unbounded so I applied 2 patches to this code. The first limits the buffer size for authentication to 10Mb, the 2nd catches the OOM and instead fails auth.
Using the patched client the customer has gone from being able to recreate this on at least 1 appserver for every Kafka server restart to not being able to reproduce the issue at all.
I am happy to submit a PR but I wanted to get feedback before I did so. For instance is 10Mb a suitable maximum buffer size for auth, should the maximum perhaps be configurable instead and if so what is best practice for providing this configuration>
Secondly catching the OOM doesn't feel like best practice to me however without doing this the entire application fails due to aggressive allocation of byte buffers in the SaslClientAuthenticator is there any alternative I should be considering.
I realise that this issue has been raised before and in the case of a mis-configuration it looks like this was not considered a bug however in this instance, at least in the customers environment, the configuration is actually ok and the error is causing clients to be unable to tolerate a server failure.
I appreciate any guidance you might give me on how I can get a change committed to prevent the problem.
{}
{}