diff --git pom.xml pom.xml index 9fb9aa6..dcb4851 100644 --- pom.xml +++ pom.xml @@ -433,6 +433,7 @@ 6.1.24 6.1.14 4.8.1 + 1.8.4 1.2.15 3.3.1 @@ -530,6 +531,12 @@ ${junit.version} test + + org.mockito + mockito-all + ${mockito-all.version} + test + @@ -594,6 +601,12 @@ + com.google.guava + guava + r03 + + + org.apache.hadoop hadoop-core @@ -693,6 +706,10 @@ junit + org.mockito + mockito-all + + org.apache.commons commons-math ${commons-math.version} diff --git src/docs/src/documentation/content/xdocs/bulk-loads.xml src/docs/src/documentation/content/xdocs/bulk-loads.xml new file mode 100644 index 0000000..fc61ebe --- /dev/null +++ src/docs/src/documentation/content/xdocs/bulk-loads.xml @@ -0,0 +1,148 @@ + + + + + + + + +
+ + Bulk Loads in HBase + +
+ +
+ Overview +

+ HBase includes several methods of loading data into tables. + The most straightforward method is to either use the TableOutputFormat + class from a MapReduce job, or use the normal client APIs; however, + these are not always the most efficient methods. +

+

+ This document describes HBase's bulk load functionality. The bulk load + feature uses a MapReduce job to output table data in HBase's internal + data format, and then directly loads the data files into a running + cluster. +

+
+
+ Bulk Load Architecture +

+ The HBase bulk load process consists of two main steps. +

+ + Preparing data via a MapReduce job +

+ The first step of a bulk load is to generate HBase data files from + a MapReduce job using HFileOutputFormat. This output format writes + out data in HBase's internal storage format so that they can be + later loaded very efficiently into the cluster. +

+

+ In order to function efficiently, HFileOutputFormat must be configured + such that each output HFile fits within a single region. In order to + do this, jobs use Hadoop's TotalOrderPartitioner class to partition the + map output into disjoint ranges of the key space, corresponding to the + key ranges of the regions in the table. +

+

+ HFileOutputFormat includes a convenience function, configureIncrementalLoad(), + which automatically sets up a TotalOrderPartitioner based on the current + region boundaries of a table. +

+
+ + Completing the data load +

+ After the data has been prepared using HFileOutputFormat, it + is loaded into the cluster using a command line tool. This command line tool + iterates through the prepared data files, and for each one determines the + region the file belongs to. It then contacts the appropriate Region Server + which adopts the HFile, moving it into its storage directory and making + the data available to clients. +

+

+ If the region boundaries have changed during the course of bulk load + preparation, or between the preparation and completion steps, the bulk + load commandline utility will automatically split the data files into + pieces corresponding to the new boundaries. This process is not + optimally efficient, so users should take care to minimize the delay between + preparing a bulk load and importing it into the cluster, especially + if other clients are simultaneously loading data through other means. +

+
+
+
+ Preparing a bulk load using the <code>importtsv</code> tool +

+ HBase ships with a command line tool called importtsv. This tool + is available by running hadoop jar /path/to/hbase-VERSION.jar importtsv. + Running this tool with no arguments prints brief usage information: +

+
+Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>
+
+Imports the given input directory of TSV data into the specified table.
+
+The column names of the TSV data must be specified using the -Dimporttsv.columns
+option. This option takes the form of comma-separated column names, where each
+column name is either a simple column family, or a columnfamily:qualifier. The special
+column name HBASE_ROW_KEY is used to designate that this column should be used
+as the row key for each imported record. You must specify exactly one column
+to be the row key.
+
+In order to prepare data for a bulk data load, pass the option:
+  -Dimporttsv.bulk.output=/path/for/output
+
+Other options that may be specified with -D include:
+  -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line
+
+
+
+ Importing the prepared data using the <code>completebulkload</code> tool +

+ After a data import has been prepared using the importtsv tool, the + completebulkload tool is used to import the data into the running cluster. +

+

+ The completebulkload tool simply takes the same output path where + importtsv put its results, and the table name. For example: +

+ $ hadoop jar hbase-VERSION.jar completebulkload /user/todd/myoutput mytable +

+ This tool will run quickly, after which point the new data will be visible in + the cluster. +

+
+
+ Advanced Usage +

+ Although the importtsv tool is useful in many cases, advanced users may + want to generate data programatically, or import data from other formats. To get + started doing so, dig into ImportTsv.java and check the JavaDoc for + HFileOutputFormat. +

+

+ The import step of the bulk load can also be done programatically. See the + LoadIncrementalHFiles class for more information. +

+
+ +
\ No newline at end of file diff --git src/docs/src/documentation/content/xdocs/site.xml src/docs/src/documentation/content/xdocs/site.xml index 0d644f5..6a42647 100644 --- src/docs/src/documentation/content/xdocs/site.xml +++ src/docs/src/documentation/content/xdocs/site.xml @@ -35,6 +35,7 @@ See http://forrest.apache.org/docs/linking.html for more info. + diff --git src/main/java/org/apache/hadoop/hbase/HRegionInfo.java src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 8e1b177..d6f9611 100644 --- src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -357,11 +357,6 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable return elements; } - /** @return the endKey */ - public byte [] getEndKey(){ - return endKey; - } - /** @return the regionId */ public long getRegionId(){ return regionId; @@ -402,6 +397,32 @@ public class HRegionInfo extends VersionedWritable implements WritableComparable public byte [] getStartKey(){ return startKey; } + + /** @return the endKey */ + public byte [] getEndKey(){ + return endKey; + } + + /** + * Returns true if the given inclusive range of rows is fully contained + * by this region. For example, if the region is foo,a,g and this is + * passed ["b","c"] or ["a","c"] it will return true, but if this is passed + * ["b","z"] it will return false. + * @throws IllegalArgumentException if the range passed is invalid (ie end < start) + */ + public boolean containsRange(byte[] rangeStartKey, byte[] rangeEndKey) { + if (Bytes.compareTo(rangeStartKey, rangeEndKey) > 0) { + throw new IllegalArgumentException( + "Invalid range: " + Bytes.toStringBinary(rangeStartKey) + + " > " + Bytes.toStringBinary(rangeEndKey)); + } + + boolean firstKeyInRange = Bytes.compareTo(rangeStartKey, startKey) >= 0; + boolean lastKeyInRange = + Bytes.compareTo(rangeEndKey, endKey) < 0 || + Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY); + return firstKeyInRange && lastKeyInRange; + } /** @return the tableDesc */ public HTableDescriptor getTableDesc(){ diff --git src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java index c48390a..0cd5213 100644 --- src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java +++ src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.io.BytesWritable; @@ -258,4 +259,11 @@ implements WritableComparable { } return results; } + + /** + * Returns a copy of the bytes referred to by this writable + */ + public byte[] copyBytes() { + return Arrays.copyOfRange(bytes, offset, offset+length); + } } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 1a907f9..ce1c240 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -462,7 +462,7 @@ public class HFile { throw new NullPointerException("Key nor value may be null"); } if (checkPrefix && - Bytes.toString(k).toLowerCase().startsWith(FileInfo.RESERVED_PREFIX)) { + Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) { throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX + " are reserved"); } @@ -1069,6 +1069,8 @@ public class HFile { /** * @return First key in the file. May be null if file has no entries. + * Note that this is not the first rowkey, but rather the byte form of + * the first KeyValue. */ public byte [] getFirstKey() { if (blockIndex == null) { @@ -1076,6 +1078,17 @@ public class HFile { } return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0]; } + + /** + * @return the first row key, or null if the file is empty. + * TODO move this to StoreFile after Ryan's patch goes in + * to eliminate KeyValue here + */ + public byte[] getFirstRowKey() { + byte[] firstKey = getFirstKey(); + if (firstKey == null) return null; + return KeyValue.createKeyValueFromKey(firstKey).getRow(); + } /** * @return number of KV entries in this HFile @@ -1089,6 +1102,8 @@ public class HFile { /** * @return Last key in the file. May be null if file has no entries. + * Note that this is not the last rowkey, but rather the byte form of + * the last KeyValue. */ public byte [] getLastKey() { if (!isFileInfoLoaded()) { @@ -1098,6 +1113,17 @@ public class HFile { } /** + * @return the last row key, or null if the file is empty. + * TODO move this to StoreFile after Ryan's patch goes in + * to eliminate KeyValue here + */ + public byte[] getLastRowKey() { + byte[] lastKey = getLastKey(); + if (lastKey == null) return null; + return KeyValue.createKeyValueFromKey(lastKey).getRow(); + } + + /** * @return number of K entries in this HFile's filter. Returns KV count if no filter. */ public int getFilterEntries() { @@ -1664,6 +1690,7 @@ public class HFile { */ static class FileInfo extends HbaseMapWritable { static final String RESERVED_PREFIX = "hfile."; + static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX); static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY"); static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN"); @@ -1679,6 +1706,15 @@ public class HFile { super(); } } + + /** + * Return true if the given file info key is reserved for internal + * use by HFile. + */ + public static boolean isReservedFileInfoKey(byte[] key) { + return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES); + } + /** * Get names of supported compression algorithms. The names are acceptable by @@ -1861,5 +1897,4 @@ public class HFile { e.printStackTrace(); } } - } diff --git src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 4cbe52a..be2e5f8 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -258,5 +258,10 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { * @throws IOException e */ public MultiPutResponse multiPut(MultiPut puts) throws IOException; - + + /** + * Bulk load an HFile into an open region + */ + public void bulkLoadHFile(String hfilePath, + byte[] regionName, byte[] familyName) throws IOException; } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java index 3d40695..5e877eb 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java @@ -36,6 +36,9 @@ public class Driver { "Count rows in HBase table"); pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); + pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format."); + pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class, + "Complete a bulk data load."); pgd.addClass(CopyTable.NAME, CopyTable.class, "Export a table from local cluster to peer cluster"); pgd.driver(args); diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 9c8e53e..2a272af 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -20,24 +20,40 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.mortbay.log.Log; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.common.base.Preconditions; /** * Writes HFiles. Passed KeyValues must arrive in order. @@ -48,7 +64,9 @@ import org.mortbay.log.Log; * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat { - public RecordWriter getRecordWriter(TaskAttemptContext context) + static Log LOG = LogFactory.getLog(HFileOutputFormat.class); + + public RecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); @@ -86,7 +104,7 @@ public class HFileOutputFormat extends FileOutputFormat sorted = + new TreeSet(startKeys); + + ImmutableBytesWritable first = sorted.first(); + Preconditions.checkArgument( + first.equals(HConstants.EMPTY_BYTE_ARRAY), + "First region of table should have empty start key. Instead has: %s", + Bytes.toStringBinary(first.get())); + sorted.remove(first); + + // Write the actual file + FileSystem fs = partitionsPath.getFileSystem(conf); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, + conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); + + try { + for (ImmutableBytesWritable startKey : sorted) { + writer.append(startKey, NullWritable.get()); + } + } finally { + writer.close(); + } + } + + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + *
    + *
  • Inspects the table to configure a total order partitioner
  • + *
  • Uploads the partitions file to the cluster and adds it to the DistributedCache
  • + *
  • Sets the number of reduce tasks to match the current number of regions
  • + *
  • Sets the output key/value class to match HFileOutputFormat's requirements
  • + *
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)
  • + *
+ * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, HTable table) throws IOException { + Configuration conf = job.getConfiguration(); + job.setPartitionerClass(TotalOrderPartitioner.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setOutputFormatClass(HFileOutputFormat.class); + + // Based on the configured map output class, set the correct reducer to properly + // sort the incoming values. + // TODO it would be nice to pick one or the other of these formats. + if (KeyValue.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(KeyValueSortReducer.class); + } else if (Put.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(PutSortReducer.class); + } else { + LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); + } + + LOG.info("Looking up current regions for table " + table); + List startKeys = getRegionStartKeys(table); + LOG.info("Configuring " + startKeys.size() + " reduce partitions " + + "to match current region count"); + job.setNumReduceTasks(startKeys.size()); + + Path partitionsPath = new Path(job.getWorkingDirectory(), + "partitions_" + System.currentTimeMillis()); + LOG.info("Writing partition information to " + partitionsPath); + + FileSystem fs = partitionsPath.getFileSystem(conf); + writePartitions(conf, partitionsPath, startKeys); + partitionsPath.makeQualified(fs); + URI cacheUri; + try { + cacheUri = new URI(partitionsPath.toString() + "#" + + TotalOrderPartitioner.DEFAULT_PATH); + } catch (URISyntaxException e) { + throw new IOException(e); + } + DistributedCache.addCacheFile(cacheUri, conf); + DistributedCache.createSymlink(conf); + + LOG.info("Incremental table output configured."); + } + } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java new file mode 100644 index 0000000..32ba01d --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -0,0 +1,345 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + +/** + * Tool to import data from a TSV file. + * + * This tool is rather simplistic - it doesn't do any quoting or + * escaping, but is useful for many data loads. + * + * @see ImportTsv#usage(String) + */ +public class ImportTsv { + final static String NAME = "importtsv"; + + final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; + final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output"; + final static String COLUMNS_CONF_KEY = "importtsv.columns"; + + static class TsvParser { + /** + * Column families and qualifiers mapped to the TSV columns + */ + private byte[][] families; + private byte[][] qualifiers; + + private int rowKeyColumnIndex; + + public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY"; + + /** + * @param columnsSpecification the list of columns to parser out, comma separated. + * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC + */ + public TsvParser(String columnsSpecification) { + ArrayList columnStrings = Lists.newArrayList( + Splitter.on(',').trimResults().split(columnsSpecification)); + + families = new byte[columnStrings.size()][]; + qualifiers = new byte[columnStrings.size()][]; + + for (int i = 0; i < columnStrings.size(); i++) { + String str = columnStrings.get(i); + if (ROWKEY_COLUMN_SPEC.equals(str)) { + rowKeyColumnIndex = i; + continue; + } + String[] parts = str.split(":", 2); + if (parts.length == 1) { + families[i] = str.getBytes(); + qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY; + } else { + families[i] = parts[0].getBytes(); + qualifiers[i] = parts[1].getBytes(); + } + } + } + + public int getRowKeyColumnIndex() { + return rowKeyColumnIndex; + } + public byte[] getFamily(int idx) { + return families[idx]; + } + public byte[] getQualifier(int idx) { + return qualifiers[idx]; + } + + public ParsedLine parse(byte[] lineBytes, int length) + throws BadTsvLineException { + // Enumerate separator offsets + ArrayList tabOffsets = new ArrayList(families.length); + for (int i = 0; i < length; i++) { + if (lineBytes[i] == '\t') { + tabOffsets.add(i); + } + } + tabOffsets.add(length); + if (tabOffsets.size() > families.length) { + throw new BadTsvLineException("Bad line:\n"); + } + + return new ParsedLine(tabOffsets, lineBytes); + } + + class ParsedLine { + private final ArrayList tabOffsets; + private byte[] lineBytes; + + ParsedLine(ArrayList tabOffsets, byte[] lineBytes) { + this.tabOffsets = tabOffsets; + this.lineBytes = lineBytes; + } + + public int getRowKeyOffset() { + return getColumnOffset(rowKeyColumnIndex); + } + public int getRowKeyLength() { + return getColumnLength(rowKeyColumnIndex); + } + public int getColumnOffset(int idx) { + if (idx > 0) + return tabOffsets.get(idx - 1) + 1; + else + return 0; + } + public int getColumnLength(int idx) { + return tabOffsets.get(idx) - getColumnOffset(idx); + } + public int getColumnCount() { + return tabOffsets.size(); + } + public byte[] getLineBytes() { + return lineBytes; + } + } + + public static class BadTsvLineException extends Exception { + public BadTsvLineException(String err) { + super(err); + } + private static final long serialVersionUID = 1L; + } + } + + /** + * Write table content out to files in hdfs. + */ + static class TsvImporter + extends Mapper + { + + /** Timestamp for all inserted rows */ + private long ts; + + /** Should skip bad lines */ + private boolean skipBadLines; + private Counter badLineCount; + + private TsvParser parser; + + @Override + protected void setup(Context context) { + parser = new TsvParser(context.getConfiguration().get( + COLUMNS_CONF_KEY)); + if (parser.getRowKeyColumnIndex() == -1) { + throw new RuntimeException("No row key column specified"); + } + ts = System.currentTimeMillis(); + + skipBadLines = context.getConfiguration().getBoolean( + SKIP_LINES_CONF_KEY, true); + badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + } + + /** + * Convert a line of TSV text into an HBase table row. + */ + @Override + public void map(LongWritable offset, Text value, + Context context) + throws IOException { + byte[] lineBytes = value.getBytes(); + + try { + TsvParser.ParsedLine parsed = parser.parse( + lineBytes, value.getLength()); + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(lineBytes, + parsed.getRowKeyOffset(), + parsed.getRowKeyLength()); + + Put put = new Put(rowKey.copyBytes()); + for (int i = 0; i < parsed.getColumnCount(); i++) { + if (i == parser.getRowKeyColumnIndex()) continue; + KeyValue kv = new KeyValue( + lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, + parser.getQualifier(i), 0, parser.getQualifier(i).length, + ts, + KeyValue.Type.Put, + lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); + put.add(kv); + } + context.write(rowKey, put); + } catch (BadTsvLineException badLine) { + if (skipBadLines) { + System.err.println( + "Bad line at offset: " + offset.get() + ":\n" + + badLine.getMessage()); + badLineCount.increment(1); + return; + } else { + throw new IOException(badLine); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path inputDir = new Path(args[1]); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(TsvImporter.class); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(TsvImporter.class); + + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + HTable table = new HTable(conf, tableName); + job.setReducerClass(PutSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + HFileOutputFormat.configureIncrementalLoad(job, table); + } else { + // No reducers. Just write straight to table. Call initTableReducerJob + // to set up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(tableName, null, job); + job.setNumReduceTasks(0); + } + + TableMapReduceUtil.addDependencyJars(job); + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + String usage = + "Usage: " + NAME + " -Dimporttsv.columns=a,b,c \n" + + "\n" + + "Imports the given input directory of TSV data into the specified table.\n" + + "\n" + + "The column names of the TSV data must be specified using the -Dimporttsv.columns\n" + + "option. This option takes the form of comma-separated column names, where each\n" + + "column name is either a simple column family, or a columnfamily:qualifier. The special\n" + + "column name HBASE_ROW_KEY is used to designate that this column should be used\n" + + "as the row key for each imported record. You must specify exactly one column\n" + + "to be the row key.\n" + + "\n" + + "In order to prepare data for a bulk data load, pass the option:\n" + + " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" + + "\n" + + "Other options that may be specified with -D include:\n" + + " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line"; + System.err.println(usage); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 2) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + + // Make sure columns are specified + String columns[] = conf.getStrings(COLUMNS_CONF_KEY); + if (columns == null) { + usage("No columns specified. Please specify with -D" + + COLUMNS_CONF_KEY+"=..."); + System.exit(-1); + } + + // Make sure they specify exactly one column as the row key + int rowkeysFound=0; + for (String col : columns) { + if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++; + } + if (rowkeysFound != 1) { + usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC); + System.exit(-1); + } + + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java new file mode 100644 index 0000000..4212efa --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -0,0 +1,321 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Deque; +import java.util.LinkedList; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.io.HalfStoreFileReader; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.Reference.Range; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Tool to load the output of HFileOutputFormat into an existing table. + * @see usage() + */ +public class LoadIncrementalHFiles extends Configured implements Tool { + + static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); + + public static String NAME = "completebulkload"; + + public LoadIncrementalHFiles(Configuration conf) { + super(conf); + } + + public LoadIncrementalHFiles() { + super(); + } + + + private void usage() { + System.err.println("usage: " + NAME + + " /path/to/hfileoutputformat-output " + + "tablename"); + } + + /** + * Represents an HFile waiting to be loaded. An queue is used + * in this class in order to support the case where a region has + * split during the process of the load. When this happens, + * the HFile is split into two physical parts across the new + * region boundary, and each part is added back into the queue. + * The import process finishes when the queue is empty. + */ + private static class LoadQueueItem { + final byte[] family; + final Path hfilePath; + + public LoadQueueItem(byte[] family, Path hfilePath) { + this.family = family; + this.hfilePath = hfilePath; + } + } + + /** + * Walk the given directory for all HFiles, and return a Queue + * containing all such files. + */ + private Deque discoverLoadQueue(Path hfofDir) + throws IOException { + FileSystem fs = hfofDir.getFileSystem(getConf()); + + if (!fs.exists(hfofDir)) { + throw new FileNotFoundException("HFileOutputFormat dir " + + hfofDir + " not found"); + } + + FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); + if (familyDirStatuses == null) { + throw new FileNotFoundException("No families found in " + hfofDir); + } + + Deque ret = new LinkedList(); + for (FileStatus stat : familyDirStatuses) { + if (!stat.isDir()) { + LOG.warn("Skipping non-directory " + stat.getPath()); + continue; + } + Path familyDir = stat.getPath(); + // Skip _logs, etc + if (familyDir.getName().startsWith("_")) continue; + byte[] family = familyDir.getName().getBytes(); + Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); + for (Path hfile : hfiles) { + if (hfile.getName().startsWith("_")) continue; + ret.add(new LoadQueueItem(family, hfile)); + } + } + return ret; + } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. + * @param hfofDir the directory that was provided as the output path + * of a job using HFileOutputFormat + * @param table the table to load into + * @throws TableNotFoundException if table does not yet exist + */ + public void doBulkLoad(Path hfofDir, HTable table) + throws TableNotFoundException, IOException + { + HConnection conn = table.getConnection(); + + if (!conn.isTableAvailable(table.getTableName())) { + throw new TableNotFoundException("Table " + + Bytes.toStringBinary(table.getTableName()) + + "is not currently available."); + } + + Deque queue = null; + try { + queue = discoverLoadQueue(hfofDir); + while (!queue.isEmpty()) { + LoadQueueItem item = queue.remove(); + tryLoad(item, conn, table.getTableName(), queue); + } + } finally { + if (queue != null && !queue.isEmpty()) { + StringBuilder err = new StringBuilder(); + err.append("-------------------------------------------------\n"); + err.append("Bulk load aborted with some files not yet loaded:\n"); + err.append("-------------------------------------------------\n"); + for (LoadQueueItem q : queue) { + err.append(" ").append(q.hfilePath).append('\n'); + } + LOG.error(err); + } + } + } + + /** + * Attempt to load the given load queue item into its target region server. + * If the hfile boundary no longer fits into a region, physically splits + * the hfile such that the new bottom half will fit, and adds the two + * resultant hfiles back into the load queue. + */ + private void tryLoad(final LoadQueueItem item, + HConnection conn, final byte[] table, + final Deque queue) + throws IOException { + final Path hfilePath = item.hfilePath; + final FileSystem fs = hfilePath.getFileSystem(getConf()); + HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false); + final byte[] first, last; + try { + hfr.loadFileInfo(); + first = hfr.getFirstRowKey(); + last = hfr.getLastRowKey(); + } finally { + hfr.close(); + } + + LOG.info("Trying to load hfile=" + hfilePath + + " first=" + Bytes.toStringBinary(first) + + " last=" + Bytes.toStringBinary(last)); + if (first == null || last == null) { + assert first == null && last == null; + LOG.info("hfile " + hfilePath + " has no entries, skipping"); + return; + } + + // We use a '_' prefix which is ignored when walking directory trees + // above. + final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp"); + + conn.getRegionServerWithRetries( + new ServerCallable(conn, table, first) { + @Override + public Void call() throws Exception { + LOG.debug("Going to connect to server " + location + + "for row " + Bytes.toStringBinary(row)); + HRegionInfo hri = location.getRegionInfo(); + if (!hri.containsRange(first, last)) { + LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + + "region. Splitting..."); + + HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family); + Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom"); + Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top"); + splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(), + botOut, topOut); + + // Add these back at the *front* of the queue, so there's a lower + // chance that the region will just split again before we get there. + queue.addFirst(new LoadQueueItem(item.family, botOut)); + queue.addFirst(new LoadQueueItem(item.family, topOut)); + LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); + return null; + } + + byte[] regionName = location.getRegionInfo().getRegionName(); + server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); + return null; + } + }); + } + + /** + * Split a storefile into a top and bottom half, maintaining + * the metadata, recreating bloom filters, etc. + */ + static void splitStoreFile( + Configuration conf, Path inFile, + HColumnDescriptor familyDesc, byte[] splitKey, + Path bottomOut, Path topOut) throws IOException + { + // Open reader with no block cache, and not in-memory + Reference topReference = new Reference(splitKey, Range.top); + Reference bottomReference = new Reference(splitKey, Range.bottom); + + copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); + copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); + } + + /** + * Copy half of an HFile into a new HFile. + */ + private static void copyHFileHalf( + Configuration conf, Path inFile, Path outFile, Reference reference, + HColumnDescriptor familyDescriptor) + throws IOException { + FileSystem fs = inFile.getFileSystem(conf); + HalfStoreFileReader halfReader = null; + HFile.Writer halfWriter = null; + try { + halfReader = new HalfStoreFileReader(fs, inFile, null, reference); + Map fileInfo = halfReader.loadFileInfo(); + + int blocksize = familyDescriptor.getBlocksize(); + Algorithm compression = familyDescriptor.getCompression(); + BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); + + halfWriter = new StoreFile.Writer( + fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR, + bloomFilterType, 0); + HFileScanner scanner = halfReader.getScanner(false, false); + scanner.seekTo(); + do { + KeyValue kv = scanner.getKeyValue(); + halfWriter.append(kv); + } while (scanner.next()); + + for (Map.Entry entry : fileInfo.entrySet()) { + if (shouldCopyHFileMetaKey(entry.getKey())) { + halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); + } + } + } finally { + if (halfWriter != null) halfWriter.close(); + if (halfReader != null) halfReader.close(); + } + } + + private static boolean shouldCopyHFileMetaKey(byte[] key) { + return !HFile.isReservedFileInfoKey(key); + } + + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + usage(); + return -1; + } + + Path hfofDir = new Path(args[0]); + HTable table = new HTable(args[1]); + + doBulkLoad(hfofDir, table); + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new LoadIncrementalHFiles(), args); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java new file mode 100644 index 0000000..5fb3e83 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -0,0 +1,66 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.util.List; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Emits sorted Puts. + * Reads in all Puts from passed Iterator, sorts them, then emits + * Puts in sorted order. If lots of columns per row, it will use lots of + * memory sorting. + * @see HFileOutputFormat + * @see KeyValueSortReducer + */ +public class PutSortReducer extends + Reducer { + + @Override + protected void reduce( + ImmutableBytesWritable row, + java.lang.Iterable puts, + Reducer.Context context) + throws java.io.IOException, InterruptedException + { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + + for (Put p : puts) { + for (List kvs : p.getFamilyMap().values()) { + for (KeyValue kv : kvs) { + map.add(kv.clone()); + } + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv : map) { + context.write(row, kv); + if (index > 0 && index % 100 == 0) + context.setStatus("Wrote " + index); + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java index af3d588..0cda76e 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Partitioner; @@ -44,15 +45,55 @@ import org.apache.hadoop.mapreduce.Partitioner; */ public class SimpleTotalOrderPartitioner extends Partitioner implements Configurable { - private final Log LOG = LogFactory.getLog(this.getClass()); + private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class); + + @Deprecated public static final String START = "hbase.simpletotalorder.start"; + @Deprecated public static final String END = "hbase.simpletotalorder.end"; + + static final String START_BASE64 = "hbase.simpletotalorder.start.base64"; + static final String END_BASE64 = "hbase.simpletotalorder.end.base64"; + private Configuration c; private byte [] startkey; private byte [] endkey; private byte [][] splits; private int lastReduces = -1; + public static void setStartKey(Configuration conf, byte[] startKey) { + conf.set(START_BASE64, Base64.encodeBytes(startKey)); + } + + public static void setEndKey(Configuration conf, byte[] endKey) { + conf.set(END_BASE64, Base64.encodeBytes(endKey)); + } + + @SuppressWarnings("deprecation") + static byte[] getStartKey(Configuration conf) { + return getKeyFromConf(conf, START_BASE64, START); + } + + @SuppressWarnings("deprecation") + static byte[] getEndKey(Configuration conf) { + return getKeyFromConf(conf, END_BASE64, END); + } + + private static byte[] getKeyFromConf(Configuration conf, + String base64Key, String deprecatedKey) { + String encoded = conf.get(base64Key); + if (encoded != null) { + return Base64.decode(encoded); + } + String oldStyleVal = conf.get(deprecatedKey); + if (oldStyleVal == null) { + return null; + } + LOG.warn("Using deprecated configuration " + deprecatedKey + + " - please use static accessor methods instead."); + return Bytes.toBytes(oldStyleVal); + } + @Override public int getPartition(final ImmutableBytesWritable key, final VALUE value, final int reduces) { @@ -87,10 +128,12 @@ implements Configurable { @Override public void setConf(Configuration conf) { this.c = conf; - String startStr = this.c.get(START); - String endStr = this.c.get(END); - LOG.info("startkey=" + startStr + ", endkey=" + endStr); - this.startkey = Bytes.toBytes(startStr); - this.endkey = Bytes.toBytes(endStr); + this.startkey = getStartKey(conf); + this.endkey = getEndKey(conf); + if (startkey == null || endkey == null) { + throw new RuntimeException(this.getClass() + " not configured"); + } + LOG.info("startkey=" + Bytes.toStringBinary(startkey) + + ", endkey=" + Bytes.toStringBinary(endkey)); } } \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 07d7911..22a8c8d 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -49,6 +49,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.ZooKeeper; +import com.google.common.base.Function; + /** * Utility for {@link TableMapper} and {@link TableReducer} */ @@ -245,6 +247,7 @@ public class TableMapReduceUtil { try { addDependencyJars(job.getConfiguration(), ZooKeeper.class, + Function.class, // Guava collections job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getOutputKeyClass(), diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java new file mode 100644 index 0000000..13be81e --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce.hadoopbackport; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Utility for collecting samples and writing a partition file for + * {@link TotalOrderPartitioner}. + * + * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner + * from Hadoop trunk at r910774, with the exception of replacing + * TaskAttemptContextImpl with TaskAttemptContext. + */ +public class InputSampler extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(InputSampler.class); + + static int printUsage() { + System.out.println("sampler -r \n" + + " [-inFormat ]\n" + + " [-keyClass ]\n" + + " [-splitRandom | " + + "// Sample from random splits at random (general)\n" + + " -splitSample | " + + " // Sample from first records in splits (random data)\n"+ + " -splitInterval ]" + + " // Sample from splits at intervals (sorted data)"); + System.out.println("Default sampler: -splitRandom 0.1 10000 10"); + ToolRunner.printGenericCommandUsage(System.out); + return -1; + } + + public InputSampler(Configuration conf) { + setConf(conf); + } + + /** + * Interface to sample using an + * {@link org.apache.hadoop.mapreduce.InputFormat}. + */ + public interface Sampler { + /** + * For a given job, collect and return a subset of the keys from the + * input data. + */ + K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException; + } + + /** + * Samples the first n records from s splits. + * Inexpensive way to sample random data. + */ + public static class SplitSampler implements Sampler { + + private final int numSamples; + private final int maxSplitsSampled; + + /** + * Create a SplitSampler sampling all splits. + * Takes the first numSamples / numSplits records from each split. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public SplitSampler(int numSamples) { + this(numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new SplitSampler. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public SplitSampler(int numSamples, int maxSplitsSampled) { + this.numSamples = numSamples; + this.maxSplitsSampled = maxSplitsSampled; + } + + /** + * From each split sampled, take the first numSamples / numSplits records. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException { + List splits = inf.getSplits(job); + ArrayList samples = new ArrayList(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.size()); + int splitStep = splits.size() / splitsToSample; + int samplesPerSplit = numSamples / splitsToSample; + long records = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader reader = inf.createRecordReader( + splits.get(i * splitStep), + new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID())); + while (reader.nextKeyValue()) { + samples.add(reader.getCurrentKey()); + ++records; + if ((i+1) * samplesPerSplit <= records) { + break; + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from random points in the input. + * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from + * each split. + */ + public static class RandomSampler implements Sampler { + private double freq; + private final int numSamples; + private final int maxSplitsSampled; + + /** + * Create a new RandomSampler sampling all splits. + * This will read every split at the client, which is very expensive. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public RandomSampler(double freq, int numSamples) { + this(freq, numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new RandomSampler. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public RandomSampler(double freq, int numSamples, int maxSplitsSampled) { + this.freq = freq; + this.numSamples = numSamples; + this.maxSplitsSampled = maxSplitsSampled; + } + + /** + * Randomize the split order, then take the specified number of keys from + * each split sampled, where each key is selected with the specified + * probability and possibly replaced by a subsequently selected key when + * the quota of keys from that split is satisfied. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException { + List splits = inf.getSplits(job); + ArrayList samples = new ArrayList(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.size()); + + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + LOG.debug("seed: " + seed); + // shuffle splits + for (int i = 0; i < splits.size(); ++i) { + InputSplit tmp = splits.get(i); + int j = r.nextInt(splits.size()); + splits.set(i, splits.get(j)); + splits.set(j, tmp); + } + // our target rate is in terms of the maximum number of sample splits, + // but we accept the possibility of sampling additional splits to hit + // the target sample keyset + for (int i = 0; i < splitsToSample || + (i < splits.size() && samples.size() < numSamples); ++i) { + RecordReader reader = inf.createRecordReader(splits.get(i), + new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID())); + while (reader.nextKeyValue()) { + if (r.nextDouble() <= freq) { + if (samples.size() < numSamples) { + samples.add(reader.getCurrentKey()); + } else { + // When exceeding the maximum number of samples, replace a + // random element with this one, then adjust the frequency + // to reflect the possibility of existing elements being + // pushed out + int ind = r.nextInt(numSamples); + if (ind != numSamples) { + samples.set(ind, reader.getCurrentKey()); + } + freq *= (numSamples - 1) / (double) numSamples; + } + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from s splits at regular intervals. + * Useful for sorted data. + */ + public static class IntervalSampler implements Sampler { + private final double freq; + private final int maxSplitsSampled; + + /** + * Create a new IntervalSampler sampling all splits. + * @param freq The frequency with which records will be emitted. + */ + public IntervalSampler(double freq) { + this(freq, Integer.MAX_VALUE); + } + + /** + * Create a new IntervalSampler. + * @param freq The frequency with which records will be emitted. + * @param maxSplitsSampled The maximum number of splits to examine. + * @see #getSample + */ + public IntervalSampler(double freq, int maxSplitsSampled) { + this.freq = freq; + this.maxSplitsSampled = maxSplitsSampled; + } + + /** + * For each split sampled, emit when the ratio of the number of records + * retained to the total record count is less than the specified + * frequency. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException { + List splits = inf.getSplits(job); + ArrayList samples = new ArrayList(); + int splitsToSample = Math.min(maxSplitsSampled, splits.size()); + int splitStep = splits.size() / splitsToSample; + long records = 0; + long kept = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader reader = inf.createRecordReader( + splits.get(i * splitStep), + new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID())); + while (reader.nextKeyValue()) { + ++records; + if ((double) kept / records < freq) { + ++kept; + samples.add(reader.getCurrentKey()); + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Write a partition file for the given job, using the Sampler provided. + * Queries the sampler for a sample keyset, sorts by the output key + * comparator, selects the keys for each rank, and writes to the destination + * returned from {@link TotalOrderPartitioner#getPartitionFile}. + */ + @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator + public static void writePartitionFile(Job job, Sampler sampler) + throws IOException, ClassNotFoundException, InterruptedException { + Configuration conf = job.getConfiguration(); + final InputFormat inf = + ReflectionUtils.newInstance(job.getInputFormatClass(), conf); + int numPartitions = job.getNumReduceTasks(); + K[] samples = sampler.getSample(inf, job); + LOG.info("Using " + samples.length + " samples"); + RawComparator comparator = + (RawComparator) job.getSortComparator(); + Arrays.sort(samples, comparator); + Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); + FileSystem fs = dst.getFileSystem(conf); + if (fs.exists(dst)) { + fs.delete(dst, false); + } + SequenceFile.Writer writer = SequenceFile.createWriter(fs, + conf, dst, job.getMapOutputKeyClass(), NullWritable.class); + NullWritable nullValue = NullWritable.get(); + float stepSize = samples.length / (float) numPartitions; + int last = -1; + for(int i = 1; i < numPartitions; ++i) { + int k = Math.round(stepSize * i); + while (last >= k && comparator.compare(samples[last], samples[k]) == 0) { + ++k; + } + writer.append(samples[k], nullValue); + last = k; + } + writer.close(); + } + + /** + * Driver for InputSampler from the command line. + * Configures a JobConf instance and calls {@link #writePartitionFile}. + */ + public int run(String[] args) throws Exception { + Job job = new Job(getConf()); + ArrayList otherArgs = new ArrayList(); + Sampler sampler = null; + for(int i=0; i < args.length; ++i) { + try { + if ("-r".equals(args[i])) { + job.setNumReduceTasks(Integer.parseInt(args[++i])); + } else if ("-inFormat".equals(args[i])) { + job.setInputFormatClass( + Class.forName(args[++i]).asSubclass(InputFormat.class)); + } else if ("-keyClass".equals(args[i])) { + job.setMapOutputKeyClass( + Class.forName(args[++i]).asSubclass(WritableComparable.class)); + } else if ("-splitSample".equals(args[i])) { + int numSamples = Integer.parseInt(args[++i]); + int maxSplits = Integer.parseInt(args[++i]); + if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; + sampler = new SplitSampler(numSamples, maxSplits); + } else if ("-splitRandom".equals(args[i])) { + double pcnt = Double.parseDouble(args[++i]); + int numSamples = Integer.parseInt(args[++i]); + int maxSplits = Integer.parseInt(args[++i]); + if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; + sampler = new RandomSampler(pcnt, numSamples, maxSplits); + } else if ("-splitInterval".equals(args[i])) { + double pcnt = Double.parseDouble(args[++i]); + int maxSplits = Integer.parseInt(args[++i]); + if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; + sampler = new IntervalSampler(pcnt, maxSplits); + } else { + otherArgs.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i-1]); + return printUsage(); + } + } + if (job.getNumReduceTasks() <= 1) { + System.err.println("Sampler requires more than one reducer"); + return printUsage(); + } + if (otherArgs.size() < 2) { + System.out.println("ERROR: Wrong number of parameters: "); + return printUsage(); + } + if (null == sampler) { + sampler = new RandomSampler(0.1, 10000, 10); + } + + Path outf = new Path(otherArgs.remove(otherArgs.size() - 1)); + TotalOrderPartitioner.setPartitionFile(getConf(), outf); + for (String s : otherArgs) { + FileInputFormat.addInputPath(job, new Path(s)); + } + InputSampler.writePartitionFile(job, sampler); + + return 0; + } + + public static void main(String[] args) throws Exception { + InputSampler sampler = new InputSampler(new Configuration()); + int res = ToolRunner.run(sampler, args); + System.exit(res); + } +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java new file mode 100644 index 0000000..065e844 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce.hadoopbackport; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Partitioner effecting a total order by reading split points from + * an externally generated source. + * + * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner + * from Hadoop trunk at r910774. + */ +public class TotalOrderPartitioner,V> + extends Partitioner implements Configurable { + + private Node partitions; + public static final String DEFAULT_PATH = "_partition.lst"; + public static final String PARTITIONER_PATH = + "mapreduce.totalorderpartitioner.path"; + public static final String MAX_TRIE_DEPTH = + "mapreduce.totalorderpartitioner.trie.maxdepth"; + public static final String NATURAL_ORDER = + "mapreduce.totalorderpartitioner.naturalorder"; + Configuration conf; + + public TotalOrderPartitioner() { } + + /** + * Read in the partition file and build indexing data structures. + * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and + * total.order.partitioner.natural.order is not false, a trie + * of the first total.order.partitioner.max.trie.depth(2) + 1 bytes + * will be built. Otherwise, keys will be located using a binary search of + * the partition keyset using the {@link org.apache.hadoop.io.RawComparator} + * defined for this job. The input file must be sorted with the same + * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys. + */ + @SuppressWarnings("unchecked") // keytype from conf not static + public void setConf(Configuration conf) { + try { + this.conf = conf; + String parts = getPartitionFile(conf); + final Path partFile = new Path(parts); + final FileSystem fs = (DEFAULT_PATH.equals(parts)) + ? FileSystem.getLocal(conf) // assume in DistributedCache + : partFile.getFileSystem(conf); + + Job job = new Job(conf); + Class keyClass = (Class)job.getMapOutputKeyClass(); + K[] splitPoints = readPartitions(fs, partFile, keyClass, conf); + if (splitPoints.length != job.getNumReduceTasks() - 1) { + throw new IOException("Wrong number of partitions in keyset:" + + splitPoints.length); + } + RawComparator comparator = + (RawComparator) job.getSortComparator(); + for (int i = 0; i < splitPoints.length - 1; ++i) { + if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) { + throw new IOException("Split points are out of order"); + } + } + boolean natOrder = + conf.getBoolean(NATURAL_ORDER, true); + if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) { + partitions = buildTrie((BinaryComparable[])splitPoints, 0, + splitPoints.length, new byte[0], + // Now that blocks of identical splitless trie nodes are + // represented reentrantly, and we develop a leaf for any trie + // node with only one split point, the only reason for a depth + // limit is to refute stack overflow or bloat in the pathological + // case where the split points are long and mostly look like bytes + // iii...iixii...iii . Therefore, we make the default depth + // limit large but not huge. + conf.getInt(MAX_TRIE_DEPTH, 200)); + } else { + partitions = new BinarySearchNode(splitPoints, comparator); + } + } catch (IOException e) { + throw new IllegalArgumentException("Can't read partitions file", e); + } + } + + public Configuration getConf() { + return conf; + } + + // by construction, we know if our keytype + @SuppressWarnings("unchecked") // is memcmp-able and uses the trie + public int getPartition(K key, V value, int numPartitions) { + return partitions.findPartition(key); + } + + /** + * Set the path to the SequenceFile storing the sorted partition keyset. + * It must be the case that for R reduces, there are R-1 + * keys in the SequenceFile. + */ + public static void setPartitionFile(Configuration conf, Path p) { + conf.set(PARTITIONER_PATH, p.toString()); + } + + /** + * Get the path to the SequenceFile storing the sorted partition keyset. + * @see #setPartitionFile(Configuration, Path) + */ + public static String getPartitionFile(Configuration conf) { + return conf.get(PARTITIONER_PATH, DEFAULT_PATH); + } + + /** + * Interface to the partitioner to locate a key in the partition keyset. + */ + interface Node { + /** + * Locate partition in keyset K, st [Ki..Ki+1) defines a partition, + * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1. + */ + int findPartition(T key); + } + + /** + * Base class for trie nodes. If the keytype is memcomp-able, this builds + * tries of the first total.order.partitioner.max.trie.depth + * bytes. + */ + static abstract class TrieNode implements Node { + private final int level; + TrieNode(int level) { + this.level = level; + } + int getLevel() { + return level; + } + } + + /** + * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or + * where disabled by total.order.partitioner.natural.order, + * search the partition keyset with a binary search. + */ + class BinarySearchNode implements Node { + private final K[] splitPoints; + private final RawComparator comparator; + BinarySearchNode(K[] splitPoints, RawComparator comparator) { + this.splitPoints = splitPoints; + this.comparator = comparator; + } + public int findPartition(K key) { + final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1; + return (pos < 0) ? -pos : pos; + } + } + + /** + * An inner trie node that contains 256 children based on the next + * character. + */ + class InnerTrieNode extends TrieNode { + private TrieNode[] child = new TrieNode[256]; + + InnerTrieNode(int level) { + super(level); + } + public int findPartition(BinaryComparable key) { + int level = getLevel(); + if (key.getLength() <= level) { + return child[0].findPartition(key); + } + return child[0xFF & key.getBytes()[level]].findPartition(key); + } + } + + /** + * @param level the tree depth at this node + * @param splitPoints the full split point vector, which holds + * the split point or points this leaf node + * should contain + * @param lower first INcluded element of splitPoints + * @param upper first EXcluded element of splitPoints + * @return a leaf node. They come in three kinds: no split points + * [and the findParttion returns a canned index], one split + * point [and we compare with a single comparand], or more + * than one [and we do a binary search]. The last case is + * rare. + */ + private TrieNode LeafTrieNodeFactory + (int level, BinaryComparable[] splitPoints, int lower, int upper) { + switch (upper - lower) { + case 0: + return new UnsplitTrieNode(level, lower); + + case 1: + return new SinglySplitTrieNode(level, splitPoints, lower); + + default: + return new LeafTrieNode(level, splitPoints, lower, upper); + } + } + + /** + * A leaf trie node that scans for the key between lower..upper. + * + * We don't generate many of these now, since we usually continue trie-ing + * when more than one split point remains at this level. and we make different + * objects for nodes with 0 or 1 split point. + */ + private class LeafTrieNode extends TrieNode { + final int lower; + final int upper; + final BinaryComparable[] splitPoints; + LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) { + super(level); + this.lower = lower; + this.upper = upper; + this.splitPoints = splitPoints; + } + public int findPartition(BinaryComparable key) { + final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1; + return (pos < 0) ? -pos : pos; + } + } + + private class UnsplitTrieNode extends TrieNode { + final int result; + + UnsplitTrieNode(int level, int value) { + super(level); + this.result = value; + } + + public int findPartition(BinaryComparable key) { + return result; + } + } + + private class SinglySplitTrieNode extends TrieNode { + final int lower; + final BinaryComparable mySplitPoint; + + SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) { + super(level); + this.lower = lower; + this.mySplitPoint = splitPoints[lower]; + } + + public int findPartition(BinaryComparable key) { + return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1); + } + } + + + /** + * Read the cut points from the given IFile. + * @param fs The file system + * @param p The path to read + * @param keyClass The map output key class + * @param job The job config + * @throws IOException + */ + // matching key types enforced by passing in + @SuppressWarnings("unchecked") // map output key class + private K[] readPartitions(FileSystem fs, Path p, Class keyClass, + Configuration conf) throws IOException { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); + ArrayList parts = new ArrayList(); + K key = ReflectionUtils.newInstance(keyClass, conf); + NullWritable value = NullWritable.get(); + while (reader.next(key, value)) { + parts.add(key); + key = ReflectionUtils.newInstance(keyClass, conf); + } + reader.close(); + return parts.toArray((K[])Array.newInstance(keyClass, parts.size())); + } + + /** + * + * This object contains a TrieNodeRef if there is such a thing that + * can be repeated. Two adjacent trie node slots that contain no + * split points can be filled with the same trie node, even if they + * are not on the same level. See buildTreeRec, below. + * + */ + private class CarriedTrieNodeRef + { + TrieNode content; + + CarriedTrieNodeRef() { + content = null; + } + } + + + /** + * Given a sorted set of cut points, build a trie that will find the correct + * partition quickly. + * @param splits the list of cut points + * @param lower the lower bound of partitions 0..numPartitions-1 + * @param upper the upper bound of partitions 0..numPartitions-1 + * @param prefix the prefix that we have already checked against + * @param maxDepth the maximum depth we will build a trie for + * @return the trie node that will divide the splits correctly + */ + private TrieNode buildTrie(BinaryComparable[] splits, int lower, + int upper, byte[] prefix, int maxDepth) { + return buildTrieRec + (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef()); + } + + /** + * This is the core of buildTrie. The interface, and stub, above, just adds + * an empty CarriedTrieNodeRef. + * + * We build trie nodes in depth first order, which is also in key space + * order. Every leaf node is referenced as a slot in a parent internal + * node. If two adjacent slots [in the DFO] hold leaf nodes that have + * no split point, then they are not separated by a split point either, + * because there's no place in key space for that split point to exist. + * + * When that happens, the leaf nodes would be semantically identical, and + * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the + * duration of the tree-walk. ref carries a potentially reusable, unsplit + * leaf node for such reuse until a leaf node with a split arises, which + * breaks the chain until we need to make a new unsplit leaf node. + * + * Note that this use of CarriedTrieNodeRef means that for internal nodes, + * for internal nodes if this code is modified in any way we still need + * to make or fill in the subnodes in key space order. + */ + private TrieNode buildTrieRec(BinaryComparable[] splits, int lower, + int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) { + final int depth = prefix.length; + // We generate leaves for a single split point as well as for + // no split points. + if (depth >= maxDepth || lower >= upper - 1) { + // If we have two consecutive requests for an unsplit trie node, we + // can deliver the same one the second time. + if (lower == upper && ref.content != null) { + return ref.content; + } + TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper); + ref.content = lower == upper ? result : null; + return result; + } + InnerTrieNode result = new InnerTrieNode(depth); + byte[] trial = Arrays.copyOf(prefix, prefix.length + 1); + // append an extra byte on to the prefix + int currentBound = lower; + for(int ch = 0; ch < 0xFF; ++ch) { + trial[depth] = (byte) (ch + 1); + lower = currentBound; + while (currentBound < upper) { + if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) { + break; + } + currentBound += 1; + } + trial[depth] = (byte) ch; + result.child[0xFF & ch] + = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); + } + // pick up the rest + trial[depth] = (byte)0xFF; + result.child[0xFF] + = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); + + return result; + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2413e98..73881f2 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -378,11 +379,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @return True if this region has references. */ boolean hasReferences() { - for (Map.Entry e: this.stores.entrySet()) { - for (Map.Entry ee: - e.getValue().getStorefiles().entrySet()) { + for (Store store : this.stores.values()) { + for (StoreFile sf : store.getStorefiles()) { // Found a reference, return. - if (ee.getValue().isReference()) return true; + if (sf.isReference()) return true; } } return false; @@ -1883,6 +1883,23 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } } + + public void bulkLoadHFile(String hfilePath, byte[] familyName) + throws IOException { + splitsAndClosesLock.readLock().lock(); + try { + Store store = getStore(familyName); + if (store == null) { + throw new DoNotRetryIOException( + "No such column family " + Bytes.toStringBinary(familyName)); + } + store.bulkLoadHFile(hfilePath); + } finally { + splitsAndClosesLock.readLock().unlock(); + } + + } + @Override public boolean equals(Object o) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 78f3223..97e9ed1 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1979,6 +1979,14 @@ public class HRegionServer implements HConstants, HRegionInterface, } } + @Override + public void bulkLoadHFile( + String hfilePath, byte[] regionName, byte[] familyName) + throws IOException { + HRegion region = getRegion(regionName); + region.bulkLoadHFile(hfilePath, familyName); + } + Map rowlocks = new ConcurrentHashMap(); @@ -2428,4 +2436,5 @@ public class HRegionServer implements HConstants, HRegionInterface, HRegionServer.class); doMain(args, regionServerClass); } + } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 515b42f..c9664ea 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -47,11 +48,15 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + import java.io.EOFException; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,6 +64,7 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Set; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -115,13 +121,11 @@ public class Store implements HConstants, HeapSize { private final boolean inMemory; /* - * Sorted Map of readers keyed by maximum edit sequence id (Most recent should - * be last in in list). ConcurrentSkipListMap is lazily consistent so no - * need to lock it down when iterating; iterator view is that of when the - * iterator was taken out. + * List of store files inside this store. This is an immutable list that + * is atomically replaced when its contents change. */ - private final NavigableMap storefiles = - new ConcurrentSkipListMap(); + private ImmutableList storefiles = null; + // All access must be synchronized. private final CopyOnWriteArraySet changedReaderObservers = @@ -222,7 +226,7 @@ public class Store implements HConstants, HeapSize { this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); // loadStoreFiles calculates this.maxSeqId. as side-effect. - this.storefiles.putAll(loadStoreFiles()); + this.storefiles = ImmutableList.copyOf(loadStoreFiles()); this.maxSeqIdBeforeLogRecovery = this.maxSeqId; @@ -395,9 +399,9 @@ public class Store implements HConstants, HeapSize { * Creates a series of StoreFile loaded from the given directory. * @throws IOException */ - private Map loadStoreFiles() + private List loadStoreFiles() throws IOException { - Map results = new HashMap(); + ArrayList results = new ArrayList(); FileStatus files[] = this.fs.listStatus(this.homedir); for (int i = 0; files != null && i < files.length; i++) { // Skip directories. @@ -422,20 +426,15 @@ public class Store implements HConstants, HeapSize { "Verify!", ioe); continue; } - long storeSeqId = curfile.getMaxSequenceId(); - if (storeSeqId > this.maxSeqId) { - this.maxSeqId = storeSeqId; - } long length = curfile.getReader().length(); this.storeSize += length; if (LOG.isDebugEnabled()) { - LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" + - curfile.isReference() + ", sequence id=" + storeSeqId + - ", length=" + length + ", majorCompaction=" + - curfile.isMajorCompaction()); + LOG.debug("loaded " + curfile.toStringDetailed()); } - results.put(Long.valueOf(storeSeqId), curfile); + results.add(curfile); } + maxSeqId = StoreFile.getMaxSequenceIdInList(results); + Collections.sort(results, StoreFile.Comparators.FLUSH_TIME); return results; } @@ -472,10 +471,77 @@ public class Store implements HConstants, HeapSize { /** * @return All store files. */ - NavigableMap getStorefiles() { + List getStorefiles() { return this.storefiles; } + public void bulkLoadHFile(String srcPathStr) throws IOException { + Path srcPath = new Path(srcPathStr); + + HFile.Reader reader = null; + try { + LOG.info("Validating hfile at " + srcPath + " for inclusion in " + + "store " + this + " region " + this.region); + reader = new HFile.Reader(srcPath.getFileSystem(conf), + srcPath, null, false); + reader.loadFileInfo(); + + byte[] firstKey = reader.getFirstRowKey(); + byte[] lastKey = reader.getLastRowKey(); + + LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) + + " last=" + Bytes.toStringBinary(lastKey)); + LOG.debug("Region bounds: first=" + + Bytes.toStringBinary(region.getStartKey()) + + " last=" + Bytes.toStringBinary(region.getEndKey())); + + HRegionInfo hri = region.getRegionInfo(); + if (!hri.containsRange(firstKey, lastKey)) { + throw new WrongRegionException( + "Bulk load file " + srcPathStr + " does not fit inside region " + + this.region); + } + } finally { + if (reader != null) reader.close(); + } + + // Move the file if it's on another filesystem + FileSystem srcFs = srcPath.getFileSystem(conf); + if (!srcFs.equals(fs)) { + LOG.info("File " + srcPath + " on different filesystem than " + + "destination store - moving to this filesystem."); + Path tmpDir = new Path(homedir, "_tmp"); + Path tmpPath = StoreFile.getRandomFilename(fs, tmpDir); + FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); + LOG.info("Copied to temporary path on dst filesystem: " + tmpPath); + srcPath = tmpPath; + } + + Path dstPath = StoreFile.getRandomFilename(fs, homedir); + LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath); + StoreFile.rename(fs, srcPath, dstPath); + + StoreFile sf = new StoreFile(fs, dstPath, blockcache, + this.conf, this.family.getBloomFilterType(), this.inMemory); + sf.createReader(); + + LOG.info("Moved hfile " + srcPath + " into store directory " + + homedir + " - updating store file list."); + + // Append the new storefile into the list + this.lock.writeLock().lock(); + try { + ArrayList newFiles = new ArrayList(storefiles); + newFiles.add(sf); + this.storefiles = ImmutableList.copyOf(newFiles); + notifyChangedReadersObservers(); + } finally { + this.lock.writeLock().unlock(); + } + LOG.info("Successfully loaded store file " + srcPath + + " into store " + this + " (new location: " + dstPath + ")"); + } + /** * Close all the readers * @@ -484,13 +550,14 @@ public class Store implements HConstants, HeapSize { * * @throws IOException */ - List close() throws IOException { + ImmutableList close() throws IOException { this.lock.writeLock().lock(); try { - ArrayList result = - new ArrayList(storefiles.values()); + ImmutableList result = storefiles; + // Clear so metrics doesn't find them. - this.storefiles.clear(); + storefiles = ImmutableList.of(); + for (StoreFile f: result) { f.closeReader(); } @@ -591,19 +658,19 @@ public class Store implements HConstants, HeapSize { /* * Change storefiles adding into place the Reader produced by this new flush. - * @param logCacheFlushId * @param sf * @param set That was used to make the passed file p. * @throws IOException * @return Whether compaction is required. */ - private boolean updateStorefiles(final long logCacheFlushId, - final StoreFile sf, final SortedSet set) + private boolean updateStorefiles(final StoreFile sf, + final SortedSet set) throws IOException { this.lock.writeLock().lock(); try { - this.storefiles.put(Long.valueOf(logCacheFlushId), sf); - + ArrayList newList = new ArrayList(storefiles); + newList.add(sf); + storefiles = ImmutableList.copyOf(newList); this.memstore.clearSnapshot(set); // Tell listeners of the change in readers. @@ -670,15 +737,14 @@ public class Store implements HConstants, HeapSize { boolean majorcompaction = mc; synchronized (compactLock) { // filesToCompact are sorted oldest to newest. - List filesToCompact = - new ArrayList(this.storefiles.values()); + List filesToCompact = this.storefiles; if (filesToCompact.isEmpty()) { LOG.debug(this.storeNameStr + ": no store files to compact"); return null; } // Max-sequenceID is the last key of the storefiles TreeMap - long maxId = this.storefiles.lastKey().longValue(); + long maxId = StoreFile.getMaxSequenceIdInList(storefiles); // Check to see if we need to do a major compaction on this region. // If so, change doMajorCompaction to true to skip the incremental @@ -819,10 +885,7 @@ public class Store implements HConstants, HeapSize { * @return True if we should run a major compaction. */ boolean isMajorCompaction() throws IOException { - List filesToCompact = null; - // filesToCompact are sorted oldest to newest. - filesToCompact = new ArrayList(this.storefiles.values()); - return isMajorCompaction(filesToCompact); + return isMajorCompaction(storefiles); } /* @@ -990,16 +1053,18 @@ public class Store implements HConstants, HeapSize { // delete old store files until we have sent out notification of // change in case old files are still being accessed by outstanding // scanners. - for (Map.Entry e: this.storefiles.entrySet()) { - if (compactedFiles.contains(e.getValue())) { - this.storefiles.remove(e.getKey()); + ArrayList newStoreFiles = new ArrayList(); + for (StoreFile sf : storefiles) { + if (!compactedFiles.contains(sf)) { + newStoreFiles.add(sf); } } + // If a StoreFile result, move it into place. May be null. if (result != null) { - Long orderVal = Long.valueOf(result.getMaxSequenceId()); - this.storefiles.put(orderVal, result); + newStoreFiles.add(result); } + this.storefiles = ImmutableList.copyOf(newStoreFiles); // WARN ugly hack here, but necessary sadly. // TODO why is this necessary? need a comment here if it's unintuitive! @@ -1020,7 +1085,7 @@ public class Store implements HConstants, HeapSize { } // 4. Compute new store size this.storeSize = 0L; - for (StoreFile hsf : this.storefiles.values()) { + for (StoreFile hsf : this.storefiles) { Reader r = hsf.getReader(); if (r == null) { LOG.warn("StoreFile " + hsf + " has a null Reader"); @@ -1094,10 +1159,9 @@ public class Store implements HConstants, HeapSize { this.memstore.getRowKeyAtOrBefore(state); // Check if match, if we got a candidate on the asked for 'kv' row. // Process each store file. Run through from newest to oldest. - Map m = this.storefiles.descendingMap(); - for (Map.Entry e : m.entrySet()) { + for (StoreFile sf : Iterables.reverse(storefiles)) { // Update the candidate keys from the current map file - rowAtOrBeforeFromStoreFile(e.getValue(), state); + rowAtOrBeforeFromStoreFile(sf, state); } return state.getCandidate(); } finally { @@ -1232,9 +1296,8 @@ public class Store implements HConstants, HeapSize { // Not splitable if we find a reference store file present in the store. boolean splitable = true; long maxSize = 0L; - Long mapIndex = Long.valueOf(0L); - for (Map.Entry e: storefiles.entrySet()) { - StoreFile sf = e.getValue(); + StoreFile largestSf = null; + for (StoreFile sf : storefiles) { if (splitable) { splitable = !sf.isReference(); if (!splitable) { @@ -1254,13 +1317,12 @@ public class Store implements HConstants, HeapSize { if (size > maxSize) { // This is the largest one so far maxSize = size; - mapIndex = e.getKey(); + largestSf = sf; } } - StoreFile sf = this.storefiles.get(mapIndex); - HFile.Reader r = sf.getReader(); + HFile.Reader r = largestSf.getReader(); if (r == null) { - LOG.warn("Storefile " + sf + " Reader is null"); + LOG.warn("Storefile " + largestSf + " Reader is null"); return null; } // Get first, last, and mid keys. Midkey is the key that starts block @@ -1333,7 +1395,7 @@ public class Store implements HConstants, HeapSize { */ long getStorefilesSize() { long size = 0; - for (StoreFile s: storefiles.values()) { + for (StoreFile s: storefiles) { Reader r = s.getReader(); if (r == null) { LOG.warn("StoreFile " + s + " has a null Reader"); @@ -1349,7 +1411,7 @@ public class Store implements HConstants, HeapSize { */ long getStorefilesIndexSize() { long size = 0; - for (StoreFile s: storefiles.values()) { + for (StoreFile s: storefiles) { Reader r = s.getReader(); if (r == null) { LOG.warn("StoreFile " + s + " has a null Reader"); @@ -1449,7 +1511,7 @@ public class Store implements HConstants, HeapSize { // Get storefiles for this store List storefileScanners = new ArrayList(); - for (StoreFile sf : this.storefiles.descendingMap().values()) { + for (StoreFile sf : Iterables.reverse(this.storefiles)) { HFile.Reader r = sf.getReader(); if (r == null) { LOG.warn("StoreFile " + sf + " has a null Reader"); @@ -1565,7 +1627,7 @@ public class Store implements HConstants, HeapSize { } // Add new file to store files. Clear snapshot too while we have // the Store write lock. - return Store.this.updateStorefiles(cacheFlushId, storeFile, snapshot); + return Store.this.updateStorefiles(storeFile, snapshot); } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 80bf09a..95a0b24 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -42,14 +41,20 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; + import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.nio.ByteBuffer; -import java.text.DecimalFormat; import java.text.NumberFormat; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.Random; @@ -99,15 +104,28 @@ public class StoreFile implements HConstants { /** Constant for major compaction meta */ public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); + // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; + /** Meta key set when store file is a result of a bulk load */ + public static final byte[] BULKLOAD_TASK_KEY = + Bytes.toBytes("BULKLOAD_SOURCE_TASK"); + public static final byte[] BULKLOAD_TIME_KEY = + Bytes.toBytes("BULKLOAD_TIMESTAMP"); + + static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META"; static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA"; static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); + /** + * Map of the metadata entries in the corresponding HFile + */ + private Map metadataMap; + /* * Regex that will work for straight filenames and for reference names. * If reference, then the regex has more than just one group. Group 1 is @@ -123,6 +141,7 @@ public class StoreFile implements HConstants { private final Configuration conf; private final BloomType bloomType; + /** * Constructor, loads a reader and it's indices, etc. May allocate a * substantial amount of ram depending on the underlying files (10-20MB?). @@ -183,7 +202,8 @@ public class StoreFile implements HConstants { * @return True if the path has format of a HStoreFile reference. */ public static boolean isReference(final Path p) { - return isReference(p, REF_NAME_PARSER.matcher(p.getName())); + return !p.getName().startsWith("_") && + isReference(p, REF_NAME_PARSER.matcher(p.getName())); } /** @@ -244,6 +264,38 @@ public class StoreFile implements HConstants { } return this.sequenceid; } + + /** + * Return the highest sequence ID found across all storefiles in + * the given list. Store files that were created by a mapreduce + * bulk load are ignored, as they do not correspond to any edit + * log items. + * @return 0 if no non-bulk-load files are provided + */ + public static long getMaxSequenceIdInList(List sfs) { + long max = 0; + for (StoreFile sf : sfs) { + if (!sf.isBulkLoadResult()) { + max = Math.max(max, sf.getMaxSequenceId()); + } + } + return max; + } + + /** + * @return true if this storefile was created by HFileOutputFormat + * for a bulk load. + */ + boolean isBulkLoadResult() { + return metadataMap.containsKey(BULKLOAD_TIME_KEY); + } + + /** + * Return the timestamp at which this bulk load file was generated. + */ + public long getBulkLoadTimestamp() { + return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY)); + } /** * Returns the block cache or null in case none should be used. @@ -297,9 +349,9 @@ public class StoreFile implements HConstants { this.inMemory); } // Load up indices and fileinfo. - Map map = this.reader.loadFileInfo(); + metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); // Read in our metadata. - byte [] b = map.get(MAX_SEQ_ID_KEY); + byte [] b = metadataMap.get(MAX_SEQ_ID_KEY); if (b != null) { // By convention, if halfhfile, top half has a sequence number > bottom // half. Thats why we add one in below. Its done for case the two halves @@ -314,7 +366,7 @@ public class StoreFile implements HConstants { } } - b = map.get(MAJOR_COMPACTION_KEY); + b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { boolean mc = Bytes.toBoolean(b); if (this.majorCompaction == null) { @@ -371,10 +423,28 @@ public class StoreFile implements HConstants { } @Override - public String toString() { + public String toString() { return this.path.toString() + (isReference()? "-" + this.referencePath + "-" + reference.toString(): ""); } + + /** + * @return a length description of this StoreFile, suitable for debug output + */ + public String toStringDetailed() { + StringBuilder sb = new StringBuilder(); + sb.append(this.path.toString()); + sb.append(", isReference=").append(isReference()); + sb.append(", isBulkLoadResult=").append(isBulkLoadResult()); + if (isBulkLoadResult()) { + sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp()); + } else { + sb.append(", seqid=").append(getMaxSequenceId()); + } + sb.append(", majorCompaction=").append(isMajorCompaction()); + + return sb.toString(); + } /** * Utility to help with rename. @@ -813,4 +883,44 @@ public class StoreFile implements HConstants { } } + + /** + * Useful comparators for comparing StoreFiles. + */ + abstract static class Comparators { + /** + * Comparator that compares based on the flush time of + * the StoreFiles. All bulk loads are placed before all non- + * bulk loads, and then all files are sorted by sequence ID. + * If there are ties, the path name is used as a tie-breaker. + */ + static final Comparator FLUSH_TIME = + Ordering.compound(ImmutableList.of( + Ordering.natural().onResultOf(new GetBulkTime()), + Ordering.natural().onResultOf(new GetSeqId()), + Ordering.natural().onResultOf(new GetPathName()) + )); + + private static class GetBulkTime implements Function { + @Override + public Long apply(StoreFile sf) { + if (!sf.isBulkLoadResult()) return Long.MAX_VALUE; + return sf.getBulkLoadTimestamp(); + } + } + private static class GetSeqId implements Function { + @Override + public Long apply(StoreFile sf) { + if (sf.isBulkLoadResult()) return -1L; + return sf.getMaxSequenceId(); + } + } + private static class GetPathName implements Function { + @Override + public String apply(StoreFile sf) { + return sf.getPath().getName(); + } + } + + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index c8941f1..1dd85e7 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.client.Scan; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.NavigableSet; /** @@ -126,9 +125,12 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb */ private List getScanners() throws IOException { // First the store file scanners - Map map = this.store.getStorefiles().descendingMap(); + + // TODO this used to get the store files in descending order, + // but now we get them in ascending order, which I think is + // actually more correct, since memstore get put at the end. List sfScanners = StoreFileScanner - .getScannersForStoreFiles(map.values(), cacheBlocks, isGet); + .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet); List scanners = new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); @@ -143,9 +145,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb private List getScanners(Scan scan, final NavigableSet columns) throws IOException { // First the store file scanners - Map map = this.store.getStorefiles().descendingMap(); List sfScanners = StoreFileScanner - .getScannersForStoreFiles(map.values(), cacheBlocks, isGet); + .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet); List scanners = new ArrayList(sfScanners.size()+1); diff --git src/main/java/org/apache/hadoop/hbase/util/Bytes.java src/main/java/org/apache/hadoop/hbase/util/Bytes.java index a53dafe..bed859f 100644 --- src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -34,6 +34,7 @@ import java.io.UnsupportedEncodingException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Comparator; +import java.util.Iterator; /** * Utility class that handles byte arrays, conversions to/from other types, @@ -890,6 +891,16 @@ public class Bytes { } /** + * Return true if the byte array on the right is a prefix of the byte + * array on the left. + */ + public static boolean startsWith(byte[] bytes, byte[] prefix) { + return bytes != null && prefix != null && + bytes.length >= prefix.length && + compareTo(bytes, 0, prefix.length, prefix, 0, prefix.length) == 0; + } + + /** * @param b bytes to hash * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the * passed in array. This method is what {@link org.apache.hadoop.io.Text} and @@ -1016,6 +1027,22 @@ public class Bytes { * @return Array of dividing values */ public static byte [][] split(final byte [] a, final byte [] b, final int num) { + byte[][] ret = new byte[num+2][]; + int i = 0; + Iterable iter = iterateOnSplits(a, b, num); + if (iter == null) return null; + for (byte[] elem : iter) { + ret[i++] = elem; + } + return ret; + } + + /** + * Iterate over keys within the passed inclusive range. + */ + public static Iterable iterateOnSplits( + final byte[] a, final byte[]b, final int num) + { byte [] aPadded; byte [] bPadded; if (a.length < b.length) { @@ -1035,14 +1062,14 @@ public class Bytes { throw new IllegalArgumentException("num cannot be < 0"); } byte [] prependHeader = {1, 0}; - BigInteger startBI = new BigInteger(add(prependHeader, aPadded)); - BigInteger stopBI = new BigInteger(add(prependHeader, bPadded)); - BigInteger diffBI = stopBI.subtract(startBI); - BigInteger splitsBI = BigInteger.valueOf(num + 1); + final BigInteger startBI = new BigInteger(add(prependHeader, aPadded)); + final BigInteger stopBI = new BigInteger(add(prependHeader, bPadded)); + final BigInteger diffBI = stopBI.subtract(startBI); + final BigInteger splitsBI = BigInteger.valueOf(num + 1); if(diffBI.compareTo(splitsBI) < 0) { return null; } - BigInteger intervalBI; + final BigInteger intervalBI; try { intervalBI = diffBI.divide(splitsBI); } catch(Exception e) { @@ -1050,20 +1077,42 @@ public class Bytes { return null; } - byte [][] result = new byte[num+2][]; - result[0] = a; + final Iterator iterator = new Iterator() { + private int i = -1; + + @Override + public boolean hasNext() { + return i < num+1; + } - for (int i = 1; i <= num; i++) { - BigInteger curBI = startBI.add(intervalBI.multiply(BigInteger.valueOf(i))); - byte [] padded = curBI.toByteArray(); - if (padded[1] == 0) - padded = tail(padded, padded.length - 2); - else - padded = tail(padded, padded.length - 1); - result[i] = padded; - } - result[num+1] = b; - return result; + @Override + public byte[] next() { + i++; + if (i == 0) return a; + if (i == num + 1) return b; + + BigInteger curBI = startBI.add(intervalBI.multiply(BigInteger.valueOf(i))); + byte [] padded = curBI.toByteArray(); + if (padded[1] == 0) + padded = tail(padded, padded.length - 2); + else + padded = tail(padded, padded.length - 1); + return padded; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + + return new Iterable() { + @Override + public Iterator iterator() { + return iterator; + } + }; } /** diff --git src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index ed8709f..f9f0936 100644 --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase; import java.io.File; import java.io.IOException; +import java.security.MessageDigest; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -47,8 +49,10 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.zookeeper.ZooKeeper; +import static org.junit.Assert.*; /** * Facility for testing HBase. Added as tool to abet junit4 testing. Replaces @@ -76,7 +80,7 @@ public class HBaseTestingUtility { public static final String TEST_DIRECTORY_KEY = "test.build.data"; /** - * Default parent directory for test output. + * Default parent direccounttory for test output. */ public static final String DEFAULT_TEST_DIRECTORY = "target/build/data"; @@ -484,6 +488,19 @@ public class HBaseTestingUtility { return count; } + /** + * Return an md5 digest of the entire contents of a table. + */ + public String checksumRows(final HTable table) throws Exception { + Scan scan = new Scan(); + ResultScanner results = table.getScanner(scan); + MessageDigest digest = MessageDigest.getInstance("MD5"); + for (Result res : results) { + digest.update(res.getRow()); + } + results.close(); + return digest.toString(); + } /** * Creates many regions names "aaa" to "zzz". @@ -520,7 +537,13 @@ public class HBaseTestingUtility { Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; - + return createMultiRegions(c, table, columnFamily, KEYS); + } + + public int createMultiRegions(final Configuration c, final HTable table, + final byte[] columnFamily, byte [][] startKeys) + throws IOException { + Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); HTable meta = new HTable(c, HConstants.META_TABLE_NAME); HTableDescriptor htd = table.getTableDescriptor(); if(!htd.hasFamily(columnFamily)) { @@ -531,13 +554,13 @@ public class HBaseTestingUtility { // setup already has the ",,123456789" row with an empty start // and end key. Adding the custom regions below adds those blindly, // including the new start region from empty to "bbb". lg - List rows = getMetaTableRows(); + List rows = getMetaTableRows(htd.getName()); // add custom ones int count = 0; - for (int i = 0; i < KEYS.length; i++) { - int j = (i + 1) % KEYS.length; + for (int i = 0; i < startKeys.length; i++) { + int j = (i + 1) % startKeys.length; HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(), - KEYS[i], KEYS[j]); + startKeys[i], startKeys[j]); Put put = new Put(hri.getRegionName()); put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(hri)); @@ -574,6 +597,29 @@ public class HBaseTestingUtility { s.close(); return rows; } + + /** + * Returns all rows from the .META. table for a given user table + * + * @throws IOException When reading the rows fails. + */ + public List getMetaTableRows(byte[] tableName) throws IOException { + HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME); + List rows = new ArrayList(); + ResultScanner s = t.getScanner(new Scan()); + for (Result result : s) { + HRegionInfo info = Writables.getHRegionInfo( + result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + HTableDescriptor desc = info.getTableDesc(); + if (Bytes.compareTo(desc.getName(), tableName) == 0) { + LOG.info("getMetaTableRows: row -> " + + Bytes.toStringBinary(result.getRow())); + rows.add(result.getRow()); + } + } + s.close(); + return rows; + } /** * Starts a MiniMRCluster with a default number of @@ -600,6 +646,8 @@ public class HBaseTestingUtility { mrCluster = new MiniMRCluster(servers, FileSystem.get(c).getUri().toString(), 1); LOG.info("Mini mapreduce cluster started"); + c.set("mapred.job.tracker", + mrCluster.createJobConf().get("mapred.job.tracker")); } /** @@ -610,6 +658,8 @@ public class HBaseTestingUtility { if (mrCluster != null) { mrCluster.shutdown(); } + // Restore configuration to point to local jobtracker + conf.set("mapred.job.tracker", "local"); LOG.info("Mini mapreduce cluster stopped"); } @@ -746,4 +796,19 @@ public class HBaseTestingUtility { public FileSystem getTestFileSystem() throws IOException { return FileSystem.get(conf); } + + public void cleanupTestDir() throws IOException { + getTestDir().getFileSystem(conf).delete(getTestDir(), true); + } + + public void waitTableAvailable(byte[] table, long timeoutMillis) + throws InterruptedException, IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + long startWait = System.currentTimeMillis(); + while (!admin.isTableAvailable(table)) { + assertTrue("Timed out waiting for table " + Bytes.toStringBinary(table), + System.currentTimeMillis() - startWait < timeoutMillis); + Thread.sleep(500); + } + } } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java new file mode 100644 index 0000000..563ee57 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java @@ -0,0 +1,127 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Input format that creates as many map tasks as configured in + * mapred.map.tasks, each provided with a single row of + * NullWritables. This can be useful when trying to write mappers + * which don't have any real input (eg when the mapper is simply + * producing random data as output) + */ +public class NMapInputFormat extends InputFormat { + + @Override + public RecordReader createRecordReader( + InputSplit split, + TaskAttemptContext tac) throws IOException, InterruptedException { + return new SingleRecordReader( + NullWritable.get(), NullWritable.get()); + } + + @Override + public List getSplits(JobContext context) throws IOException, + InterruptedException { + int count = context.getConfiguration().getInt("mapred.map.tasks", 1); + List splits = new ArrayList(count); + for (int i = 0; i < count; i++) { + splits.add(new NullInputSplit()); + } + return splits; + } + + private static class NullInputSplit extends InputSplit implements Writable { + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[] {}; + } + + @Override + public void readFields(DataInput in) throws IOException { + } + + @Override + public void write(DataOutput out) throws IOException { + } + } + + private static class SingleRecordReader + extends RecordReader { + + private final K key; + private final V value; + boolean providedKey = false; + + SingleRecordReader(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public void close() { + } + + @Override + public K getCurrentKey() { + return key; + } + + @Override + public V getCurrentValue(){ + return value; + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext tac) { + } + + @Override + public boolean nextKeyValue() { + if (providedKey) return false; + providedKey = true; + return true; + } + + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index d04ced2..0cf7b6c 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -19,28 +19,35 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.DataInput; -import java.io.DataOutput; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; /** * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}. @@ -49,144 +56,257 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; * emits keys and values like those of {@link PerformanceEvaluation}. Makes * as many splits as "mapred.map.tasks" maps. */ -public class TestHFileOutputFormat extends HBaseTestCase { +public class TestHFileOutputFormat { private final static int ROWSPERSPLIT = 1024; - /* - * InputFormat that makes keys and values like those used in - * PerformanceEvaluation. Makes as many splits as there are configured - * maps ("mapred.map.tasks"). + private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME; + private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + + private static Log LOG = LogFactory.getLog(TestHFileOutputFormat.class); + + /** + * Simple mapper that makes KeyValue output. */ - static class PEInputFormat extends InputFormat { - /* Split that holds nothing but split index. - */ - static class PEInputSplit extends InputSplit implements Writable { - private int index = -1; - - PEInputSplit() { - super(); - } - - PEInputSplit(final int i) { - this.index = i; - } + static class RandomKVGeneratingMapper + extends Mapper { + + private int keyLength; + private static final int KEYLEN_DEFAULT=10; + private static final String KEYLEN_CONF="randomkv.key.length"; - int getIndex() { - return this.index; - } - - public long getLength() throws IOException, InterruptedException { - return ROWSPERSPLIT; - } - - public String [] getLocations() throws IOException, InterruptedException { - return new String [] {}; - } - - public void readFields(DataInput in) throws IOException { - this.index = in.readInt(); - } - - public void write(DataOutput out) throws IOException { - out.writeInt(this.index); - } - } - - public RecordReader createRecordReader( - InputSplit split, TaskAttemptContext context) throws IOException, + private int valLength; + private static final int VALLEN_DEFAULT=10; + private static final String VALLEN_CONF="randomkv.val.length"; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { - final int startrow = ((PEInputSplit)split).getIndex() * ROWSPERSPLIT; - return new RecordReader() { - // Starts at a particular row - private int counter = startrow; - private ImmutableBytesWritable key; - private ImmutableBytesWritable value; - private final Random random = new Random(System.currentTimeMillis()); - - public void close() throws IOException { - // Nothing to do. - } - - public ImmutableBytesWritable getCurrentKey() - throws IOException, InterruptedException { - return this.key; - } - - public ImmutableBytesWritable getCurrentValue() - throws IOException, InterruptedException { - return this.value; - } - - public float getProgress() throws IOException, InterruptedException { - return ((float)(ROWSPERSPLIT - this.counter) / (float)this.counter); - } + super.setup(context); + + Configuration conf = context.getConfiguration(); + keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); + valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); + } - public void initialize(InputSplit arg0, TaskAttemptContext arg1) - throws IOException, InterruptedException { - // Nothing to do. + protected void map( + NullWritable n1, NullWritable n2, + Mapper.Context context) + throws java.io.IOException ,InterruptedException + { - } + byte keyBytes[] = new byte[keyLength]; + byte valBytes[] = new byte[valLength]; + + Random random = new Random(System.currentTimeMillis()); + for (int i = 0; i < ROWSPERSPLIT; i++) { - public boolean nextKeyValue() throws IOException, InterruptedException { - if (this.counter - startrow > ROWSPERSPLIT) return false; - this.counter++; - this.key = new ImmutableBytesWritable(PerformanceEvaluation.format(this.counter)); - this.value = new ImmutableBytesWritable(PerformanceEvaluation.generateValue(this.random)); - return true; - } - }; - } + random.nextBytes(keyBytes); + random.nextBytes(valBytes); + ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - public List getSplits(JobContext context) - throws IOException, InterruptedException { - int count = context.getConfiguration().getInt("mapred.map.tasks", 1); - List splits = new ArrayList(count); - for (int i = 0; i < count; i++) { - splits.add(new PEInputSplit(i)); + KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME, + PerformanceEvaluation.QUALIFIER_NAME, valBytes); + context.write(key, kv); } - return splits; } } - /** - * Simple mapper that makes KeyValue output. - */ - static class PEtoKVMapper extends Mapper { - protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, - org.apache.hadoop.mapreduce.Mapper.Context context) - throws java.io.IOException ,InterruptedException { - context.write(key, new KeyValue(key.get(), PerformanceEvaluation.FAMILY_NAME, - PerformanceEvaluation.QUALIFIER_NAME, value.get())); - } + @Before + public void cleanupDir() throws IOException { + util.cleanupTestDir(); + } + + + private void setupRandomGeneratorMapper(Job job) { + job.setInputFormatClass(NMapInputFormat.class); + job.setMapperClass(RandomKVGeneratingMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); } /** * Run small MR job. */ + @Test public void testWritingPEData() throws Exception { + Configuration conf = util.getConfiguration(); + Path testDir = HBaseTestingUtility.getTestDir("testWritingPEData"); + FileSystem fs = testDir.getFileSystem(conf); + // Set down this value or we OOME in eclipse. - this.conf.setInt("io.sort.mb", 20); + conf.setInt("io.sort.mb", 20); // Write a few files. - this.conf.setLong("hbase.hregion.max.filesize", 64 * 1024); - Job job = new Job(this.conf, getName()); - job.setInputFormatClass(TestHFileOutputFormat.PEInputFormat.class); - job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + conf.setLong("hbase.hregion.max.filesize", 64 * 1024); + + Job job = new Job(conf, "testWritingPEData"); + setupRandomGeneratorMapper(job); // This partitioner doesn't work well for number keys but using it anyways // just to demonstrate how to configure it. + byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; + byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; + + Arrays.fill(startKey, (byte)0); + Arrays.fill(endKey, (byte)0xff); + job.setPartitionerClass(SimpleTotalOrderPartitioner.class); // Set start and end rows for partitioner. - job.getConfiguration().set(SimpleTotalOrderPartitioner.START, - Bytes.toString(PerformanceEvaluation.format(0))); - int rows = this.conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT; - job.getConfiguration().set(SimpleTotalOrderPartitioner.END, - Bytes.toString(PerformanceEvaluation.format(rows))); + SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); + SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); job.setReducerClass(KeyValueSortReducer.class); job.setOutputFormatClass(HFileOutputFormat.class); - FileOutputFormat.setOutputPath(job, this.testDir); + job.setNumReduceTasks(4); + + FileOutputFormat.setOutputPath(job, testDir); assertTrue(job.waitForCompletion(false)); - FileStatus [] files = this.fs.listStatus(this.testDir); + FileStatus [] files = fs.listStatus(testDir); assertTrue(files.length > 0); } -} \ No newline at end of file + + @Test + public void testJobConfiguration() throws Exception { + Job job = new Job(); + HTable table = Mockito.mock(HTable.class); + byte[][] mockKeys = new byte[][] { + HConstants.EMPTY_BYTE_ARRAY, + Bytes.toBytes("aaa"), + Bytes.toBytes("ggg"), + Bytes.toBytes("zzz") + }; + Mockito.doReturn(mockKeys).when(table).getStartKeys(); + + HFileOutputFormat.configureIncrementalLoad(job, table); + assertEquals(job.getNumReduceTasks(), 4); + } + + private byte [][] generateRandomStartKeys(int numKeys) { + Random random = new Random(); + byte[][] ret = new byte[numKeys][]; + // first region start key is always empty + ret[0] = HConstants.EMPTY_BYTE_ARRAY; + for (int i = 1; i < numKeys; i++) { + ret[i] = PerformanceEvaluation.generateValue(random); + } + return ret; + } + + @Test + public void testMRIncrementalLoad() throws Exception { + doIncrementalLoadTest(false); + } + + @Test + public void testMRIncrementalLoadWithSplit() throws Exception { + doIncrementalLoadTest(true); + } + + private void doIncrementalLoadTest( + boolean shouldChangeRegions) throws Exception { + Configuration conf = util.getConfiguration(); + Path testDir = HBaseTestingUtility.getTestDir("testLocalMRIncrementalLoad"); + byte[][] startKeys = generateRandomStartKeys(5); + + try { + util.startMiniCluster(); + HBaseAdmin admin = new HBaseAdmin(conf); + HTable table = util.createTable(TABLE_NAME, FAMILY_NAME); + int numRegions = util.createMultiRegions( + util.getConfiguration(), table, FAMILY_NAME, + startKeys); + assertEquals("Should make 5 regions", + numRegions, 5); + assertEquals("Should start with empty table", + 0, util.countRows(table)); + + // Generate the bulk load files + util.startMiniMapReduceCluster(); + runIncrementalPELoad(conf, table, testDir); + // This doesn't write into the table, just makes files + assertEquals("HFOF should not touch actual table", + 0, util.countRows(table)); + + if (shouldChangeRegions) { + LOG.info("Changing regions in table"); + admin.disableTable(table.getTableName()); + byte[][] newStartKeys = generateRandomStartKeys(15); + util.createMultiRegions(util.getConfiguration(), + table, FAMILY_NAME, newStartKeys); + admin.enableTable(table.getTableName()); + while (table.getRegionsInfo().size() != 15 || + !admin.isTableAvailable(table.getTableName())) { + Thread.sleep(1000); + LOG.info("Waiting for new region assignment to happen"); + } + } + + // Perform the actual load + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + + // Ensure data shows up + int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT; + assertEquals("LoadIncrementalHFiles should put expected data in table", + expectedRows, util.countRows(table)); + String tableDigestBefore = util.checksumRows(table); + + // Cause regions to reopen + admin.disableTable(TABLE_NAME); + while (table.getRegionsInfo().size() != 0) { + Thread.sleep(1000); + LOG.info("Waiting for table to disable"); + } + admin.enableTable(TABLE_NAME); + util.waitTableAvailable(TABLE_NAME, 30000); + + assertEquals("Data should remain after reopening of regions", + tableDigestBefore, util.checksumRows(table)); + } finally { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + } + + + + private void runIncrementalPELoad( + Configuration conf, HTable table, Path outDir) + throws Exception { + Job job = new Job(conf, "testLocalMRIncrementalLoad"); + setupRandomGeneratorMapper(job); + HFileOutputFormat.configureIncrementalLoad(job, table); + FileOutputFormat.setOutputPath(job, outDir); + + assertEquals(table.getRegionsInfo().size(), + job.getNumReduceTasks()); + + assertTrue(job.waitForCompletion(true)); + } + + public static void main(String args[]) throws Exception { + new TestHFileOutputFormat().manualTest(args); + } + + public void manualTest(String args[]) throws Exception { + Configuration conf = HBaseConfiguration.create(); + util = new HBaseTestingUtility(conf); + if ("newtable".equals(args[0])) { + byte[] tname = args[1].getBytes(); + HTable table = util.createTable(tname, FAMILY_NAME); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.disableTable(tname); + util.createMultiRegions(conf, table, FAMILY_NAME, + generateRandomStartKeys(5)); + admin.enableTable(tname); + } else if ("incremental".equals(args[0])) { + byte[] tname = args[1].getBytes(); + HTable table = new HTable(conf, tname); + Path outDir = new Path("incremental-out"); + runIncrementalPELoad(conf, table, outDir); + } else { + throw new RuntimeException( + "usage: TestHFileOutputFormat newtable | incremental"); + } + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java new file mode 100644 index 0000000..612b00a --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -0,0 +1,71 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.util.ArrayList; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + +import static org.junit.Assert.*; + +public class TestImportTsv { + @Test + public void testTsvParser() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d"); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); + assertNull(parser.getFamily(2)); + assertNull(parser.getQualifier(2)); + + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); + ParsedLine parsed = parser.parse(line, line.length); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + assertEquals(2, parser.getRowKeyColumnIndex()); + } + + private void checkParsing(ParsedLine parsed, Iterable expected) { + ArrayList parsedCols = new ArrayList(); + for (int i = 0; i < parsed.getColumnCount(); i++) { + parsedCols.add(Bytes.toString( + parsed.getLineBytes(), + parsed.getColumnOffset(i), + parsed.getColumnLength(i))); + } + if (!Iterables.elementsEqual(parsedCols, expected)) { + fail("Expected: " + Joiner.on(",").join(expected) + "\n" + + "Got:" + Joiner.on(",").join(parsedCols)); + } + } + + private void assertBytesEquals(byte[] a, byte[] b) { + assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b)); + } +} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java new file mode 100644 index 0000000..d5374d2 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -0,0 +1,188 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * Test cases for the "load" half of the HFileOutputFormat bulk load + * functionality. These tests run faster than the full MR cluster + * tests in TestHFileOutputFormat + */ +public class TestLoadIncrementalHFiles { + + private static final byte[] TABLE = Bytes.toBytes("mytable"); + private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); + private static final byte[] FAMILY = Bytes.toBytes("myfam"); + + private static final byte[][] SPLIT_KEYS = new byte[][] { + Bytes.toBytes("ddd"), + Bytes.toBytes("ppp") + }; + + public static int BLOCKSIZE = 64*1024; + public static String COMPRESSION = + Compression.Algorithm.NONE.getName(); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Test case that creates some regions and loads + * HFiles that fit snugly inside those regions + */ + @Test + public void testSimpleLoad() throws Exception { + runTest("testSimpleLoad", + new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }); + } + + /** + * Test case that creates some regions and loads + * HFiles that cross the boundaries of those regions + */ + @Test + public void testRegionCrossingLoad() throws Exception { + runTest("testRegionCrossingLoad", + new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, + new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, + }); + } + + private void runTest(String testName, byte[][][] hfileRanges) + throws Exception { + Path dir = HBaseTestingUtility.getTestDir(testName); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + createHFile(fs, new Path(familyDir, "hfile_" + hfileIdx++), + FAMILY, QUALIFIER, from, to, 1000); + } + int expectedRows = hfileIdx * 1000; + + + util.startMiniCluster(); + try { + HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); + HTableDescriptor htd = new HTableDescriptor(TABLE); + htd.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(htd, SPLIT_KEYS); + + HTable table = new HTable(TABLE); + util.waitTableAvailable(TABLE, 30000); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles( + util.getConfiguration()); + loader.doBulkLoad(dir, table); + + assertEquals(expectedRows, util.countRows(table)); + } finally { + util.shutdownMiniCluster(); + } + } + + @Test + public void testSplitStoreFile() throws IOException { + Path dir = HBaseTestingUtility.getTestDir("testSplitHFile"); + FileSystem fs = util.getTestFileSystem(); + Path testIn = new Path(dir, "testhfile"); + HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); + createHFile(fs, testIn, FAMILY, QUALIFIER, + Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); + + Path bottomOut = new Path(dir, "bottom.out"); + Path topOut = new Path(dir, "top.out"); + + LoadIncrementalHFiles.splitStoreFile( + util.getConfiguration(), testIn, + familyDesc, Bytes.toBytes("ggg"), + bottomOut, + topOut); + + int rowCount = verifyHFile(bottomOut); + rowCount += verifyHFile(topOut); + assertEquals(1000, rowCount); + } + + private int verifyHFile(Path p) throws IOException { + Configuration conf = util.getConfiguration(); + HFile.Reader reader = new HFile.Reader( + p.getFileSystem(conf), p, null, false); + reader.loadFileInfo(); + HFileScanner scanner = reader.getScanner(false, false); + scanner.seekTo(); + int count = 0; + do { + count++; + } while (scanner.next()); + assertTrue(count > 0); + return count; + } + + + /** + * Create an HFile with the given number of rows between a given + * start key and end key. + * TODO put me in an HFileTestUtil or something? + */ + static void createHFile( + FileSystem fs, Path path, + byte[] family, byte[] qualifier, + byte[] startKey, byte[] endKey, int numRows) throws IOException + { + HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION, + KeyValue.KEY_COMPARATOR); + try { + // subtract 2 since iterateOnSplits doesn't include boundary keys + for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) { + KeyValue kv = new KeyValue(key, family, qualifier, key); + writer.append(kv); + } + } finally { + writer.close(); + } + } +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 93a720c..9e5ca46 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -207,8 +207,7 @@ public class TestCompaction extends HBaseTestCase { // they were deleted. int count = 0; boolean containsStartRow = false; - for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles(). - values()) { + for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { HFileScanner scanner = f.getReader().getScanner(false, false); scanner.seekTo(); do { @@ -239,7 +238,7 @@ public class TestCompaction extends HBaseTestCase { private int count() throws IOException { int count = 0; for (StoreFile f: this.r.stores. - get(COLUMN_FAMILY_TEXT).getStorefiles().values()) { + get(COLUMN_FAMILY_TEXT).getStorefiles()) { HFileScanner scanner = f.getReader().getScanner(false, false); if (!scanner.seekTo()) { continue; diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java index aa5c620..516139b 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionInfo.java @@ -19,12 +19,16 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MD5Hash; -public class TestHRegionInfo extends HBaseTestCase { +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestHRegionInfo { + @Test public void testCreateHRegionInfoName() throws Exception { String tableName = "tablename"; final byte [] tn = Bytes.toBytes(tableName); @@ -47,4 +51,32 @@ public class TestHRegionInfo extends HBaseTestCase { + id + "." + md5HashInHex + ".", nameStr); } + + @Test + public void testContainsRange() { + HTableDescriptor tableDesc = new HTableDescriptor("testtable"); + HRegionInfo hri = new HRegionInfo( + tableDesc, Bytes.toBytes("a"), Bytes.toBytes("g")); + // Single row range at start of region + assertTrue(hri.containsRange(Bytes.toBytes("a"), Bytes.toBytes("a"))); + // Fully contained range + assertTrue(hri.containsRange(Bytes.toBytes("b"), Bytes.toBytes("c"))); + // Range overlapping start of region + assertTrue(hri.containsRange(Bytes.toBytes("a"), Bytes.toBytes("c"))); + // Fully contained single-row range + assertTrue(hri.containsRange(Bytes.toBytes("c"), Bytes.toBytes("c"))); + // Range that overlaps end key and hence doesn't fit + assertFalse(hri.containsRange(Bytes.toBytes("a"), Bytes.toBytes("g"))); + // Single row range on end key + assertFalse(hri.containsRange(Bytes.toBytes("g"), Bytes.toBytes("g"))); + // Single row range entirely outside + assertFalse(hri.containsRange(Bytes.toBytes("z"), Bytes.toBytes("z"))); + + // Degenerate range + try { + hri.containsRange(Bytes.toBytes("z"), Bytes.toBytes("a")); + fail("Invalid range did not throw IAE"); + } catch (IllegalArgumentException iae) { + } + } } diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 4595e6e..43a8a28 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -130,7 +130,7 @@ public class TestStore extends TestCase { flush(1); // Now put in place an empty store file. Its a little tricky. Have to // do manually with hacked in sequence id. - StoreFile f = this.store.getStorefiles().firstEntry().getValue(); + StoreFile f = this.store.getStorefiles().get(0); Path storedir = f.getPath().getParent(); long seqid = f.getMaxSequenceId(); HBaseConfiguration c = new HBaseConfiguration(); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 0d8651a..e65ae34 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -21,6 +21,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.TreeSet; import org.apache.commons.logging.Log; @@ -36,6 +40,12 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.mockito.Mockito; + +import com.google.common.base.Joiner; +import com.google.common.collect.Collections2; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; /** * Test HStoreFile @@ -441,4 +451,52 @@ public class TestStoreFile extends HBaseTestCase { } + public void testFlushTimeComparator() { + assertOrdering(StoreFile.Comparators.FLUSH_TIME, + mockStoreFile(true, 1000, -1, "/foo/123"), + mockStoreFile(true, 1000, -1, "/foo/126"), + mockStoreFile(true, 2000, -1, "/foo/126"), + mockStoreFile(false, -1, 1, "/foo/1"), + mockStoreFile(false, -1, 3, "/foo/2"), + mockStoreFile(false, -1, 5, "/foo/2"), + mockStoreFile(false, -1, 5, "/foo/3")); + } + + /** + * Assert that the given comparator orders the given storefiles in the + * same way that they're passed. + */ + private void assertOrdering(Comparator comparator, StoreFile ... sfs) { + ArrayList sorted = Lists.newArrayList(sfs); + Collections.shuffle(sorted); + Collections.sort(sorted, comparator); + LOG.debug("sfs: " + Joiner.on(",").join(sfs)); + LOG.debug("sorted: " + Joiner.on(",").join(sorted)); + assertTrue(Iterables.elementsEqual(Arrays.asList(sfs), sorted)); + } + + /** + * Create a mock StoreFile with the given attributes. + */ + private StoreFile mockStoreFile(boolean bulkLoad, long bulkTimestamp, + long seqId, String path) { + StoreFile mock = Mockito.mock(StoreFile.class); + Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult(); + Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp(); + if (bulkLoad) { + // Bulk load files will throw if you ask for their sequence ID + Mockito.doThrow(new IllegalAccessError("bulk load")) + .when(mock).getMaxSequenceId(); + } else { + Mockito.doReturn(seqId).when(mock).getMaxSequenceId(); + } + Mockito.doReturn(new Path(path)).when(mock).getPath(); + String name = "mock storefile, bulkLoad=" + bulkLoad + + " bulkTimestamp=" + bulkTimestamp + + " seqId=" + seqId + + " path=" + path; + Mockito.doReturn(name).when(mock).toString(); + return mock; + } + } diff --git src/test/java/org/apache/hadoop/hbase/util/TestBytes.java src/test/java/org/apache/hadoop/hbase/util/TestBytes.java index c7361cb..418fc1e 100644 --- src/test/java/org/apache/hadoop/hbase/util/TestBytes.java +++ src/test/java/org/apache/hadoop/hbase/util/TestBytes.java @@ -150,6 +150,14 @@ public class TestBytes extends TestCase { assertEquals(5, Bytes.binarySearch(arr, key3, 1, 1, Bytes.BYTES_RAWCOMPARATOR)); } + + public void testStartsWith() { + assertTrue(Bytes.startsWith(Bytes.toBytes("hello"), Bytes.toBytes("h"))); + assertTrue(Bytes.startsWith(Bytes.toBytes("hello"), Bytes.toBytes(""))); + assertTrue(Bytes.startsWith(Bytes.toBytes("hello"), Bytes.toBytes("hello"))); + assertFalse(Bytes.startsWith(Bytes.toBytes("hello"), Bytes.toBytes("helloworld"))); + assertFalse(Bytes.startsWith(Bytes.toBytes(""), Bytes.toBytes("hello"))); + } public void testIncrementBytes() throws IOException {