Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.14.0
-
None
-
None
Description
KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, excuse me how A stored in the six tasks. When I was running, I observed that some tasks fetched variable A was null, while others had values .The following code :
....
setParallelism(9);
......
public class dealStreamProcessFunction extends KeyedBroadcastProcessFunction<String, StandardEvent, List<String>, StandardEvent> {
private static final Logger logger = LoggerFactory.getLogger(dealStreamProcessFunction.class);
private transient ValueState<List<StandardEvent>> listState;
private transient ValueState<Boolean> runingFlagState;
private transient ValueState<InferenceEngine> engineState;
MapStateDescriptor<String, List<String>> ruleStateDescriptor = new MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
, BasicTypeInfo.STRING_TYPE_INFO
, new ListTypeInfo<>(String.class));
InferenceEngine engine;
/**
- open方法只会执行一次
- 可以在这实现初始化的功能
* - @param parameters
- @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor = new ValueStateDescriptor<List<StandardEvent>>(
"recent-operator",
TypeInformation.of(new TypeHint<List<StandardEvent>>() {
}));
ValueStateDescriptor<Boolean> runingFlagDescriptor = new ValueStateDescriptor<Boolean>(
"runingFlag",
Boolean.class);
ValueStateDescriptor<InferenceEngine> engineDescriptor = new ValueStateDescriptor<InferenceEngine>(
"runingFlag1",
InferenceEngine.class);
engineState = getRuntimeContext().getState(engineDescriptor);
listState = getRuntimeContext().getState(recentOperatorsDescriptor);
runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);
logger.info("KeyedBroadcastProcessFunction open");
}
@Override
public void processElement(StandardEvent standardEvent, ReadOnlyContext readOnlyContext, Collector<StandardEvent> collector) throws Exception {
if(standardEvent == null)
List<String> list = null;
list = readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
if (list == null) {
logger.info("RulesBroadcastState is null..............");
List<StandardEvent> lst = listState.value();
if (lst == null)
lst.add(standardEvent);
listState.update(lst);
return;
}
//第一次进来
if (runingFlagState.value() == null)
if (((runingFlagState.value() && list.get(0).equals("1")) || list.get(0).equals("0"))) {
logger.info("action update.....:" + list.size() + ":" + runingFlagState.value() + ":" + list.get(0));
String flag = list.get(0);
list.remove(0);
InferenceEngine engine1 = InferenceEngine.compile(RuleReader.parseRules(list));
engineState.update(engine1);
if (runingFlagState.value() && flag.equals("1"))
}
if (engineState.value() != null) {
List<StandardEvent> listTmp = listState.value();
if (listTmp != null) {
for (StandardEvent standardEventTmp : listTmp)
listState.clear();
}
match(standardEvent, collector);
} else
}
private void match(StandardEvent standardEvent, Collector<StandardEvent> collector) throws IOException {
PatternMatcher matcher = engineState.value().matcher(standardEvent);
if (matcher.find()) {
List<Action> actions = matcher.getActions();
for (Action action : actions) {
if (standardEvent != null)
}
} else
}
@Override
public void processBroadcastElement(List<String> strings, Context context, Collector<StandardEvent> collector) throws Exception {
BroadcastState<String, List<String>> broadcastState = context.getBroadcastState(ruleStateDescriptor);
logger.info("processBroadcastElement.....:" + strings.size());
if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE))
else {
logger.info("do not find old State, put first counterState {}", strings.size());
}
broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
}
}