Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
None
-
None
-
None
Description
`CEP` component make `KeyedStream` choose wrong channel
Origin KeySelector is perfect right.
public static KeySelector<HBaseServerLog, Integer> buildKeySelector() { return (KeySelector<HBaseServerLog, Integer>) log -> { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return flumeId; }; }
After some changes, it will throw Key group index out of range of key group range [16, 32) exception.
public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) { return new KeySelector<HBaseServerLog, Integer>() { private Random r = new Random(System.nanoTime()); @Override public Integer getKey(HBaseServerLog log) throws Exception { if (log == null) return 0; Integer flumeId; if ((flumeId = log.getFlumeId()) == null) return 1; return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0); } }; }
But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the KeySelector choose wrong channel and throw IllegalArgumentException.