Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.3.2, 1.4.0
-
None
-
linux: 3.10.0-514.el7.x86_64
flink:- version: 1.4
- rocksdb backend state
- checkpoint interval 5s
- keyed cep
language: Java8
Description
Here is my exception log:
java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:291) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:309) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:247) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:277) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:886) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) ... 7 more Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Behavior{schema='logon', detector='SlidingWindowAnomalyDetector', measure='count', field='activity', dimension='Logoff', description='null', icons=null, startTimestamp=1465297200000, endTimestamp=1465297203600, count=11.0, anomalyScore=100, adHashCode=-1866791453, timeMap={1465297200000=11.0}, user='LMR0049', logQuery=null, group='null'}, 1465300799999, 0), [SharedBufferEdge(null, 199)], 1) at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:943) at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:806) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888) at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820) at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100) ... 13 more
Main job code:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new RocksDBStateBackend(getString("flink.backend-state-dir"))); // ......... DataStream<Behavior> behaviorStream = anomalyStream .assignTimestampsAndWatermarks(new AnomalyTimestampExtractor(Time.seconds(0))) .keyBy((KeySelector<AnomalySlice, String>) value -> value.entity) .window(SlidingEventTimeWindows.of(Time.seconds(getLong("flink.window.window-size")), Time.seconds(getLong("flink.window.slice-size")))) .apply(new BehaviorBuilderFunction()) .filter(new WhitelistFilterFunction()) // non-keyed stream will result in pattern operator parallelism equal to 1. .keyBy((KeySelector<Behavior, String>) Behavior::getUser); // cep on behavior stream List<Pattern> allPatterns = PatternsHolder.getAllPatterns(); for (Pattern pa : allPatterns) { PatternStream<Behavior> ps = CEP.pattern(behaviorStream, pa); ps.select(new AlertGenerator(pa.getName())).name(pa.getName()); }
keyed stream event:
public class Behavior implements Serializable { private static final long serialVersionUID = 7786674623147772721L; static int ANOMALY_SCORE_THRESHOLD = 40; static int ANOMALY_COUNT_THRESHOLD = 3; public final String schema; public final String detector; private String measure = UEBAConstants.DEFAULT_MEASURE_FIELD; public final String dimension; public final String field; //dim value private String user; public String group; public double count; public int anomalyScore; protected String description; private Icon[] icons; private int adHashCode; private long startTimestamp; private long endTimestamp; private Map<Long, Double> timeMap; public ArrayList<HashMap<String, Object>> logQuery; public Behavior(String schema, String detector, String field, String dimension, String user, long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String, Object>> logQuery) { this.schema = schema; this.detector = detector; this.field = field; this.dimension = dimension; this.user = user; this.startTimestamp = fromMillis; this.endTimestamp = toMillis; this.count = count; this.anomalyScore = anomalyScore; this.logQuery = logQuery; timeMap = new HashMap<>(); timeMap.put(fromMillis, count); } public Behavior(String schema, String detector, String field, String dimension, long fromMillis, long toMillis, double count, int anomalyScore) { this.schema = schema; this.detector = detector; this.field = field; this.dimension = dimension; this.startTimestamp = fromMillis; this.endTimestamp = toMillis; this.count = count; this.anomalyScore = anomalyScore; timeMap = new HashMap<>(); timeMap.put(fromMillis, count); } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public void setAdHashCode(int hashCode) { this.adHashCode = hashCode; } public void setMeasure(String measure) { this.measure = measure; } public String getMeasure() { return measure; } // anomalyScore is using weighted average, may not be wise. public void add(long fromMillis, long toMillis, double count, int anomalyScore, ArrayList<HashMap<String, Object>> logQuery) { double sum = this.count * this.anomalyScore + count * anomalyScore; this.count += count; this.anomalyScore = (int) (sum / this.count); if (fromMillis < this.startTimestamp) { this.startTimestamp = fromMillis; } if (toMillis > this.endTimestamp) { this.endTimestamp = toMillis; } if (!timeMap.containsKey(fromMillis)) { timeMap.put(fromMillis, 0.0); } timeMap.put(fromMillis, timeMap.get(fromMillis) + count); if (logQuery != null) { this.logQuery.addAll(logQuery); } } public void add(long fromMillis, long toMillis, double count, int anomalyScore) { double sum = this.count * this.anomalyScore + count * anomalyScore; this.count += count; this.anomalyScore = (int) (sum / this.count); if (fromMillis < this.startTimestamp) { this.startTimestamp = fromMillis; } if (toMillis > this.endTimestamp) { this.endTimestamp = toMillis; } if (!timeMap.containsKey(fromMillis)) { timeMap.put(fromMillis, 0.0); } timeMap.put(fromMillis, timeMap.get(fromMillis) + count); } public Long[] getTimestamps() { return timeMap.keySet().toArray(new Long[timeMap.size()]); } public String dimension() { return dimension; } public long startTimestamp() { return startTimestamp; } public long endTimestamp() { return endTimestamp; } public double count() { return count; } public int anomalyScore() { return anomalyScore; } public boolean isAnomaly() { return anomalyScore() >= ANOMALY_SCORE_THRESHOLD && count() >= ANOMALY_COUNT_THRESHOLD; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public void describeAs(String description, Icon... icons) { this.description = description; this.icons = icons; } public String setVisualizeInterfaceParameter(String group, long visualizeStartTimestamp, long visualizeEndTimestamp) { String requestParameterString = "/get_alert_visualize?detectorName=" + detector + "&groupField=" + group + "&user=" + user + "&field=" + field + "&measureField=" + measure + "&schemaName=" + schema + "&dimensionField=" + dimension + "&visualizeStartTimestamp=" + visualizeStartTimestamp + "&visualizeEndTimestamp=" + visualizeEndTimestamp; return requestParameterString; } @Override public int hashCode() { int result; long temp; result = schema != null ? schema.hashCode() : 0; result = 31 * result + (detector != null ? detector.hashCode() : 0); result = 31 * result + (measure != null ? measure.hashCode() : 0); result = 31 * result + (field != null ? field.hashCode() : 0); result = 31 * result + (dimension != null ? dimension.hashCode() : 0); result = 31 * result + (description != null ? description.hashCode() : 0); result = 31 * result + Arrays.hashCode(icons); result = 31 * result + (int) (startTimestamp ^ (startTimestamp >>> 32)); result = 31 * result + (int) (endTimestamp ^ (endTimestamp >>> 32)); temp = Double.doubleToLongBits(count); result = 31 * result + (int) (temp ^ (temp >>> 32)); result = 31 * result + anomalyScore; result = 31 * result + adHashCode; result = 31 * result + (timeMap != null ? timeMap.hashCode() : 0); result = 31 * result + (user != null ? user.hashCode() : 0); result = 31 * result + (logQuery != null ? logQuery.hashCode() : 0); result = 31 * result + (group != null ? group.hashCode() : 0); return result; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Behavior behavior = (Behavior) o; if (startTimestamp != behavior.startTimestamp) return false; if (endTimestamp != behavior.endTimestamp) return false; if (Double.compare(behavior.count, count) != 0) return false; if (anomalyScore != behavior.anomalyScore) return false; if (adHashCode != behavior.adHashCode) return false; if (schema != null ? !schema.equals(behavior.schema) : behavior.schema != null) return false; if (detector != null ? !detector.equals(behavior.detector) : behavior.detector != null) return false; if (measure != null ? !measure.equals(behavior.measure) : behavior.measure != null) return false; if (field != null ? !field.equals(behavior.field) : behavior.field != null) return false; if (dimension != null ? !dimension.equals(behavior.dimension) : behavior.dimension != null) return false; if (description != null ? !description.equals(behavior.description) : behavior.description != null) return false; // Probably incorrect - comparing Object[] arrays with Arrays.equals if (!Arrays.equals(icons, behavior.icons)) return false; if (timeMap != null ? !timeMap.equals(behavior.timeMap) : behavior.timeMap != null) return false; if (user != null ? !user.equals(behavior.user) : behavior.user != null) return false; if (logQuery != null ? !logQuery.equals(behavior.logQuery) : behavior.logQuery != null) return false; return group != null ? group.equals(behavior.group) : behavior.group == null; } @Override public String toString() { return "Behavior{" + "schema='" + schema + '\'' + ", detector='" + detector + '\'' + ", measure='" + measure + '\'' + ", field='" + field + '\'' + ", dimension='" + dimension + '\'' + ", description='" + description + '\'' + ", icons=" + Arrays.toString(icons) + ", startTimestamp=" + startTimestamp + ", endTimestamp=" + endTimestamp + ", count=" + count + ", anomalyScore=" + anomalyScore + ", adHashCode=" + adHashCode + ", timeMap=" + timeMap + ", user='" + user + '\'' + ", logQuery=" + logQuery + ", group='" + group + '\'' + '}'; } }
Attachments
Issue Links
- is related to
-
FLINK-6321 RocksDB state backend Checkpointing is not working with KeyedCEP.
- Closed
-
FLINK-8226 Dangling reference generated after NFA clean up timed out SharedBufferEntry
- Resolved