Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33566

HBase sql-connector needs overwrite the rowKey

    XMLWordPrintableJSON

Details

    Description

      When I want to write label values of 50+to a rowkey column family in HBase (all values are label values of 0/1), for example:

      {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"}

      Here are four label values for example:

      {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"}
      --source:
      CREATE TABLE kafka_table_(
       `id` STRING,
       `q1` STRING,
       `q2` STRING,
       `q3` STRING,
       `q4` STRING
      )  WITH (
      ...
         'connector'='kafka',
         'format'='json',
      ...
      );
      --sink:
      CREATE TABLE hbase_table_ (
       rowkey  STRING,
       cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
       PRIMARY KEY (rowkey ) NOT ENFORCED
      ) WITH (
        'connector' = 'my-hbase-2.2',
        'table-name' = 'test_table',
        'zookeeper.quorum' = '127.0.0.1'
      );
      --insert:
      insert into hbase_table_ 
      select 
       id AS rowkey ,
       ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf
       from kafka_table_  ;

      hbase:
      hbase(main):016:0> scan 'test_table'
      ROW                                                        COLUMN+CELL                                                                                                                                                                
       1111                                                      column=cf:q1, timestamp=0000000000001, value=\x00\x00\x00\x00                                                                                                              
       1111                                                      column=cf:q2, timestamp=0000000000001, value=\x00\x00\x00\x01                                                                                                              
       1111                                                      column=cf:q3, timestamp=0000000000001, value=\x00\x00\x00\x00                                                                                                              
       1111                                                      column=cf:q4, timestamp=0000000000001, value=\x00\x00\x00\x01 
       
      Upstream data has a fixed value of 50+k-v data, among which very few value values are 1 (the default label value is 0). For example, only 1 or 2 values are 1: q2=1, q4=1, so I want HBase to store the following values:
      hbase(main):016:0> scan 'test_table'
      ROW                                                        COLUMN+CELL                                                                                                                                                                
       1111                                                      column=cf:q2, timestamp=0000000000001, value=\x00\x00\x00\x01                                                                                                             
       1111                                                      column=cf:q4, timestamp=0000000000001, value=\x00\x00\x00\x01 
       
      When I use the "sink. ignore null value" keyword here, It just don't update the null value, and downstream third parties will still read all the values (such as 50+), but there are only 2 values that are truly 1:

      --sink:
      CREATE TABLE hbase_table_ (
       rowkey  STRING,
       cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
       PRIMARY KEY (rowkey ) NOT ENFORCED
      ) WITH (
        'connector' = 'my-hbase-2.2',
        'table-name' = 'test_table',
        'sink.ignore-null-value' = 'true',
        'zookeeper.quorum' = '127.0.0.1'
      ); 
      --insert:
      insert into hbase_table_ 
      select 
       id AS rowkey ,
       ROW(
       case when q1 <> '0' then cast(q1 as INT) else null end,
       case when q2 <> '0' then cast(q2 as INT) else null end,
       case when q3 <> '0' then cast(q3 as INT) else null end,
       case when q4 <> '0' then cast(q4 as INT) else null end 
      ) as cf
       from kafka_table_ ; 

      hbase(main):016:0> scan 'test_table'
      ROW                                                        COLUMN+CELL                                                                                                                                                                
       1111                                                      column=cf:q1, timestamp=0000000000001, value=\x00\x00\x00\x00                                                                                                              
       1111                                                      column=cf:q2, timestamp=0000000000002, value=\x00\x00\x00\x01                                                                                                              
       1111                                                      column=cf:q3, timestamp=0000000000001, value=\x00\x00\x00\x00                                                                                                              
       1111                                                      column=cf:q4, timestamp=0000000000002, value=\x00\x00\x00\x01 
       
      There are no other configurations available, so I hope to have the function of overwriting and writing rowKey, that is, deleting the rowkey before adding new data.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jankowilliam JankoWilliam
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: