Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6018

Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

    Details

      Description

      The code snippet currently in the `AbstractKeyedStateBackend # getPartitionedState` method, as follows:

      line 352: // TODO: This is wrong, it should throw an exception that the initialization has not properly happened
      line 353: if (!stateDescriptor.isSerializerInitialized()) {
      line 354:        stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
      line 354 }
      

      Method `isSerializerInitialized`:

      public boolean isSerializerInitialized() {
      		return serializer != null;
      	}
      

      Method `initializeSerializerUnlessSet`:

      public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
      		if (serializer == null) { 
      			if (typeInfo != null) {
      				serializer = typeInfo.createSerializer(executionConfig);
      			} else {
      				throw new IllegalStateException(
      						"Cannot initialize serializer after TypeInformation was dropped during serialization");
      			}
      		}
      	}
      

      that is, in the `initializeSerializerUnlessSet` method, The `serializer` has been checked by `serializer == null`.So I hope this code has a little improvement to the following:
      approach 1:
      According to the `TODO` information we throw an exception

      if (!stateDescriptor.isSerializerInitialized()) {
      			throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); 
      }
      

      approach 2:
      Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized())

      {` logic. {code}

      stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());

      Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` add a `private final ExecutionConfig executionConfig` property. then we can change the code like this:
      

      stateDescriptor.initializeSerializerUnlessSet(executionConfig);

      
      

      Are the above suggestions reasonable for you?
      Welcome anybody's feedback and corrections.

        Attachments

          Activity

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: