Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36371

Field BinaryStringData as keySelector would lead the same records hash shuffle into different tumble windows

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.0.0
    • None
    • None

    Description

      The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap --(keyby)–> TumbleProcessWindow -> sink.

      The Source would emit two fields: UserName String, and Amount Integer.

      FlatMap collects the BinaryRowData type record and emits it to the TumbleProcessWindow operator.

      KeySelector is using the UserName BinaryStringData. key by code is : 

      .keyBy(rowData -> rowData.getString(0))

      The window result is unexpected, the same username records arrived at TumbleProcessWindow simultaneously, but these records were calculated in the different windows.  

      When I use the below keyBy, the window result is correct.

      .keyBy(rowData -> rowData.getString(0).toString())

       

      Reproduction Code:

      env.fromSource(
                      kafkaSource,
                      WatermarkStrategy.noWatermarks(),
                      sourceName())
               .flatMap(new FlatMapFunction<RowData, RowData>() {
                  public void flatMap(RowData value, Collector<RowData> out) throws Exception {
                      System.out.println("is instanceof BinaryRowData =" + (value instanceof BinaryRowData));
                      out.collect(value);
                  }
              })
              .keyBy((KeySelector<RowData, StringData>) value -> value.getString(0))
              .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
              .process(new FlinkTumbleWindowingProcessFunction())
              .print(); 
      
      public static class FlinkTumbleWindowingProcessFunction extends ProcessWindowFunction<RowData, RowData, StringData, TimeWindow> {
          int subtask = -1;
      
          public void open(Configuration parameters) throws Exception {
              super.open(parameters);
              subtask = getRuntimeContext().getIndexOfThisSubtask();
          }
      
          public void process(
                  StringData s,
                  ProcessWindowFunction<RowData, RowData, StringData, TimeWindow>.Context context,
                  Iterable<RowData> elements,
                  Collector<RowData> out) throws Exception {
              System.out.printf(
                      "subtask = %d, s = %s, elements = %s%n",
                      subtask,
                      s,
                      elements);
          }
      }
      
      

       

       

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            easonye Eason Ye
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: