Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.0.3, 1.1.4, 1.2.1, 1.3.0, 1.4.0
Description
The StateDescriptor contains the TypeSerializer which is used to serialize the state. The serializer instance won't be duplicated when it is accessed. Therefore, the StateDescriptor cannot be shared if the TypeSerializer is stateful as in the case of the KryoSerializer.
This problem can easily arise when a user defines a stateful operator which defines the StateDescriptor statically. The work around is to not define a static StateDescriptor. However, I would still make it a blocker, because it is extremely hard to debug for the user if things fail because the TypeSerializer is used concurrently.
The following operator produces the problem:
private static final class StatefulMapper extends RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements CheckpointedFunction { private static final long serialVersionUID = -1175717056869107847L; private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class); private transient ValueState<PojoType> valueState; public StatefulMapper() { valueState = null; } @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception { PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, 2.0)); valueState.update(pojoType); return tuple; } @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {} @Override public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { valueState = functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE); } }
Attachments
Issue Links
- links to