Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3746

InvalidReceiveException when connecting to broker over SSL

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 0.9.0.1
    • None
    • None
    • None
    • 3-node cluster on localhost

    Description

      When trying to do KafkaConsumer.poll(), server closes connection with InvalidReceiveException. Strangely, it is repoduced only with SSL enabled between consumer and broker. We do not use SSL for inter-broker communication.

      Consumer configuration:

      [2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] [INFO]: ConsumerConfig values: 
      	metric.reporters = []
      	metadata.max.age.ms = 300000
      	value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
      	group.id = sds
      	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
      	reconnect.backoff.ms = 50
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	max.partition.fetch.bytes = 1048576
      	bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
      	retry.backoff.ms = 100
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	ssl.keystore.type = JKS
      	ssl.trustmanager.algorithm = PKIX
      	enable.auto.commit = false
      	ssl.key.password = [hidden]
      	fetch.max.wait.ms = 500
      	sasl.kerberos.min.time.before.relogin = 60000
      	connections.max.idle.ms = 540000
      	ssl.truststore.password = [hidden]
      	session.timeout.ms = 30000
      	metrics.num.samples = 2
      	client.id = 
      	ssl.endpoint.identification.algorithm = null
      	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      	ssl.protocol = TLS
      	check.crcs = true
      	request.timeout.ms = 40000
      	ssl.provider = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
      	heartbeat.interval.ms = 3000
      	auto.commit.interval.ms = 1000
      	receive.buffer.bytes = 32768
      	ssl.cipher.suites = null
      	ssl.truststore.type = JKS
      	security.protocol = SSL
      	ssl.truststore.location = src/main/resources/ssl/kafka.client.truststore.jks
      	ssl.keystore.password = [hidden]
      	ssl.keymanager.algorithm = SunX509
      	metrics.sample.window.ms = 30000
      	fetch.min.bytes = 1
      	send.buffer.bytes = 131072
      	auto.offset.reset = earliest
      

      Server configuration:

      [2016-05-23 15:04:51,707] INFO KafkaConfig values:
              advertised.host.name = null
              metric.reporters = []
              quota.producer.default = 9223372036854775807
              offsets.topic.num.partitions = 50
              log.flush.interval.messages = 9223372036854775807
              auto.create.topics.enable = true
              controller.socket.timeout.ms = 30000
              log.flush.interval.ms = null
              principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
              replica.socket.receive.buffer.bytes = 65536
              min.insync.replicas = 2
              replica.fetch.wait.max.ms = 500
              num.recovery.threads.per.data.dir = 1
              ssl.keystore.type = JKS
              default.replication.factor = 3
              ssl.truststore.password = [hidden]
              log.preallocate = false
              sasl.kerberos.principal.to.local.rules = [DEFAULT]
              fetch.purgatory.purge.interval.requests = 1000
              ssl.endpoint.identification.algorithm = null
              replica.socket.timeout.ms = 30000
              message.max.bytes = 1000012
              num.io.threads = 10
              offsets.commit.required.acks = -1
              log.flush.offset.checkpoint.interval.ms = 60000
              delete.topic.enable = true
              quota.window.size.seconds = 1
              ssl.truststore.type = JKS
              offsets.commit.timeout.ms = 5000
              quota.window.num = 11
              zookeeper.connect = 127.0.0.1:2181
              authorizer.class.name =
              num.replica.fetchers = 1
              log.retention.ms = null
              log.roll.jitter.hours = 0
              log.cleaner.enable = true
              offsets.load.buffer.size = 5242880
              log.cleaner.delete.retention.ms = 86400000
              ssl.client.auth = none
              controlled.shutdown.max.retries = 3
              queued.max.requests = 500
              offsets.topic.replication.factor = 3
              log.cleaner.threads = 1
              sasl.kerberos.service.name = null
              sasl.kerberos.ticket.renew.jitter = 0.05
              socket.request.max.bytes = 104857600
              ssl.trustmanager.algorithm = PKIX
              zookeeper.session.timeout.ms = 6000
              log.retention.bytes = -1
              sasl.kerberos.min.time.before.relogin = 60000
              zookeeper.set.acl = false
              connections.max.idle.ms = 600000
              offsets.retention.minutes = 1440
              replica.fetch.backoff.ms = 1000
              inter.broker.protocol.version = 0.9.0.X
              log.retention.hours = 24
              num.partitions = 1
              broker.id.generation.enable = true
              listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
              ssl.provider = null
              ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
              log.roll.ms = null
              log.flush.scheduler.interval.ms = 9223372036854775807
              ssl.cipher.suites = null
              log.index.size.max.bytes = 10485760
              ssl.keymanager.algorithm = SunX509
              security.inter.broker.protocol = PLAINTEXT
              replica.fetch.max.bytes = 1048576
              advertised.port = null
              log.cleaner.dedupe.buffer.size = 134217728
              replica.high.watermark.checkpoint.interval.ms = 5000
              log.cleaner.io.buffer.size = 524288
              sasl.kerberos.ticket.renew.window.factor = 0.8
              zookeeper.connection.timeout.ms = 60000
              controlled.shutdown.retry.backoff.ms = 5000
              log.roll.hours = 168
              log.cleanup.policy = delete
              host.name =
              log.roll.jitter.ms = null
              max.connections.per.ip = 2147483647
              offsets.topic.segment.bytes = 104857600
              background.threads = 10
              quota.consumer.default = 9223372036854775807
              request.timeout.ms = 30000
              log.index.interval.bytes = 4096
              log.dir = /tmp/kafka-logs
              log.segment.bytes = 1073741824
              log.cleaner.backoff.ms = 15000
              offset.metadata.max.bytes = 4096
              ssl.truststore.location = /ssl/server.truststore.jks
              group.max.session.timeout.ms = 30000
              ssl.keystore.password = [hidden]
              zookeeper.sync.time.ms = 2000
              port = 9092
              log.retention.minutes = null
              log.segment.delete.delay.ms = 60000
              log.dirs = /data
              controlled.shutdown.enable = true
              compression.type = producer
              max.connections.per.ip.overrides =
              sasl.kerberos.kinit.cmd = /usr/bin/kinit
              log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
              auto.leader.rebalance.enable = true
              leader.imbalance.check.interval.seconds = 300
              log.cleaner.min.cleanable.ratio = 0.5
              replica.lag.time.max.ms = 10000
              num.network.threads = 5
              ssl.key.password = [hidden]
              reserved.broker.max.id = 1000
              metrics.num.samples = 2
              socket.send.buffer.bytes = 102400
              ssl.protocol = TLS
              socket.receive.buffer.bytes = 102400
              ssl.keystore.location = /ssl/server.keystore.jks
              replica.fetch.min.bytes = 1
              unclean.leader.election.enable = true
              group.min.session.timeout.ms = 6000
              log.cleaner.io.buffer.load.factor = 0.9
              offsets.retention.check.interval.ms = 600000
              producer.purgatory.purge.interval.requests = 1000
              metrics.sample.window.ms = 30000
              broker.id = 1
              offsets.topic.compression.codec = 0
              log.retention.check.interval.ms = 300000
              advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
              leader.imbalance.per.broker.percentage = 10
       (kafka.server.KafkaConfig)
      

      Client:

      java.io.IOException: Broken pipe
      	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
      	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
      	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
      	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
      	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
      	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
      	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:161)
      	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45)
      	at org.apache.kafka.common.network.Selector.close(Selector.java:442)
      	at org.apache.kafka.common.network.Selector.poll(Selector.java:310)
      	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
      

      Server:

      [2016-05-23 15:07:16,427] WARN Unexpected error from /127.0.0.1; closing connection (org.apache.kafka.common.network.Selector)
      org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296128 larger than 104857600)
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
              at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
              at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
              at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
              at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
              at kafka.network.Processor.run(SocketServer.scala:413)
              at java.lang.Thread.run(Thread.java:745)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            salaev Sergey Alaev
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: