Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8195

Unstable Producer After Send Failure in Transaction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.0.1, 2.2.0, 2.3.0
    • None
    • clients
    • None

    Description

      This journey started with this Stack Overflow question .

      I easily reproduced his issue (see my comments on his question).

      My first step was to take Spring out of the picture and replicate the issue with the native Producer apis. The following code shows the result; I have attached logs and stack traces.

      There are four methods in the test; the first performs 2 transactions, successfully, on the same Producer instance.

      The second aborts 2 transactions, successfully, on the same Producer instance - application level failures after performing a send.

      There are two flavors of the problem:

      The third attempts to send 2 messages, on the same Producer that are too large for the topic; the first aborts as expected; the second send hangs in abortTransaction after getting a TimeoutException when attempting to get the send metadata. See log hang.same.producer.log - it also includes the stack trace showing the hang.

      The fourth test is similar to the third but it closes the producer after the first failure; this time, we timeout in initTransactions().

      Subsequent executions of this test get the timeout on the first attempt - that transactional.id seems to be unusable. Removing the logs was one way I found to get past the problem.

      Test code

      	public ApplicationRunner runner(AdminClient client, DefaultKafkaProducerFactory<String, String> pf) {
      		return args -> {
      			try {
      				Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
      				int committed = testGoodTx(client, configs);
      				System.out.println("Successes (same producer) committed: " + committed);
      				int rolledBack = testAppFailureTx(client, configs);
      				System.out.println("App failures (same producer) rolled back: " + rolledBack);
      				
      				// first flavor - hung thread in abortTransaction()
      //				rolledBack = testSendFailureTxSameProducer(client, configs);
      //				System.out.println("Send failures (same producer) rolled back: " + rolledBack);
      				
      				// second flavor - timeout in initTransactions()
      				rolledBack = testSendFailureTxNewProducer(client, configs);
      				System.out.println("Send failures (new producer) rolled back: " + rolledBack);
      			}
      			catch (Exception e) {
      				e.printStackTrace();
      			}
      		};
      	}
      
      	private int testGoodTx(AdminClient client, Map<String, Object> configs)
      			throws ExecutionException {
      
      		int commits = 0;
      		NewTopic topic = TopicBuilder.name("so55510898a")
      				.partitions(1)
      				.replicas(1)
      				.build();
      		createTopic(client, topic);
      		configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-");
      		Producer<String, String> producer = new KafkaProducer<>(configs);
      		try {
      			producer.initTransactions();
      			for (int i = 0; i < 2; i++) {
      				producer.beginTransaction();
      				RecordMetadata recordMetadata = producer.send(
      						new ProducerRecord<>("so55510898a", "foooooooooooooooooooooo")).get(10, TimeUnit.SECONDS);
      				System.out.println(recordMetadata);
      				producer.commitTransaction();
      				commits++;
      			}
      		}
      		catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
      			// We can't recover from these exceptions, so our only option is to close the producer and exit.
      		}
      		catch (Exception e) {
      			System.out.println("aborting");
      			producer.abortTransaction();
      		}
      		finally {
      			producer.close();
      		}
      		return commits;
      	}
      
      	private int testAppFailureTx(AdminClient client, Map<String, Object> configs)
      			throws ExecutionException {
      
      		int aborts = 0;
      		NewTopic topic = TopicBuilder.name("so55510898b")
      				.partitions(1)
      				.replicas(1)
      				.build();
      		createTopic(client, topic);
      		configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txb-");
      		Producer<String, String> producer = new KafkaProducer<>(configs);
      		producer.initTransactions();
      		for (int i = 0; i < 2; i++) {
      			try {
      				producer.beginTransaction();
      				RecordMetadata recordMetadata = producer.send(
      						new ProducerRecord<>("so55510898b", "baaaaaaaaaaaaaaaar")).get(10, TimeUnit.SECONDS);
      				System.out.println(recordMetadata);
      				throw new RuntimeException("App failed after send");
      			}
      			catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
      				// We can't recover from these exceptions, so our only option is to close the producer and exit.
      			}
      			catch (Exception e) {
      				System.out.println("aborting");
      				producer.abortTransaction();
      				aborts++;
      			}
      		}
      		producer.close();
      		return aborts;
      	}
      
      	private int testSendFailureTxSameProducer(AdminClient client, Map<String, Object> configs)
      			throws ExecutionException {
      
      		int aborts = 0;
      		NewTopic topic = TopicBuilder.name("so55510898c")
      				.partitions(1)
      				.replicas(1)
      				.config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "10")
      				.build();
      		configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txc-");
      		createTopic(client, topic);
      		Producer<String, String> producer = new KafkaProducer<>(configs);
      		producer.initTransactions();
      		for (int i = 0; i < 2; i++) {
      			try {
      				producer.beginTransaction();
      				RecordMetadata recordMetadata = producer.send(
      						new ProducerRecord<>("so55510898c", "baaaaaaaaaaaaaaaaz")).get(10, TimeUnit.SECONDS);
      				System.out.println(recordMetadata);
      				throw new RuntimeException("App failed after send");
      			}
      			catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
      				// We can't recover from these exceptions, so our only option is to close the producer and exit.
      			}
      			catch (Exception e) {
      				System.out.println("aborting");
      				e.printStackTrace();
      				producer.abortTransaction();
      				aborts++;
      			}
      		}
      		producer.close();
      		return aborts;
      	}
      
      	private int testSendFailureTxNewProducer(AdminClient client, Map<String, Object> configs)
      			throws ExecutionException {
      
      		int aborts = 0;
      		NewTopic topic = TopicBuilder.name("so55510898d")
      				.partitions(1)
      				.replicas(1)
      				.config(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "10")
      				.build();
      		createTopic(client, topic);
      		configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txd-");
      		for (int i = 0; i < 2; i++) {
      			Producer<String, String> producer = new KafkaProducer<>(configs);
      			try {
      				try {
      					producer.initTransactions();
      				}
      				catch (Exception e) {
      					e.printStackTrace();
      					return aborts;
      				}
      				producer.beginTransaction();
      				RecordMetadata recordMetadata = producer.send(
      						new ProducerRecord<>("so55510898d", "quuuuuuuuuuuuuuux")).get(10, TimeUnit.SECONDS);
      				System.out.println(recordMetadata);
      				throw new RuntimeException("App failed after send");
      			}
      			catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
      				// We can't recover from these exceptions, so our only option is to close the producer and exit.
      			}
      			catch (Exception e) {
      				System.out.println("aborting");
      				e.printStackTrace();
      				producer.abortTransaction();
      				aborts++;
      
      			}
      			finally {
      				producer.close();
      			}
      		}
      		return aborts;
      	}
      
      	private void createTopic(AdminClient client, NewTopic topic) throws ExecutionException {
      		try {
      			client.createTopics(Collections.singletonList(topic)).all().get();
      			return;
      		}
      		catch (InterruptedException e) {
      			Thread.currentThread().interrupt();
      			return;
      		}
      		catch (ExecutionException e) {
      			if (!(e.getCause() instanceof TopicExistsException)) {
      				throw e;
      			}
      		}
      	}
      

      hang.same.producer.log

      2019-04-05 12:24:42.235  INFO 15404 --- [           main] com.example.So55510898Application        : Starting So55510898Application on Gollum2.local with PID 15404 (/Users/grussell/Development/stsws/so55510898/target/classes started by grussell in /Users/grussell/Development/stsws/so55510898)
      2019-04-05 12:24:42.237  INFO 15404 --- [           main] com.example.So55510898Application        : No active profile set, falling back to default profiles: default
      2019-04-05 12:24:42.546  INFO 15404 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$b2eeb124] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
      2019-04-05 12:24:42.639  INFO 15404 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
      	bootstrap.servers = [localhost:9092]
      	client.dns.lookup = default
      	client.id =
      	connections.max.idle.ms = 300000
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	receive.buffer.bytes = 65536
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 120000
      	retries = 5
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      
      2019-04-05 12:24:42.671  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:24:42.672  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:24:42.853  INFO 15404 --- [           main] com.example.So55510898Application        : Started So55510898Application in 0.8 seconds (JVM running for 1.233)
      2019-04-05 12:24:43.058  INFO 15404 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
      	acks = all
      	batch.size = 16384
      	bootstrap.servers = [localhost:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id =
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 120000
      	enable.idempotence = true
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 5000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retries = 2
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = txa-
      	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
      
      2019-04-05 12:24:43.063  INFO 15404 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=txa-] Instantiated a transactional producer.
      2019-04-05 12:24:43.074  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:24:43.075  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:24:43.075  INFO 15404 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=txa-] ProducerId set to -1 with epoch -1
      2019-04-05 12:24:43.176  INFO 15404 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ
      2019-04-05 12:24:43.927  INFO 15404 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=txa-] ProducerId set to 0 with epoch 0
      so55510898a-0@0
      so55510898a-0@2
      2019-04-05 12:24:44.034  INFO 15404 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=txa-] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      Successes (same producer) committed: 2
      2019-04-05 12:24:44.062  INFO 15404 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
      	acks = all
      	batch.size = 16384
      	bootstrap.servers = [localhost:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id =
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 120000
      	enable.idempotence = true
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 5000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retries = 2
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = txb-
      	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
      
      2019-04-05 12:24:44.062  INFO 15404 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2, transactionalId=txb-] Instantiated a transactional producer.
      2019-04-05 12:24:44.066  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:24:44.066  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:24:44.066  INFO 15404 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=txb-] ProducerId set to -1 with epoch -1
      2019-04-05 12:24:44.171  INFO 15404 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ
      2019-04-05 12:24:44.339  INFO 15404 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=txb-] ProducerId set to 1 with epoch 0
      so55510898b-0@0
      aborting
      so55510898b-0@2
      aborting
      2019-04-05 12:24:44.384  INFO 15404 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2, transactionalId=txb-] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      App failures (same producer) rolled back: 2
      2019-04-05 12:24:44.416  INFO 15404 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
      	acks = all
      	batch.size = 16384
      	bootstrap.servers = [localhost:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id =
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 120000
      	enable.idempotence = true
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 5000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retries = 2
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = txc-
      	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
      
      2019-04-05 12:24:44.417  INFO 15404 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-3, transactionalId=txc-] Instantiated a transactional producer.
      2019-04-05 12:24:44.419  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:24:44.420  INFO 15404 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:24:44.420  INFO 15404 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-3, transactionalId=txc-] ProducerId set to -1 with epoch -1
      2019-04-05 12:24:44.522  INFO 15404 --- [ad | producer-3] org.apache.kafka.clients.Metadata        : Cluster ID: 0q-Ge1ROSyKNehyv1sVJCQ
      2019-04-05 12:24:44.634  INFO 15404 --- [ad | producer-3] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-3, transactionalId=txc-] ProducerId set to 2 with epoch 0
      aborting
      java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:81)
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
      	at com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:176)
      	at com.example.So55510898Application.lambda$0(So55510898Application.java:84)
      	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
      	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
      	at com.example.So55510898Application.main(So55510898Application.java:36)
      Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
      aborting
      java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms.
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:78)
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
      	at com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:176)
      	at com.example.So55510898Application.lambda$0(So55510898Application.java:84)
      	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
      	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
      	at com.example.So55510898Application.main(So55510898Application.java:36)
      
      Thread stiuck in abort after RecordTooLargeException:
      
      "main" #1 prio=5 os_prio=31 tid=0x00007fc36b001800 nid=0x2603 waiting on condition [0x0000700000b36000]
         java.lang.Thread.State: WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000076b5b3208> (a java.util.concurrent.CountDownLatch$Sync)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
      	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
      	at org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
      	at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:718)
      	at com.example.So55510898Application.testSendFailureTxSameProducer(So55510898Application.java:186)
      	at com.example.So55510898Application.lambda$0(So55510898Application.java:84)
      	at com.example.So55510898Application$$Lambda$204/1026483832.run(Unknown Source)
      	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
      	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
      	at com.example.So55510898Application.main(So55510898Application.java:36)
      

      inittrans.timeout.log

      2019-04-05 12:42:30.007  INFO 21503 --- [           main] com.example.So55510898Application        : Starting So55510898Application on Gollum2.local with PID 21503 (/Users/grussell/Development/stsws/so55510898/target/classes started by grussell in /Users/grussell/Development/stsws/so55510898)
      2019-04-05 12:42:30.009  INFO 21503 --- [           main] com.example.So55510898Application        : No active profile set, falling back to default profiles: default
      2019-04-05 12:42:30.323  INFO 21503 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$e567a345] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
      2019-04-05 12:42:30.414  INFO 21503 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
      	bootstrap.servers = [localhost:9092]
      	client.dns.lookup = default
      	client.id =
      	connections.max.idle.ms = 300000
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	receive.buffer.bytes = 65536
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 120000
      	retries = 5
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      
      2019-04-05 12:42:30.443  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:42:30.444  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:42:30.591  INFO 21503 --- [           main] com.example.So55510898Application        : Started So55510898Application in 0.769 seconds (JVM running for 1.163)
      2019-04-05 12:42:30.806  INFO 21503 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
      	acks = all
      	batch.size = 16384
      	bootstrap.servers = [localhost:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id =
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 120000
      	enable.idempotence = true
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 5000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retries = 2
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = txa-
      	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
      
      2019-04-05 12:42:30.811  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=txa-] Instantiated a transactional producer.
      2019-04-05 12:42:30.825  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:42:30.825  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:42:30.826  INFO 21503 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=txa-] ProducerId set to -1 with epoch -1
      2019-04-05 12:42:30.927  INFO 21503 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: u1qliw8hRg-kG3RA3GnspQ
      2019-04-05 12:42:31.792  INFO 21503 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=txa-] ProducerId set to 0 with epoch 0
      so55510898a-0@0
      so55510898a-0@2
      2019-04-05 12:42:31.901  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=txa-] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      Successes (same producer) committed: 2
      2019-04-05 12:42:31.938  INFO 21503 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
      	acks = all
      	batch.size = 16384
      	bootstrap.servers = [localhost:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id =
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 120000
      	enable.idempotence = true
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 5000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retries = 2
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = txb-
      	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
      
      2019-04-05 12:42:31.938  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2, transactionalId=txb-] Instantiated a transactional producer.
      2019-04-05 12:42:31.943  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:42:31.943  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:42:31.943  INFO 21503 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=txb-] ProducerId set to -1 with epoch -1
      2019-04-05 12:42:32.046  INFO 21503 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : Cluster ID: u1qliw8hRg-kG3RA3GnspQ
      2019-04-05 12:42:32.157  INFO 21503 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2, transactionalId=txb-] ProducerId set to 1 with epoch 0
      so55510898b-0@0
      aborting
      so55510898b-0@2
      aborting
      2019-04-05 12:42:32.200  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2, transactionalId=txb-] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      App failures (same producer) rolled back: 2
      2019-04-05 12:42:32.231  INFO 21503 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
      	acks = all
      	batch.size = 16384
      	bootstrap.servers = [localhost:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id =
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 120000
      	enable.idempotence = true
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 5000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retries = 2
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = txd-
      	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
      
      2019-04-05 12:42:32.231  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-3, transactionalId=txd-] Instantiated a transactional producer.
      2019-04-05 12:42:32.234  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:42:32.234  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:42:32.234  INFO 21503 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-3, transactionalId=txd-] ProducerId set to -1 with epoch -1
      2019-04-05 12:42:32.339  INFO 21503 --- [ad | producer-3] org.apache.kafka.clients.Metadata        : Cluster ID: u1qliw8hRg-kG3RA3GnspQ
      2019-04-05 12:42:32.449  INFO 21503 --- [ad | producer-3] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-3, transactionalId=txd-] ProducerId set to 2 with epoch 0
      aborting
      java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:81)
      	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
      	at com.example.So55510898Application.testSendFailureTxNewProducer(So55510898Application.java:222)
      	at com.example.So55510898Application.lambda$0(So55510898Application.java:87)
      	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
      	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
      	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
      	at com.example.So55510898Application.main(So55510898Application.java:36)
      Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
      2019-04-05 12:42:32.463  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-3, transactionalId=txd-] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      2019-04-05 12:42:32.466  INFO 21503 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
      	acks = all
      	batch.size = 16384
      	bootstrap.servers = [localhost:9092]
      	buffer.memory = 33554432
      	client.dns.lookup = default
      	client.id =
      	compression.type = none
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 120000
      	enable.idempotence = true
      	interceptor.classes = []
      	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
      	linger.ms = 0
      	max.block.ms = 5000
      	max.in.flight.requests.per.connection = 5
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
      	receive.buffer.bytes = 32768
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retries = 2
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	send.buffer.bytes = 131072
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      	ssl.endpoint.identification.algorithm = https
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLS
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	transaction.timeout.ms = 60000
      	transactional.id = txd-
      	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
      
      2019-04-05 12:42:32.466  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-4, transactionalId=txd-] Instantiated a transactional producer.
      2019-04-05 12:42:32.470  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.2.0
      2019-04-05 12:42:32.470  INFO 21503 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 05fcfde8f69b0349
      2019-04-05 12:42:32.470  INFO 21503 --- [           main] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-4, transactionalId=txd-] ProducerId set to -1 with epoch -1
      2019-04-05 12:42:32.574  INFO 21503 --- [ad | producer-4] org.apache.kafka.clients.Metadata        : Cluster ID: u1qliw8hRg-kG3RA3GnspQ
      org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 5000ms.
      2019-04-05 12:42:37.472  INFO 21503 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-4, transactionalId=txd-] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
      Send failures (new producer) rolled back: 1
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            grussell Gary Russell
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: