From 0ff0e563ac93883aa1c2d14e4f7f0f92aad54111 Mon Sep 17 00:00:00 2001 From: Chun-Hao Tang Date: Mon, 18 Sep 2017 14:26:44 +0800 Subject: [PATCH] HBASE-18142 Deletion of a cell deletes the previous versions too --- hbase-shell/src/main/ruby/hbase/table.rb | 461 +++++++++++++-------- hbase-shell/src/main/ruby/shell/commands/delete.rb | 19 +- .../src/main/ruby/shell/commands/deleteall.rb | 28 +- hbase-shell/src/test/ruby/hbase/table_test.rb | 89 ++-- 4 files changed, 364 insertions(+), 233 deletions(-) diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb index 36c6509..7eaf1b3 100644 --- a/hbase-shell/src/main/ruby/hbase/table.rb +++ b/hbase-shell/src/main/ruby/hbase/table.rb @@ -24,7 +24,6 @@ include Java module Hbase class Table include HBaseConstants - @@thread_pool = nil # Add the command 'name' to table s.t. the shell command also called via 'name' @@ -32,7 +31,7 @@ module Hbase # # e.g. name = scan, adds table.scan which calls Scan.scan def self.add_shell_command(name) - self.add_command(name, name, name) + add_command(name, name, name) end # add a named command to the table instance @@ -42,18 +41,18 @@ module Hbase # shell_command - name of the command in the shell # internal_method_name - name of the method in the shell command to forward the call def self.add_command(name, shell_command, internal_method_name) - method = name.to_sym - self.class_eval do + method = name.to_sym + class_eval do define_method method do |*args| - @shell.internal_command(shell_command, internal_method_name, self, *args) - end + @shell.internal_command(shell_command, internal_method_name, self, *args) + end end end # General help for the table # class level so we can call it from anywhere def self.help - return <<-EOF + <<-EOF Help for table-reference commands. You can either create a table via 'create' and then manipulate the table via commands like 'put', 'get', etc. @@ -102,7 +101,7 @@ flush and drop just by typing: Note that after dropping a table, your reference to it becomes useless and further usage is undefined (and not recommended). EOF - end + end #--------------------------------------------------------------------------------------------- @@ -113,13 +112,13 @@ EOF def initialize(table, shell) @table = table - @name = @table.getName().getNameAsString() + @name = @table.getName.getNameAsString @shell = shell - @converters = Hash.new() + @converters = {} end - def close() - @table.close() + def close + @table.close end # Note the below methods are prefixed with '_' to hide them from the average user, as @@ -131,88 +130,134 @@ EOF p = org.apache.hadoop.hbase.client.Put.new(row.to_s.to_java_bytes) family, qualifier = parse_column_name(column) if args.any? - attributes = args[ATTRIBUTES] - set_attributes(p, attributes) if attributes - visibility = args[VISIBILITY] - set_cell_visibility(p, visibility) if visibility - ttl = args[TTL] - set_op_ttl(p, ttl) if ttl - end - #Case where attributes are specified without timestamp - if timestamp.kind_of?(Hash) - timestamp.each do |k, v| + attributes = args[ATTRIBUTES] + set_attributes(p, attributes) if attributes + visibility = args[VISIBILITY] + set_cell_visibility(p, visibility) if visibility + ttl = args[TTL] + set_op_ttl(p, ttl) if ttl + end + # Case where attributes are specified without timestamp + if timestamp.is_a?(Hash) + timestamp.each do |k, v| if k == 'ATTRIBUTES' set_attributes(p, v) elsif k == 'VISIBILITY' set_cell_visibility(p, v) - elsif k == "TTL" + elsif k == 'TTL' set_op_ttl(p, v) end end timestamp = nil end if timestamp - p.add(family, qualifier, timestamp, value.to_s.to_java_bytes) + p.addColumn(family, qualifier, timestamp, value.to_s.to_java_bytes) else - p.add(family, qualifier, value.to_s.to_java_bytes) + p.addColumn(family, qualifier, value.to_s.to_java_bytes) end @table.put(p) end #---------------------------------------------------------------------------------------------- + # Create a Delete mutation + def _createdelete_internal(row, column = nil, + timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, + args = {}, all_version = true) + temptimestamp = timestamp + if temptimestamp.is_a?(Hash) + timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP + end + d = org.apache.hadoop.hbase.client.Delete.new(row.to_s.to_java_bytes, timestamp) + if temptimestamp.is_a?(Hash) + temptimestamp.each do |_k, v| + if v.is_a?(String) + set_cell_visibility(d, v) if v + end + end + end + if args.any? + visibility = args[VISIBILITY] + set_cell_visibility(d, visibility) if visibility + end + if column && all_version + family, qualifier = parse_column_name(column) + d.addColumns(family, qualifier, timestamp) + elsif column && !all_version + family, qualifier = parse_column_name(column) + d.addColumn(family, qualifier, timestamp) + end + d + end + + #---------------------------------------------------------------------------------------------- + # Delete rows using prefix + def _deleterows_internal(row, column = nil, + timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, + args = {}, all_version = true) + cache = row['CACHE'] ? row['CACHE'] : 100 + prefix = row['ROWPREFIXFILTER'] + + # create scan to get table names using prefix + scan = org.apache.hadoop.hbase.client.Scan.new + scan.setRowPrefixFilter(prefix.to_java_bytes) + # Run the scanner to get all rowkeys + scanner = @table.getScanner(scan) + # Create a list to store all deletes + list = java.util.ArrayList.new + # Iterate results + iter = scanner.iterator + while iter.hasNext + row = iter.next + key = org.apache.hadoop.hbase.util.Bytes.toStringBinary(row.getRow) + d = _createdelete_internal(key, column, timestamp, args, all_version) + list.add(d) + if list.size >= cache + @table.delete(list) + list.clear + end + end + @table.delete(list) + end + + #---------------------------------------------------------------------------------------------- # Delete a cell def _delete_internal(row, column, - timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {}) - _deleteall_internal(row, column, timestamp, args) + timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, + args = {}, all_version = false) + _deleteall_internal(row, column, timestamp, args, all_version) end #---------------------------------------------------------------------------------------------- # Delete a row def _deleteall_internal(row, column = nil, - timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {}) + timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, + args = {}, all_version = true) # delete operation doesn't need read permission. Retaining the read check for # meta table as a part of HBASE-5837. if is_meta_table? - raise ArgumentError, "Row Not Found" if _get_internal(row).nil? - end - temptimestamp = timestamp - if temptimestamp.kind_of?(Hash) - timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP + raise ArgumentError, 'Row Not Found' if _get_internal(row).nil? end - d = org.apache.hadoop.hbase.client.Delete.new(row.to_s.to_java_bytes, timestamp) - if temptimestamp.kind_of?(Hash) - temptimestamp.each do |k, v| - if v.kind_of?(String) - set_cell_visibility(d, v) if v - end - end - end - if args.any? - visibility = args[VISIBILITY] - set_cell_visibility(d, visibility) if visibility - end - if column - family, qualifier = parse_column_name(column) - d.deleteColumns(family, qualifier, timestamp) + if row.is_a?(Hash) + _deleterows_internal(row, column, timestamp, args, all_version) + else + d = _createdelete_internal(row, column, timestamp, args, all_version) + @table.delete(d) end - @table.delete(d) end #---------------------------------------------------------------------------------------------- # Increment a counter atomically - def _incr_internal(row, column, value = nil, args={}) - if value.kind_of?(Hash) - value = 1 - end + def _incr_internal(row, column, value = nil, args = {}) + value = 1 if value.is_a?(Hash) value ||= 1 incr = org.apache.hadoop.hbase.client.Increment.new(row.to_s.to_java_bytes) family, qualifier = parse_column_name(column) if qualifier.nil? - raise ArgumentError, "Failed to provide both column family and column qualifier for incr" + raise ArgumentError, 'Failed to provide both column family and column qualifier for incr' end if args.any? - attributes = args[ATTRIBUTES] - visibility = args[VISIBILITY] + attributes = args[ATTRIBUTES] + visibility = args[VISIBILITY] set_attributes(incr, attributes) if attributes set_cell_visibility(incr, visibility) if visibility ttl = args[TTL] @@ -224,20 +269,21 @@ EOF # Fetch cell value cell = result.listCells[0] - org.apache.hadoop.hbase.util.Bytes::toLong(cell.getValue) + org.apache.hadoop.hbase.util.Bytes.toLong(cell.getValueArray, + cell.getValueOffset, cell.getValueLength) end #---------------------------------------------------------------------------------------------- # appends the value atomically - def _append_internal(row, column, value, args={}) + def _append_internal(row, column, value, args = {}) append = org.apache.hadoop.hbase.client.Append.new(row.to_s.to_java_bytes) family, qualifier = parse_column_name(column) if qualifier.nil? - raise ArgumentError, "Failed to provide both column family and column qualifier for append" + raise ArgumentError, 'Failed to provide both column family and column qualifier for append' end if args.any? - attributes = args[ATTRIBUTES] - visibility = args[VISIBILITY] + attributes = args[ATTRIBUTES] + visibility = args[VISIBILITY] set_attributes(append, attributes) if attributes set_cell_visibility(append, visibility) if visibility ttl = args[TTL] @@ -249,17 +295,33 @@ EOF # Fetch cell value cell = result.listCells[0] - org.apache.hadoop.hbase.util.Bytes::toStringBinary(cell.getValue) + org.apache.hadoop.hbase.util.Bytes.toStringBinary(cell.getValueArray, + cell.getValueOffset, cell.getValueLength) end #---------------------------------------------------------------------------------------------- # Count rows in a table - def _count_internal(interval = 1000, caching_rows = 10) + def _count_internal(interval = 1000, scan = nil) + raise(ArgumentError, 'Scan argument should be org.apache.hadoop.hbase.client.Scan') \ + unless scan.nil? || scan.is_a?(org.apache.hadoop.hbase.client.Scan) # We can safely set scanner caching with the first key only filter - scan = org.apache.hadoop.hbase.client.Scan.new - scan.setCacheBlocks(false) - scan.setCaching(caching_rows) - scan.setFilter(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new) + + if scan.nil? + scan = org.apache.hadoop.hbase.client.Scan.new + scan.setCacheBlocks(false) + scan.setCaching(10) + scan.setFilter(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new) + else + scan.setCacheBlocks(false) + filter = scan.getFilter + firstKeyOnlyFilter = org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new + if filter.nil? + scan.setFilter(firstKeyOnlyFilter) + else + firstKeyOnlyFilter.setReversed(filter.isReversed) + scan.setFilter(org.apache.hadoop.hbase.filter.FilterList.new(filter, firstKeyOnlyFilter)) + end + end # Run the scanner scanner = @table.getScanner(scan) @@ -270,15 +332,15 @@ EOF while iter.hasNext row = iter.next count += 1 - next unless (block_given? && count % interval == 0) + next unless block_given? && count % interval == 0 # Allow command modules to visualize counting process yield(count, - org.apache.hadoop.hbase.util.Bytes::toStringBinary(row.getRow)) + org.apache.hadoop.hbase.util.Bytes.toStringBinary(row.getRow)) end - scanner.close() + scanner.close # Return the counter - return count + count end #---------------------------------------------------------------------------------------------- @@ -287,19 +349,19 @@ EOF get = org.apache.hadoop.hbase.client.Get.new(row.to_s.to_java_bytes) maxlength = -1 count = 0 - @converters.clear() + @converters.clear # Normalize args - args = args.first if args.first.kind_of?(Hash) - if args.kind_of?(String) || args.kind_of?(Array) - columns = [ args ].flatten.compact + args = args.first if args.first.is_a?(Hash) + if args.is_a?(String) || args.is_a?(Array) + columns = [args].flatten.compact args = { COLUMNS => columns } end # # Parse arguments # - unless args.kind_of?(Hash) + unless args.is_a?(Hash) raise ArgumentError, "Failed parse of #{args.inspect}, #{args.class}" end @@ -310,19 +372,21 @@ EOF authorizations = args[AUTHORIZATIONS] consistency = args.delete(CONSISTENCY) if args[CONSISTENCY] replicaId = args.delete(REGION_REPLICA_ID) if args[REGION_REPLICA_ID] + converter = args.delete(FORMATTER) || nil + converter_class = args.delete(FORMATTER_CLASS) || 'org.apache.hadoop.hbase.util.Bytes' unless args.empty? columns = args[COLUMN] || args[COLUMNS] - if args[VERSIONS] - vers = args[VERSIONS] - else - vers = 1 - end + vers = if args[VERSIONS] + args[VERSIONS] + else + 1 + end if columns # Normalize types, convert string to an array of strings - columns = [ columns ] if columns.is_a?(String) + columns = [columns] if columns.is_a?(String) # At this point it is either an array or some unsupported stuff - unless columns.kind_of?(Array) + unless columns.is_a?(Array) raise ArgumentError, "Failed parse column argument type #{args.inspect}, #{args.class}" end @@ -342,14 +406,14 @@ EOF get.setTimeRange(args[TIMERANGE][0], args[TIMERANGE][1]) if args[TIMERANGE] else if attributes - set_attributes(get, attributes) + set_attributes(get, attributes) elsif authorizations - set_authorizations(get, authorizations) + set_authorizations(get, authorizations) else - # May have passed TIMESTAMP and row only; wants all columns from ts. - unless ts = args[TIMESTAMP] || tr = args[TIMERANGE] - raise ArgumentError, "Failed parse of #{args.inspect}, #{args.class}" - end + # May have passed TIMESTAMP and row only; wants all columns from ts. + unless ts = args[TIMESTAMP] || tr = args[TIMERANGE] + raise ArgumentError, "Failed parse of #{args.inspect}, #{args.class}" + end end get.setMaxVersions(vers) @@ -361,11 +425,12 @@ EOF set_authorizations(get, authorizations) if authorizations end - unless filter.class == String - get.setFilter(filter) - else + if filter.class == String get.setFilter( - org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter.to_java_bytes)) + org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter.to_java_bytes) + ) + else + get.setFilter(filter) end get.setConsistency(org.apache.hadoop.hbase.client.Consistency.valueOf(consistency)) if consistency @@ -381,12 +446,14 @@ EOF # Print out results. Result can be Cell or RowResult. res = {} - result.list.each do |kv| - family = String.from_java_bytes(kv.getFamily) - qualifier = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getQualifier) + result.listCells.each do |c| + family = convert_bytes_with_position(c.getFamilyArray, + c.getFamilyOffset, c.getFamilyLength, converter_class, converter) + qualifier = convert_bytes_with_position(c.getQualifierArray, + c.getQualifierOffset, c.getQualifierLength, converter_class, converter) column = "#{family}:#{qualifier}" - value = to_string(column, kv, maxlength) + value = to_string(column, c, maxlength, converter_class, converter) if block_given? yield(column, value) @@ -396,7 +463,7 @@ EOF end # If block given, we've yielded all the results, otherwise just return them - return ((block_given?) ? [count, is_stale]: res) + (block_given? ? [count, is_stale] : res) end #---------------------------------------------------------------------------------------------- @@ -413,48 +480,49 @@ EOF return nil if result.isEmpty # Fetch cell value - cell = result.list[0] - org.apache.hadoop.hbase.util.Bytes::toLong(cell.getValue) + cell = result.listCells[0] + org.apache.hadoop.hbase.util.Bytes.toLong(cell.getValueArray, + cell.getValueOffset, cell.getValueLength) end def _hash_to_scan(args) if args.any? - enablemetrics = args["ALL_METRICS"].nil? ? false : args["ALL_METRICS"] - enablemetrics = enablemetrics || !args["METRICS"].nil? - filter = args["FILTER"] - startrow = args["STARTROW"] || '' - stoprow = args["STOPROW"] - rowprefixfilter = args["ROWPREFIXFILTER"] - timestamp = args["TIMESTAMP"] - columns = args["COLUMNS"] || args["COLUMN"] || [] + enablemetrics = args['ALL_METRICS'].nil? ? false : args['ALL_METRICS'] + enablemetrics ||= !args['METRICS'].nil? + filter = args['FILTER'] + startrow = args['STARTROW'] || '' + stoprow = args['STOPROW'] + rowprefixfilter = args['ROWPREFIXFILTER'] + timestamp = args['TIMESTAMP'] + columns = args['COLUMNS'] || args['COLUMN'] || [] # If CACHE_BLOCKS not set, then default 'true'. - cache_blocks = args["CACHE_BLOCKS"].nil? ? true: args["CACHE_BLOCKS"] - cache = args["CACHE"] || 0 - reversed = args["REVERSED"] || false - versions = args["VERSIONS"] || 1 + cache_blocks = args['CACHE_BLOCKS'].nil? ? true : args['CACHE_BLOCKS'] + cache = args['CACHE'] || 0 + reversed = args['REVERSED'] || false + versions = args['VERSIONS'] || 1 timerange = args[TIMERANGE] - raw = args["RAW"] || false + raw = args['RAW'] || false attributes = args[ATTRIBUTES] authorizations = args[AUTHORIZATIONS] consistency = args[CONSISTENCY] # Normalize column names columns = [columns] if columns.class == String - limit = args["LIMIT"] || -1 - unless columns.kind_of?(Array) - raise ArgumentError.new("COLUMNS must be specified as a String or an Array") + limit = args['LIMIT'] || -1 + unless columns.is_a?(Array) + raise ArgumentError, 'COLUMNS must be specified as a String or an Array' end scan = if stoprow - org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes, stoprow.to_java_bytes) - else - org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes) - end + org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes, stoprow.to_java_bytes) + else + org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes) + end # This will overwrite any startrow/stoprow settings scan.setRowPrefixFilter(rowprefixfilter.to_java_bytes) if rowprefixfilter # Clear converters from last scan. - @converters.clear() + @converters.clear columns.each do |c| family, qualifier = parse_column_name(c.to_s) @@ -465,11 +533,12 @@ EOF end end - unless filter.class == String - scan.setFilter(filter) - else + if filter.class == String scan.setFilter( - org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter.to_java_bytes)) + org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter.to_java_bytes) + ) + else + scan.setFilter(filter) end scan.setScanMetricsEnabled(enablemetrics) if enablemetrics @@ -498,32 +567,36 @@ EOF #---------------------------------------------------------------------------------------------- # Scans whole table or a range of keys and returns rows matching specific criteria def _scan_internal(args = {}, scan = nil) - raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash) - raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \ - unless scan == nil || scan.kind_of?(org.apache.hadoop.hbase.client.Scan) - - limit = args["LIMIT"] || -1 - maxlength = args.delete("MAXLENGTH") || -1 + raise(ArgumentError, 'Args should be a Hash') unless args.is_a?(Hash) + raise(ArgumentError, 'Scan argument should be org.apache.hadoop.hbase.client.Scan') \ + unless scan.nil? || scan.is_a?(org.apache.hadoop.hbase.client.Scan) + + limit = args['LIMIT'] || -1 + maxlength = args.delete('MAXLENGTH') || -1 + converter = args.delete(FORMATTER) || nil + converter_class = args.delete(FORMATTER_CLASS) || 'org.apache.hadoop.hbase.util.Bytes' count = 0 res = {} # Start the scanner - scan = scan == nil ? _hash_to_scan(args) : scan + scan = scan.nil? ? _hash_to_scan(args) : scan scanner = @table.getScanner(scan) iter = scanner.iterator # Iterate results while iter.hasNext row = iter.next - key = org.apache.hadoop.hbase.util.Bytes::toStringBinary(row.getRow) + key = convert_bytes(row.getRow, nil, converter) is_stale |= row.isStale - row.list.each do |kv| - family = String.from_java_bytes(kv.getFamily) - qualifier = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getQualifier) + row.listCells.each do |c| + family = convert_bytes_with_position(c.getFamilyArray, + c.getFamilyOffset, c.getFamilyLength, converter_class, converter) + qualifier = convert_bytes_with_position(c.getQualifierArray, + c.getQualifierOffset, c.getQualifierLength, converter_class, converter) column = "#{family}:#{qualifier}" - cell = to_string(column, kv, maxlength) + cell = to_string(column, c, maxlength, converter_class, converter) if block_given? yield(key, "column=#{column}, #{cell}") @@ -541,25 +614,26 @@ EOF end end - scanner.close() - return ((block_given?) ? [count, is_stale] : res) + scanner.close + (block_given? ? [count, is_stale] : res) end - # Apply OperationAttributes to puts/scans/gets + # Apply OperationAttributes to puts/scans/gets def set_attributes(oprattr, attributes) - raise(ArgumentError, "Attributes must be a Hash type") unless attributes.kind_of?(Hash) - for k,v in attributes + raise(ArgumentError, 'Attributes must be a Hash type') unless attributes.is_a?(Hash) + for k, v in attributes v = v.to_s unless v.nil? oprattr.setAttribute(k.to_s, v.to_java_bytes) end end def set_cell_permissions(op, permissions) - raise(ArgumentError, "Permissions must be a Hash type") unless permissions.kind_of?(Hash) + raise(ArgumentError, 'Permissions must be a Hash type') unless permissions.is_a?(Hash) map = java.util.HashMap.new - permissions.each do |user,perms| + permissions.each do |user, perms| map.put(user.to_s, org.apache.hadoop.hbase.security.access.Permission.new( - perms.to_java_bytes)) + perms.to_java_bytes + )) end op.setACL(map) end @@ -567,15 +641,19 @@ EOF def set_cell_visibility(oprattr, visibility) oprattr.setCellVisibility( org.apache.hadoop.hbase.security.visibility.CellVisibility.new( - visibility.to_s)) + visibility.to_s + ) + ) end def set_authorizations(oprattr, authorizations) - raise(ArgumentError, "Authorizations must be a Array type") unless authorizations.kind_of?(Array) - auths = [ authorizations ].flatten.compact + raise(ArgumentError, 'Authorizations must be a Array type') unless authorizations.is_a?(Array) + auths = [authorizations].flatten.compact oprattr.setAuthorizations( org.apache.hadoop.hbase.security.visibility.Authorizations.new( - auths.to_java(:string))) + auths.to_java(:string) + ) + ) end def set_op_ttl(op, ttl) @@ -596,14 +674,14 @@ EOF end end - #Add the following admin utilities to the table + # Add the following admin utilities to the table add_admin_utils :enable, :disable, :flush, :drop, :describe, :snapshot #---------------------------- - #give the general help for the table + # give the general help for the table # or the named command - def help (command = nil) - #if there is a command, get the per-command help from the shell + def help(command = nil) + # if there is a command, get the per-command help from the shell if command begin return @shell.help_command(command) @@ -612,13 +690,13 @@ EOF return nil end end - return @shell.help('table_help') + @shell.help('table_help') end # Table to string def to_s - cl = self.class() - return "#{cl} - #{@name}" + cl = self.class + "#{cl} - #{@name}" end # Standard ruby call to get the return value for an object @@ -639,48 +717,51 @@ EOF # Checks if current table is one of the 'meta' tables def is_meta_table? - org.apache.hadoop.hbase.TableName::META_TABLE_NAME.equals(@table.getName()) + org.apache.hadoop.hbase.TableName::META_TABLE_NAME.equals(@table.getName) end # Returns family and (when has it) qualifier for a column name def parse_column_name(column) split = org.apache.hadoop.hbase.KeyValue.parseColumn(column.to_java_bytes) set_converter(split) if split.length > 1 - return split[0], (split.length > 1) ? split[1] : nil + [split[0], split.length > 1 ? split[1] : nil] end # Make a String of the passed kv # Intercept cells whose format we know such as the info:regioninfo in hbase:meta - def to_string(column, kv, maxlength = -1) + def to_string(column, kv, maxlength = -1, converter_class = nil, converter = nil) if is_meta_table? - if column == 'info:regioninfo' or column == 'info:splitA' or column == 'info:splitB' - hri = org.apache.hadoop.hbase.HRegionInfo.parseFromOrNull(kv.getValue) - return "timestamp=%d, value=%s" % [kv.getTimestamp, hri.toString] + if column == 'info:regioninfo' || column == 'info:splitA' || column == 'info:splitB' + hri = org.apache.hadoop.hbase.HRegionInfo.parseFromOrNull(kv.getValueArray, + kv.getValueOffset, kv.getValueLength) + return format('timestamp=%d, value=%s', kv.getTimestamp, hri.toString) end if column == 'info:serverstartcode' - if kv.getValue.length > 0 - str_val = org.apache.hadoop.hbase.util.Bytes.toLong(kv.getValue) + if kv.getValueLength > 0 + str_val = org.apache.hadoop.hbase.util.Bytes.toLong(kv.getValueArray, + kv.getValueOffset, kv.getValueLength) else - str_val = org.apache.hadoop.hbase.util.Bytes.toStringBinary(kv.getValue) + str_val = org.apache.hadoop.hbase.util.Bytes.toStringBinary(kv.getValueArray, + kv.getValueOffset, kv.getValueLength) end - return "timestamp=%d, value=%s" % [kv.getTimestamp, str_val] + return format('timestamp=%d, value=%s', kv.getTimestamp, str_val) end end if kv.isDelete - val = "timestamp=#{kv.getTimestamp}, type=#{org.apache.hadoop.hbase.KeyValue::Type::codeToType(kv.getType)}" + val = "timestamp=#{kv.getTimestamp}, type=#{org.apache.hadoop.hbase.KeyValue::Type.codeToType(kv.getType)}" else - val = "timestamp=#{kv.getTimestamp}, value=#{convert(column, kv)}" + val = "timestamp=#{kv.getTimestamp}, value=#{convert(column, kv, converter_class, converter)}" end - (maxlength != -1) ? val[0, maxlength] : val + maxlength != -1 ? val[0, maxlength] : val end - def convert(column, kv) - #use org.apache.hadoop.hbase.util.Bytes as the default class - klazz_name = 'org.apache.hadoop.hbase.util.Bytes' - #use org.apache.hadoop.hbase.util.Bytes::toStringBinary as the default convertor - converter = 'toStringBinary' - if @converters.has_key?(column) + def convert(column, kv, converter_class = 'org.apache.hadoop.hbase.util.Bytes', converter = 'toStringBinary') + # use org.apache.hadoop.hbase.util.Bytes as the default class + converter_class = 'org.apache.hadoop.hbase.util.Bytes' unless converter_class + # use org.apache.hadoop.hbase.util.Bytes::toStringBinary as the default convertor + converter = 'toStringBinary' unless converter + if @converters.key?(column) # lookup the CONVERTER for certain column - "cf:qualifier" matches = /c\((.+)\)\.(.+)/.match(@converters[column]) if matches.nil? @@ -692,8 +773,19 @@ EOF converter = matches[2] end end - method = eval(klazz_name).method(converter) - return method.call(kv.getValue) # apply the converter + # apply the converter + convert_bytes(org.apache.hadoop.hbase.CellUtil.cloneValue(kv), klazz_name, converter) + end + + def convert_bytes(bytes, converter_class = nil, converter_method = nil) + convert_bytes_with_position(bytes, 0, bytes.length, converter_class, converter_method) + end + + def convert_bytes_with_position(bytes, offset, len, converter_class, converter_method) + # Avoid nil + converter_class = 'org.apache.hadoop.hbase.util.Bytes' unless converter_class + converter_method = 'toStringBinary' unless converter_method + eval(converter_class).method(converter_method).call(bytes, offset, len) end # if the column spec contains CONVERTER information, to get rid of :CONVERTER info from column pair. @@ -710,13 +802,14 @@ EOF #---------------------------------------------------------------------------------------------- # Get the split points for the table - def _get_splits_internal() - locator = @table.getRegionLocator() - splits = locator.getAllRegionLocations(). - map{|i| Bytes.toStringBinary(i.getRegionInfo().getStartKey)}.delete_if{|k| k == ""} - locator.close() - puts("Total number of splits = %s" % [splits.size + 1]) - return splits + def _get_splits_internal + locator = @table.getRegionLocator + splits = locator.getAllRegionLocations + .map { |i| Bytes.toStringBinary(i.getRegionInfo.getStartKey) }.delete_if { |k| k == '' } + locator.close + puts(format('Total number of splits = %s', splits.size + 1)) + puts splits + splits end end end diff --git a/hbase-shell/src/main/ruby/shell/commands/delete.rb b/hbase-shell/src/main/ruby/shell/commands/delete.rb index dcb8341..6995959 100644 --- a/hbase-shell/src/main/ruby/shell/commands/delete.rb +++ b/hbase-shell/src/main/ruby/shell/commands/delete.rb @@ -21,7 +21,7 @@ module Shell module Commands class Delete < Command def help - return <<-EOF + <<-EOF Put a delete cell value at specified table/row/column and optionally timestamp coordinates. Deletes must match the deleted cell's coordinates exactly. When scanning, a delete cell suppresses older @@ -40,20 +40,19 @@ t to table 't1', the corresponding command would be: EOF end - def command(table, row, column, - timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {}) + def command(table, row, column = nil, + timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {}) delete(table(table), row, column, timestamp, args) end - def delete(table, row, column, - timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {}) - format_simple_command do - table._delete_internal(row, column, timestamp, args) - end + def delete(table, row, column = nil, + timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {}) + @start_time = Time.now + table._delete_internal(row, column, timestamp, args, false) end end end end -#Add the method table.delete that calls delete.delete -::Hbase::Table.add_shell_command("delete") +# Add the method table.delete that calls delete.delete +::Hbase::Table.add_shell_command('delete') diff --git a/hbase-shell/src/main/ruby/shell/commands/deleteall.rb b/hbase-shell/src/main/ruby/shell/commands/deleteall.rb index e6118c9..f18fa05 100644 --- a/hbase-shell/src/main/ruby/shell/commands/deleteall.rb +++ b/hbase-shell/src/main/ruby/shell/commands/deleteall.rb @@ -21,9 +21,10 @@ module Shell module Commands class Deleteall < Command def help - return <<-EOF + <<-EOF Delete all cells in a given row; pass a table name, row, and optionally -a column and timestamp. Examples: +a column and timestamp. Deleteall also support deleting a row range using a +row key prefix. Examples: hbase> deleteall 'ns1:t1', 'r1' hbase> deleteall 't1', 'r1' @@ -31,13 +32,21 @@ a column and timestamp. Examples: hbase> deleteall 't1', 'r1', 'c1', ts1 hbase> deleteall 't1', 'r1', 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'} +ROWPREFIXFILTER can be used to delete row ranges + hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'} + hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'}, 'c1' //delete certain column family in the row ranges + hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'}, 'c1', ts1 + hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'}, 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'} + +CACHE can be used to specify how many deletes batched to be sent to server at one time, default is 100 + hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix', CACHE => 100} + + The same commands also can be run on a table reference. Suppose you had a reference t to table 't1', the corresponding command would be: - hbase> t.deleteall 'r1' - hbase> t.deleteall 'r1', 'c1' - hbase> t.deleteall 'r1', 'c1', ts1 hbase> t.deleteall 'r1', 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'} + hbase> t.deleteall {ROWPREFIXFILTER => 'prefix', CACHE => 100}, 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'} EOF end @@ -48,13 +57,12 @@ EOF def deleteall(table, row, column = nil, timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {}) - format_simple_command do - table._deleteall_internal(row, column, timestamp, args) - end + @start_time = Time.now + table._deleteall_internal(row, column, timestamp, args, true) end end end end -#Add the method table.deleteall that calls deleteall.deleteall -::Hbase::Table.add_shell_command("deleteall") +# Add the method table.deleteall that calls deleteall.deleteall +::Hbase::Table.add_shell_command('deleteall') diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb index b6801da..81d0a9a 100644 --- a/hbase-shell/src/test/ruby/hbase/table_test.rb +++ b/hbase-shell/src/test/ruby/hbase/table_test.rb @@ -17,7 +17,7 @@ # limitations under the License. # -require 'hbase' +require 'hbase_constants' include HBaseConstants @@ -106,20 +106,18 @@ module Hbase @test_table = table(@test_name) # Insert data to perform delete operations - @test_table.put("101", "x:a", "1") - @test_table.put("101", "x:a", "2", Time.now.to_i) - - @test_table.put("102", "x:a", "1", 1212) - @test_table.put("102", "x:a", "2", 1213) - - @test_table.put(103, "x:a", "3") - @test_table.put(103, "x:a", "4") - + @test_table.put("102", "x:a", "2", 1212) + @test_table.put(103, "x:a", "3", 1214) + @test_table.put("104", "x:a", 5) @test_table.put("104", "x:b", 6) - + @test_table.put(105, "x:a", "3") @test_table.put(105, "x:a", "4") + + @test_table.put("111", "x:a", "5") + @test_table.put("111", "x:b", "6") + @test_table.put("112", "x:a", "5") end def teardown @@ -148,21 +146,16 @@ module Hbase end #------------------------------------------------------------------------------- - - define_test "delete should work without timestamp" do - @test_table.delete("101", "x:a") - res = @test_table._get_internal('101', 'x:a') - assert_nil(res) - end - - define_test "delete should work with timestamp" do - @test_table.delete("102", "x:a", 1214) + define_test "delete should work with string keys" do + @test_table.delete('102', 'x:a', 1212) res = @test_table._get_internal('102', 'x:a') assert_nil(res) end define_test "delete should work with integer keys" do - @test_table.delete(103, "x:a") + res = @test_table._get_internal('103', 'x:a') + assert_not_nil(res) + @test_table.delete(103, 'x:a', 1214) res = @test_table._get_internal('103', 'x:a') assert_nil(res) end @@ -181,6 +174,14 @@ module Hbase assert_nil(res) end + define_test "deletall should work with row prefix" do + @test_table.deleteall({ROWPREFIXFILTER => '11'}) + res1 = @test_table._get_internal('111') + assert_nil(res1) + res2 = @test_table._get_internal('112') + assert_nil(res2) + end + define_test "append should work with value" do @test_table.append("123", 'x:cnt2', '123') assert_equal("123123", @test_table._append_internal("123", 'x:cnt2', '123')) @@ -238,6 +239,31 @@ module Hbase assert(!rows.empty?) end + define_test "count should support STARTROW parameter" do + count = @test_table.count STARTROW => '4' + assert(count == 0) + end + + define_test "count should support STOPROW parameter" do + count = @test_table.count STOPROW => '0' + assert(count == 0) + end + + define_test "count should support COLUMNS parameter" do + @test_table.put(4, "x:c", "31") + begin + count = @test_table.count COLUMNS => [ 'x:c'] + assert(count == 1) + ensure + @test_table.deleteall(4, 'x:c') + end + end + + define_test "count should support FILTER parameter" do + count = @test_table.count FILTER => "ValueFilter(=, 'binary:11')" + assert(count == 1) + end + #------------------------------------------------------------------------------- define_test "get should work w/o columns specification" do @@ -376,8 +402,8 @@ module Hbase assert_not_nil(/value=98/.match(res['x:d'])) ensure # clean up newly added columns for this test only. - @test_table.delete(1, "x:c") - @test_table.delete(1, "x:d") + @test_table.deleteall(1, 'x:c') + @test_table.deleteall(1, 'x:d') end end @@ -393,7 +419,7 @@ module Hbase assert_nil(res) ensure # clean up newly added columns for this test only. - @test_table.delete(1, "x:v") + @test_table.deleteall(1, 'x:v') end end @@ -576,8 +602,8 @@ module Hbase assert_not_nil(/value=98/.match(res['1']['x:d'])) ensure # clean up newly added columns for this test only. - @test_table.delete(1, "x:c") - @test_table.delete(1, "x:d") + @test_table.deleteall(1, 'x:c') + @test_table.deleteall(1, 'x:d') end end @@ -595,7 +621,7 @@ module Hbase assert_equal(res, {}, "Result is not empty") ensure # clean up newly added columns for this test only. - @test_table.delete(1, "x:v") + @test_table.deleteall(1, 'x:v') end end @@ -611,10 +637,15 @@ module Hbase assert_nil(res['2']) ensure # clean up newly added columns for this test only. - @test_table.delete(4, "x:a") + @test_table.deleteall(4, 'x:a') end end + define_test "scan hbase meta table" do + res = table("hbase:meta")._scan_internal + assert_not_nil(res) + end + define_test "mutation with TTL should expire" do @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } ) begin @@ -624,7 +655,7 @@ module Hbase res = @test_table._get_internal('ttlTest', 'x:a') assert_nil(res) ensure - @test_table.delete('ttlTest', 'x:a') + @test_table.deleteall('ttlTest', 'x:a') end end -- 1.9.2.msysgit.0