Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8090

Improve error message when registering different states under the same name.

    XMLWordPrintableJSON

Details

    Description

      Currently a ProcessFunction like this:

      
      final MapStateDescriptor<Integer, Tuple2<Integer, Long>> firstMapStateDescriptor = new MapStateDescriptor<>(
      					"timon-one",
      					BasicTypeInfo.INT_TYPE_INFO,
      					source.getType());
      
      final ListStateDescriptor<Integer> secondListStateDescriptor = new ListStateDescriptor<Integer>(
      					"timon-one",
      					BasicTypeInfo.INT_TYPE_INFO);
      
      new ProcessFunction<Tuple2<Integer, Long>, Object>() {
      				private static final long serialVersionUID = -805125545438296619L;
      
      				private transient MapState<Integer, Tuple2<Integer, Long>> firstMapState;
                                      private transient ListState<Integer> secondListState;
      
      				@Override
      				public void open(Configuration parameters) throws Exception {
      					super.open(parameters);
      					firstMapState = getRuntimeContext().getMapState(firstMapStateDescriptor);
      					secondListState = getRuntimeContext().getListState(secondListStateDescriptor);
      				}
      
      				@Override
      				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception {
      					Tuple2<Integer, Long> v = firstMapState.get(value.f0);
      					if (v == null) {
      						v = new Tuple2<>(value.f0, 0L);
      					}
      					firstMapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
      				}
      			}
      

      fails with:

      java.lang.RuntimeException: Error while getting state
      	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
      	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
      	at org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
      	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
      	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
      	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.ClassCastException: org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to org.apache.flink.api.common.state.ListState
      	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
      	... 9 more
      

      Which is cryptic, as it does not explain the reason for the problem. The error message should be something along the line of "Duplicate state name".

      Attachments

        Activity

          People

            Unassigned Unassigned
            kkl0u Kostas Kloudas
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 10m
                10m