Uploaded image for project: 'HBase'
  1. HBase
  2. HBASE-23062

Use TableInputFormat to read data from Hbase, when Scan.setCaching(size) the size is too big, some rowkeys will lost without exctpions.

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.98.6.1
    • Fix Version/s: 1.2.5
    • Component/s: None
    • Labels:
      None

      Description

      Hbase server cluster version is 1.2.5 . Data will be lost when the hbase client version is 0.98.6.1. When the hbase client version is 1.2.5, no data will be lost.

      I did the experiment in three ways. One way I use spark to read hbase, second way I use mapreduce to read hbase. In both cases, when I increase the Scan Caching size, some data will be lost. To be more accurately, When I set scan.setCaching(500), I can receive 7622 rows of data, but when I set scan.setCaching(50000), I can receive only 4226 rows of data.  Third way I use Scan to read hbase directly, caching size does not affect the result, I can always receive 7622 rows of data.

      The seriousness of the problem is that the data is lost but there is no exceptions, it is difficult to find the reason.

      My spark code is like this:

      Configuration hbaseConfiguration = HBaseConfiguration.create();
      hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
      hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
      hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
      hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
      hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
      hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
      final Scan hbaseScan = new Scan();
      hbaseScan.addFamily(familyName);
      hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
      for(String[] cell:cellNames){ 
        String column = cell[0]; 
        hbaseScan.addColumn(familyName,Bytes.toBytes(column));
      }
      hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
      hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
      try { 
        ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); 
        hbaseConfiguration.set(TableInputFormat.SCAN, Base64.encodeBytes(scanProto.toByteArray()));
      JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( hbaseConfiguration,TableInputFormat.class, ImmutableBytesWritable.class, Result.class );
        System.out.println("pairRDD.count(): " + pairRDD.count());
      } 
      catch (IOException e) { 
        System.out.println("Scan Exception!!!!!! " + e.getMessage());
      }
      

      My mapreduce code is like this:

      static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
         @Override protected void map(ImmutableBytesWritable key, Result value,Mapper.Context context) throws IOException, InterruptedException {
            for(Cell cell :value.rawCells()){ 
              context.write(new ImmutableBytesWritable("A".getBytes()),new Text("max")); 
            } 
         }
      }
      public static void main(String[] args) throws Exception { 
      org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); 
      hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); 
      hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
      hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
      hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); 
      hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
      Job job = Job.getInstance(hbaseConfiguration); 
      job.setJarByClass(App.class);
      List<Scan> list = new ArrayList<Scan>(); 
      Scan scan = new Scan(); 
      scan.addFamily(Bytes.toBytes(familyName)); 
      scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
      for (String[] cell : cellNames) { 
        String column = cell[0]; 
        scan.addColumn(familyName,Bytes.toBytes(column)); 
      } 
      scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
      scan.setStopRow(Bytes.toBytes(endRowkeyStr)); 
      scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(hbaseTableName)); 
      list.add(scan);
      System.out.println("size: "+list.size()); 
      TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class, job);
      job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); 
      job.setOutputKeyClass(ImmutableBytesWritable.class); 
      job.setOutputValueClass(Text.class); 
      FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); 
      System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

      The pom.xml for mapreduce code is like this:

      pom.xml

       Third way code is like this:

      public static void main(String[] args) throws Exception{
       org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create();
       hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
       hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
       hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
       hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
       hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
       Connection conn = ConnectionFactory.createConnection(hbaseConfiguration);
       HTable table = (HTable) conn.getTable(TableName.valueOf(hbaseTableName));
       Long res = 0l;
       final Scan hbaseScan = new Scan();
       hbaseScan.addFamily(Bytes.toBytes(familyName));
       hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
       for (String[] cell : cellNames) {
           String column = cell[0];
           hbaseScan.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column)); 
      }
       hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
       hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
       try {
       ResultScanner scanner = table.getScanner(hbaseScan);
       Iterator<Result> it = scanner.iterator();
       while (it.hasNext()) {
           res++;
           Result r = it.next();
       }
       scanner.close(); 
      } catch (IOException e) {
       System.out.println("Scan Exception!!!!!! " + e.getMessage()); 
      }
       System.out.println("Successful!");
      }
      

        Attachments

        1. pom.xml
          4 kB
          ZhanxiongWang

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              maxzxwang ZhanxiongWang
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: