Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36188

HBase disable buffer flush lose efficacy

    XMLWordPrintableJSON

Details

    Description

      HBase table

      rowkey col
      1 1

      The user lookup joins the hbase table, adds 1 to the col value, and writes it back to hbase

      @Test
      void testTableSinkDisabledBufferFlush() throws Exception {
              StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
              StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);        
                  tEnv.executeSql(
                      "CREATE TABLE hTableForSink ("
                              + " rowkey INT PRIMARY KEY NOT ENFORCED,"
                              + " family1 ROW<col1 INT>"
                              + ") WITH ("
                              + " 'connector' = 'hbase-2.2',"
                              + " 'sink.buffer-flush.max-size' = '0',"
                              + " 'sink.buffer-flush.max-rows' = '0',"
                              + " 'table-name' = '"
                              + TEST_TABLE_6
                              + "',"
                              + " 'zookeeper.quorum' = '"
                              + getZookeeperQuorum()
                              + "'"
                              + ")");       
                  String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
              tEnv.executeSql(insert).await();        
      
                  tEnv.executeSql(
                      "CREATE VIEW user_click AS "
                              + " SELECT user_id, proctime() AS proc_time"
                              + " FROM ( "
                              + " VALUES(1), (1), (1), (1), (1)"
                              + " ) AS t (user_id);");    
          
                 tEnv.executeSql(
                      "INSERT INTO hTableForSink SELECT "
                              + "    user_id as rowkey,"
                              + "    ROW(CAST(family1.col1 + 1 AS INT))"
                              + " FROM user_click INNER JOIN hTableForSink"
                              + " FOR SYSTEM_TIME AS OF user_click.proc_time"
                              + " ON hTableForSink.rowkey = user_click.user_id;");        
      
                  tEnv.executeSql(
                      "CREATE TABLE hTableForQuery ("
                              + " rowkey INT PRIMARY KEY NOT ENFORCED,"
                              + " family1 ROW<col1 INT>"
                              + ") WITH ("
                              + " 'connector' = 'hbase-2.2',"
                              + " 'table-name' = '"
                              + TEST_TABLE_6
                              + "',"
                              + " 'zookeeper.quorum' = '"
                              + getZookeeperQuorum()
                              + "'"
                              + ")");
              String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";        
                  TableResult firstResult = tEnv.executeSql(query);
              List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect());
              String firstExpected = "+I[1, 6]";
              TestBaseUtils.compareResultAsText(firstResults, firstExpected);
          } 

      test failed

      org.junit.ComparisonFailure: Different elements in arrays: expected 1 elements and received 1
       expected: [+I[1, 6]]
       received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
      Expected :+I[1, 6]
      Actual   :+I[1, 2] 

      Attachments

        Issue Links

          Activity

            People

              MOBIN MOBIN
              MOBIN MOBIN
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: