diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index ed41338..7ba8133 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -451,6 +451,10 @@ public class Import { usage("Wrong number of arguments: " + otherArgs.length); System.exit(-1); } + String inputVersionString = System.getProperty(ResultSerialization.INPUT_FORMT_VER); + if (inputVersionString != null) { + conf.set(ResultSerialization.INPUT_FORMT_VER, inputVersionString); + } Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java index b7a7756..ae2945e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java @@ -17,18 +17,32 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.Serialization; import org.apache.hadoop.io.serializer.Serializer; -public class ResultSerialization implements Serialization { +public class ResultSerialization extends Configured implements Serialization { + private static final Log LOG = LogFactory.getLog(ResultSerialization.class); + public static final String INPUT_FORMT_VER = "hbase.input.version"; + @Override public boolean accept(Class c) { return Result.class.isAssignableFrom(c); @@ -36,6 +50,16 @@ public class ResultSerialization implements Serialization { @Override public Deserializer getDeserializer(Class c) { + // check input format version + Configuration conf = getConf(); + if (conf != null) { + String inputVersion = conf.get(INPUT_FORMT_VER); + if (inputVersion != null && inputVersion.equals("0.94")) { + LOG.info("Load exported file using deserializer for HBase 0.94 format"); + return new Result94Deserializer(); + } + } + return new ResultDeserializer(); } @@ -44,6 +68,52 @@ public class ResultSerialization implements Serialization { return new ResultSerializer(); } + /** + * The following deserializer class is used to load exported file of 0.94 + */ + private static class Result94Deserializer implements Deserializer { + private DataInputStream in; + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public Result deserialize(Result mutation) throws IOException { + int totalBuffer = in.readInt(); + if (totalBuffer == 0) { + return Result.EMPTY_RESULT; + } + byte[] buf = new byte[totalBuffer]; + readChunked(in, buf, 0, totalBuffer); + List kvs = new ArrayList(); + int offset = 0; + while (offset < totalBuffer) { + int keyLength = Bytes.toInt(buf, offset); + offset += Bytes.SIZEOF_INT; + kvs.add(new KeyValue(buf, offset, keyLength)); + offset += keyLength; + } + return Result.create(kvs); + } + + @Override + public void open(InputStream in) throws IOException { + if (!(in instanceof DataInputStream)) { + throw new IOException("Wrong input stream instance passed in"); + } + this.in = (DataInputStream) in; + } + + private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException { + int maxRead = 8192; + + for (; ofs < len; ofs += maxRead) + in.readFully(dest, ofs, Math.min(len - ofs, maxRead)); + } + } + private static class ResultDeserializer implements Deserializer { private InputStream in; @@ -54,8 +124,7 @@ public class ResultSerialization implements Serialization { @Override public Result deserialize(Result mutation) throws IOException { - ClientProtos.Result proto = - ClientProtos.Result.parseDelimitedFrom(in); + ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in); return ProtobufUtil.toResult(proto); } @@ -63,8 +132,8 @@ public class ResultSerialization implements Serialization { public void open(InputStream in) throws IOException { this.in = in; } - } + private static class ResultSerializer implements Serializer { private OutputStream out;