diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java new file mode 100644 index 0000000..844c2ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Writes HFiles. Passed Cells must arrive in order. + * Writes current time as the sequence id for the file. + * Creates 2 level tree directory, first level is using table name as parent directory and + * then use family name as child directory + * Sets the major compacted attribute on created @{link {@link HFile}s. + * Calling write(null,null) will forcibly roll all HFiles being written. + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MultiHFileOutputFormat + extends FileOutputFormat { + private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class); + + /* + * Data structure to hold a Writer and amount of data written on it. + */ + static class WriterLength { + long written = 0; + StoreFileWriter writer = null; + } + + @Override + public RecordWriter getRecordWriter( + final TaskAttemptContext context) throws IOException, InterruptedException { + return createRecordWriter(context); + } + + static RecordWriter + createRecordWriter(final TaskAttemptContext context) + throws IOException { + + // Get the path of the output directory + final Path outputPath = FileOutputFormat.getOutputPath(context); + final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Configuration conf = context.getConfiguration(); + final FileSystem fs = outputdir.getFileSystem(conf); + // These configs. are from hbase-*.xml + final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE); + // Invented config. Add to hbase-*.xml if other than default compression. + final String defaultCompressionStr = conf.get("hfile.compression", + Algorithm.NONE.getName()); + final Algorithm defaultCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr); + final boolean compactionExclude = conf.getBoolean( + "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); + + // Map of tables to writers + final Map> tableWriters = + new HashMap>(); + + return new RecordWriter() { + @Override + public void write(ImmutableBytesWritable tableName, V cell) throws IOException, InterruptedException { + RecordWriter tableWriter = tableWriters.get(tableName); + // if there is new table, verify that table directory exists + if (tableWriter == null) { + //using table name as directory name + final Path tableOutputdir = new Path(outputdir, Bytes.toString(tableName.copyBytes())); + fs.mkdirs(tableOutputdir); + + tableWriter = new RecordWriter() { + // Map of families to writers and how much has been output on the writer. + private final Map writers = + new TreeMap(Bytes.BYTES_COMPARATOR); + private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; + private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); + private boolean rollRequested = false; + + @Override + public void write(ImmutableBytesWritable row, V cell) + throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + + // null input == user explicitly wants to flush + if (row == null && kv == null) { + rollWriters(); + return; + } + + byte [] rowKey = CellUtil.cloneRow(kv); + long length = kv.getLength(); + byte [] family = CellUtil.cloneFamily(kv); + WriterLength wl = this.writers.get(family); + + // If this is a new column family, verify that the directory for column exists + if (wl == null) { + FileSystem tfs = tableOutputdir.getFileSystem(conf); + tfs.mkdirs(new Path(tableOutputdir, Bytes.toString(family))); + } + + // If any of the HFiles for the column families has reached + // maxsize, we need to roll all the writers + if (wl != null && wl.written + length >= maxsize) { + this.rollRequested = true; + } + + // This can only happen once a row is finished though + if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + rollWriters(); + } + + // create a new writer, if necessary + if (wl == null || wl.writer == null) { + wl = getNewWriter(family, conf, null); + } + + // we now have the proper writer. full steam ahead + kv.updateLatestStamp(this.now); + wl.writer.append(kv); + wl.written += length; + + // Copy the row so we know when a row transition. + this.previousRow = rowKey; + } + + /* Roll storefile writer, flush and close storefile writer + * @throws IOException + */ + private void rollWriters() throws IOException { + for (WriterLength wl : this.writers.values()) { + if (wl.writer != null) { + LOG.info("Writer=" + wl.writer.getPath() + + ((wl.written == 0)? "": ", wrote=" + wl.written)); + close_sf_writer(wl.writer); + } + wl.writer = null; + wl.written = 0; + } + this.rollRequested = false; + } + + /* Create a new StoreFile.Writer. + * @param family + * @return A WriterLength, containing a new StoreFile.Writer. + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", + justification="Not important") + private WriterLength getNewWriter(byte[] family, Configuration conf, + InetSocketAddress[] favoredNodes) throws IOException { + WriterLength wl = new WriterLength(); + Path familydir = new Path(tableOutputdir, Bytes.toString(family)); + Algorithm compression = defaultCompression; + BloomType bloomType = BloomType.NONE; + Integer blockSize = HConstants.DEFAULT_BLOCKSIZE; + + Configuration tempConf = new Configuration(conf); + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + HFileContextBuilder contextBuilder = new HFileContextBuilder() + .withCompression(compression) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(blockSize); + + if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { + contextBuilder.withIncludesTags(true); + } + + HFileContext hFileContext = contextBuilder.build(); + + if (null == favoredNodes) { + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); + } else { + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } + + this.writers.put(family, wl); + return wl; + } + + /* + * Flush and close StoreFile writer + * @param w + * @throws IOException + */ + private void close_sf_writer(final StoreFileWriter w) throws IOException { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + w.appendTrackedTimestampsToMetadata(); + w.close(); + } + } + + @Override + public void close(TaskAttemptContext c) + throws IOException, InterruptedException { + for (WriterLength wl: this.writers.values()) { + close_sf_writer(wl.writer); + } + } + }; + // Put table into map + tableWriters.put(tableName, tableWriter); + } + // Write into tableWriter + tableWriter.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), cell); + } + + @Override + public void close(TaskAttemptContext c) throws IOException, InterruptedException { + for (RecordWriter writer : tableWriters.values()) { + writer.close(c); + } + } + }; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java new file mode 100644 index 0000000..55e9dc9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java @@ -0,0 +1,206 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class TestMultiHFileOutputFormat { + private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + + private static int ROWSPERSPLIT = 5; + + private static final int KEYLEN_DEFAULT=10; + private static final String KEYLEN_CONF="randomkv.key.length"; + + private static final int VALLEN_DEFAULT=10; + private static final String VALLEN_CONF="randomkv.val.length"; + + private static final byte[][] TABLES + = {Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")) + , Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2"))}; + + private static final byte[][] FAMILIES + = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) + , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; + + private static final byte [] QUALIFIER = Bytes.toBytes("data"); + + public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { + new TestMultiHFileOutputFormat().testWritingDataIntoHFiles(); + } + + /** + * Run small MR job. + * this MR job will write HFile into testWritingDataIntoHFiles/tableNames/columFamilies/ + */ + public void testWritingDataIntoHFiles() throws IOException, ClassNotFoundException, InterruptedException { + Configuration conf = util.getConfiguration(); + + Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); + FileSystem fs = testDir.getFileSystem(conf); + LOG.info("testWritingDataIntoHFiles dir writing to dir: "+ testDir); + + // Set down this value or we OOME in eclipse. + conf.setInt("mapreduce.task.io.sort.mb", 20); + // Write a few files by setting max file size. + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + + Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); + + FileOutputFormat.setOutputPath(job, testDir); + + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(Random_TableKV_GeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + job.setReducerClass(Table_KeyValueSortReducer.class); + job.setOutputFormatClass(MultiHFileOutputFormat.class); + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + fs.deleteOnExit(testDir); + LOG.info("\nStarting test testWritingDataIntoHFiles\n"); + assertTrue(job.waitForCompletion(true)); + LOG.info("\nWaiting on check MapReduce output"); + assertTrue(checkMRoutput(fs, testDir, 0)); + } + + /** + * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile + * this method to check the created directory is correct or not + * A recursion method, the testDir had better be small size + */ + private boolean checkMRoutput(FileSystem fs, Path testDir, int level) + throws FileNotFoundException, IOException { + if(level >= 3) { + return HFile.isHFileFormat(fs, testDir); + } + FileStatus [] fStats = fs.listStatus(testDir); + if(fStats==null || fStats.length<=0) + return false; + + for(FileStatus stats : fStats) { + //skip the _SUCCESS file created by MapReduce + if(level==0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) + return true; + if(level<2 && !stats.isDirectory()) { + return false; + } + boolean flag = checkMRoutput(fs, stats.getPath(), level+1); + if(flag == false) + return false; + } + return true; + } + + /** + * Simple mapper that makes output. + * With no input data + */ + static class Random_TableKV_GeneratingMapper + extends Mapper { + + private int keyLength; + private int valLength; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + } + + @Override + protected void map( + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException ,InterruptedException + { + + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + ArrayList tables = new ArrayList(); + for(int i=0; i < TABLES.length; i++) { + tables.add(new ImmutableBytesWritable(TABLES[i])); + } + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + Random random = new Random(); + + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); + random.nextBytes(valBytes); + + for(ImmutableBytesWritable table : tables) { + for (byte[] family : FAMILIES) { + Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); + context.write(table, kv); + } + } + } + } + } + + /** + * Simple Reducer that have input , with KeyValues have no order. + * and output , with KeyValues are ordered + */ + + static class Table_KeyValueSortReducer extends Reducer { + protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet(CellComparator.COMPARATOR); + for (KeyValue kv: kvs) { + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv: map) { + context.write(table, kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } + } + +}