Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Invalid
-
1.0.0
-
None
-
None
Description
A simple kafka streams application, use KStream to consume data, as below。
Memory usage is very high when there is a large amount of data under the consuming topic.
Sometimes it goes up to 20G.
This is very strange. The program doesn't do anything. It just reads the data and prints it to the screen. Why is the memory usage so high when there is a lot of data in the topic?
The program code:
public class TestMain {
public static StreamsBuilder builder = new StreamsBuilder();
public static void kafkaStreamStart() {
KStream<String, String> stream = builder.stream(Arrays.asList("wk_wangxin_po"));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");
props.setProperty("sasl.kerberos.service.name", "kafka");
System.setProperty("java.security.auth.login.config", "./conf/kafka_client_jaas.conf");
stream.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value)
});
Topology topo = builder.build();
KafkaStreams streams = new KafkaStreams(topo, props);
streams.start();
}
public static void main(String[] args)
{ kafkaStreamStart(); }}