diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index dd148ba..2ed1c91 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -19,17 +19,14 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.TreeSet; - +import com.google.common.base.Preconditions; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.Encoder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; @@ -50,64 +47,93 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; /** * Writes HFiles. Passed KeyValues must arrive in order. - * Currently, can only write files to a single column family at a - * time. Multiple column families requires coordinating keys cross family. + * Multiple column families requires coordinating keys cross family. * Writes current time as the sequence id for the file. Sets the major compacted * attribute on created hfiles. + * * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat { - static Log LOG = LogFactory.getLog(HFileOutputFormat.class); - + private static final Log LOG = LogFactory.getLog(HFileOutputFormat.class); + + @Override public RecordWriter getRecordWriter(final TaskAttemptContext context) - throws IOException, InterruptedException { + throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); - final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); Configuration conf = context.getConfiguration(); - final FileSystem fs = outputdir.getFileSystem(conf); + final FileSystem fs = outputDir.getFileSystem(conf); // These configs. are from hbase-*.xml - final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456); - final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536); + final long maxsize = conf.getLong("hbase.hregion.max.filesize", HConstants.DEFAULT_MAX_FILE_SIZE); + final int blocksize = conf.getInt("hfile.min.blocksize.size", HFile.DEFAULT_BLOCKSIZE); // Invented config. Add to hbase-*.xml if other than default compression. final String compression = conf.get("hfile.compression", - Compression.Algorithm.NONE.getName()); + Compression.Algorithm.NONE.getName()); return new RecordWriter() { // Map of families to writers and how much has been output on the writer. - private final Map writers = - new TreeMap(Bytes.BYTES_COMPARATOR); - private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); + private final Map writers = + new TreeMap(Bytes.BYTES_COMPARATOR); + private byte[] startRow; + private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; + private final byte[] now = Bytes.toBytes(System.currentTimeMillis()); + private boolean rotateRequested = false; + @Override public void write(ImmutableBytesWritable row, KeyValue kv) - throws IOException { - long length = kv.getLength(); - byte [] family = kv.getFamily(); + throws IOException { + + byte[] rowKey = kv.getRow(); + + if (rotateRequested && Bytes.compareTo(previousRow, rowKey) != 0) { + /* This means that any of the HFiles for the column families has reached + * maxsize and thus all need to be closed. + * This can only happen once a row is finished though */ + + writeMetaData(); + startRow = null; + rotateRequested = false; + } + + int length = kv.getLength(); + byte[] family = kv.getFamily(); WriterLength wl = this.writers.get(family); - if (wl == null || ((length + wl.written) >= maxsize) && - Bytes.compareTo(this.previousRow, 0, this.previousRow.length, - kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) { - // Get a new writer. - Path basedir = new Path(outputdir, Bytes.toString(family)); - if (wl == null) { - wl = new WriterLength(); - this.writers.put(family, wl); - if (this.writers.size() > 1) throw new IOException("One family only"); - // If wl == null, first file in family. Ensure family dir exits. - if (!fs.exists(basedir)) fs.mkdirs(basedir); + + // If this is a new column family check if the directory exists + if (wl == null) { + Path basedir = new Path(outputDir, Bytes.toString(family)); + if (!fs.exists(basedir)) { + fs.mkdirs(basedir); } - wl.writer = getNewWriter(wl.writer, basedir); - LOG.info("Writer=" + wl.writer.getPath() + - ((wl.written == 0)? "": ", wrote=" + wl.written)); + } + + if (wl == null || wl.writer == null) { + wl = new WriterLength(); + Path basedir = new Path(outputDir, Bytes.toString(family)); wl.written = 0; + wl.writer = getNewWriter(wl.writer, basedir); + this.writers.put(family, wl); + + if (startRow == null) { + startRow = rowKey; + } + } + + if (length + wl.written >= maxsize && + Bytes.compareTo(previousRow, rowKey) != 0) { + rotateRequested = true; + } + + if (Bytes.compareTo(startRow, rowKey) < 0) { + startRow = rowKey; } kv.updateLatestStamp(this.now); wl.writer.append(kv); @@ -116,6 +142,50 @@ public class HFileOutputFormat extends FileOutputFormat + *
  • start key
  • + *
  • end key
  • + *
  • An array of strings with the full path to the generated HFiles
  • + * + * + * @throws IOException + */ + private void writeMetaData() throws IOException { + Path metaDataPath = StoreFile.getUniqueFile(fs, outputDir); + FSDataOutputStream metaData = fs.create(metaDataPath); + Encoder enc = new BinaryEncoder(metaData); + enc.writeBytes(startRow); + enc.writeBytes(previousRow); + + Collection paths = new ArrayList(); + for (Map.Entry wlEntry : writers.entrySet()) { + WriterLength wl = wlEntry.getValue(); + /* We only open new HFiles once a value is actually written so these + * may be null for some column families */ + if (wl.writer == null) { + continue; + } + close(wl.writer); + + paths.add(wl.writer.getPath().toString()); + LOG.info("Writer=" + wl.writer.getPath() + + (wl.written == 0 ? "" : ", wrote=" + wl.written)); + wl.writer = null; + } + + enc.writeArrayStart(); + enc.setItemCount(paths.size()); + for (String path : paths) { + enc.startItem(); + enc.writeString(path); + } + enc.writeArrayEnd(); + enc.flush(); + metaData.close(); + } + /* Create a new HFile.Writer. Close current if there is one. * @param writer * @param familydir @@ -126,7 +196,7 @@ public class HFileOutputFormat extends FileOutputFormat e: this.writers.entrySet()) { - close(e.getValue().writer); - } + throws IOException { + writeMetaData(); } }; } @@ -164,10 +233,10 @@ public class HFileOutputFormat extends FileOutputFormat getRegionStartKeys(HTable table) - throws IOException { + throws IOException { byte[][] byteKeys = table.getStartKeys(); - ArrayList ret = - new ArrayList(byteKeys.length); + List ret = + new ArrayList(byteKeys.length); for (byte[] byteKey : byteKeys) { ret.add(new ImmutableBytesWritable(byteKey)); } @@ -177,11 +246,12 @@ public class HFileOutputFormat extends FileOutputFormat startKeys) throws IOException { + List startKeys) throws IOException { Preconditions.checkArgument(!startKeys.isEmpty(), "No regions passed"); // We're generating a list of split points, and we don't ever @@ -189,7 +259,7 @@ public class HFileOutputFormat extends FileOutputFormat sorted = - new TreeSet(startKeys); + new TreeSet(startKeys); ImmutableBytesWritable first = sorted.first(); Preconditions.checkArgument( @@ -197,12 +267,12 @@ public class HFileOutputFormat extends FileOutputFormatSets the number of reduce tasks to match the current number of regions *
  • Sets the output key/value class to match HFileOutputFormat's requirements
  • *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)
  • - * + * PutSortReducer) + * * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. */ @@ -232,7 +302,7 @@ public class HFileOutputFormat extends FileOutputFormat startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); - + Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + System.currentTimeMillis()); LOG.info("Writing partition information to " + partitionsPath); @@ -266,8 +336,8 @@ public class HFileOutputFormat extends FileOutputFormat paths; + + private RegionInfo(byte[] startKey, byte[] endKey, List paths) { + this.startKey = startKey; + this.endKey = endKey; + this.paths = paths; + } + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + usage(); + return -1; + } + + // Check command line parameters + byte[] tableName = HTableDescriptor.isLegalTableName(Bytes.toBytes(args[0])); + Path hfofDir = new Path(args[1]); + if (!hfofDir.getFileSystem(getConf()).exists(hfofDir)) { + throw new FileNotFoundException("HFileOutputFormat dir " + + hfofDir + " not found"); + } + + HBaseAdmin admin = new HBaseAdmin(getConf()); + + doBulkLoad(hfofDir, tableName); + return 0; + } + + private void doBulkLoad(Path hfofDir, byte[] tableName) throws IOException { + List loadRegionInfos = discoverLoadQueue(hfofDir); + + // Generate a list of all start keys and remove the first + byte[][] startKeys = new byte[loadRegionInfos.size()][]; + for (int i = 0; i < loadRegionInfos.size(); i++) { + startKeys[i] = loadRegionInfos.get(i).startKey; + } + Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); + byte[][] splitKeys = Arrays.copyOfRange(startKeys, 1, startKeys.length); + + // Get a list of all column families + FileSystem fs = hfofDir.getFileSystem(getConf()); + FileStatus[] metaDataStatuses = fs.listStatus(hfofDir, new FSUtils.DirFilter(fs)); + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + for (FileStatus metaDataStatus : metaDataStatuses) { + tableDescriptor.addFamily(new HColumnDescriptor(metaDataStatus.getPath().getName())); + } + + // Create the table with all the regions + HBaseAdmin admin = new HBaseAdmin(getConf()); + admin.createTable(tableDescriptor, splitKeys); + HTable table = new HTable(getConf(), tableName); + + // Get all the regions and move the pre-created HFiles in place + Map regionInfos = table.getRegionsInfo(); + Path tableDir = new Path(FSUtils.getRootDir(getConf()), new String(tableName)); + for (int i = 0; i < startKeys.length; i++) { + byte[] searchKey = i == 0 ? HConstants.EMPTY_START_ROW : startKeys[i]; + HRegionInfo targetRegion = null; + for (HRegionInfo regionInfo : regionInfos.keySet()) { + if (Arrays.equals(regionInfo.getStartKey(), searchKey)) { + targetRegion = regionInfo; + break; + } + } + + RegionInfo sourceRegion = null; + for (RegionInfo loadRegionInfo : loadRegionInfos) { + if (Arrays.equals(loadRegionInfo.startKey, startKeys[i])) { + sourceRegion = loadRegionInfo; + break; + } + } + + Path regionPath = new Path(tableDir, targetRegion.getEncodedName()); + for (Path sourcePath : sourceRegion.paths) { + Path targetPath = new Path(new Path(regionPath, sourcePath.getParent().getName()), sourcePath.getName()); + fs.rename(sourcePath, targetPath); + } + } + + } + + private List discoverLoadQueue(Path hfofDir) throws IOException { + FileSystem fs = hfofDir.getFileSystem(getConf()); + FileStatus[] metaDataStatuses = fs.listStatus(hfofDir); + BinaryDecoder dec = null; + List regionInfos = new ArrayList(); + for (FileStatus metaDataStatus : metaDataStatuses) { + if (metaDataStatus.isDir()) { + continue; + } + + Path metaDataPath = metaDataStatus.getPath(); + FSDataInputStream in = fs.open(metaDataPath); + dec = DecoderFactory.defaultFactory().createBinaryDecoder(in, dec); + ByteBuffer buf = dec.readBytes(null); //start key + byte[] startKey = new byte[buf.remaining()]; + buf.get(startKey); + + dec.readBytes(buf); //end key + byte[] endKey = new byte[buf.remaining()]; + buf.get(endKey); + + List paths = new ArrayList(); + Utf8 path = null; + for (long i = dec.readArrayStart(); i != 0; i = dec.arrayNext()) { + for (long j = 0; j < i; j++) { + path = dec.readString(path); + paths.add(new Path(path.toString())); // hfile + } + } + + regionInfos.add(new RegionInfo(startKey, endKey, paths)); + } + return regionInfos; + } + + private static void usage() { + System.err.println("usage: " + NAME + + "tablename" + + " /path/to/hfileoutputformat-output "); + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 1183584..e2d56de 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -54,7 +54,7 @@ import org.apache.hadoop.util.ToolRunner; /** * Tool to load the output of HFileOutputFormat into an existing table. - * @see usage() + * @see #usage() */ public class LoadIncrementalHFiles extends Configured implements Tool { @@ -141,7 +141,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @throws TableNotFoundException if table does not yet exist */ public void doBulkLoad(Path hfofDir, HTable table) - throws TableNotFoundException, IOException + throws IOException { HConnection conn = table.getConnection(); diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index 386eb7b..1524de2 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -62,7 +62,7 @@ import org.mockito.Mockito; * as many splits as "mapred.map.tasks" maps. */ public class TestHFileOutputFormat { - private final static int ROWSPERSPLIT = 1024; + private static final int ROWSPERSPLIT = 1024; private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME; private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");