Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.6.3, 1.6.4, 1.7.2
-
Flink 1.6.2 (Standalone Cluster)
Oracle JDK 1.8u151
Centos 7.4
Description
We are using Flink 1.6.2 in our production environment, and kafka producer occasionally throws NullpointerException.
We found in line 175 of flink/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java, NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR was created as a static variable.
Then in line 837,
context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
was called, and that leads to line 734 of
flink/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java:
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
In function initializeSerializerUnlessSet(line 283 of flink/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java):
if (serializer == null) { checkState(typeInfo != null, "no serializer and no type info"); // instantiate the serializer serializer = typeInfo.createSerializer(executionConfig); // we can drop the type info now, no longer needed typeInfo = null; } "serializer = typeInfo.createSerializer(executionConfig);" is the line which throws the exception.
We think that's because multiple subtasks of the same producer in a same TaskManager share a same NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.
Attachments
Issue Links
- is blocked by
-
FLINK-12688 Make serializer lazy initialization thread safe in StateDescriptor
- Closed
- links to