(org.apache.kafka.streams.StreamsConfig) [2017-12-12 10:45:38,889] INFO [main] StreamsConfig values: application.id = streams-soak-test-app application.server = bootstrap.servers = [r0.streams-soak-4-0-0.confluent.aws.devel.cpdev.cloud:9092, r1.streams-soak-4-0-0.confluent.aws.devel.cpdev.cloud:9093, r2.streams-soak-4-0-0.confluent.aws.devel.cpdev.cloud:9094] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 10485760 client.id = streams-saak-test-client-StreamThread-3-consumer commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde key.serde = null metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = DEBUG metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 1 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 3 request.timeout.ms = 40000 retries = 0 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = SASL_SSL send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams timestamp.extractor = null value.serde = null windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = (org.apache.kafka.streams.StreamsConfig) [2017-12-12 10:45:38,889] DEBUG [main] stream-thread [main] Configs: (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2017-12-12 10:45:38,898] DEBUG [main] stream-client [streams-saak-test-client] Starting Streams client (org.apache.kafka.streams.KafkaStreams) [2017-12-12 10:45:38,898] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Starting (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:38,899] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] State transition from CREATED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:38,899] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Starting (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:38,899] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] State transition from CREATED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:38,900] INFO [main] stream-client [streams-saak-test-client] Started Streams client (org.apache.kafka.streams.KafkaStreams) [2017-12-12 10:45:38,900] INFO [main] Soak test kafka streams app kicked off (org.apache.kafka.streams.StreamsSoakTest) [2017-12-12 10:45:38,900] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Starting (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:38,900] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] State transition from CREATED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:39,508] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. current assigned active tasks: [] current assigned standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:39,508] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:39,508] INFO [streams-saak-test-client-StreamThread-1] stream-client [streams-saak-test-client] State transition from RUNNING to REBALANCING (org.apache.kafka.streams.KafkaStreams) [2017-12-12 10:45:39,509] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Suspending all active tasks [] and standby tasks [] (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:45:39,509] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] partition revocation took 1 ms. suspended active tasks: [] suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:39,511] DEBUG [streams-saak-test-client-StreamThread-1] [node-name-repartition, windowed-node-counts, host-id-repartition, cluster-id-repartition, logs]found stream-thread [streams-saak-test-client-StreamThread-1] topics possibly matching regex (org.apache.kafka.streams.processor.internals.InternalTopologyBuilder) [2017-12-12 10:45:39,511] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] updating builder with SubscriptionUpdates{updatedTopicSubscriptions=[node-name-repartition, windowed-node-counts, host-id-repartition, cluster-id-repartition, logs]} topic(s) with possible matching regex subscription(s) (org.apache.kafka.streams.processor.internals.InternalTopologyBuilder) [2017-12-12 10:45:40,343] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. current assigned active tasks: [] current assigned standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:40,343] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] State transition from RUNNING to PARTITIONS_REVOKED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:40,343] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Suspending all active tasks [] and standby tasks [] (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:45:40,344] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] partition revocation took 1 ms. suspended active tasks: [] suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:46,152] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. current assigned active tasks: [] current assigned standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:46,152] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] State transition from RUNNING to PARTITIONS_REVOKED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:46,152] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Suspending all active tasks [] and standby tasks [] (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:45:46,152] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] partition revocation took 0 ms. suspended active tasks: [] suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:47,877] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Constructed client metadata {039436d4-a7a9-492b-80c1-53315b47a075=ClientMetadata{hostInfo=null, consumers=[streams-saak-test-client-StreamThread-3-consumer-0266102c-68ba-4960-a857-5abf49879ce5, streams-saak-test-client-StreamThread-1-consumer-5b717839-d984-4630-8856-101d2b6ae5e3], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([0_1, 1_0, 2_0, 1_2, 2_1, 0_4, 0_5, 1_5, 0_7, 1_6, 3_5, 0_8, 2_6, 3_6, 1_9, 2_8, 2_9, 3_8]) prevAssignedTasks: ([0_0, 0_1, 1_0, 2_0, 0_2, 1_2, 0_3, 2_1, 0_4, 3_1, 0_5, 2_3, 3_2, 1_5, 0_6, 2_4, 3_3, 0_7, 1_6, 3_4, 2_5, 3_5, 1_7, 0_8, 2_6, 3_6, 0_9, 2_7, 1_9, 2_8, 3_7, 2_9, 3_8, 3_9]) capacity: 2]}, 8b76fea0-2830-47a9-9142-79c24af3ad9f=ClientMetadata{hostInfo=null, consumers=[streams-saak-test-client-StreamThread-1-consumer-2de01464-3748-4887-ad59-392189289aef, streams-saak-test-client-StreamThread-2-consumer-c739ae10-f59e-46ed-9bbc-83bd6e4632ef], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([1_1, 2_2, 1_3, 2_3, 1_4, 2_4, 0_6, 0_7, 3_4, 3_5, 2_6, 1_8, 0_9]) prevAssignedTasks: ([0_0, 0_1, 1_1, 0_2, 0_3, 2_1, 3_0, 2_2, 1_3, 2_3, 1_4, 0_5, 3_2, 2_4, 0_6, 3_3, 0_7, 3_4, 2_5, 3_5, 0_8, 2_6, 1_8, 3_6, 0_9, 2_7, 1_9, 2_8, 3_7, 3_8, 2_9, 3_9]) capacity: 2]}, ce00caec-66f7-4c9b-9500-3cae42220478=ClientMetadata{hostInfo=null, consumers=[streams-saak-test-client-StreamThread-3-consumer-b01571a5-5f63-48d2-80ce-33b8d18bc105, streams-saak-test-client-StreamThread-2-consumer-f01bf5a2-5375-44dd-9dd4-e6e4f4d2da5f, streams-saak-test-client-StreamThread-1-consumer-14b8634d-cab3-4e78-8b4d-883a30fe6bb7], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([3_1, 0_4, 1_3, 1_4, 0_5, 2_3, 3_2, 0_6, 1_5, 2_4, 3_3, 1_6, 3_4, 2_5, 0_7, 1_7, 0_8, 3_5, 2_6, 3_6, 0_9, 1_8, 2_7, 1_9, 2_8, 3_7, 3_8, 2_9, 3_9]) capacity: 3]}} from the member subscriptions. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:47,879] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Starting to validate internal topics in partition assignor. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:47,879] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Completed validating internal topics in partition assignor. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:47,879] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Created repartition topics [] from the parsed topology. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:47,879] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Starting to validate internal topics in partition assignor. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:57,896] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] at state PARTITIONS_REVOKED: partitions [host-id-repartition-3, host-id-repartition-1, host-id-repartition-7, node-name-repartition-7, windowed-node-counts-7, cluster-id-repartition-7, host-id-repartition-8] assigned at the end of consumer rebalance. current suspended active tasks: [] current suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:57,896] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] at state PARTITIONS_REVOKED: partitions [host-id-repartition-2, logs-9, cluster-id-repartition-5, host-id-repartition-9, cluster-id-repartition-8, logs-3, cluster-id-repartition-9] assigned at the end of consumer rebalance. current suspended active tasks: [] current suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:57,896] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:57,896] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:57,896] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Adding assigned tasks as active: {0_3=[logs-3], 3_2=[host-id-repartition-2], 2_5=[cluster-id-repartition-5], 0_9=[logs-9], 2_8=[cluster-id-repartition-8], 2_9=[cluster-id-repartition-9], 3_9=[host-id-repartition-9]} (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:45:57,896] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Adding assigned tasks as active: {3_1=[host-id-repartition-1], 3_3=[host-id-repartition-3], 1_7=[node-name-repartition-7, windowed-node-counts-7], 2_7=[cluster-id-repartition-7], 3_7=[host-id-repartition-7], 3_8=[host-id-repartition-8]} (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:45:57,901] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Created state store manager for task 0_3 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,902] DEBUG [streams-saak-test-client-StreamThread-1] task [3_1] Created state store manager for task 3_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,908] DEBUG [streams-saak-test-client-StreamThread-1] task [3_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,908] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,909] DEBUG [streams-saak-test-client-StreamThread-1] task [3_3] Created state store manager for task 3_3 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,909] DEBUG [streams-saak-test-client-StreamThread-1] task [3_3] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,910] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Created state store manager for task 1_7 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,910] DEBUG [streams-saak-test-client-StreamThread-2] task [3_2] Created state store manager for task 3_2 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,910] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,911] DEBUG [streams-saak-test-client-StreamThread-2] task [3_2] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,911] DEBUG [streams-saak-test-client-StreamThread-1] task [2_7] Created state store manager for task 2_7 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,912] DEBUG [streams-saak-test-client-StreamThread-1] task [2_7] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,912] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Created state store manager for task 2_5 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,912] DEBUG [streams-saak-test-client-StreamThread-1] task [3_7] Created state store manager for task 3_7 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,912] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,913] DEBUG [streams-saak-test-client-StreamThread-1] task [3_7] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,913] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Created state store manager for task 0_9 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,913] DEBUG [streams-saak-test-client-StreamThread-1] task [3_8] Created state store manager for task 3_8 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,914] DEBUG [streams-saak-test-client-StreamThread-1] task [3_8] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,914] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,914] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] partition assignment took 18 ms. current active tasks: [3_1, 3_3, 1_7, 2_7, 3_7, 3_8] current standby tasks: [] previous active tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:57,915] DEBUG [streams-saak-test-client-StreamThread-2] task [2_8] Created state store manager for task 2_8 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,915] DEBUG [streams-saak-test-client-StreamThread-2] task [2_8] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,916] DEBUG [streams-saak-test-client-StreamThread-2] task [2_9] Created state store manager for task 2_9 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,917] DEBUG [streams-saak-test-client-StreamThread-2] task [2_9] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,917] DEBUG [streams-saak-test-client-StreamThread-2] task [3_9] Created state store manager for task 3_9 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,917] DEBUG [streams-saak-test-client-StreamThread-2] task [3_9] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:57,917] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] partition assignment took 21 ms. current active tasks: [0_3, 3_2, 2_5, 0_9, 2_8, 2_9, 3_9] current standby tasks: [] previous active tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:57,919] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Initializing stream tasks [3_1, 3_3, 1_7, 2_7, 3_7, 3_8] (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:57,919] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] No custom setting defined for topic 'cluster-id-repartition' using original config 'earliest' for offset reset (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:45:57,919] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Initializing stream tasks [0_3, 3_2, 2_5, 0_9, 2_8, 2_9, 3_9] (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:57,921] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Acquired state dir lock for task 3_1 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:57,940] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] transitioning stream task 0_3 to running (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:57,940] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 3_2 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,025] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,025] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,056] DEBUG [streams-saak-test-client-StreamThread-1] task [3_1] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,056] DEBUG [streams-saak-test-client-StreamThread-2] task [3_2] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,067] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Transitioning stream task 3_1 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,067] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 3_2 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,067] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Acquired state dir lock for task 3_3 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,067] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 2_5 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,072] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,072] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,107] DEBUG [streams-saak-test-client-StreamThread-1] task [3_3] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,107] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,115] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 2_5 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,115] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Transitioning stream task 3_3 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,115] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Acquired state dir lock for task 1_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,121] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,124] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] transitioning stream task 0_9 to running (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,125] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 2_8 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,127] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,142] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,144] DEBUG [streams-saak-test-client-StreamThread-2] task [2_8] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,146] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 2_8 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,146] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 2_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,148] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,158] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,166] DEBUG [streams-saak-test-client-StreamThread-2] task [2_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,167] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 2_9 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,167] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 3_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,170] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,174] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,176] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,185] DEBUG [streams-saak-test-client-StreamThread-2] task [3_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,188] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 3_9 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,191] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,208] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,223] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000013 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,223] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,239] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,256] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,271] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000018 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,272] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,287] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,303] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,318] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000024 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,319] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:58,349] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Registering state store windowed-node-counts-STATE-STORE-0000000029 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:58,359] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Transitioning stream task 1_7 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:58,359] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Acquired state dir lock for task 2_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:58,990] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:59,006] DEBUG [streams-saak-test-client-StreamThread-1] task [2_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:59,008] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Transitioning stream task 2_7 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:59,008] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Acquired state dir lock for task 3_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:59,040] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:59,056] DEBUG [streams-saak-test-client-StreamThread-1] task [3_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:59,058] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Transitioning stream task 3_7 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:59,058] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Acquired state dir lock for task 3_8 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:45:59,060] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:45:59,076] DEBUG [streams-saak-test-client-StreamThread-1] task [3_8] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:45:59,077] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Transitioning stream task 3_8 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:45:59,498] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Completed validating internal topics in partition assignor. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:59,498] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Created state changelog topics [InternalTopicMetadata(config=InternalTopicConfig(name=streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog, logConfig={cleanup.policy=compact,delete, retention.ms=21600000, retention.bytes=104857600}, cleanupPolicies=[compact], retentionMs=null), numPartitions=10), InternalTopicMetadata(config=InternalTopicConfig(name=streams-soak-test-app-windowed-node-counts-STATE-STORE-0000000029-changelog, logConfig={}, cleanupPolicies=[compact], retentionMs=null), numPartitions=10), InternalTopicMetadata(config=InternalTopicConfig(name=streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog, logConfig={cleanup.policy=compact,delete, retention.ms=21600000, retention.bytes=104857600}, cleanupPolicies=[delete, compact], retentionMs=null), numPartitions=10), InternalTopicMetadata(config=InternalTopicConfig(name=streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000039-changelog, logConfig={cleanup.policy=compact,delete, retention.ms=21600000, retention.bytes=104857600}, cleanupPolicies=[compact], retentionMs=null), numPartitions=10), InternalTopicMetadata(config=InternalTopicConfig(name=streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog, logConfig={cleanup.policy=compact,delete, retention.ms=21600000, retention.bytes=104857600}, cleanupPolicies=[compact], retentionMs=null), numPartitions=10), InternalTopicMetadata(config=InternalTopicConfig(name=streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog, logConfig={cleanup.policy=compact,delete, retention.ms=21600000, retention.bytes=104857600}, cleanupPolicies=[delete, compact], retentionMs=null), numPartitions=10), InternalTopicMetadata(config=InternalTopicConfig(name=streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000018-changelog, logConfig={cleanup.policy=compact,delete, retention.ms=21600000, retention.bytes=104857600}, cleanupPolicies=[delete, compact], retentionMs=null), numPartitions=10)] from the parsed topology. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:59,498] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Assigning tasks [0_0, 0_1, 1_0, 0_2, 1_1, 2_0, 0_3, 1_2, 2_1, 3_0, 0_4, 1_3, 2_2, 3_1, 0_5, 1_4, 2_3, 3_2, 0_6, 1_5, 2_4, 3_3, 0_7, 1_6, 2_5, 3_4, 0_8, 1_7, 2_6, 3_5, 0_9, 1_8, 2_7, 3_6, 1_9, 2_8, 3_7, 2_9, 3_8, 3_9] to clients {039436d4-a7a9-492b-80c1-53315b47a075=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([0_1, 1_0, 2_0, 1_2, 2_1, 0_4, 0_5, 1_5, 0_7, 1_6, 3_5, 0_8, 2_6, 3_6, 1_9, 2_8, 2_9, 3_8]) prevAssignedTasks: ([0_0, 0_1, 1_0, 2_0, 0_2, 1_2, 0_3, 2_1, 0_4, 3_1, 0_5, 2_3, 3_2, 1_5, 0_6, 2_4, 3_3, 0_7, 1_6, 3_4, 2_5, 3_5, 1_7, 0_8, 2_6, 3_6, 0_9, 2_7, 1_9, 2_8, 3_7, 2_9, 3_8, 3_9]) capacity: 2], 8b76fea0-2830-47a9-9142-79c24af3ad9f=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([1_1, 2_2, 1_3, 2_3, 1_4, 2_4, 0_6, 0_7, 3_4, 3_5, 2_6, 1_8, 0_9]) prevAssignedTasks: ([0_0, 0_1, 1_1, 0_2, 0_3, 2_1, 3_0, 2_2, 1_3, 2_3, 1_4, 0_5, 3_2, 2_4, 0_6, 3_3, 0_7, 3_4, 2_5, 3_5, 0_8, 2_6, 1_8, 3_6, 0_9, 2_7, 1_9, 2_8, 3_7, 3_8, 2_9, 3_9]) capacity: 2], ce00caec-66f7-4c9b-9500-3cae42220478=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([3_1, 0_4, 1_3, 1_4, 0_5, 2_3, 3_2, 0_6, 1_5, 2_4, 3_3, 1_6, 3_4, 2_5, 0_7, 1_7, 0_8, 3_5, 2_6, 3_6, 0_9, 1_8, 2_7, 1_9, 2_8, 3_7, 3_8, 2_9, 3_9]) capacity: 3]} with number of replicas 0 (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:45:59,501] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3-consumer] Assigned tasks to clients as {039436d4-a7a9-492b-80c1-53315b47a075=[activeTasks: ([0_1, 1_0, 2_0, 0_2, 1_2, 2_1, 0_4, 0_5, 1_5, 1_6, 0_8, 3_9]) standbyTasks: ([]) assignedTasks: ([0_1, 1_0, 2_0, 0_2, 1_2, 2_1, 0_4, 0_5, 1_5, 1_6, 0_8, 3_9]) prevActiveTasks: ([0_1, 1_0, 2_0, 1_2, 2_1, 0_4, 0_5, 1_5, 0_7, 1_6, 3_5, 0_8, 2_6, 3_6, 1_9, 2_8, 2_9, 3_8]) prevAssignedTasks: ([0_0, 0_1, 1_0, 2_0, 0_2, 1_2, 0_3, 2_1, 0_4, 3_1, 0_5, 2_3, 3_2, 1_5, 0_6, 2_4, 3_3, 0_7, 1_6, 3_4, 2_5, 3_5, 1_7, 0_8, 2_6, 3_6, 0_9, 2_7, 1_9, 2_8, 3_7, 2_9, 3_8, 3_9]) capacity: 2], 8b76fea0-2830-47a9-9142-79c24af3ad9f=[activeTasks: ([1_1, 0_3, 2_2, 1_3, 2_3, 1_4, 2_4, 0_6, 0_7, 3_4, 3_5]) standbyTasks: ([]) assignedTasks: ([1_1, 0_3, 2_2, 1_3, 2_3, 1_4, 2_4, 0_6, 0_7, 3_4, 3_5]) prevActiveTasks: ([1_1, 2_2, 1_3, 2_3, 1_4, 2_4, 0_6, 0_7, 3_4, 3_5, 2_6, 1_8, 0_9]) prevAssignedTasks: ([0_0, 0_1, 1_1, 0_2, 0_3, 2_1, 3_0, 2_2, 1_3, 2_3, 1_4, 0_5, 3_2, 2_4, 0_6, 3_3, 0_7, 3_4, 2_5, 3_5, 0_8, 2_6, 1_8, 3_6, 0_9, 2_7, 1_9, 2_8, 3_7, 3_8, 2_9, 3_9]) capacity: 2], ce00caec-66f7-4c9b-9500-3cae42220478=[activeTasks: ([0_0, 3_0, 3_1, 3_2, 3_3, 2_5, 1_7, 2_6, 0_9, 1_8, 2_7, 3_6, 1_9, 2_8, 3_7, 2_9, 3_8]) standbyTasks: ([]) assignedTasks: ([0_0, 3_0, 3_1, 3_2, 3_3, 2_5, 1_7, 2_6, 0_9, 1_8, 2_7, 3_6, 1_9, 2_8, 3_7, 2_9, 3_8]) prevActiveTasks: ([]) prevAssignedTasks: ([3_1, 0_4, 1_3, 1_4, 0_5, 2_3, 3_2, 0_6, 1_5, 2_4, 3_3, 1_6, 3_4, 2_5, 0_7, 1_7, 0_8, 3_5, 2_6, 3_6, 0_9, 1_8, 2_7, 1_9, 2_8, 3_7, 3_8, 2_9, 3_9]) capacity: 3]}. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor) [2017-12-12 10:46:04,653] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Start restoring state stores from changelog topics [streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000039-changelog-9, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-9, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-2, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000039-changelog-8] (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:04,653] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000039-changelog-9 from offset 0 to endOffset 90574 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:04,668] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,689] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,707] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-9 from offset 0 to endOffset 94420 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:04,720] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,740] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,756] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-2 from offset 84873 to endOffset 89218 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:04,768] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,789] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,803] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000039-changelog-8 from offset 46472 to endOffset 46576 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:04,817] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,838] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,861] WARN [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch. (org.apache.kafka.streams.processor.internals.StoreChangelogReader) org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-9=0} at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:883) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1173) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1106) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) [2017-12-12 10:46:04,863] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Reinitializing StreamTask StreamsTask taskId: 3_9 ProcessorTopology: KSTREAM-SOURCE-0000000046: topics: [host-id-repartition] children: [KSTREAM-AGGREGATE-0000000049] KSTREAM-AGGREGATE-0000000049: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000048] children: [KTABLE-TOSTREAM-0000000050] KTABLE-TOSTREAM-0000000050: children: [KSTREAM-SINK-0000000051] KSTREAM-SINK-0000000051: topic: host-counts Partitions [host-id-repartition-9] (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:04,879] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:04,898] DEBUG [streams-saak-test-client-StreamThread-2] task [3_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,898] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] transitioning stream task 2_5 to running (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:46:04,898] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Committing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,899] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Flushing producer (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) [2017-12-12 10:46:04,901] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Committing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,901] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Flushing all stores registered in the state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,901] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Flushing producer (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) [2017-12-12 10:46:04,903] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Committing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,903] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Flushing producer (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) [2017-12-12 10:46:04,906] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Committed all active tasks [0_3, 3_2, 2_5, 0_9, 2_8, 2_9, 3_9] and standby tasks [] in 26005ms (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:04,906] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] at state PARTITIONS_ASSIGNED: partitions [host-id-repartition-2, logs-9, cluster-id-repartition-5, host-id-repartition-9, cluster-id-repartition-8, logs-3, cluster-id-repartition-9] revoked at the beginning of consumer rebalance. current assigned active tasks: [0_3, 3_2, 2_5, 0_9, 2_8, 2_9, 3_9] current assigned standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:04,906] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] State transition from PARTITIONS_ASSIGNED to PARTITIONS_REVOKED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:04,906] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Suspending all active tasks [0_3, 2_5, 0_9] and standby tasks [] (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:46:04,906] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Suspending (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,931] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Committing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,931] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Flushing producer (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) [2017-12-12 10:46:04,934] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Suspending (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,934] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Committing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,934] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Flushing all stores registered in the state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,934] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Flushing producer (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) [2017-12-12 10:46:04,936] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Suspending (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,936] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Committing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,936] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Flushing producer (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) [2017-12-12 10:46:04,939] DEBUG [streams-saak-test-client-StreamThread-2] task [3_2] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,939] DEBUG [streams-saak-test-client-StreamThread-2] task [3_2] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,939] DEBUG [streams-saak-test-client-StreamThread-2] task [3_2] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,949] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 3_2 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:04,950] DEBUG [streams-saak-test-client-StreamThread-2] task [2_8] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,950] DEBUG [streams-saak-test-client-StreamThread-2] task [2_8] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,950] DEBUG [streams-saak-test-client-StreamThread-2] task [2_8] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000039 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,959] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 2_8 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:04,960] DEBUG [streams-saak-test-client-StreamThread-2] task [2_9] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,960] DEBUG [streams-saak-test-client-StreamThread-2] task [2_9] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,960] DEBUG [streams-saak-test-client-StreamThread-2] task [2_9] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000039 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,969] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 2_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:04,969] DEBUG [streams-saak-test-client-StreamThread-2] task [3_9] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:04,969] DEBUG [streams-saak-test-client-StreamThread-2] task [3_9] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,969] DEBUG [streams-saak-test-client-StreamThread-2] task [3_9] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:04,978] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 3_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:04,978] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] partition revocation took 72 ms. suspended active tasks: [0_3, 2_5, 0_9] suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:59,019] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Start restoring state stores from changelog topics [streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-8, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000018-changelog-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-1, windowed-node-counts-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000039-changelog-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-3] (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,019] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-7 from offset 0 to endOffset 482080 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,020] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-7 from offset 0 to endOffset 454480 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,020] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-8 from offset 137020 to endOffset 137396 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,033] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,055] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,069] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-7 from offset 0 to endOffset 289514 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,082] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,103] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,119] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000018-changelog-7 from offset 0 to endOffset 4256022 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,119] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog-7 from offset 0 to endOffset 15463608 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,119] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-1 from offset 113860 to endOffset 113980 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,132] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,152] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,167] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition windowed-node-counts-7 from offset 0 to endOffset 22227857 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,180] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,201] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,217] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000039-changelog-7 from offset 29850 to endOffset 29959 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,230] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,251] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,266] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-3 from offset 78976 to endOffset 79245 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,279] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,300] INFO [streams-saak-test-client-StreamThread-1] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:46:59,319] WARN [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch. (org.apache.kafka.streams.processor.internals.StoreChangelogReader) org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-7=0} at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:883) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1173) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1106) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) [2017-12-12 10:46:59,320] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Reinitializing StreamTask StreamsTask taskId: 1_7 ProcessorTopology: KSTREAM-SOURCE-0000000005: topics: [node-name-repartition] children: [KSTREAM-AGGREGATE-0000000008, KSTREAM-AGGREGATE-0000000014, KSTREAM-AGGREGATE-0000000019, KSTREAM-AGGREGATE-0000000025, KSTREAM-JOIN-0000000032] KSTREAM-AGGREGATE-0000000008: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000007] children: [KTABLE-TOSTREAM-0000000009] KTABLE-TOSTREAM-0000000009: children: [KSTREAM-MAP-0000000010] KSTREAM-MAP-0000000010: children: [KSTREAM-SINK-0000000011] KSTREAM-SINK-0000000011: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000014: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000013] children: [KTABLE-TOSTREAM-0000000015] KTABLE-TOSTREAM-0000000015: children: [KSTREAM-SINK-0000000016] KSTREAM-SINK-0000000016: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000019: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000018] children: [KTABLE-TOSTREAM-0000000020] KTABLE-TOSTREAM-0000000020: children: [KSTREAM-MAP-0000000021] KSTREAM-MAP-0000000021: children: [KSTREAM-SINK-0000000022] KSTREAM-SINK-0000000022: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000025: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000024] children: [KTABLE-TOSTREAM-0000000026] KTABLE-TOSTREAM-0000000026: children: [KSTREAM-MAP-0000000027] KSTREAM-MAP-0000000027: children: [KSTREAM-SINK-0000000028] KSTREAM-SINK-0000000028: topic: windowed-node-counts KSTREAM-JOIN-0000000032: states: [windowed-node-counts-STATE-STORE-0000000029] children: [KSTREAM-SINK-0000000033] KSTREAM-SINK-0000000033: topic: joined-counts KSTREAM-SOURCE-0000000030: topics: [windowed-node-counts] children: [KTABLE-SOURCE-0000000031] KTABLE-SOURCE-0000000031: states: [windowed-node-counts-STATE-STORE-0000000029] Partitions [node-name-repartition-7, windowed-node-counts-7] (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:46:59,341] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,341] ERROR [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Encountered the following error during processing: (org.apache.kafka.streams.processor.internals.StreamThread) java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:752) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:750) at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:74) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:229) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) [2017-12-12 10:46:59,342] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:59,342] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:59,342] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Shutting down all active tasks [], standby tasks [], suspended tasks [], and suspended standby tasks [] (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:46:59,342] DEBUG [streams-saak-test-client-StreamThread-1] task [3_1] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:59,342] DEBUG [streams-saak-test-client-StreamThread-1] task [3_1] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,342] DEBUG [streams-saak-test-client-StreamThread-1] task [3_1] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,348] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Released state dir lock for task 3_1 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:59,348] DEBUG [streams-saak-test-client-StreamThread-1] task [3_3] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:59,348] DEBUG [streams-saak-test-client-StreamThread-1] task [3_3] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,348] DEBUG [streams-saak-test-client-StreamThread-1] task [3_3] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,354] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Released state dir lock for task 3_3 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:59,354] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:59,354] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,354] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000013 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,371] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000018 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,388] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000024 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,405] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Closing storage engine windowed-node-counts-STATE-STORE-0000000029 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,411] DEBUG [streams-saak-test-client-StreamThread-1] task [1_7] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000007 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,411] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Released state dir lock for task 1_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:59,411] DEBUG [streams-saak-test-client-StreamThread-1] task [2_7] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:59,411] DEBUG [streams-saak-test-client-StreamThread-1] task [2_7] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,411] DEBUG [streams-saak-test-client-StreamThread-1] task [2_7] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000039 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,417] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Released state dir lock for task 2_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:59,417] DEBUG [streams-saak-test-client-StreamThread-1] task [3_7] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:59,417] DEBUG [streams-saak-test-client-StreamThread-1] task [3_7] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,417] DEBUG [streams-saak-test-client-StreamThread-1] task [3_7] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,423] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Released state dir lock for task 3_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:59,423] DEBUG [streams-saak-test-client-StreamThread-1] task [3_8] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:46:59,423] DEBUG [streams-saak-test-client-StreamThread-1] task [3_8] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,423] DEBUG [streams-saak-test-client-StreamThread-1] task [3_8] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:46:59,429] DEBUG [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Released state dir lock for task 3_8 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:46:59,434] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:59,434] INFO [streams-saak-test-client-StreamThread-1] stream-thread [streams-saak-test-client-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:46:59,434] ERROR [streams-saak-test-client-StreamThread-1] Thread StreamsThread threadId: streams-saak-test-client-StreamThread-1 Active tasks: Running: Suspended: Restoring: StreamsTask taskId: 3_1 ProcessorTopology: KSTREAM-SOURCE-0000000046: topics: [host-id-repartition] children: [KSTREAM-AGGREGATE-0000000049] KSTREAM-AGGREGATE-0000000049: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000048] children: [KTABLE-TOSTREAM-0000000050] KTABLE-TOSTREAM-0000000050: children: [KSTREAM-SINK-0000000051] KSTREAM-SINK-0000000051: topic: host-counts Partitions [host-id-repartition-1] StreamsTask taskId: 3_3 ProcessorTopology: KSTREAM-SOURCE-0000000046: topics: [host-id-repartition] children: [KSTREAM-AGGREGATE-0000000049] KSTREAM-AGGREGATE-0000000049: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000048] children: [KTABLE-TOSTREAM-0000000050] KTABLE-TOSTREAM-0000000050: children: [KSTREAM-SINK-0000000051] KSTREAM-SINK-0000000051: topic: host-counts Partitions [host-id-repartition-3] StreamsTask taskId: 1_7 ProcessorTopology: KSTREAM-SOURCE-0000000005: topics: [node-name-repartition] children: [KSTREAM-AGGREGATE-0000000008, KSTREAM-AGGREGATE-0000000014, KSTREAM-AGGREGATE-0000000019, KSTREAM-AGGREGATE-0000000025, KSTREAM-JOIN-0000000032] KSTREAM-AGGREGATE-0000000008: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000007] children: [KTABLE-TOSTREAM-0000000009] KTABLE-TOSTREAM-0000000009: children: [KSTREAM-MAP-0000000010] KSTREAM-MAP-0000000010: children: [KSTREAM-SINK-0000000011] KSTREAM-SINK-0000000011: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000014: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000013] children: [KTABLE-TOSTREAM-0000000015] KTABLE-TOSTREAM-0000000015: children: [KSTREAM-SINK-0000000016] KSTREAM-SINK-0000000016: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000019: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000018] children: [KTABLE-TOSTREAM-0000000020] KTABLE-TOSTREAM-0000000020: children: [KSTREAM-MAP-0000000021] KSTREAM-MAP-0000000021: children: [KSTREAM-SINK-0000000022] KSTREAM-SINK-0000000022: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000025: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000024] children: [KTABLE-TOSTREAM-0000000026] KTABLE-TOSTREAM-0000000026: children: [KSTREAM-MAP-0000000027] KSTREAM-MAP-0000000027: children: [KSTREAM-SINK-0000000028] KSTREAM-SINK-0000000028: topic: windowed-node-counts KSTREAM-JOIN-0000000032: states: [windowed-node-counts-STATE-STORE-0000000029] children: [KSTREAM-SINK-0000000033] KSTREAM-SINK-0000000033: topic: joined-counts KSTREAM-SOURCE-0000000030: topics: [windowed-node-counts] children: [KTABLE-SOURCE-0000000031] KTABLE-SOURCE-0000000031: states: [windowed-node-counts-STATE-STORE-0000000029] Partitions [node-name-repartition-7, windowed-node-counts-7] StreamsTask taskId: 2_7 ProcessorTopology: KSTREAM-SOURCE-0000000037: topics: [cluster-id-repartition] children: [KSTREAM-AGGREGATE-0000000040] KSTREAM-AGGREGATE-0000000040: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000039] children: [KTABLE-TOSTREAM-0000000041] KTABLE-TOSTREAM-0000000041: children: [KSTREAM-SINK-0000000042] KSTREAM-SINK-0000000042: topic: cluster-id-counts Partitions [cluster-id-repartition-7] StreamsTask taskId: 3_7 ProcessorTopology: KSTREAM-SOURCE-0000000046: topics: [host-id-repartition] children: [KSTREAM-AGGREGATE-0000000049] KSTREAM-AGGREGATE-0000000049: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000048] children: [KTABLE-TOSTREAM-0000000050] KTABLE-TOSTREAM-0000000050: children: [KSTREAM-SINK-0000000051] KSTREAM-SINK-0000000051: topic: host-counts Partitions [host-id-repartition-7] StreamsTask taskId: 3_8 ProcessorTopology: KSTREAM-SOURCE-0000000046: topics: [host-id-repartition] children: [KSTREAM-AGGREGATE-0000000049] KSTREAM-AGGREGATE-0000000049: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000048] children: [KTABLE-TOSTREAM-0000000050] KTABLE-TOSTREAM-0000000050: children: [KSTREAM-SINK-0000000051] KSTREAM-SINK-0000000051: topic: host-counts Partitions [host-id-repartition-8] New: Standby tasks: Running: Suspended: Restoring: New: encountered an error processing soak test (org.apache.kafka.streams.StreamsSoakTest) java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:752) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:750) at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:74) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:229) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) [2017-12-12 10:47:08,193] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] at state PARTITIONS_REVOKED: partitions [host-id-repartition-1, windowed-node-counts-2, node-name-repartition-7, cluster-id-repartition-5, windowed-node-counts-7, node-name-repartition-2, cluster-id-repartition-7, host-id-repartition-9, cluster-id-repartition-8, cluster-id-repartition-9] assigned at the end of consumer rebalance. current suspended active tasks: [] current suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:08,193] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] at state PARTITIONS_REVOKED: partitions [logs-9, host-id-repartition-7, logs-8, windowed-node-counts-5, node-name-repartition-9, logs-4, windowed-node-counts-9, logs-3, host-id-repartition-8, node-name-repartition-5] assigned at the end of consumer rebalance. current suspended active tasks: [0_3, 2_5, 0_9] current suspended standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:08,193] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:08,193] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:08,193] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Closing suspended and not re-assigned stream task 2_5 (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,193] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Adding assigned tasks as active: {1_2=[windowed-node-counts-2, node-name-repartition-2], 3_1=[host-id-repartition-1], 2_5=[cluster-id-repartition-5], 1_7=[node-name-repartition-7, windowed-node-counts-7], 2_7=[cluster-id-repartition-7], 2_8=[cluster-id-repartition-8], 2_9=[cluster-id-repartition-9], 3_9=[host-id-repartition-9]} (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:47:08,193] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,193] DEBUG [streams-saak-test-client-StreamThread-2] task [2_5] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000039 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,194] DEBUG [streams-saak-test-client-StreamThread-3] task [1_2] Created state store manager for task 1_2 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,194] DEBUG [streams-saak-test-client-StreamThread-3] task [1_2] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,195] DEBUG [streams-saak-test-client-StreamThread-3] task [3_1] Created state store manager for task 3_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,195] DEBUG [streams-saak-test-client-StreamThread-3] task [3_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,195] DEBUG [streams-saak-test-client-StreamThread-3] task [2_5] Created state store manager for task 2_5 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,195] DEBUG [streams-saak-test-client-StreamThread-3] task [2_5] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,196] DEBUG [streams-saak-test-client-StreamThread-3] task [1_7] Created state store manager for task 1_7 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,197] DEBUG [streams-saak-test-client-StreamThread-3] task [1_7] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,197] DEBUG [streams-saak-test-client-StreamThread-3] task [2_7] Created state store manager for task 2_7 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,197] DEBUG [streams-saak-test-client-StreamThread-3] task [2_7] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,198] DEBUG [streams-saak-test-client-StreamThread-3] task [2_8] Created state store manager for task 2_8 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,198] DEBUG [streams-saak-test-client-StreamThread-3] task [2_8] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,198] DEBUG [streams-saak-test-client-StreamThread-3] task [2_9] Created state store manager for task 2_9 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,198] DEBUG [streams-saak-test-client-StreamThread-3] task [2_9] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,199] DEBUG [streams-saak-test-client-StreamThread-3] task [3_9] Created state store manager for task 3_9 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,199] DEBUG [streams-saak-test-client-StreamThread-3] task [3_9] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,199] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] partition assignment took 6 ms. current active tasks: [1_2, 3_1, 2_5, 1_7, 2_7, 2_8, 2_9, 3_9] current standby tasks: [] previous active tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:08,201] INFO [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] No custom setting defined for topic 'cluster-id-repartition' using original config 'earliest' for offset reset (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:08,201] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Initializing stream tasks [1_2, 3_1, 2_5, 1_7, 2_7, 2_8, 2_9, 3_9] (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,202] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 1_2 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,203] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 2_5 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,203] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Adding assigned tasks as active: {0_3=[logs-3], 0_4=[logs-4], 1_5=[windowed-node-counts-5, node-name-repartition-5], 0_8=[logs-8], 0_9=[logs-9], 3_7=[host-id-repartition-7], 1_9=[node-name-repartition-9, windowed-node-counts-9], 3_8=[host-id-repartition-8]} (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:47:08,203] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Resuming (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:08,206] DEBUG [streams-saak-test-client-StreamThread-3] task [1_2] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,207] DEBUG [streams-saak-test-client-StreamThread-3] task [1_2] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000013 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,208] DEBUG [streams-saak-test-client-StreamThread-3] task [1_2] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000018 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,210] DEBUG [streams-saak-test-client-StreamThread-3] task [1_2] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000024 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,211] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] transitioning stream task 0_3 to running (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,211] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Resuming (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:08,212] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,215] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] transitioning stream task 0_9 to running (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,216] DEBUG [streams-saak-test-client-StreamThread-2] task [0_4] Created state store manager for task 0_4 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,216] DEBUG [streams-saak-test-client-StreamThread-2] task [0_4] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,216] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Created state store manager for task 1_5 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,216] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,217] DEBUG [streams-saak-test-client-StreamThread-2] task [0_8] Created state store manager for task 0_8 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,217] DEBUG [streams-saak-test-client-StreamThread-2] task [0_8] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,217] DEBUG [streams-saak-test-client-StreamThread-2] task [3_7] Created state store manager for task 3_7 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,217] DEBUG [streams-saak-test-client-StreamThread-2] task [3_7] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,218] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Created state store manager for task 1_9 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,218] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,219] DEBUG [streams-saak-test-client-StreamThread-2] task [3_8] Created state store manager for task 3_8 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,219] DEBUG [streams-saak-test-client-StreamThread-2] task [3_8] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,219] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] partition assignment took 26 ms. current active tasks: [0_3, 0_4, 1_5, 0_8, 0_9, 3_7, 1_9, 3_8] current standby tasks: [] previous active tasks: [0_3, 2_5, 0_9] (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:08,238] DEBUG [streams-saak-test-client-StreamThread-3] task [1_2] Registering state store windowed-node-counts-STATE-STORE-0000000029 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,244] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 1_2 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,244] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 3_1 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,246] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,262] DEBUG [streams-saak-test-client-StreamThread-3] task [3_1] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,263] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 3_1 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,263] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 2_5 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,265] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,281] DEBUG [streams-saak-test-client-StreamThread-3] task [2_5] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,282] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 2_5 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,283] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 1_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,285] DEBUG [streams-saak-test-client-StreamThread-3] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,286] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,301] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,317] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,333] DEBUG [streams-saak-test-client-StreamThread-3] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000013 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,333] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,349] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,365] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,381] DEBUG [streams-saak-test-client-StreamThread-3] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000018 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,382] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,397] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,413] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,428] DEBUG [streams-saak-test-client-StreamThread-3] task [1_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000024 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,429] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,445] DEBUG [streams-saak-test-client-StreamThread-3] task [1_7] Registering state store windowed-node-counts-STATE-STORE-0000000029 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,450] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 1_7 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,450] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 2_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,457] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,472] DEBUG [streams-saak-test-client-StreamThread-3] task [2_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,473] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 2_7 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,473] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 2_8 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,475] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,491] DEBUG [streams-saak-test-client-StreamThread-3] task [2_8] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,492] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 2_8 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,492] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 2_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,495] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,500] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Initializing stream tasks [0_4, 1_5, 0_8, 3_7, 1_9, 3_8] (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,503] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] transitioning stream task 0_4 to running (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,503] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 1_5 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,511] DEBUG [streams-saak-test-client-StreamThread-3] task [2_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000039 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,512] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 2_9 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,512] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Acquired state dir lock for task 3_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:08,514] INFO [streams-saak-test-client-StreamThread-3] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,526] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,530] DEBUG [streams-saak-test-client-StreamThread-3] task [3_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,531] DEBUG [streams-saak-test-client-StreamThread-3] stream-thread [streams-saak-test-client-StreamThread-3] Transitioning stream task 3_9 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,542] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,558] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,573] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,573] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,589] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,604] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,619] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000013 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,620] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,635] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,650] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,666] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000018 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,666] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,682] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,697] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,713] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000024 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,714] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:08,729] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Registering state store windowed-node-counts-STATE-STORE-0000000029 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:08,733] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 1_5 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,737] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] transitioning stream task 0_8 to running (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:08,737] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 3_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:09,208] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,225] DEBUG [streams-saak-test-client-StreamThread-2] task [3_7] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,226] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 3_7 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:09,226] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 1_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:09,238] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,253] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,269] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,284] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,285] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,301] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,316] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,333] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000013 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,333] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,348] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,364] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,380] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000018 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,380] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,396] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,411] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,428] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000024 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,429] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,444] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Registering state store windowed-node-counts-STATE-STORE-0000000029 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,455] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 1_9 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:09,455] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Acquired state dir lock for task 3_8 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:09,459] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,475] DEBUG [streams-saak-test-client-StreamThread-2] task [3_8] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000048 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,476] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Transitioning stream task 3_8 to restoring (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) [2017-12-12 10:47:09,606] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Start restoring state stores from changelog topics [streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-5, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-9, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-8, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000018-changelog-5, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-7, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-5, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-9, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000018-changelog-9, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog-5, windowed-node-counts-5, streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog-9, windowed-node-counts-9] (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,607] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-5 from offset 0 to endOffset 192968 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,607] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-9 from offset 0 to endOffset 480555 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,607] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-8 from offset 137020 to endOffset 137396 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,620] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,640] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,656] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000018-changelog-5 from offset 0 to endOffset 2566541 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,656] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-7 from offset 0 to endOffset 289514 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,669] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,690] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,705] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-5 from offset 0 to endOffset 238643 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,705] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-9 from offset 0 to endOffset 387096 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,705] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000018-changelog-9 from offset 0 to endOffset 5156759 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,705] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog-5 from offset 0 to endOffset 10917490 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,705] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition windowed-node-counts-5 from offset 0 to endOffset 20805837 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,719] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,739] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,754] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog-9 from offset 0 to endOffset 48958668 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,754] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring partition windowed-node-counts-9 from offset 0 to endOffset 88944752 (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,771] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,799] INFO [streams-saak-test-client-StreamThread-2] Using 5000000 for max log size 5 for max number logs and /mnt/data/deploy/streams/logs for log dir dropping stats every 800 seconds (org.apache.kafka.streams.logging.RocksDbLoggingConfig) [2017-12-12 10:47:09,818] WARN [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch. (org.apache.kafka.streams.processor.internals.StoreChangelogReader) org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000024-changelog-9=0} at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:883) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1173) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1106) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) [2017-12-12 10:47:09,818] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Reinitializing StreamTask StreamsTask taskId: 1_9 ProcessorTopology: KSTREAM-SOURCE-0000000005: topics: [node-name-repartition] children: [KSTREAM-AGGREGATE-0000000008, KSTREAM-AGGREGATE-0000000014, KSTREAM-AGGREGATE-0000000019, KSTREAM-AGGREGATE-0000000025, KSTREAM-JOIN-0000000032] KSTREAM-AGGREGATE-0000000008: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000007] children: [KTABLE-TOSTREAM-0000000009] KTABLE-TOSTREAM-0000000009: children: [KSTREAM-MAP-0000000010] KSTREAM-MAP-0000000010: children: [KSTREAM-SINK-0000000011] KSTREAM-SINK-0000000011: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000014: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000013] children: [KTABLE-TOSTREAM-0000000015] KTABLE-TOSTREAM-0000000015: children: [KSTREAM-SINK-0000000016] KSTREAM-SINK-0000000016: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000019: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000018] children: [KTABLE-TOSTREAM-0000000020] KTABLE-TOSTREAM-0000000020: children: [KSTREAM-MAP-0000000021] KSTREAM-MAP-0000000021: children: [KSTREAM-SINK-0000000022] KSTREAM-SINK-0000000022: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000025: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000024] children: [KTABLE-TOSTREAM-0000000026] KTABLE-TOSTREAM-0000000026: children: [KSTREAM-MAP-0000000027] KSTREAM-MAP-0000000027: children: [KSTREAM-SINK-0000000028] KSTREAM-SINK-0000000028: topic: windowed-node-counts KSTREAM-JOIN-0000000032: states: [windowed-node-counts-STATE-STORE-0000000029] children: [KSTREAM-SINK-0000000033] KSTREAM-SINK-0000000033: topic: joined-counts KSTREAM-SOURCE-0000000030: topics: [windowed-node-counts] children: [KTABLE-SOURCE-0000000031] KTABLE-SOURCE-0000000031: states: [windowed-node-counts-STATE-STORE-0000000029] Partitions [node-name-repartition-9, windowed-node-counts-9] (org.apache.kafka.streams.processor.internals.StoreChangelogReader) [2017-12-12 10:47:09,840] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Registering state store KSTREAM-AGGREGATE-STATE-STORE-0000000024 to its state manager (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,840] ERROR [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Encountered the following error during processing: (org.apache.kafka.streams.processor.internals.StreamThread) java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:752) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:750) at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:74) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:229) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) [2017-12-12 10:47:09,840] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:09,840] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:09,840] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Shutting down all active tasks [0_3, 0_4, 0_8, 0_9], standby tasks [], suspended tasks [0_3, 2_5, 0_9], and suspended standby tasks [] (org.apache.kafka.streams.processor.internals.TaskManager) [2017-12-12 10:47:09,840] DEBUG [streams-saak-test-client-StreamThread-2] task [0_3] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:09,873] DEBUG [streams-saak-test-client-StreamThread-2] task [0_4] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:09,874] DEBUG [streams-saak-test-client-StreamThread-2] task [0_8] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:09,874] DEBUG [streams-saak-test-client-StreamThread-2] task [0_9] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:09,874] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:09,874] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,874] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000007 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,894] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000013 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,911] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000018 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,928] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000024 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,945] DEBUG [streams-saak-test-client-StreamThread-2] task [1_5] Closing storage engine windowed-node-counts-STATE-STORE-0000000029 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,951] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 1_5 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:09,951] DEBUG [streams-saak-test-client-StreamThread-2] task [3_7] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:09,951] DEBUG [streams-saak-test-client-StreamThread-2] task [3_7] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,951] DEBUG [streams-saak-test-client-StreamThread-2] task [3_7] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,957] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 3_7 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:09,957] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:09,957] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,957] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000007 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,974] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000013 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:09,991] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000018 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:10,008] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Closing storage engine windowed-node-counts-STATE-STORE-0000000029 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:10,013] DEBUG [streams-saak-test-client-StreamThread-2] task [1_9] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000024 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:10,014] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 1_9 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:10,014] DEBUG [streams-saak-test-client-StreamThread-2] task [3_8] Closing (org.apache.kafka.streams.processor.internals.StreamTask) [2017-12-12 10:47:10,014] DEBUG [streams-saak-test-client-StreamThread-2] task [3_8] Closing its state manager and all the registered state stores (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:10,014] DEBUG [streams-saak-test-client-StreamThread-2] task [3_8] Closing storage engine KSTREAM-AGGREGATE-STATE-STORE-0000000048 (org.apache.kafka.streams.processor.internals.ProcessorStateManager) [2017-12-12 10:47:10,019] DEBUG [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Released state dir lock for task 3_8 (org.apache.kafka.streams.processor.internals.StateDirectory) [2017-12-12 10:47:10,023] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] State transition from PENDING_SHUTDOWN to DEAD (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:10,023] INFO [streams-saak-test-client-StreamThread-2] stream-thread [streams-saak-test-client-StreamThread-2] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread) [2017-12-12 10:47:10,024] ERROR [streams-saak-test-client-StreamThread-2] Thread StreamsThread threadId: streams-saak-test-client-StreamThread-2 Active tasks: Running: Suspended: Restoring: StreamsTask taskId: 1_5 ProcessorTopology: KSTREAM-SOURCE-0000000005: topics: [node-name-repartition] children: [KSTREAM-AGGREGATE-0000000008, KSTREAM-AGGREGATE-0000000014, KSTREAM-AGGREGATE-0000000019, KSTREAM-AGGREGATE-0000000025, KSTREAM-JOIN-0000000032] KSTREAM-AGGREGATE-0000000008: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000007] children: [KTABLE-TOSTREAM-0000000009] KTABLE-TOSTREAM-0000000009: children: [KSTREAM-MAP-0000000010] KSTREAM-MAP-0000000010: children: [KSTREAM-SINK-0000000011] KSTREAM-SINK-0000000011: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000014: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000013] children: [KTABLE-TOSTREAM-0000000015] KTABLE-TOSTREAM-0000000015: children: [KSTREAM-SINK-0000000016] KSTREAM-SINK-0000000016: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000019: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000018] children: [KTABLE-TOSTREAM-0000000020] KTABLE-TOSTREAM-0000000020: children: [KSTREAM-MAP-0000000021] KSTREAM-MAP-0000000021: children: [KSTREAM-SINK-0000000022] KSTREAM-SINK-0000000022: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000025: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000024] children: [KTABLE-TOSTREAM-0000000026] KTABLE-TOSTREAM-0000000026: children: [KSTREAM-MAP-0000000027] KSTREAM-MAP-0000000027: children: [KSTREAM-SINK-0000000028] KSTREAM-SINK-0000000028: topic: windowed-node-counts KSTREAM-JOIN-0000000032: states: [windowed-node-counts-STATE-STORE-0000000029] children: [KSTREAM-SINK-0000000033] KSTREAM-SINK-0000000033: topic: joined-counts KSTREAM-SOURCE-0000000030: topics: [windowed-node-counts] children: [KTABLE-SOURCE-0000000031] KTABLE-SOURCE-0000000031: states: [windowed-node-counts-STATE-STORE-0000000029] Partitions [windowed-node-counts-5, node-name-repartition-5] StreamsTask taskId: 3_7 ProcessorTopology: KSTREAM-SOURCE-0000000046: topics: [host-id-repartition] children: [KSTREAM-AGGREGATE-0000000049] KSTREAM-AGGREGATE-0000000049: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000048] children: [KTABLE-TOSTREAM-0000000050] KTABLE-TOSTREAM-0000000050: children: [KSTREAM-SINK-0000000051] KSTREAM-SINK-0000000051: topic: host-counts Partitions [host-id-repartition-7] StreamsTask taskId: 1_9 ProcessorTopology: KSTREAM-SOURCE-0000000005: topics: [node-name-repartition] children: [KSTREAM-AGGREGATE-0000000008, KSTREAM-AGGREGATE-0000000014, KSTREAM-AGGREGATE-0000000019, KSTREAM-AGGREGATE-0000000025, KSTREAM-JOIN-0000000032] KSTREAM-AGGREGATE-0000000008: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000007] children: [KTABLE-TOSTREAM-0000000009] KTABLE-TOSTREAM-0000000009: children: [KSTREAM-MAP-0000000010] KSTREAM-MAP-0000000010: children: [KSTREAM-SINK-0000000011] KSTREAM-SINK-0000000011: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000014: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000013] children: [KTABLE-TOSTREAM-0000000015] KTABLE-TOSTREAM-0000000015: children: [KSTREAM-SINK-0000000016] KSTREAM-SINK-0000000016: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000019: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000018] children: [KTABLE-TOSTREAM-0000000020] KTABLE-TOSTREAM-0000000020: children: [KSTREAM-MAP-0000000021] KSTREAM-MAP-0000000021: children: [KSTREAM-SINK-0000000022] KSTREAM-SINK-0000000022: topic: windowed-node-counts KSTREAM-AGGREGATE-0000000025: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000024] children: [KTABLE-TOSTREAM-0000000026] KTABLE-TOSTREAM-0000000026: children: [KSTREAM-MAP-0000000027] KSTREAM-MAP-0000000027: children: [KSTREAM-SINK-0000000028] KSTREAM-SINK-0000000028: topic: windowed-node-counts KSTREAM-JOIN-0000000032: states: [windowed-node-counts-STATE-STORE-0000000029] children: [KSTREAM-SINK-0000000033] KSTREAM-SINK-0000000033: topic: joined-counts KSTREAM-SOURCE-0000000030: topics: [windowed-node-counts] children: [KTABLE-SOURCE-0000000031] KTABLE-SOURCE-0000000031: states: [windowed-node-counts-STATE-STORE-0000000029] Partitions [node-name-repartition-9, windowed-node-counts-9] StreamsTask taskId: 3_8 ProcessorTopology: KSTREAM-SOURCE-0000000046: topics: [host-id-repartition] children: [KSTREAM-AGGREGATE-0000000049] KSTREAM-AGGREGATE-0000000049: states: [KSTREAM-AGGREGATE-STATE-STORE-0000000048] children: [KTABLE-TOSTREAM-0000000050] KTABLE-TOSTREAM-0000000050: children: [KSTREAM-SINK-0000000051] KSTREAM-SINK-0000000051: topic: host-counts Partitions [host-id-repartition-8] New: Standby tasks: Running: Suspended: Restoring: New: encountered an error processing soak test (org.apache.kafka.streams.StreamsSoakTest) java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:752) at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:750) at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:74) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:229) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)