Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.18.0
-
flink: 1.18.0
hbase: 2.2.3
flink-connector-hbase-2.2:3.0.0-1.18
Description
When I want to write label values of 50+to a rowkey column family in HBase (all values are label values of 0/1), for example:
{"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"}Here are four label values for example:
{"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"}--source: CREATE TABLE kafka_table_( `id` STRING, `q1` STRING, `q2` STRING, `q3` STRING, `q4` STRING ) WITH ( ... 'connector'='kafka', 'format'='json', ... ); --sink: CREATE TABLE hbase_table_ ( rowkey STRING, cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>, PRIMARY KEY (rowkey ) NOT ENFORCED ) WITH ( 'connector' = 'my-hbase-2.2', 'table-name' = 'test_table', 'zookeeper.quorum' = '127.0.0.1' ); --insert: insert into hbase_table_ select id AS rowkey , ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf from kafka_table_ ;
hbase:
hbase(main):016:0> scan 'test_table'
ROW COLUMN+CELL
1111 column=cf:q1, timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q2, timestamp=0000000000001, value=\x00\x00\x00\x01
1111 column=cf:q3, timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q4, timestamp=0000000000001, value=\x00\x00\x00\x01
Upstream data has a fixed value of 50+k-v data, among which very few value values are 1 (the default label value is 0). For example, only 1 or 2 values are 1: q2=1, q4=1, so I want HBase to store the following values:
hbase(main):016:0> scan 'test_table'
ROW COLUMN+CELL
1111 column=cf:q2, timestamp=0000000000001, value=\x00\x00\x00\x01
1111 column=cf:q4, timestamp=0000000000001, value=\x00\x00\x00\x01
When I use the "sink. ignore null value" keyword here, It just don't update the null value, and downstream third parties will still read all the values (such as 50+), but there are only 2 values that are truly 1:
--sink: CREATE TABLE hbase_table_ ( rowkey STRING, cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>, PRIMARY KEY (rowkey ) NOT ENFORCED ) WITH ( 'connector' = 'my-hbase-2.2', 'table-name' = 'test_table', 'sink.ignore-null-value' = 'true', 'zookeeper.quorum' = '127.0.0.1' ); --insert: insert into hbase_table_ select id AS rowkey , ROW( case when q1 <> '0' then cast(q1 as INT) else null end, case when q2 <> '0' then cast(q2 as INT) else null end, case when q3 <> '0' then cast(q3 as INT) else null end, case when q4 <> '0' then cast(q4 as INT) else null end ) as cf from kafka_table_ ;
hbase(main):016:0> scan 'test_table'
ROW COLUMN+CELL
1111 column=cf:q1, timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q2, timestamp=0000000000002, value=\x00\x00\x00\x01
1111 column=cf:q3, timestamp=0000000000001, value=\x00\x00\x00\x00
1111 column=cf:q4, timestamp=0000000000002, value=\x00\x00\x00\x01
There are no other configurations available, so I hope to have the function of overwriting and writing rowKey, that is, deleting the rowkey before adding new data.
Attachments
Issue Links
- links to