diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java index 7c1ebbc..ee34a34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java @@ -1,48 +1,88 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; import java.util.Map; +import java.util.UUID; +import java.util.TreeMap; +import java.util.TreeSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import com.google.common.annotations.VisibleForTesting; + /** * Create 3 level tree directory, first level is using table name as parent directory and then use * family name as child directory, and all related HFiles for one family are under child directory * -tableName1 * -columnFamilyName1 + * -HFile (region1) * -columnFamilyName2 - * -HFiles + * -HFile1 (region1) + * -HFile2 (region2) + * -HFile3 (region3) * -tableName2 * -columnFamilyName1 - * -HFiles - * -columnFamilyName2 - *

+ * -HFile (region1) + * family directory and its hfiles match the output of HFileOutputFormat2 + * @see org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 */ + @InterfaceAudience.Public @InterfaceStability.Evolving @VisibleForTesting @@ -80,7 +120,11 @@ public class MultiHFileOutputFormat extends FileOutputFormat(context, tableOutputDir); // Put table into map @@ -99,4 +143,310 @@ public class MultiHFileOutputFormat extends FileOutputFormat tables) throws IOException { + configureIncrementalLoad(job, tables, MultiHFileOutputFormat.class); + } + + public static void configureIncrementalLoad(Job job, List tables, + Class> cls) throws IOException { + + Configuration conf = job.getConfiguration(); + + // file path to store + String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); + LOG.info("Writing partition info into dir: " + partitionsPath.toString()); + job.setPartitionerClass(MultiHFilePartitioner.class); + // get split keys for all the tables, and write them into partition file + Map> tableSplitKeys = + MultiHFilePartitioner.getTablesRegionStartKeys(conf, tables); + MultiHFilePartitioner.writeTableSplitKeys(conf, partitionsPath, tableSplitKeys); + MultiHFilePartitioner.setPartitionFile(conf, partitionsPath); + partitionsPath.getFileSystem(conf).makeQualified(partitionsPath); + partitionsPath.getFileSystem(conf).deleteOnExit(partitionsPath); + + // now only support Mapper output + // we can use KeyValueSortReducer directly to sort Mapper output + if (KeyValue.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(KeyValueSortReducer.class); + } else { + LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); + } + int reducerNum = MultiHFilePartitioner.getReducerNumber(tableSplitKeys); + job.setNumReduceTasks(reducerNum); + LOG.info("Configuring " + reducerNum + " reduce partitions " + "to match current region count"); + + // setup output format + job.setOutputFormatClass(cls); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + + } + + /** + * check if table exist + * @return instance of table, if it exist + */ + private static Table getTable(byte[] tableName, Configuration conf) throws IOException { + Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin(); + TableName table = TableName.valueOf(tableName); + if (admin.tableExists(table)) { + return connection.getTable(table); + } + LOG.warn("Table: '" + table.toString() + "' does not exist"); + return null; + } + + /** + * MultiHFilePartitioner support MultiHFileOutputFormat writing hfile based on regions for one + * table, so that different regions have corresponding hfiles + */ + static class MultiHFilePartitioner extends Partitioner + implements Configurable { + + public static final String DEFAULT_PATH = "_partition_multihfile.lst"; + public static final String PARTITIONER_PATH = "mapreduce.multihfile.partitioner.path"; + private Configuration conf; + // map to receive from file + private Map> table_SplitKeys; + // each pair is map to one unique integer + private TreeMap partitionMap; + + @Override + public void setConf(Configuration conf) { + try { + this.conf = conf; + partitionMap = new TreeMap(); + table_SplitKeys = readTableSplitKeys(conf); + + // initiate partitionMap by table_SplitKeys map + int splitNum = 0; + for (Map.Entry> entry : table_SplitKeys.entrySet()) { + ImmutableBytesWritable table = entry.getKey(); + List list = entry.getValue(); + for (ImmutableBytesWritable splitKey : list) { + partitionMap.put(new TableSplitKeyPair(table, splitKey), splitNum++); + } + } + + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Set the path to the SequenceFile storing the sorted . It must be the case + * that for R reduces, there are R-1 keys in the SequenceFile. + */ + public static void setPartitionFile(Configuration conf, Path p) { + conf.set(PARTITIONER_PATH, p.toString()); + } + + /** + * Get the path to the SequenceFile storing the sorted . + * @see #setPartitionFile(Configuration, Path) + */ + public static String getPartitionFile(Configuration conf) { + return conf.get(PARTITIONER_PATH, DEFAULT_PATH); + } + + /** + * Get the number of reducer by the map transferred into the partitioner + */ + public static int getReducerNumber( + Map> tableSplitKeys) { + int reducerNum = 0; + for (Map.Entry> entry : tableSplitKeys.entrySet()) { + reducerNum += entry.getValue().size(); + } + return reducerNum; + } + + /** + * Return map of + */ + public static Map> getTablesRegionStartKeys( + Configuration conf, List tables) throws IOException { + TreeMap> ret = + new TreeMap>(); + + Connection conn = ConnectionFactory.createConnection(conf); + LOG.info("Looking up current regions for tables"); + for (TableName tName : tables) { + try (RegionLocator table = conn.getRegionLocator(tName)) { + // if table not exist, use default split keys for this table + byte[][] byteKeys = { HConstants.EMPTY_BYTE_ARRAY }; + if (conn.getAdmin().tableExists(tName)) byteKeys = table.getStartKeys(); + ArrayList tableStartKeys = + new ArrayList(byteKeys.length); + for (byte[] byteKey : byteKeys) { + tableStartKeys.add(new ImmutableBytesWritable(byteKey)); + } + ret.put(new ImmutableBytesWritable(table.getName().toBytes()), tableStartKeys); + } + } + conn.close(); + return ret; + } + + /** + * write into sequence file in order, + * and this format can be parsed by MultiHFilePartitioner + */ + public static void writeTableSplitKeys(Configuration conf, Path partitionsPath, + Map> map) throws IOException { + LOG.info("Writing partition information to " + partitionsPath); + + if (map == null || map.isEmpty()) { + throw new IllegalArgumentException("No regions passed for all tables"); + } + + SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), + Writer.keyClass(ImmutableBytesWritable.class), + Writer.valueClass(ImmutableBytesWritable.class)); + + try { + for (Map.Entry> entry : map.entrySet()) { + ImmutableBytesWritable table = entry.getKey(); + List list = entry.getValue(); + if (list == null) { + throw new IOException("Split keys for a table can not be null"); + } + + TreeSet sorted = new TreeSet(list); + + ImmutableBytesWritable first = sorted.first(); + if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { + throw new IllegalArgumentException( + "First region of table should have empty start key. Instead has: " + + Bytes.toStringBinary(first.get())); + } + + for (ImmutableBytesWritable startKey : sorted) { + writer.append(table, startKey); + } + } + } finally { + writer.close(); + } + } + + /** + * read partition file into map + */ + private Map> readTableSplitKeys( + Configuration conf) throws IOException { + String parts = getPartitionFile(conf); + LOG.info("Read partition info from file: " + parts); + final Path partFile = new Path(parts); + + SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(partFile)); + // values are already sorted in file, so use list + Map> map = + new TreeMap>(); + // key and value have same type + ImmutableBytesWritable key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + ImmutableBytesWritable value = + ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + try { + while (reader.next(key, value)) { + + List list = map.get(key); + if (list == null) { + list = new ArrayList(); + } + list.add(value); + map.put(key, list); + + key = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + value = ReflectionUtils.newInstance(ImmutableBytesWritable.class, conf); + } + reader = null; + } finally { + IOUtils.cleanup(LOG, reader); + } + return map; + } + + @Override + public int getPartition(ImmutableBytesWritable table, Cell value, int numPartitions) { + byte[] row = CellUtil.cloneRow(value); + ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row); + ImmutableBytesWritable splitRegion = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY); + + //find splitKey by rowKey + if (table_SplitKeys.containsKey(table)) { + List list = table_SplitKeys.get(table); + for (ImmutableBytesWritable splitKey : list) { + if (splitKey.compareTo(rowKey) > 0) break; + splitRegion = splitKey; + } + } + // find the number of the reducer for the input + Integer id = partitionMap.get(new TableSplitKeyPair(table, splitRegion)); + if (id == null) { + try { + throw new IOException("No regions passed for table :" + table.toString()); + } catch (IOException e) { + LOG.warn("Exception found in getPartition(): " + e.toString()); + return -1; + } + } + + return id.intValue() % numPartitions; + } + + /** + * A class store pair, has two main usage + * 1. store tableName and one of its splitKey as a pair + * 2. implement comparable, so that partitioner can find splitKey of its input cell + */ + static class TableSplitKeyPair extends Pair + implements Comparable { + + private static final long serialVersionUID = -6485999667666325594L; + + public TableSplitKeyPair(ImmutableBytesWritable a, ImmutableBytesWritable b) { + super(a, b); + } + + @Override + public int compareTo(TableSplitKeyPair other) { + if (this.getFirst().equals(other.getFirst())) { + return this.getSecond().compareTo(other.getSecond()); + } + return this.getFirst().compareTo(other.getFirst()); + } + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java index 738ae5f..7c4b57e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java @@ -1,12 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.mapreduce; @@ -15,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.TreeSet; @@ -26,6 +34,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -44,181 +55,277 @@ import org.junit.experimental.categories.Category; */ @Category(MediumTests.class) public class TestMultiHFileOutputFormat { - private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class); - - private HBaseTestingUtility util = new HBaseTestingUtility(); - - private static int ROWSPERSPLIT = 10; - - private static final int KEYLEN_DEFAULT = 10; - private static final String KEYLEN_CONF = "randomkv.key.length"; - - private static final int VALLEN_DEFAULT = 10; - private static final String VALLEN_CONF = "randomkv.val.length"; - - private static final byte[][] TABLES = - { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")), - Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) }; - - private static final byte[][] FAMILIES = - { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), - Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; - - private static final byte[] QUALIFIER = Bytes.toBytes("data"); - - public static void main(String[] args) throws Exception { - new TestMultiHFileOutputFormat().testWritingDataIntoHFiles(); + private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + + private static int ROWSPERSPLIT = 10; + + private static final int KEYLEN_DEFAULT = 10; + private static final String KEYLEN_CONF = "randomkv.key.length"; + + private static final int VALLEN_DEFAULT = 10; + private static final String VALLEN_CONF = "randomkv.val.length"; + + private static final byte[][] TABLES = + { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")), + Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) }; + + private static final byte[][] FAMILIES = + { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")), + Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) }; + + private static final byte[] QUALIFIER = Bytes.toBytes("data"); + + /** + * Run small MR job. this MR job will write HFile into + * testWritingDataIntoHFiles/tableNames/columFamilies/ + */ + @Test + public void testWritingDataIntoHFiles() throws Exception { + Configuration conf = util.getConfiguration(); + util.startMiniCluster(); + Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); + FileSystem fs = testDir.getFileSystem(conf); + LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir); + + // Set down this value or we OOME in eclipse. + conf.setInt("mapreduce.task.io.sort.mb", 20); + // Write a few files by setting max file size. + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + + try { + Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); + + FileOutputFormat.setOutputPath(job, testDir); + + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(Random_TableKV_GeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + job.setReducerClass(Table_KeyValueSortReducer.class); + job.setOutputFormatClass(MultiHFileOutputFormat.class); + job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("\nStarting test testWritingDataIntoHFiles\n"); + assertTrue(job.waitForCompletion(true)); + LOG.info("\nWaiting on checking MapReduce output\n"); + assertTrue(checkMROutput(fs, testDir, 0)); + } finally { + testDir.getFileSystem(conf).delete(testDir, true); + util.shutdownMiniCluster(); } - - /** - * Run small MR job. this MR job will write HFile into - * testWritingDataIntoHFiles/tableNames/columFamilies/ - */ - @Test - public void testWritingDataIntoHFiles() throws Exception { - Configuration conf = util.getConfiguration(); - util.startMiniCluster(); - Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles"); - FileSystem fs = testDir.getFileSystem(conf); - LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir); - - // Set down this value or we OOME in eclipse. - conf.setInt("mapreduce.task.io.sort.mb", 20); - // Write a few files by setting max file size. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); - - try { - Job job = Job.getInstance(conf, "testWritingDataIntoHFiles"); - - FileOutputFormat.setOutputPath(job, testDir); - - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(Random_TableKV_GeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - job.setReducerClass(Table_KeyValueSortReducer.class); - job.setOutputFormatClass(MultiHFileOutputFormat.class); - job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("\nStarting test testWritingDataIntoHFiles\n"); - assertTrue(job.waitForCompletion(true)); - LOG.info("\nWaiting on checking MapReduce output\n"); - assertTrue(checkMROutput(fs, testDir, 0)); - } finally { - testDir.getFileSystem(conf).delete(testDir, true); - util.shutdownMiniCluster(); + } + + /** + * check whether create directory and hfiles as format designed in MultiHFilePartitioner + * and also check whether the output file has same related configuration as created table + */ + @Test + public void testMultiHFilePartitioner() throws Exception { + Configuration conf = util.getConfiguration(); + util.startMiniCluster(); + Path testDir = util.getDataTestDirOnTestFS("testMultiHFilePartitioner"); + FileSystem fs = testDir.getFileSystem(conf); + LOG.info("testMultiHFilePartitioner dir writing to : " + testDir); + + // Set down this value or we OOME in eclipse. + conf.setInt("mapreduce.task.io.sort.mb", 20); + // Write a few files by setting max file size. + conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + + // Create several tables for testing + List tableNames = new ArrayList(); + + for (int i = 0; i < TABLES.length; i++) { + TableName tableName = TableName.valueOf(TABLES[i]); + byte[][] splitKeys = generateRandomSplitKeys(3); + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + for(int j = 0; j < FAMILIES.length; j++) { + HColumnDescriptor familyDescriptor = new HColumnDescriptor(FAMILIES[j]); + //only set Tables[0] configuration, and specify compression type and DataBlockEncode + if(i == 0) { + familyDescriptor.setCompressionType(Compression.Algorithm.GZ); + familyDescriptor.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); } + tableDescriptor.addFamily(familyDescriptor); + } + util.createTable(tableDescriptor, splitKeys, conf); + tableNames.add(tableName); } - - /** - * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the - * created directory is correct or not A recursion method, the testDir had better be small size - */ - private boolean checkMROutput(FileSystem fs, Path testDir, int level) - throws FileNotFoundException, IOException { - if (level >= 3) { - return HFile.isHFileFormat(fs, testDir); - } - FileStatus[] fStats = fs.listStatus(testDir); - if (fStats == null || fStats.length <= 0) { - LOG.info("Created directory format is not correct"); - return false; - } - - for (FileStatus stats : fStats) { - // skip the _SUCCESS file created by MapReduce - if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) - continue; - if (level < 2 && !stats.isDirectory()) { - LOG.info("Created directory format is not correct"); + // set up for MapReduce job + try { + Job job = Job.getInstance(conf, "testMultiHFilePartitioner"); + FileOutputFormat.setOutputPath(job, testDir); + + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(Random_TableKV_GeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + + MultiHFileOutputFormat.configureIncrementalLoad(job, tableNames); + + LOG.info("Starting test testWritingDataIntoHFiles"); + assertTrue(job.waitForCompletion(true)); + LOG.info("Waiting on checking MapReduce output"); + assertTrue(checkMROutput(fs, testDir, 0)); + assertTrue(checkFileConf(conf, fs, testDir)); + } finally { + for (int i = 0; i < TABLES.length; i++) { + TableName tName = TableName.valueOf(TABLES[i]); + util.deleteTable(tName); + } + fs.delete(testDir, true); + fs.close(); + util.shutdownMiniCluster(); + } + } + + /** + * check the output hfile has same configuration as created test table + * only check TABLES[0] + */ + private boolean checkFileConf(Configuration conf, FileSystem fs, Path testDir) throws IOException { + FileStatus[] fStats = fs.listStatus(testDir); + for (FileStatus stats : fStats) { + if (stats.getPath().getName().equals(new String(TABLES[0]))) { + FileStatus[] cfStats = fs.listStatus(stats.getPath()); + for (FileStatus cfstat : cfStats) { + FileStatus[] hfStats = fs.listStatus(cfstat.getPath()); + for (FileStatus hfstat : hfStats) { + if (HFile.isHFileFormat(fs, hfstat)) { + HFile.Reader hfr = + HFile.createReader(fs, hfstat.getPath(), new CacheConfig(conf), conf); + if (!hfr.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF) || + !hfr.getCompressionAlgorithm().equals(Compression.Algorithm.GZ)) return false; } - boolean flag = checkMROutput(fs, stats.getPath(), level + 1); - if (flag == false) return false; + } } - return true; + } + } + return true; + } + /** + * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the + * created directory is correct or not A recursion method, the testDir had better be small size + */ + private boolean checkMROutput(FileSystem fs, Path testDir, int level) + throws FileNotFoundException, IOException { + if (level >= 3) { + return HFile.isHFileFormat(fs, testDir); + } + FileStatus[] fStats = fs.listStatus(testDir); + if (fStats == null || fStats.length <= 0) { + LOG.info("Created directory format is not correct"); + return false; } - /** - * Simple mapper that makes output. With no input data - */ - static class Random_TableKV_GeneratingMapper - extends Mapper { + for (FileStatus stats : fStats) { + // skip the _SUCCESS file created by MapReduce + if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME)) + continue; + if (level < 2 && !stats.isDirectory()) { + LOG.info("Created directory format is not correct"); + return false; + } + boolean flag = checkMROutput(fs, stats.getPath(), level + 1); + if (flag == false) return false; + } + return true; + } - private int keyLength; - private int valLength; - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); + private byte[][] generateRandomSplitKeys(int numKeys) { + Random random = new Random(); + byte[][] ret = new byte[numKeys][]; + for (int i = 0; i < numKeys; i++) { + ret[i] = PerformanceEvaluation.generateData(random, KEYLEN_DEFAULT); + } + return ret; + } - Configuration conf = context.getConfiguration(); - keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); - valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); - } - @Override - protected void map(NullWritable n1, NullWritable n2, - Mapper.Context context) - throws java.io.IOException, InterruptedException { + /** + * Simple mapper that makes output. With no input data + */ + static class Random_TableKV_GeneratingMapper + extends Mapper { - byte keyBytes[] = new byte[keyLength]; - byte valBytes[] = new byte[valLength]; + private int keyLength; + private int valLength; - ArrayList tables = new ArrayList(); - for (int i = 0; i < TABLES.length; i++) { - tables.add(new ImmutableBytesWritable(TABLES[i])); - } + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); - int taskId = context.getTaskAttemptID().getTaskID().getId(); - assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - Random random = new Random(); - - for (int i = 0; i < ROWSPERSPLIT; i++) { - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); - random.nextBytes(valBytes); - - for (ImmutableBytesWritable table : tables) { - for (byte[] family : FAMILIES) { - Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); - context.write(table, kv); - } - } - } - } + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); } - /** - * Simple Reducer that have input , with KeyValues have no order. and output - * , with KeyValues are ordered - */ - - static class Table_KeyValueSortReducer - extends Reducer { - protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, - org.apache.hadoop.mapreduce.Reducer.Context context) - throws java.io.IOException, InterruptedException { - TreeSet map = new TreeSet(KeyValue.COMPARATOR); - for (KeyValue kv : kvs) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); - } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv : map) { - context.write(table, kv); - if (++index % 100 == 0) context.setStatus("Wrote " + index); - } + @Override + protected void map(NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException, InterruptedException { + + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + ArrayList tables = new ArrayList(); + for (int i = 0; i < TABLES.length; i++) { + tables.add(new ImmutableBytesWritable(TABLES[i])); + } + + int taskId = context.getTaskAttemptID().getTaskID().getId(); + assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; + Random random = new Random(); + + for (int i = 0; i < ROWSPERSPLIT; i++) { + random.nextBytes(keyBytes); + // Ensure that unique tasks generate unique keys + keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); + random.nextBytes(valBytes); + + for (ImmutableBytesWritable table : tables) { + for (byte[] family : FAMILIES) { + Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); + context.write(table, kv); + } + } + } + } + } + + /** + * Simple Reducer that have input , with KeyValues have no order. and output + * , with KeyValues are ordered + */ + + static class Table_KeyValueSortReducer + extends Reducer { + protected void reduce(ImmutableBytesWritable table, java.lang.Iterable kvs, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet(CellComparator.COMPARATOR); + for (KeyValue kv : kvs) { + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv : map) { + context.write(table, kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } } + } -} +} \ No newline at end of file