Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7424

`CEP` component make `KeyedStream` choose wrong channel

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • None
    • None
    • Library / CEP
    • None

    Description

      `CEP` component make `KeyedStream` choose wrong channel

      Origin KeySelector is perfect right.

      public static KeySelector<HBaseServerLog, Integer> buildKeySelector() {
          return (KeySelector<HBaseServerLog, Integer>) log -> {
              if (log == null) return 0;
              Integer flumeId;
              if ((flumeId = log.getFlumeId()) == null) return 1;
              return flumeId;
          };
      }
      

      After some changes, it will throw Key group index out of range of key group range [16, 32) exception.

      public static KeySelector<HBaseServerLog, Integer> buildKeySelector(final int parallelism) {
          return new KeySelector<HBaseServerLog, Integer>() {
              private Random r = new Random(System.nanoTime());
              @Override
              public Integer getKey(HBaseServerLog log) throws Exception {
                  if (log == null) return 0;
                  Integer flumeId;
                  if ((flumeId = log.getFlumeId()) == null) return 1;
                  return Math.max(flumeId + (r.nextBoolean() ? 0 : -1 * r.nextInt(parallelism)), 0);
              }
          };
      }
      

      But, after MathUtils.murmurHash(keyHash) % maxParallelism process, it shouldn't be wrong. Actually, when we add some `CEP` component (IterativeCondition/PatternFlatSelectFunction) code after it. It make the KeySelector choose wrong channel and throw IllegalArgumentException.

      Attachments

        Activity

          People

            benedict jin Benedict Jin
            benedict jin Benedict Jin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: