Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8248

RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.2, 1.4.0
    • Fix Version/s: 1.5.0, 1.4.1
    • Labels:
      None
    • Environment:

      linux: 3.10.0-514.el7.x86_64
      flink:

      • version: 1.4
      • rocksdb backend state
      • checkpoint interval 5s
      • keyed cep
        language: Java8

      Description

      Here is my exception log:

      java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291)
      	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
      	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
      	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
      	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309)
      	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247)
      	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277)
      	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
      	... 7 more
      Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', detector='SlidingWindowAnomalyDetector', measure='count', field='activity', dimension='Logoff', description='null', icons=null, startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0, anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0}, user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0), [SharedBufferEdge(null, 199)], 1)
      	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
      	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943)
      	at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806)
      	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
      	at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
      	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100)
      	... 13 more
      

      Main job code:

      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
              env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
              env.setStateBackend(new RocksDBStateBackend(getString("flink.backend-state-dir")));
      // .........
      DataStream<Behavior> behaviorStream = anomalyStream
                      .assignTimestampsAndWatermarks(new AnomalyTimestampExtractor(Time.seconds(0)))
                      .keyBy((KeySelector<AnomalySlice, String>) value -> value.entity)
                      .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")),
                              Time.seconds(getLong("flink.window.slice-size"))))
                      .apply(new BehaviorBuilderFunction())
                      .filter(new WhitelistFilterFunction())
                      // non-keyed stream will result in pattern operator parallelism equal to 1.
                      .keyBy((KeySelector<Behavior, String>) Behavior::getUser);
      
              // cep on behavior stream
              List<Pattern> allPatterns = PatternsHolder.getAllPatterns();
              for (Pattern pa : allPatterns) {
                  PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa);
                  ps.select(new AlertGenerator(pa.getName())).name(pa.getName());
              }
      

      keyed stream event:

      public class Behavior implements Serializable {
          private static final long serialVersionUID = 7786674623147772721L;
      
          static int ANOMALY_SCORE_THRESHOLD = 40;
          static int ANOMALY_COUNT_THRESHOLD = 3;
      
          public final String schema;
          public final String detector;
          private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD;
          public final String dimension;
          public final String field; //dim value
          private String user;
          public String group;
          public double count;
          public int anomalyScore;
          protected String description;
          private Icon[] icons;
          private int adHashCode;
          private long startTimestamp;
          private long endTimestamp;
          private Map<Long, Double> timeMap;
          public ArrayList<HashMap<String, Object>> logQuery;
      
          public Behavior(String schema, String detector, String field, String dimension, String user,
                          long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
                  Object>> logQuery) {
              this.schema = schema;
              this.detector = detector;
              this.field = field;
              this.dimension = dimension;
              this.user = user;
              this.startTimestamp = fromMillis;
              this.endTimestamp = toMillis;
              this.count = count;
              this.anomalyScore = anomalyScore;
              this.logQuery = logQuery;
              timeMap = new HashMap<>();
              timeMap.put(fromMillis, count);
          }
      
          public Behavior(String schema, String detector, String field, String dimension,
                          long fromMillis, long toMillis, double count, int anomalyScore) {
              this.schema = schema;
              this.detector = detector;
              this.field = field;
              this.dimension = dimension;
              this.startTimestamp = fromMillis;
              this.endTimestamp = toMillis;
              this.count = count;
              this.anomalyScore = anomalyScore;
              timeMap = new HashMap<>();
              timeMap.put(fromMillis, count);
          }
      
          public String getGroup() {
              return group;
          }
      
          public void setGroup(String group) {
              this.group = group;
          }
      
          public void setAdHashCode(int hashCode) {
              this.adHashCode = hashCode;
          }
      
          public void setMeasure(String measure) {
              this.measure = measure;
          }
      
          public String getMeasure() {
              return measure;
          }
      
          // anomalyScore is using weighted average, may not be wise.
          public void add(long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String,
                  Object>> logQuery) {
              double sum = this.count * this.anomalyScore + count * anomalyScore;
              this.count += count;
              this.anomalyScore = (int) (sum / this.count);
      
              if (fromMillis < this.startTimestamp) {
                  this.startTimestamp = fromMillis;
              }
              if (toMillis > this.endTimestamp) {
                  this.endTimestamp = toMillis;
              }
              if (!timeMap.containsKey(fromMillis)) {
                  timeMap.put(fromMillis, 0.0);
              }
              timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
              if (logQuery != null) {
                  this.logQuery.addAll(logQuery);
              }
          }
      
          public void add(long fromMillis, long toMillis, double count, int anomalyScore) {
              double sum = this.count * this.anomalyScore + count * anomalyScore;
              this.count += count;
              this.anomalyScore = (int) (sum / this.count);
      
              if (fromMillis < this.startTimestamp) {
                  this.startTimestamp = fromMillis;
              }
              if (toMillis > this.endTimestamp) {
                  this.endTimestamp = toMillis;
              }
              if (!timeMap.containsKey(fromMillis)) {
                  timeMap.put(fromMillis, 0.0);
              }
              timeMap.put(fromMillis, timeMap.get(fromMillis) + count);
          }
      
          public Long[] getTimestamps() {
              return timeMap.keySet().toArray(new Long[timeMap.size()]);
          }
      
          public String dimension() {
              return dimension;
          }
      
          public long startTimestamp() {
              return startTimestamp;
          }
      
          public long endTimestamp() {
              return endTimestamp;
          }
      
          public double count() {
              return count;
          }
      
          public int anomalyScore() {
              return anomalyScore;
          }
      
          public boolean isAnomaly() {
              return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >= ANOMALY_COUNT_THRESHOLD;
          }
      
          public String getUser() {
              return user;
          }
      
          public void setUser(String user) {
              this.user = user;
          }
      
          public void describeAs(String description, Icon... icons) {
              this.description = description;
              this.icons = icons;
          }
      
          public String setVisualizeInterfaceParameter(String group, long visualizeStartTimestamp, long
                  visualizeEndTimestamp) {
      
              String requestParameterString = "/get_alert_visualize?detectorName=" + detector + "&groupField=" + group +
                      "&user=" + user + "&field=" + field + "&measureField=" + measure + "&schemaName=" + schema +
                      "&dimensionField=" + dimension + "&visualizeStartTimestamp=" + visualizeStartTimestamp +
                      "&visualizeEndTimestamp=" + visualizeEndTimestamp;
              return requestParameterString;
      
          }
      
          @Override
          public int hashCode() {
              int result;
              long temp;
              result = schema != null ? schema.hashCode() : 0;
              result = 31 * result + (detector != null ? detector.hashCode() : 0);
              result = 31 * result + (measure != null ? measure.hashCode() : 0);
              result = 31 * result + (field != null ? field.hashCode() : 0);
              result = 31 * result + (dimension != null ? dimension.hashCode() : 0);
              result = 31 * result + (description != null ? description.hashCode() : 0);
              result = 31 * result + Arrays.hashCode(icons);
              result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32));
              result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32));
              temp = Double.doubleToLongBits(count);
              result = 31 * result + (int) (temp ^ (temp >>> 32));
              result = 31 * result + anomalyScore;
              result = 31 * result + adHashCode;
              result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0);
              result = 31 * result + (user != null ? user.hashCode() : 0);
              result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0);
              result = 31 * result + (group != null ? group.hashCode() : 0);
              return result;
          }
      
          @Override
          public boolean equals(Object o) {
              if (this == o) return true;
              if (o == null || getClass() != o.getClass()) return false;
      
              Behavior behavior = (Behavior) o;
      
              if (startTimestamp != behavior.startTimestamp) return false;
              if (endTimestamp != behavior.endTimestamp) return false;
              if (Double.compare(behavior.count, count) != 0) return false;
              if (anomalyScore != behavior.anomalyScore) return false;
              if (adHashCode != behavior.adHashCode) return false;
              if (schema != null ? !schema.equals(behavior.schema) : behavior.schema != null)
                  return false;
              if (detector != null ? !detector.equals(behavior.detector) : behavior.detector != null)
                  return false;
              if (measure != null ? !measure.equals(behavior.measure) : behavior.measure != null)
                  return false;
              if (field != null ? !field.equals(behavior.field) : behavior.field != null) return false;
              if (dimension != null ? !dimension.equals(behavior.dimension) : behavior.dimension != null)
                  return false;
              if (description != null ? !description.equals(behavior.description) : behavior.description != null)
                  return false;
              // Probably incorrect - comparing Object[] arrays with Arrays.equals
              if (!Arrays.equals(icons, behavior.icons)) return false;
              if (timeMap != null ? !timeMap.equals(behavior.timeMap) : behavior.timeMap != null)
                  return false;
              if (user != null ? !user.equals(behavior.user) : behavior.user != null) return false;
              if (logQuery != null ? !logQuery.equals(behavior.logQuery) : behavior.logQuery != null)
                  return false;
              return group != null ? group.equals(behavior.group) : behavior.group == null;
      
          }
      
          @Override
          public String toString() {
              return "Behavior{" +
                      "schema='" + schema + '\'' +
                      ", detector='" + detector + '\'' +
                      ", measure='" + measure + '\'' +
                      ", field='" + field + '\'' +
                      ", dimension='" + dimension + '\'' +
                      ", description='" + description + '\'' +
                      ", icons=" + Arrays.toString(icons) +
                      ", startTimestamp=" + startTimestamp +
                      ", endTimestamp=" + endTimestamp +
                      ", count=" + count +
                      ", anomalyScore=" + anomalyScore +
                      ", adHashCode=" + adHashCode +
                      ", timeMap=" + timeMap +
                      ", user='" + user + '\'' +
                      ", logQuery=" + logQuery +
                      ", group='" + group + '\'' +
                      '}';
          }
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                sonice_lj jia liu
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: