Details
-
Improvement
-
Status: Resolved
-
P2
-
Resolution: Not A Problem
-
None
Description
In the Java SDK we currently ask users to define a state spec as:
new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() { // A state cell holding a single Integer per key+window @StateId("index") private final StateSpec<ValueState<Integer>> indexSpec = StateSpecs.value(VarIntCoder.of()); @ProcessElement public void processElement( ProcessContext context, @StateId("index") ValueState<Integer> index) { int current = firstNonNull(index.read(), 0); context.output(KV.of(current, context.element())); index.write(current+1); } }
The suggestion is to move the @StateId into the StateSpec so the could would look like:
new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() { // A state cell holding a single Integer per key+window private final StateSpec<ValueState<Integer>> indexSpec = StateSpecs.value("index", VarIntCoder.of()); @ProcessElement public void processElement( ProcessContext context, @StateId("index") ValueState<Integer> index) { int current = firstNonNull(index.read(), 0); context.output(KV.of(current, context.element())); index.write(current+1); } }
We should also remove the coder from the StateSpec hashCode and equals method.