Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25103

KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.14.0
    • None
    • Runtime / Task
    • None

    Description

      KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, excuse me how A stored in the six tasks. When I was running, I observed that some tasks fetched variable A was null, while others had values .The following code :
      ....
      setParallelism(9);
      ......
      public class dealStreamProcessFunction extends KeyedBroadcastProcessFunction<String, StandardEvent, List<String>, StandardEvent> {
      private static final Logger logger = LoggerFactory.getLogger(dealStreamProcessFunction.class);

      private transient ValueState<List<StandardEvent>> listState;
      private transient ValueState<Boolean> runingFlagState;
      private transient ValueState<InferenceEngine> engineState;
      MapStateDescriptor<String, List<String>> ruleStateDescriptor = new MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
      , BasicTypeInfo.STRING_TYPE_INFO
      , new ListTypeInfo<>(String.class));
      InferenceEngine engine;

      /**

      • open方法只会执行一次
      • 可以在这实现初始化的功能
        *
      • @param parameters
      • @throws Exception
        */
        @Override
        public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor = new ValueStateDescriptor<List<StandardEvent>>(
        "recent-operator",
        TypeInformation.of(new TypeHint<List<StandardEvent>>() {
        }));

      ValueStateDescriptor<Boolean> runingFlagDescriptor = new ValueStateDescriptor<Boolean>(
      "runingFlag",
      Boolean.class);

      ValueStateDescriptor<InferenceEngine> engineDescriptor = new ValueStateDescriptor<InferenceEngine>(
      "runingFlag1",
      InferenceEngine.class);
      engineState = getRuntimeContext().getState(engineDescriptor);
      listState = getRuntimeContext().getState(recentOperatorsDescriptor);
      runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);

      logger.info("KeyedBroadcastProcessFunction open");
      }

      @Override
      public void processElement(StandardEvent standardEvent, ReadOnlyContext readOnlyContext, Collector<StandardEvent> collector) throws Exception {
      if(standardEvent == null)

      { return; }

      List<String> list = null;
      list = readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
      if (list == null) {
      logger.info("RulesBroadcastState is null..............");
      List<StandardEvent> lst = listState.value();
      if (lst == null)

      { lst = new ArrayList<>(); }

      lst.add(standardEvent);
      listState.update(lst);
      return;
      }
      //第一次进来
      if (runingFlagState.value() == null)

      { logger.info("runingFlagState.value() == null"); runingFlagState.update(true); }

      if (((runingFlagState.value() && list.get(0).equals("1")) || list.get(0).equals("0"))) {
      logger.info("action update.....:" + list.size() + ":" + runingFlagState.value() + ":" + list.get(0));
      String flag = list.get(0);
      list.remove(0);
      InferenceEngine engine1 = InferenceEngine.compile(RuleReader.parseRules(list));
      engineState.update(engine1);
      if (runingFlagState.value() && flag.equals("1"))

      { runingFlagState.update(false); }

      }

      if (engineState.value() != null) {
      List<StandardEvent> listTmp = listState.value();
      if (listTmp != null) {
      for (StandardEvent standardEventTmp : listTmp)

      { logger.info("listState.....:" + standardEventTmp); match(standardEventTmp, collector); }

      listState.clear();
      }
      match(standardEvent, collector);
      } else

      { logger.info("processElement engine is null.....:"); }

      }

      private void match(StandardEvent standardEvent, Collector<StandardEvent> collector) throws IOException {
      PatternMatcher matcher = engineState.value().matcher(standardEvent);
      if (matcher.find()) {
      List<Action> actions = matcher.getActions();
      for (Action action : actions) {
      if (standardEvent != null)

      { if(collector != null) collector.collect(standardEvent); else logger.info("collector is null:" ); }

      }
      } else

      { logger.info("no matcher:" + standardEvent); }

      }

      @Override
      public void processBroadcastElement(List<String> strings, Context context, Collector<StandardEvent> collector) throws Exception {
      BroadcastState<String, List<String>> broadcastState = context.getBroadcastState(ruleStateDescriptor);
      logger.info("processBroadcastElement.....:" + strings.size());
      if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE))

      { List<String> oldList = broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE); logger.info("get State:" + oldList.size() + " replaced with State:" + strings.size()); }

      else {
      logger.info("do not find old State, put first counterState {}", strings.size());
      }
      broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
      }
      }

      Attachments

        Activity

          People

            Unassigned Unassigned
            wangbaohua wangbaohua
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: