Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
0.9.0.1
-
None
-
None
-
None
-
3-node cluster on localhost
Description
When trying to do KafkaConsumer.poll(), server closes connection with InvalidReceiveException. Strangely, it is repoduced only with SSL enabled between consumer and broker. We do not use SSL for inter-broker communication.
Consumer configuration:
[2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] [INFO]: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer group.id = sds partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = [hidden] fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = [hidden] session.timeout.ms = 30000 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = SSL ssl.truststore.location = src/main/resources/ssl/kafka.client.truststore.jks ssl.keystore.password = [hidden] ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = earliest
Server configuration:
[2016-05-23 15:04:51,707] INFO KafkaConfig values: advertised.host.name = null metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 50 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = true controller.socket.timeout.ms = 30000 log.flush.interval.ms = null principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 2 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS default.replication.factor = 3 ssl.truststore.password = [hidden] log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000 message.max.bytes = 1000012 num.io.threads = 10 offsets.commit.required.acks = -1 log.flush.offset.checkpoint.interval.ms = 60000 delete.topic.enable = true quota.window.size.seconds = 1 ssl.truststore.type = JKS offsets.commit.timeout.ms = 5000 quota.window.num = 11 zookeeper.connect = 127.0.0.1:2181 authorizer.class.name = num.replica.fetchers = 1 log.retention.ms = null log.roll.jitter.hours = 0 log.cleaner.enable = true offsets.load.buffer.size = 5242880 log.cleaner.delete.retention.ms = 86400000 ssl.client.auth = none controlled.shutdown.max.retries = 3 queued.max.requests = 500 offsets.topic.replication.factor = 3 log.cleaner.threads = 1 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 socket.request.max.bytes = 104857600 ssl.trustmanager.algorithm = PKIX zookeeper.session.timeout.ms = 6000 log.retention.bytes = -1 sasl.kerberos.min.time.before.relogin = 60000 zookeeper.set.acl = false connections.max.idle.ms = 600000 offsets.retention.minutes = 1440 replica.fetch.backoff.ms = 1000 inter.broker.protocol.version = 0.9.0.X log.retention.hours = 24 num.partitions = 1 broker.id.generation.enable = true listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] log.roll.ms = null log.flush.scheduler.interval.ms = 9223372036854775807 ssl.cipher.suites = null log.index.size.max.bytes = 10485760 ssl.keymanager.algorithm = SunX509 security.inter.broker.protocol = PLAINTEXT replica.fetch.max.bytes = 1048576 advertised.port = null log.cleaner.dedupe.buffer.size = 134217728 replica.high.watermark.checkpoint.interval.ms = 5000 log.cleaner.io.buffer.size = 524288 sasl.kerberos.ticket.renew.window.factor = 0.8 zookeeper.connection.timeout.ms = 60000 controlled.shutdown.retry.backoff.ms = 5000 log.roll.hours = 168 log.cleanup.policy = delete host.name = log.roll.jitter.ms = null max.connections.per.ip = 2147483647 offsets.topic.segment.bytes = 104857600 background.threads = 10 quota.consumer.default = 9223372036854775807 request.timeout.ms = 30000 log.index.interval.bytes = 4096 log.dir = /tmp/kafka-logs log.segment.bytes = 1073741824 log.cleaner.backoff.ms = 15000 offset.metadata.max.bytes = 4096 ssl.truststore.location = /ssl/server.truststore.jks group.max.session.timeout.ms = 30000 ssl.keystore.password = [hidden] zookeeper.sync.time.ms = 2000 port = 9092 log.retention.minutes = null log.segment.delete.delay.ms = 60000 log.dirs = /data controlled.shutdown.enable = true compression.type = producer max.connections.per.ip.overrides = sasl.kerberos.kinit.cmd = /usr/bin/kinit log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 auto.leader.rebalance.enable = true leader.imbalance.check.interval.seconds = 300 log.cleaner.min.cleanable.ratio = 0.5 replica.lag.time.max.ms = 10000 num.network.threads = 5 ssl.key.password = [hidden] reserved.broker.max.id = 1000 metrics.num.samples = 2 socket.send.buffer.bytes = 102400 ssl.protocol = TLS socket.receive.buffer.bytes = 102400 ssl.keystore.location = /ssl/server.keystore.jks replica.fetch.min.bytes = 1 unclean.leader.election.enable = true group.min.session.timeout.ms = 6000 log.cleaner.io.buffer.load.factor = 0.9 offsets.retention.check.interval.ms = 600000 producer.purgatory.purge.interval.requests = 1000 metrics.sample.window.ms = 30000 broker.id = 1 offsets.topic.compression.codec = 0 log.retention.check.interval.ms = 300000 advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093 leader.imbalance.per.broker.percentage = 10 (kafka.server.KafkaConfig)
Client:
java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:161) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45) at org.apache.kafka.common.network.Selector.close(Selector.java:442) at org.apache.kafka.common.network.Selector.poll(Selector.java:310) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
Server:
[2016-05-23 15:07:16,427] WARN Unexpected error from /127.0.0.1; closing connection (org.apache.kafka.common.network.Selector) org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296128 larger than 104857600) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at kafka.network.Processor.run(SocketServer.scala:413) at java.lang.Thread.run(Thread.java:745)