Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.4.0
-
None
Description
Currently a ProcessFunction like this:
final MapStateDescriptor<Integer, Tuple2<Integer, Long>> firstMapStateDescriptor = new MapStateDescriptor<>( "timon-one", BasicTypeInfo.INT_TYPE_INFO, source.getType()); final ListStateDescriptor<Integer> secondListStateDescriptor = new ListStateDescriptor<Integer>( "timon-one", BasicTypeInfo.INT_TYPE_INFO); new ProcessFunction<Tuple2<Integer, Long>, Object>() { private static final long serialVersionUID = -805125545438296619L; private transient MapState<Integer, Tuple2<Integer, Long>> firstMapState; private transient ListState<Integer> secondListState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); firstMapState = getRuntimeContext().getMapState(firstMapStateDescriptor); secondListState = getRuntimeContext().getListState(secondListStateDescriptor); } @Override public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception { Tuple2<Integer, Long> v = firstMapState.get(value.f0); if (v == null) { v = new Tuple2<>(value.f0, 0L); } firstMapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1)); } }
fails with:
java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) at org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to org.apache.flink.api.common.state.ListState at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) ... 9 more
Which is cryptic, as it does not explain the reason for the problem. The error message should be something along the line of "Duplicate state name".