diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index ed41338..7ba8133 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -451,6 +451,10 @@ public class Import { usage("Wrong number of arguments: " + otherArgs.length); System.exit(-1); } + String inputVersionString = System.getProperty(ResultSerialization.INPUT_FORMT_VER); + if (inputVersionString != null) { + conf.set(ResultSerialization.INPUT_FORMT_VER, inputVersionString); + } Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java index b7a7756..ae2945e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java @@ -17,18 +17,32 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.Serialization; import org.apache.hadoop.io.serializer.Serializer; -public class ResultSerialization implements Serialization { +public class ResultSerialization extends Configured implements Serialization { + private static final Log LOG = LogFactory.getLog(ResultSerialization.class); + public static final String INPUT_FORMT_VER = "hbase.input.version"; + @Override public boolean accept(Class c) { return Result.class.isAssignableFrom(c); @@ -36,6 +50,16 @@ public class ResultSerialization implements Serialization { @Override public Deserializer getDeserializer(Class c) { + // check input format version + Configuration conf = getConf(); + if (conf != null) { + String inputVersion = conf.get(INPUT_FORMT_VER); + if (inputVersion != null && inputVersion.equals("0.94")) { + LOG.info("Load exported file using deserializer for HBase 0.94 format"); + return new Result94Deserializer(); + } + } + return new ResultDeserializer(); } @@ -44,6 +68,52 @@ public class ResultSerialization implements Serialization { return new ResultSerializer(); } + /** + * The following deserializer class is used to load exported file of 0.94 + */ + private static class Result94Deserializer implements Deserializer { + private DataInputStream in; + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public Result deserialize(Result mutation) throws IOException { + int totalBuffer = in.readInt(); + if (totalBuffer == 0) { + return Result.EMPTY_RESULT; + } + byte[] buf = new byte[totalBuffer]; + readChunked(in, buf, 0, totalBuffer); + List kvs = new ArrayList(); + int offset = 0; + while (offset < totalBuffer) { + int keyLength = Bytes.toInt(buf, offset); + offset += Bytes.SIZEOF_INT; + kvs.add(new KeyValue(buf, offset, keyLength)); + offset += keyLength; + } + return Result.create(kvs); + } + + @Override + public void open(InputStream in) throws IOException { + if (!(in instanceof DataInputStream)) { + throw new IOException("Wrong input stream instance passed in"); + } + this.in = (DataInputStream) in; + } + + private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException { + int maxRead = 8192; + + for (; ofs < len; ofs += maxRead) + in.readFully(dest, ofs, Math.min(len - ofs, maxRead)); + } + } + private static class ResultDeserializer implements Deserializer { private InputStream in; @@ -54,8 +124,7 @@ public class ResultSerialization implements Serialization { @Override public Result deserialize(Result mutation) throws IOException { - ClientProtos.Result proto = - ClientProtos.Result.parseDelimitedFrom(in); + ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in); return ProtobufUtil.toResult(proto); } @@ -63,8 +132,8 @@ public class ResultSerialization implements Serialization { public void open(InputStream in) throws IOException { this.in = in; } - } + private static class ResultSerializer implements Serializer { private OutputStream out; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 21b6655..9f05645 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -21,10 +21,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -66,8 +71,6 @@ import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.mockito.Mockito.*; - /** * Tests the table import and table export MR job functionality */ @@ -189,15 +192,35 @@ public class TestImportExport { } /** - * Test export hbase:meta table - * + * Test import data from 0.94 exported file * @throws Exception */ @Test - public void testMetaExport() throws Exception { - String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString(); - String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" }; - assertTrue(runExport(args)); + public void testImport94Table() throws Exception { + URL url = TestImportExport.class.getResource( + "exportedTableIn94Format"); + Path importPath = new Path(url.getPath()); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + + "exportedTableIn94Format")); + String IMPORT_TABLE = "importTableExportedFrom94"; + HTable t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), Bytes.toBytes("f1"), 3); + String[] args = new String[] { + "-Dhbase.input.version=0.94" , + IMPORT_TABLE, FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + + /* exportedTableIn94Format contains 5 rows + ROW COLUMN+CELL + r1 column=f1:c1, timestamp=1383766761171, value=val1 + r2 column=f1:c1, timestamp=1383766771642, value=val2 + r3 column=f1:c1, timestamp=1383766777615, value=val3 + r4 column=f1:c1, timestamp=1383766785146, value=val4 + r5 column=f1:c1, timestamp=1383766791506, value=val5 + */ + assertEquals(5, UTIL.countRows(t)); + t.close(); } /** diff --git hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format new file mode 100755 index 0000000..762ddd7 Binary files /dev/null and hbase-server/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format differ