diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
new file mode 100644
index 0000000..844c2ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Writes HFiles. Passed Cells must arrive in order.
+ * Writes current time as the sequence id for the file.
+ * Creates 2 level tree directory, first level is using table name as parent directory and
+ * then use family name as child directory
+ * Sets the major compacted attribute on created @{link {@link HFile}s.
+ * Calling write(null,null) will forcibly roll all HFiles being written.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiHFileOutputFormat
+ extends FileOutputFormat {
+ private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class);
+
+ /*
+ * Data structure to hold a Writer and amount of data written on it.
+ */
+ static class WriterLength {
+ long written = 0;
+ StoreFileWriter writer = null;
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(
+ final TaskAttemptContext context) throws IOException, InterruptedException {
+ return createRecordWriter(context);
+ }
+
+ static RecordWriter
+ createRecordWriter(final TaskAttemptContext context)
+ throws IOException {
+
+ // Get the path of the output directory
+ final Path outputPath = FileOutputFormat.getOutputPath(context);
+ final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = outputdir.getFileSystem(conf);
+ // These configs. are from hbase-*.xml
+ final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+ HConstants.DEFAULT_MAX_FILE_SIZE);
+ // Invented config. Add to hbase-*.xml if other than default compression.
+ final String defaultCompressionStr = conf.get("hfile.compression",
+ Algorithm.NONE.getName());
+ final Algorithm defaultCompression = HFileWriterImpl
+ .compressionByName(defaultCompressionStr);
+ final boolean compactionExclude = conf.getBoolean(
+ "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+ // Map of tables to writers
+ final Map> tableWriters =
+ new HashMap>();
+
+ return new RecordWriter() {
+ @Override
+ public void write(ImmutableBytesWritable tableName, V cell) throws IOException, InterruptedException {
+ RecordWriter tableWriter = tableWriters.get(tableName);
+ // if there is new table, verify that table directory exists
+ if (tableWriter == null) {
+ //using table name as directory name
+ final Path tableOutputdir = new Path(outputdir, Bytes.toString(tableName.copyBytes()));
+ fs.mkdirs(tableOutputdir);
+
+ tableWriter = new RecordWriter() {
+ // Map of families to writers and how much has been output on the writer.
+ private final Map writers =
+ new TreeMap(Bytes.BYTES_COMPARATOR);
+ private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+ private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+ private boolean rollRequested = false;
+
+ @Override
+ public void write(ImmutableBytesWritable row, V cell)
+ throws IOException {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+
+ // null input == user explicitly wants to flush
+ if (row == null && kv == null) {
+ rollWriters();
+ return;
+ }
+
+ byte [] rowKey = CellUtil.cloneRow(kv);
+ long length = kv.getLength();
+ byte [] family = CellUtil.cloneFamily(kv);
+ WriterLength wl = this.writers.get(family);
+
+ // If this is a new column family, verify that the directory for column exists
+ if (wl == null) {
+ FileSystem tfs = tableOutputdir.getFileSystem(conf);
+ tfs.mkdirs(new Path(tableOutputdir, Bytes.toString(family)));
+ }
+
+ // If any of the HFiles for the column families has reached
+ // maxsize, we need to roll all the writers
+ if (wl != null && wl.written + length >= maxsize) {
+ this.rollRequested = true;
+ }
+
+ // This can only happen once a row is finished though
+ if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+ rollWriters();
+ }
+
+ // create a new writer, if necessary
+ if (wl == null || wl.writer == null) {
+ wl = getNewWriter(family, conf, null);
+ }
+
+ // we now have the proper writer. full steam ahead
+ kv.updateLatestStamp(this.now);
+ wl.writer.append(kv);
+ wl.written += length;
+
+ // Copy the row so we know when a row transition.
+ this.previousRow = rowKey;
+ }
+
+ /* Roll storefile writer, flush and close storefile writer
+ * @throws IOException
+ */
+ private void rollWriters() throws IOException {
+ for (WriterLength wl : this.writers.values()) {
+ if (wl.writer != null) {
+ LOG.info("Writer=" + wl.writer.getPath() +
+ ((wl.written == 0)? "": ", wrote=" + wl.written));
+ close_sf_writer(wl.writer);
+ }
+ wl.writer = null;
+ wl.written = 0;
+ }
+ this.rollRequested = false;
+ }
+
+ /* Create a new StoreFile.Writer.
+ * @param family
+ * @return A WriterLength, containing a new StoreFile.Writer.
+ * @throws IOException
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
+ justification="Not important")
+ private WriterLength getNewWriter(byte[] family, Configuration conf,
+ InetSocketAddress[] favoredNodes) throws IOException {
+ WriterLength wl = new WriterLength();
+ Path familydir = new Path(tableOutputdir, Bytes.toString(family));
+ Algorithm compression = defaultCompression;
+ BloomType bloomType = BloomType.NONE;
+ Integer blockSize = HConstants.DEFAULT_BLOCKSIZE;
+
+ Configuration tempConf = new Configuration(conf);
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+ HFileContextBuilder contextBuilder = new HFileContextBuilder()
+ .withCompression(compression)
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ .withBlockSize(blockSize);
+
+ if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+ contextBuilder.withIncludesTags(true);
+ }
+
+ HFileContext hFileContext = contextBuilder.build();
+
+ if (null == favoredNodes) {
+ wl.writer =
+ new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+ .withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
+ } else {
+ wl.writer =
+ new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+ .withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+ .withFavoredNodes(favoredNodes).build();
+ }
+
+ this.writers.put(family, wl);
+ return wl;
+ }
+
+ /*
+ * Flush and close StoreFile writer
+ * @param w
+ * @throws IOException
+ */
+ private void close_sf_writer(final StoreFileWriter w) throws IOException {
+ if (w != null) {
+ w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+ Bytes.toBytes(System.currentTimeMillis()));
+ w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+ Bytes.toBytes(context.getTaskAttemptID().toString()));
+ w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+ Bytes.toBytes(true));
+ w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+ Bytes.toBytes(compactionExclude));
+ w.appendTrackedTimestampsToMetadata();
+ w.close();
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext c)
+ throws IOException, InterruptedException {
+ for (WriterLength wl: this.writers.values()) {
+ close_sf_writer(wl.writer);
+ }
+ }
+ };
+ // Put table into map
+ tableWriters.put(tableName, tableWriter);
+ }
+ // Write into tableWriter
+ tableWriter.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), cell);
+ }
+
+ @Override
+ public void close(TaskAttemptContext c) throws IOException, InterruptedException {
+ for (RecordWriter writer : tableWriters.values()) {
+ writer.close(c);
+ }
+ }
+ };
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
new file mode 100644
index 0000000..55e9dc9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
@@ -0,0 +1,206 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class TestMultiHFileOutputFormat {
+ private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class);
+
+ private HBaseTestingUtility util = new HBaseTestingUtility();
+
+ private static int ROWSPERSPLIT = 5;
+
+ private static final int KEYLEN_DEFAULT=10;
+ private static final String KEYLEN_CONF="randomkv.key.length";
+
+ private static final int VALLEN_DEFAULT=10;
+ private static final String VALLEN_CONF="randomkv.val.length";
+
+ private static final byte[][] TABLES
+ = {Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1"))
+ , Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2"))};
+
+ private static final byte[][] FAMILIES
+ = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
+ , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+
+ private static final byte [] QUALIFIER = Bytes.toBytes("data");
+
+ public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
+ new TestMultiHFileOutputFormat().testWritingDataIntoHFiles();
+ }
+
+ /**
+ * Run small MR job.
+ * this MR job will write HFile into testWritingDataIntoHFiles/tableNames/columFamilies/
+ */
+ public void testWritingDataIntoHFiles() throws IOException, ClassNotFoundException, InterruptedException {
+ Configuration conf = util.getConfiguration();
+
+ Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
+ FileSystem fs = testDir.getFileSystem(conf);
+ LOG.info("testWritingDataIntoHFiles dir writing to dir: "+ testDir);
+
+ // Set down this value or we OOME in eclipse.
+ conf.setInt("mapreduce.task.io.sort.mb", 20);
+ // Write a few files by setting max file size.
+ conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+
+ Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
+
+ FileOutputFormat.setOutputPath(job, testDir);
+
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(Random_TableKV_GeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setReducerClass(Table_KeyValueSortReducer.class);
+ job.setOutputFormatClass(MultiHFileOutputFormat.class);
+ job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.initCredentials(job);
+ fs.deleteOnExit(testDir);
+ LOG.info("\nStarting test testWritingDataIntoHFiles\n");
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("\nWaiting on check MapReduce output");
+ assertTrue(checkMRoutput(fs, testDir, 0));
+ }
+
+ /**
+ * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile
+ * this method to check the created directory is correct or not
+ * A recursion method, the testDir had better be small size
+ */
+ private boolean checkMRoutput(FileSystem fs, Path testDir, int level)
+ throws FileNotFoundException, IOException {
+ if(level >= 3) {
+ return HFile.isHFileFormat(fs, testDir);
+ }
+ FileStatus [] fStats = fs.listStatus(testDir);
+ if(fStats==null || fStats.length<=0)
+ return false;
+
+ for(FileStatus stats : fStats) {
+ //skip the _SUCCESS file created by MapReduce
+ if(level==0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
+ return true;
+ if(level<2 && !stats.isDirectory()) {
+ return false;
+ }
+ boolean flag = checkMRoutput(fs, stats.getPath(), level+1);
+ if(flag == false)
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Simple mapper that makes output.
+ * With no input data
+ */
+ static class Random_TableKV_GeneratingMapper
+ extends Mapper {
+
+ private int keyLength;
+ private int valLength;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+ valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ }
+
+ @Override
+ protected void map(
+ NullWritable n1, NullWritable n2,
+ Mapper.Context context)
+ throws java.io.IOException ,InterruptedException
+ {
+
+ byte keyBytes[] = new byte[keyLength];
+ byte valBytes[] = new byte[valLength];
+
+ ArrayList tables = new ArrayList();
+ for(int i=0; i < TABLES.length; i++) {
+ tables.add(new ImmutableBytesWritable(TABLES[i]));
+ }
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+ Random random = new Random();
+
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+ random.nextBytes(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
+ random.nextBytes(valBytes);
+
+ for(ImmutableBytesWritable table : tables) {
+ for (byte[] family : FAMILIES) {
+ Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
+ context.write(table, kv);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Simple Reducer that have input , with KeyValues have no order.
+ * and output , with KeyValues are ordered
+ */
+
+ static class Table_KeyValueSortReducer extends Reducer {
+ protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs,
+ org.apache.hadoop.mapreduce.Reducer.Context context)
+ throws java.io.IOException, InterruptedException {
+ TreeSet map = new TreeSet(CellComparator.COMPARATOR);
+ for (KeyValue kv: kvs) {
+ try {
+ map.add(kv.clone());
+ } catch (CloneNotSupportedException e) {
+ throw new java.io.IOException(e);
+ }
+ }
+ context.setStatus("Read " + map.getClass());
+ int index = 0;
+ for (KeyValue kv: map) {
+ context.write(table, kv);
+ if (++index % 100 == 0) context.setStatus("Wrote " + index);
+ }
+ }
+ }
+
+}