Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6775

StateDescriptor cannot be shared by multiple subtasks

    XMLWordPrintableJSON

Details

    Description

      The StateDescriptor contains the TypeSerializer which is used to serialize the state. The serializer instance won't be duplicated when it is accessed. Therefore, the StateDescriptor cannot be shared if the TypeSerializer is stateful as in the case of the KryoSerializer.

      This problem can easily arise when a user defines a stateful operator which defines the StateDescriptor statically. The work around is to not define a static StateDescriptor. However, I would still make it a blocker, because it is extremely hard to debug for the user if things fail because the TypeSerializer is used concurrently.

      The following operator produces the problem:

      private static final class StatefulMapper extends RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements CheckpointedFunction {
              private static final long serialVersionUID = -1175717056869107847L;
              private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
      
              private transient ValueState<PojoType> valueState;
      
              public StatefulMapper() {
                  valueState = null;
              }
      
              @Override
              public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception {
                  PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, 2.0));
                  valueState.update(pojoType);
                  return tuple;
              }
      
              @Override
              public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {}
      
              @Override
              public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
                  valueState = functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
              }
          }
      

      Attachments

        Issue Links

          Activity

            People

              trohrmann Till Rohrmann
              trohrmann Till Rohrmann
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: