Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java (revision 0) @@ -0,0 +1,217 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; + +@Category(MediumTests.class) +public class TestImportExport { + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final String FAMILYA_STRING = "a"; + private static final String FAMILYB_STRING = "b"; + private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); + private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); + private static final byte[] QUAL = Bytes.toBytes("q"); + private static final String OUTPUT_DIR = "outputdir"; + + private static MiniHBaseCluster cluster; + private static long now = System.currentTimeMillis(); + + @BeforeClass + public static void beforeClass() throws Exception { + cluster = UTIL.startMiniCluster(); + UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + @Before + @After + public void cleanup() throws Exception { + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + fs.delete(new Path(OUTPUT_DIR), true); + } + + /** + * Test simple replication case with column mapping + * @throws Exception + */ + @Test + public void testSimpleCase() throws Exception { + String EXPORT_TABLE = "exportSimpleCase"; + HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA); + Put p = new Put(ROW1); + p.add(FAMILYA, QUAL, now, QUAL); + p.add(FAMILYA, QUAL, now+1, QUAL); + p.add(FAMILYA, QUAL, now+2, QUAL); + t.put(p); + p = new Put(ROW2); + p.add(FAMILYA, QUAL, now, QUAL); + p.add(FAMILYA, QUAL, now+1, QUAL); + p.add(FAMILYA, QUAL, now+2, QUAL); + t.put(p); + + String[] args = new String[] { + EXPORT_TABLE, + OUTPUT_DIR, + "1000" + }; + + GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + + Job job = Export.createSubmittableJob(conf, args); + job.waitForCompletion(false); + assertTrue(job.isSuccessful()); + + + String IMPORT_TABLE = "importTableSimpleCase"; + t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB); + args = new String[] { + "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, + IMPORT_TABLE, + OUTPUT_DIR + }; + + opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); + conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + + job = Import.createSubmittableJob(conf, args); + job.waitForCompletion(false); + assertTrue(job.isSuccessful()); + + Get g = new Get(ROW1); + g.setMaxVersions(); + Result r = t.get(g); + assertEquals(3, r.size()); + g = new Get(ROW2); + g.setMaxVersions(); + r = t.get(g); + assertEquals(3, r.size()); + } + + @Test + public void testWithDeletes() throws Exception { + String EXPORT_TABLE = "exportWithDeletes"; + HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE); + desc.addFamily(new HColumnDescriptor(FAMILYA, + HColumnDescriptor.DEFAULT_MIN_VERSIONS, + 5, /* versions */ + true /* keep deleted cells */, + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_BLOCKSIZE, + HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HConstants.REPLICATION_SCOPE_LOCAL)); + UTIL.getHBaseAdmin().createTable(desc); + HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE); + + Put p = new Put(ROW1); + p.add(FAMILYA, QUAL, now, QUAL); + p.add(FAMILYA, QUAL, now+1, QUAL); + p.add(FAMILYA, QUAL, now+2, QUAL); + p.add(FAMILYA, QUAL, now+3, QUAL); + p.add(FAMILYA, QUAL, now+4, QUAL); + t.put(p); + + Delete d = new Delete(ROW1, now+3, null); + t.delete(d); + d = new Delete(ROW1); + d.deleteColumns(FAMILYA, QUAL, now+2); + t.delete(d); + + String[] args = new String[] { + "-D" + Export.RAW_SCAN + "=true", + EXPORT_TABLE, + OUTPUT_DIR, + "1000" + }; + + GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); + Configuration conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + + Job job = Export.createSubmittableJob(conf, args); + job.waitForCompletion(false); + assertTrue(job.isSuccessful()); + + + String IMPORT_TABLE = "importWithDeletes"; + desc = new HTableDescriptor(IMPORT_TABLE); + desc.addFamily(new HColumnDescriptor(FAMILYA, + HColumnDescriptor.DEFAULT_MIN_VERSIONS, + 5, /* versions */ + true /* keep deleted cells */, + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_BLOCKSIZE, + HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HConstants.REPLICATION_SCOPE_LOCAL)); + UTIL.getHBaseAdmin().createTable(desc); + t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE); + args = new String[] { + IMPORT_TABLE, + OUTPUT_DIR + }; + + opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); + conf = opts.getConfiguration(); + args = opts.getRemainingArgs(); + + job = Import.createSubmittableJob(conf, args); + job.waitForCompletion(false); + assertTrue(job.isSuccessful()); + + Scan s = new Scan(); + s.setMaxVersions(); + s.setRaw(true); + ResultScanner scanner = t.getScanner(s); + Result r = scanner.next(); + KeyValue[] res = r.raw(); + assertTrue(res[0].isDeleteFamily()); + assertEquals(now+4, res[1].getTimestamp()); + assertEquals(now+3, res[2].getTimestamp()); + assertTrue(res[3].isDelete()); + assertEquals(now+2, res[4].getTimestamp()); + assertEquals(now+1, res[5].getTimestamp()); + assertEquals(now, res[6].getTimestamp()); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/Delete.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Delete.java (revision 1212005) +++ src/main/java/org/apache/hadoop/hbase/client/Delete.java (working copy) @@ -119,6 +119,25 @@ } /** + * Advanced use only. Create a Delete object based on a KeyValue + * of type "delete". + * @param kv + * @throws IOException + */ + public Delete(KeyValue kv) throws IOException { + this(kv.getRow(), kv.getTimestamp(), null); + if (!kv.isDelete()) { + throw new IOException("The recently added KeyValue is not of type " + + "delete. Rowkey: " + Bytes.toStringBinary(this.row)); + } + // can't use singletonList, because this might be modified at the server by + // coprocessors + ArrayList list = new ArrayList(1); + list.add(kv); + familyMap.put(kv.getFamily(), list); + } + + /** * Delete all versions of all columns of the specified family. *

* Overrides previous calls to deleteColumn and deleteColumns for the Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1212005) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; @@ -165,6 +164,9 @@ addFamily(fam); } } + for (Map.Entry attr : scan.getAttributesMap().entrySet()) { + setAttribute(attr.getKey(), attr.getValue()); + } } /** Index: src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (revision 1212005) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (working copy) @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -47,7 +49,7 @@ * Write table content out to files in hdfs. */ static class Importer - extends TableMapper { + extends TableMapper { private Map cfRenameMap; /** @@ -63,15 +65,15 @@ Context context) throws IOException { try { - context.write(row, resultToPut(row, value)); + writeResult(row, value, context); } catch (InterruptedException e) { e.printStackTrace(); } } - private Put resultToPut(ImmutableBytesWritable key, Result result) - throws IOException { - Put put = new Put(key.get()); + private void writeResult(ImmutableBytesWritable key, Result result, Context context) + throws IOException, InterruptedException { + Put put = null; for (KeyValue kv : result.raw()) { if(cfRenameMap != null) { // If there's a rename mapping for this CF, create a new KeyValue @@ -93,11 +95,20 @@ kv.getValueLength()); // value length } } - put.add(kv); + if (kv.isDelete()) { + context.write(key, new Delete(kv)); + } else { + if (put == null) { + put = new Put(key.get()); + } + put.add(kv); + } } - return put; + if (put != null) { + context.write(key, put); + } } - + @Override public void setup(Context context) { // Make a map from sourceCfName to destCfName by parsing a config key Index: src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (revision 1212005) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (working copy) @@ -48,6 +48,7 @@ public class Export { private static final Log LOG = LogFactory.getLog(Export.class); final static String NAME = "export"; + final static String RAW_SCAN="hbase.mapreduce.include.deleted.rows"; /** * Mapper. @@ -115,6 +116,11 @@ // Set cache blocks s.setCacheBlocks(false); // Set Scan Column Family + boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); + if (raw) { + s.setRaw(raw); + } + if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) { s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY))); } @@ -124,8 +130,8 @@ LOG.info("Setting Scan Filter for Export."); s.setFilter(exportFilter); } - LOG.info("verisons=" + versions + ", starttime=" + startTime + - ", endtime=" + endTime); + LOG.info("versions=" + versions + ", starttime=" + startTime + + ", endtime=" + endTime + ", keepDeletedCells=" + raw); return s; } @@ -159,6 +165,7 @@ System.err.println(" Additionally, the following SCAN properties can be specified"); System.err.println(" to control/limit what is exported.."); System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "="); + System.err.println(" -D " + RAW_SCAN + "=true"); } /**