diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 025473b..d522028 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -19,23 +19,28 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import javax.ws.rs.HEAD; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - /** * Tool used to copy a table to another one which can be on a different setup. * It is also configurable with a start and time as well as a specification @@ -50,11 +55,13 @@ public class CopyTable extends Configured implements Tool { static String tableName = null; static String startRow = null; static String stopRow = null; + static boolean useBytesBinary = false; static String newTableName = null; static String peerAddress = null; static String families = null; static boolean allCells = false; - + static Filter filter = null; + public CopyTable(Configuration conf) { super(conf); } @@ -85,15 +92,31 @@ public class CopyTable extends Configured implements Tool { if (versions >= 0) { scan.setMaxVersions(versions); } - + if (startRow != null) { - scan.setStartRow(Bytes.toBytes(startRow)); + byte[] row; + if (useBytesBinary) { + row = Bytes.toBytesBinary(startRow); + } else { + row = Bytes.toBytes(startRow); + } + scan.setStartRow(row); } - + if (stopRow != null) { - scan.setStopRow(Bytes.toBytes(stopRow)); + byte[] row; + if (useBytesBinary) { + row = Bytes.toBytesBinary(stopRow); + } else { + row = Bytes.toBytes(stopRow); + } + scan.setStopRow(row); + } + + if (filter != null) { + scan.setFilter(filter); } - + if(families != null) { String[] fams = families.split(","); Map cfRenameMap = new HashMap(); @@ -138,6 +161,8 @@ public class CopyTable extends Configured implements Tool { System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); System.err.println(" startrow the start row"); System.err.println(" stoprow the stop row"); + System.err.println(" bytesBinary Use bytes binary for specifying"); + System.err.println(" startrow and stoprow keys"); System.err.println(" starttime beginning of the time range (unixtime in millis)"); System.err.println(" without endtime means from starttime to forever"); System.err.println(" endtime end of the time range. Ignored if no starttime specified."); @@ -149,6 +174,8 @@ public class CopyTable extends Configured implements Tool { System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); System.err.println(" To keep the same name, just give \"cfName\""); System.err.println(" all.cells also copy delete markers and deleted cells"); + System.err.println(" filter Filter to use. Specify the file name "); + System.err.println(" that contains the filter serialized"); System.err.println(); System.err.println("Args:"); System.err.println(" tablename Name of the table to copy"); @@ -177,19 +204,24 @@ public class CopyTable extends Configured implements Tool { printUsage(null); return false; } - + final String startRowArgKey = "--startrow="; if (cmd.startsWith(startRowArgKey)) { startRow = cmd.substring(startRowArgKey.length()); continue; } - + final String stopRowArgKey = "--stoprow="; if (cmd.startsWith(stopRowArgKey)) { stopRow = cmd.substring(stopRowArgKey.length()); continue; } - + + if (cmd.equals("--bytesBinary")) { + useBytesBinary = true; + continue; + } + final String startTimeArgKey = "--starttime="; if (cmd.startsWith(startTimeArgKey)) { startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); @@ -231,6 +263,18 @@ public class CopyTable extends Configured implements Tool { continue; } + final String filterArgKey = "--filter="; + if (cmd.startsWith(filterArgKey)) { + String filterFile = cmd.substring(filterArgKey.length()); + filter = deserializeFilter(filterFile); + if (filter == null) { + printUsage("Couldn't parse filter from file: " + filterFile); + return false; + } else { + continue; + } + } + if (i == args.length-1) { tableName = cmd; } else { @@ -256,6 +300,51 @@ public class CopyTable extends Configured implements Tool { } /** + * Create filter from a file containing the filter serialized + * @param filterFile file containing the filter + * @return filter + * @throws Exception + */ + public static Filter deserializeFilter(String filterFile) throws Exception { + ObjectInputStream input = null; + try { + input = new ObjectInputStream(new FileInputStream (filterFile)); + Filter filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(input))); + filter.readFields(input); + return filter; + } finally { + if (input != null) { + input.close(); + } + } + } + + @SuppressWarnings("unchecked") + private static Writable createForName(String className) { + try { + Class clazz = (Class) Class.forName( + className); + return WritableFactories.newInstance(clazz, new Configuration()); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Can't find class " + className); + } + } + + public static void serializeFilter(Filter filter, String filterFile) throws + Exception { + ObjectOutputStream output = null; + try { + output = new ObjectOutputStream(new FileOutputStream(filterFile)); + Bytes.writeByteArray(output, Bytes.toBytes(filter.getClass().getName())); + filter.write(output); + } finally { + if (output != null) { + output.close(); + } + } + } + + /** * Main entry point. * * @param args The command line parameters. diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index e93d781..f532704 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -131,15 +131,15 @@ public class TestCopyTable { Result r = t2.get(g); assertEquals(1, r.size()); assertTrue(Bytes.equals(COLUMN1, r.raw()[0].getQualifier())); - + g = new Get(ROW0); r = t2.get(g); assertEquals(0, r.size()); - + g = new Get(ROW2); r = t2.get(g); assertEquals(0, r.size()); - + t1.close(); t2.close(); TEST_UTIL.deleteTable(TABLENAME1); diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableSmallTests.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableSmallTests.java new file mode 100644 index 0000000..48ea94d --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableSmallTests.java @@ -0,0 +1,26 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestCopyTableSmallTests { + + @Test + public void testSerializeAndDeserializeFilter() throws Exception { + ColumnCountGetFilter testFilter = new ColumnCountGetFilter(1); + String file = "filter-test.bin"; + CopyTable.serializeFilter(testFilter, file); + ColumnCountGetFilter result = (ColumnCountGetFilter) CopyTable.deserializeFilter( + file); + assertEquals(1, result.getLimit()); + + // Delete the created file + new File(file).delete(); + } +}