diff --git pom.xml pom.xml index 53bbd64..860f17e 100644 --- pom.xml +++ pom.xml @@ -402,6 +402,7 @@ 1.1.1 6.1.14 4.8.1 + 1.8.4 1.2.15 3.3.1 @@ -498,6 +499,12 @@ ${junit.version} test + + org.mockito + mockito-all + ${mockito-all.version} + test + @@ -661,6 +668,10 @@ junit + org.mockito + mockito-all + + org.apache.commons commons-math ${commons-math.version} diff --git src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java index 0a9ec4b..613aa21 100644 --- src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java +++ src/main/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.io.BytesWritable; @@ -253,4 +254,11 @@ implements WritableComparable { } return results; } + + /** + * Returns a copy of the bytes referred to by this writable + */ + public byte[] copyBytes() { + return Arrays.copyOfRange(bytes, offset, length); + } } diff --git src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 4cbe52a..91e44dd 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -258,5 +258,11 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { * @throws IOException e */ public MultiPutResponse multiPut(MultiPut puts) throws IOException; - + + + /** + * Bulk load an HFile into an open region + */ + public void bulkLoadHFile(String hfilePath, + byte[] regionName, byte[] familyName) throws IOException; } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java index 3d40695..c6ec4ae 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java @@ -36,6 +36,7 @@ public class Driver { "Count rows in HBase table"); pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); + pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format."); pgd.addClass(CopyTable.NAME, CopyTable.class, "Export a table from local cluster to peer cluster"); pgd.driver(args); diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 9c8e53e..028156b 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -20,24 +20,40 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; +import org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler.Sampler; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.mortbay.log.Log; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Writes HFiles. Passed KeyValues must arrive in order. @@ -48,6 +64,8 @@ import org.mortbay.log.Log; * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat { + static Log LOG = LogFactory.getLog(HFileOutputFormat.class); + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file @@ -86,7 +104,7 @@ public class HFileOutputFormat extends FileOutputFormat startKeys = getRegionStartKeys(table); + LOG.info("Configuring " + startKeys.size() + " reduce partitions " + + "to match current region count"); + job.setNumReduceTasks(startKeys.size()); + + Path partitionsPath = new Path(job.getWorkingDirectory(), + "partitions_" + System.currentTimeMillis()); + LOG.info("Writing partition information to " + partitionsPath); + + FileSystem fs = partitionsPath.getFileSystem(conf); + writePartitions(conf, partitionsPath, startKeys); + partitionsPath.makeQualified(fs); + URI cacheUri; + try { + cacheUri = new URI(partitionsPath.toString() + "#" + + TotalOrderPartitioner.DEFAULT_PATH); + } catch (URISyntaxException e) { + throw new IOException(e); + } + DistributedCache.addCacheFile(cacheUri, conf); + DistributedCache.createSymlink(conf); + + LOG.info("Incremental table output configured."); + } + } diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java new file mode 100644 index 0000000..aad6070 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -0,0 +1,192 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.LinkedList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +public class ImportTsv { + final static String NAME = "importtsv"; + + /** + * Write table content out to files in hdfs. + */ + static class TsvImporter + extends Mapper + { + private byte[][] families; + private byte[][] qualifiers; + private long ts; + + @Override + protected void setup(Context context) { + String columnStrings[] = context.getConfiguration().getStrings("columns"); + families = new byte[columnStrings.length][]; + qualifiers = new byte[columnStrings.length][]; + + for (int i = 0; i < columnStrings.length; i++) { + String str = columnStrings[i]; + String[] parts = str.split(":", 2); + if (parts.length == 1) { + families[i] = columnStrings[i].getBytes(); + qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY; + } else { + families[i] = parts[0].getBytes(); + qualifiers[i] = parts[1].getBytes(); + } + } + + ts = System.currentTimeMillis(); + } + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + @Override + public void map(LongWritable offset, Text value, + Context context) + throws IOException { + try { + byte[] lineBytes = value.getBytes(); + LinkedList tabOffsets = new LinkedList(); + for (int i = 0; i < value.getLength(); i++) { + if (lineBytes[i] == '\t') { + tabOffsets.add(i); + } + } + tabOffsets.add(lineBytes.length); + + int rowkeySep = tabOffsets.remove(); + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(lineBytes, 0, rowkeySep); + + int lastOff = rowkeySep; + int colIdx = 0; + Put put = new Put(rowKey.copyBytes()); + for (int tabOff : tabOffsets) { + KeyValue kv = new KeyValue( + lineBytes, 0, rowkeySep, // row key + families[colIdx], 0, families[colIdx].length, + qualifiers[colIdx], 0, qualifiers[colIdx].length, + ts, + KeyValue.Type.Put, + lineBytes, lastOff + 1, tabOff - lastOff - 1); + put.add(kv); + lastOff = tabOff; + colIdx++; + } + context.write(rowKey, put); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path inputDir = new Path(args[1]); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(TsvImporter.class); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(TsvImporter.class); + + if (conf.getBoolean("use.hfile", false)) { + HTable table = new HTable(conf, tableName); + job.setReducerClass(PutSortReducer.class); + Path outputDir = new Path("hfof." + tableName); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + HFileOutputFormat.configureIncrementalLoad(job, table); + } else { + // No reducers. Just write straight to table. Call initTableReducerJob + // because it sets up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(tableName, null, job); + job.setNumReduceTasks(0); + } + + TableMapReduceUtil.addDependencyJars(job); + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println( + "Usage: ImportTsv -Dcolumns=a,b,c "); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 2) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + String columns[] = conf.getStrings("columns"); + if (columns == null) { + usage("No columns specified. Please specify with -Dcolumns=..."); + } + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java new file mode 100644 index 0000000..68aaf71 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -0,0 +1,148 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class LoadIncrementalHFiles extends Configured implements Tool { + + static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); + + private void usage() { + System.err.println("usage: LoadIncrementalFiles " + + "/path/to/hfileoutputformat-output " + + "tablename"); + } + + + private int doBulkLoad(Path hfofDir, HTable table) + throws TableNotFoundException, IOException + { + FileSystem fs = hfofDir.getFileSystem(getConf()); + HConnection conn = table.getConnection(); + + if (!conn.isTableAvailable(table.getTableName())) { + throw new TableNotFoundException("Table " + + Bytes.toStringBinary(table.getTableName()) + + "is not currently available."); + } + if (!fs.exists(hfofDir)) { + throw new FileNotFoundException("HFileOutputFormat dir " + + hfofDir + " not found"); + } + + Path[] familyDirs = FileUtil.stat2Paths(fs.listStatus(hfofDir)); + if (familyDirs == null) { + throw new FileNotFoundException("No families found in " + hfofDir); + } + + // TODO maybe we want to do multifamily load atomically + for (Path familyDir : familyDirs) { + // Skip _logs, etc + if (familyDir.getName().startsWith("_")) continue; + doBulkLoadFamily(familyDir, conn, table.getTableName()); + } + + return 0; + } + + private void doBulkLoadFamily(Path familyDir, HConnection conn, byte[] table) + throws IOException { + FileSystem fs = familyDir.getFileSystem(getConf()); + Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); + if (hfiles == null) { + LOG.info("No hfiles in " + familyDir); + } + byte[] family = familyDir.getName().getBytes(); + for (Path hfile : hfiles) { + doLoadHFile(hfile, conn, table, family); + } + } + + + private void doLoadHFile( + final Path hfilePath, HConnection conn, byte[] table, final byte[] family) + throws IOException { + FileSystem fs = hfilePath.getFileSystem(getConf()); + HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false); + hfr.loadFileInfo(); + final byte[] first = hfr.getFirstKey(); + final byte[] last = hfr.getLastKey(); + + LOG.info("hfile=" + hfilePath + + " first=" + Bytes.toStringBinary(first) + + " last=" + Bytes.toStringBinary(last)); + if (first == null || last == null) { + assert first == null && last == null; + LOG.info("hfile " + hfilePath + " has no entries, skipping"); + return; + } + + conn.getRegionServerWithRetries( + new ServerCallable(conn, table, first) { + + @Override + public Void call() throws Exception { + LOG.info("Going to connect to server " + server + + "for row " + row); + // TODO check range against region! + byte[] regionName = location.getRegionInfo().getRegionName(); + server.bulkLoadHFile(hfilePath.toString(), regionName, family); + return null; + } + }); + } + + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + usage(); + return -1; + } + + Path hfofDir = new Path(args[0]); + HTable table = new HTable(args[1]); + + return doBulkLoad(hfofDir, table); + + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new LoadIncrementalHFiles(), args); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java new file mode 100644 index 0000000..e32dc82 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -0,0 +1,58 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.util.List; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Reducer; + +public class PutSortReducer extends + Reducer { + + @Override + protected void reduce( + ImmutableBytesWritable row, + java.lang.Iterable puts, + Reducer.Context context) + throws java.io.IOException, InterruptedException + { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + + for (Put p : puts) { + for (List kvs : p.getFamilyMap().values()) { + for (KeyValue kv : kvs) { + map.add(kv.clone()); + } + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv : map) { + context.write(row, kv); + if (index > 0 && index % 100 == 0) + context.setStatus("Wrote " + index); + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index b332280..07d7911 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -24,7 +24,18 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; @@ -34,14 +45,17 @@ import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.ZooKeeper; /** * Utility for {@link TableMapper} and {@link TableReducer} */ @SuppressWarnings("unchecked") public class TableMapReduceUtil { - + static Log LOG = LogFactory.getLog(TableMapReduceUtil.class); + /** * Use this before submitting a TableMap job. It will appropriately set up * the job. @@ -222,4 +236,105 @@ public class TableMapReduceUtil { job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); } + /** + * Add the HBase dependency jars as well as jars for any of the configured + * job classes to the job configuration, so that JobClient will ship them + * to the cluster and add them to the DistributedCache. + */ + public static void addDependencyJars(Job job) throws IOException { + try { + addDependencyJars(job.getConfiguration(), + ZooKeeper.class, + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + job.getOutputKeyClass(), + job.getOutputValueClass(), + job.getOutputFormatClass(), + job.getPartitionerClass(), + job.getCombinerClass()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** + * Add the jars containing the given classes to the job's configuration + * such that JobClient will ship them to the cluster and add them to + * the DistributedCache. + */ + public static void addDependencyJars(Configuration conf, + Class... classes) throws IOException { + + FileSystem localFs = FileSystem.getLocal(conf); + + Set jars = new HashSet(); + for (Class clazz : classes) { + if (clazz == null) continue; + + String pathStr = findContainingJar(clazz); + if (pathStr == null) { + LOG.warn("Could not find jar for class " + clazz + + " in order to ship it to the cluster."); + continue; + } + Path path = new Path(pathStr); + if (!localFs.exists(path)) { + LOG.warn("Could not validate jar file " + path + " for class " + + clazz); + continue; + } + jars.add(path.makeQualified(localFs).toString()); + } + if (jars.isEmpty()) return; + + String tmpJars = conf.get("tmpjars"); + if (tmpJars == null) { + tmpJars = StringUtils.arrayToString(jars.toArray(new String[0])); + } else { + tmpJars += "," + StringUtils.arrayToString(jars.toArray(new String[0])); + } + conf.set("tmpjars", tmpJars); + } + + /** + * Find a jar that contains a class of the same name, if any. + * It will return a jar file, even if that is not the first thing + * on the class path that has a class with the same name. + * + * This is shamelessly copied from JobConf + * + * @param my_class the class to find. + * @return a jar file that contains the class, or null. + * @throws IOException + */ + private static String findContainingJar(Class my_class) { + ClassLoader loader = my_class.getClassLoader(); + String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + try { + for(Enumeration itr = loader.getResources(class_file); + itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + + } \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java new file mode 100644 index 0000000..13be81e --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce.hadoopbackport; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Utility for collecting samples and writing a partition file for + * {@link TotalOrderPartitioner}. + * + * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner + * from Hadoop trunk at r910774, with the exception of replacing + * TaskAttemptContextImpl with TaskAttemptContext. + */ +public class InputSampler extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(InputSampler.class); + + static int printUsage() { + System.out.println("sampler -r \n" + + " [-inFormat ]\n" + + " [-keyClass ]\n" + + " [-splitRandom | " + + "// Sample from random splits at random (general)\n" + + " -splitSample | " + + " // Sample from first records in splits (random data)\n"+ + " -splitInterval ]" + + " // Sample from splits at intervals (sorted data)"); + System.out.println("Default sampler: -splitRandom 0.1 10000 10"); + ToolRunner.printGenericCommandUsage(System.out); + return -1; + } + + public InputSampler(Configuration conf) { + setConf(conf); + } + + /** + * Interface to sample using an + * {@link org.apache.hadoop.mapreduce.InputFormat}. + */ + public interface Sampler { + /** + * For a given job, collect and return a subset of the keys from the + * input data. + */ + K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException; + } + + /** + * Samples the first n records from s splits. + * Inexpensive way to sample random data. + */ + public static class SplitSampler implements Sampler { + + private final int numSamples; + private final int maxSplitsSampled; + + /** + * Create a SplitSampler sampling all splits. + * Takes the first numSamples / numSplits records from each split. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public SplitSampler(int numSamples) { + this(numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new SplitSampler. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public SplitSampler(int numSamples, int maxSplitsSampled) { + this.numSamples = numSamples; + this.maxSplitsSampled = maxSplitsSampled; + } + + /** + * From each split sampled, take the first numSamples / numSplits records. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException { + List splits = inf.getSplits(job); + ArrayList samples = new ArrayList(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.size()); + int splitStep = splits.size() / splitsToSample; + int samplesPerSplit = numSamples / splitsToSample; + long records = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader reader = inf.createRecordReader( + splits.get(i * splitStep), + new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID())); + while (reader.nextKeyValue()) { + samples.add(reader.getCurrentKey()); + ++records; + if ((i+1) * samplesPerSplit <= records) { + break; + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from random points in the input. + * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from + * each split. + */ + public static class RandomSampler implements Sampler { + private double freq; + private final int numSamples; + private final int maxSplitsSampled; + + /** + * Create a new RandomSampler sampling all splits. + * This will read every split at the client, which is very expensive. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public RandomSampler(double freq, int numSamples) { + this(freq, numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new RandomSampler. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public RandomSampler(double freq, int numSamples, int maxSplitsSampled) { + this.freq = freq; + this.numSamples = numSamples; + this.maxSplitsSampled = maxSplitsSampled; + } + + /** + * Randomize the split order, then take the specified number of keys from + * each split sampled, where each key is selected with the specified + * probability and possibly replaced by a subsequently selected key when + * the quota of keys from that split is satisfied. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException { + List splits = inf.getSplits(job); + ArrayList samples = new ArrayList(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.size()); + + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + LOG.debug("seed: " + seed); + // shuffle splits + for (int i = 0; i < splits.size(); ++i) { + InputSplit tmp = splits.get(i); + int j = r.nextInt(splits.size()); + splits.set(i, splits.get(j)); + splits.set(j, tmp); + } + // our target rate is in terms of the maximum number of sample splits, + // but we accept the possibility of sampling additional splits to hit + // the target sample keyset + for (int i = 0; i < splitsToSample || + (i < splits.size() && samples.size() < numSamples); ++i) { + RecordReader reader = inf.createRecordReader(splits.get(i), + new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID())); + while (reader.nextKeyValue()) { + if (r.nextDouble() <= freq) { + if (samples.size() < numSamples) { + samples.add(reader.getCurrentKey()); + } else { + // When exceeding the maximum number of samples, replace a + // random element with this one, then adjust the frequency + // to reflect the possibility of existing elements being + // pushed out + int ind = r.nextInt(numSamples); + if (ind != numSamples) { + samples.set(ind, reader.getCurrentKey()); + } + freq *= (numSamples - 1) / (double) numSamples; + } + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from s splits at regular intervals. + * Useful for sorted data. + */ + public static class IntervalSampler implements Sampler { + private final double freq; + private final int maxSplitsSampled; + + /** + * Create a new IntervalSampler sampling all splits. + * @param freq The frequency with which records will be emitted. + */ + public IntervalSampler(double freq) { + this(freq, Integer.MAX_VALUE); + } + + /** + * Create a new IntervalSampler. + * @param freq The frequency with which records will be emitted. + * @param maxSplitsSampled The maximum number of splits to examine. + * @see #getSample + */ + public IntervalSampler(double freq, int maxSplitsSampled) { + this.freq = freq; + this.maxSplitsSampled = maxSplitsSampled; + } + + /** + * For each split sampled, emit when the ratio of the number of records + * retained to the total record count is less than the specified + * frequency. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, Job job) + throws IOException, InterruptedException { + List splits = inf.getSplits(job); + ArrayList samples = new ArrayList(); + int splitsToSample = Math.min(maxSplitsSampled, splits.size()); + int splitStep = splits.size() / splitsToSample; + long records = 0; + long kept = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader reader = inf.createRecordReader( + splits.get(i * splitStep), + new TaskAttemptContext(job.getConfiguration(), + new TaskAttemptID())); + while (reader.nextKeyValue()) { + ++records; + if ((double) kept / records < freq) { + ++kept; + samples.add(reader.getCurrentKey()); + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Write a partition file for the given job, using the Sampler provided. + * Queries the sampler for a sample keyset, sorts by the output key + * comparator, selects the keys for each rank, and writes to the destination + * returned from {@link TotalOrderPartitioner#getPartitionFile}. + */ + @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator + public static void writePartitionFile(Job job, Sampler sampler) + throws IOException, ClassNotFoundException, InterruptedException { + Configuration conf = job.getConfiguration(); + final InputFormat inf = + ReflectionUtils.newInstance(job.getInputFormatClass(), conf); + int numPartitions = job.getNumReduceTasks(); + K[] samples = sampler.getSample(inf, job); + LOG.info("Using " + samples.length + " samples"); + RawComparator comparator = + (RawComparator) job.getSortComparator(); + Arrays.sort(samples, comparator); + Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); + FileSystem fs = dst.getFileSystem(conf); + if (fs.exists(dst)) { + fs.delete(dst, false); + } + SequenceFile.Writer writer = SequenceFile.createWriter(fs, + conf, dst, job.getMapOutputKeyClass(), NullWritable.class); + NullWritable nullValue = NullWritable.get(); + float stepSize = samples.length / (float) numPartitions; + int last = -1; + for(int i = 1; i < numPartitions; ++i) { + int k = Math.round(stepSize * i); + while (last >= k && comparator.compare(samples[last], samples[k]) == 0) { + ++k; + } + writer.append(samples[k], nullValue); + last = k; + } + writer.close(); + } + + /** + * Driver for InputSampler from the command line. + * Configures a JobConf instance and calls {@link #writePartitionFile}. + */ + public int run(String[] args) throws Exception { + Job job = new Job(getConf()); + ArrayList otherArgs = new ArrayList(); + Sampler sampler = null; + for(int i=0; i < args.length; ++i) { + try { + if ("-r".equals(args[i])) { + job.setNumReduceTasks(Integer.parseInt(args[++i])); + } else if ("-inFormat".equals(args[i])) { + job.setInputFormatClass( + Class.forName(args[++i]).asSubclass(InputFormat.class)); + } else if ("-keyClass".equals(args[i])) { + job.setMapOutputKeyClass( + Class.forName(args[++i]).asSubclass(WritableComparable.class)); + } else if ("-splitSample".equals(args[i])) { + int numSamples = Integer.parseInt(args[++i]); + int maxSplits = Integer.parseInt(args[++i]); + if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; + sampler = new SplitSampler(numSamples, maxSplits); + } else if ("-splitRandom".equals(args[i])) { + double pcnt = Double.parseDouble(args[++i]); + int numSamples = Integer.parseInt(args[++i]); + int maxSplits = Integer.parseInt(args[++i]); + if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; + sampler = new RandomSampler(pcnt, numSamples, maxSplits); + } else if ("-splitInterval".equals(args[i])) { + double pcnt = Double.parseDouble(args[++i]); + int maxSplits = Integer.parseInt(args[++i]); + if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; + sampler = new IntervalSampler(pcnt, maxSplits); + } else { + otherArgs.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i-1]); + return printUsage(); + } + } + if (job.getNumReduceTasks() <= 1) { + System.err.println("Sampler requires more than one reducer"); + return printUsage(); + } + if (otherArgs.size() < 2) { + System.out.println("ERROR: Wrong number of parameters: "); + return printUsage(); + } + if (null == sampler) { + sampler = new RandomSampler(0.1, 10000, 10); + } + + Path outf = new Path(otherArgs.remove(otherArgs.size() - 1)); + TotalOrderPartitioner.setPartitionFile(getConf(), outf); + for (String s : otherArgs) { + FileInputFormat.addInputPath(job, new Path(s)); + } + InputSampler.writePartitionFile(job, sampler); + + return 0; + } + + public static void main(String[] args) throws Exception { + InputSampler sampler = new InputSampler(new Configuration()); + int res = ToolRunner.run(sampler, args); + System.exit(res); + } +} diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java new file mode 100644 index 0000000..065e844 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce.hadoopbackport; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Partitioner effecting a total order by reading split points from + * an externally generated source. + * + * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner + * from Hadoop trunk at r910774. + */ +public class TotalOrderPartitioner,V> + extends Partitioner implements Configurable { + + private Node partitions; + public static final String DEFAULT_PATH = "_partition.lst"; + public static final String PARTITIONER_PATH = + "mapreduce.totalorderpartitioner.path"; + public static final String MAX_TRIE_DEPTH = + "mapreduce.totalorderpartitioner.trie.maxdepth"; + public static final String NATURAL_ORDER = + "mapreduce.totalorderpartitioner.naturalorder"; + Configuration conf; + + public TotalOrderPartitioner() { } + + /** + * Read in the partition file and build indexing data structures. + * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and + * total.order.partitioner.natural.order is not false, a trie + * of the first total.order.partitioner.max.trie.depth(2) + 1 bytes + * will be built. Otherwise, keys will be located using a binary search of + * the partition keyset using the {@link org.apache.hadoop.io.RawComparator} + * defined for this job. The input file must be sorted with the same + * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys. + */ + @SuppressWarnings("unchecked") // keytype from conf not static + public void setConf(Configuration conf) { + try { + this.conf = conf; + String parts = getPartitionFile(conf); + final Path partFile = new Path(parts); + final FileSystem fs = (DEFAULT_PATH.equals(parts)) + ? FileSystem.getLocal(conf) // assume in DistributedCache + : partFile.getFileSystem(conf); + + Job job = new Job(conf); + Class keyClass = (Class)job.getMapOutputKeyClass(); + K[] splitPoints = readPartitions(fs, partFile, keyClass, conf); + if (splitPoints.length != job.getNumReduceTasks() - 1) { + throw new IOException("Wrong number of partitions in keyset:" + + splitPoints.length); + } + RawComparator comparator = + (RawComparator) job.getSortComparator(); + for (int i = 0; i < splitPoints.length - 1; ++i) { + if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) { + throw new IOException("Split points are out of order"); + } + } + boolean natOrder = + conf.getBoolean(NATURAL_ORDER, true); + if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) { + partitions = buildTrie((BinaryComparable[])splitPoints, 0, + splitPoints.length, new byte[0], + // Now that blocks of identical splitless trie nodes are + // represented reentrantly, and we develop a leaf for any trie + // node with only one split point, the only reason for a depth + // limit is to refute stack overflow or bloat in the pathological + // case where the split points are long and mostly look like bytes + // iii...iixii...iii . Therefore, we make the default depth + // limit large but not huge. + conf.getInt(MAX_TRIE_DEPTH, 200)); + } else { + partitions = new BinarySearchNode(splitPoints, comparator); + } + } catch (IOException e) { + throw new IllegalArgumentException("Can't read partitions file", e); + } + } + + public Configuration getConf() { + return conf; + } + + // by construction, we know if our keytype + @SuppressWarnings("unchecked") // is memcmp-able and uses the trie + public int getPartition(K key, V value, int numPartitions) { + return partitions.findPartition(key); + } + + /** + * Set the path to the SequenceFile storing the sorted partition keyset. + * It must be the case that for R reduces, there are R-1 + * keys in the SequenceFile. + */ + public static void setPartitionFile(Configuration conf, Path p) { + conf.set(PARTITIONER_PATH, p.toString()); + } + + /** + * Get the path to the SequenceFile storing the sorted partition keyset. + * @see #setPartitionFile(Configuration, Path) + */ + public static String getPartitionFile(Configuration conf) { + return conf.get(PARTITIONER_PATH, DEFAULT_PATH); + } + + /** + * Interface to the partitioner to locate a key in the partition keyset. + */ + interface Node { + /** + * Locate partition in keyset K, st [Ki..Ki+1) defines a partition, + * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1. + */ + int findPartition(T key); + } + + /** + * Base class for trie nodes. If the keytype is memcomp-able, this builds + * tries of the first total.order.partitioner.max.trie.depth + * bytes. + */ + static abstract class TrieNode implements Node { + private final int level; + TrieNode(int level) { + this.level = level; + } + int getLevel() { + return level; + } + } + + /** + * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or + * where disabled by total.order.partitioner.natural.order, + * search the partition keyset with a binary search. + */ + class BinarySearchNode implements Node { + private final K[] splitPoints; + private final RawComparator comparator; + BinarySearchNode(K[] splitPoints, RawComparator comparator) { + this.splitPoints = splitPoints; + this.comparator = comparator; + } + public int findPartition(K key) { + final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1; + return (pos < 0) ? -pos : pos; + } + } + + /** + * An inner trie node that contains 256 children based on the next + * character. + */ + class InnerTrieNode extends TrieNode { + private TrieNode[] child = new TrieNode[256]; + + InnerTrieNode(int level) { + super(level); + } + public int findPartition(BinaryComparable key) { + int level = getLevel(); + if (key.getLength() <= level) { + return child[0].findPartition(key); + } + return child[0xFF & key.getBytes()[level]].findPartition(key); + } + } + + /** + * @param level the tree depth at this node + * @param splitPoints the full split point vector, which holds + * the split point or points this leaf node + * should contain + * @param lower first INcluded element of splitPoints + * @param upper first EXcluded element of splitPoints + * @return a leaf node. They come in three kinds: no split points + * [and the findParttion returns a canned index], one split + * point [and we compare with a single comparand], or more + * than one [and we do a binary search]. The last case is + * rare. + */ + private TrieNode LeafTrieNodeFactory + (int level, BinaryComparable[] splitPoints, int lower, int upper) { + switch (upper - lower) { + case 0: + return new UnsplitTrieNode(level, lower); + + case 1: + return new SinglySplitTrieNode(level, splitPoints, lower); + + default: + return new LeafTrieNode(level, splitPoints, lower, upper); + } + } + + /** + * A leaf trie node that scans for the key between lower..upper. + * + * We don't generate many of these now, since we usually continue trie-ing + * when more than one split point remains at this level. and we make different + * objects for nodes with 0 or 1 split point. + */ + private class LeafTrieNode extends TrieNode { + final int lower; + final int upper; + final BinaryComparable[] splitPoints; + LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) { + super(level); + this.lower = lower; + this.upper = upper; + this.splitPoints = splitPoints; + } + public int findPartition(BinaryComparable key) { + final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1; + return (pos < 0) ? -pos : pos; + } + } + + private class UnsplitTrieNode extends TrieNode { + final int result; + + UnsplitTrieNode(int level, int value) { + super(level); + this.result = value; + } + + public int findPartition(BinaryComparable key) { + return result; + } + } + + private class SinglySplitTrieNode extends TrieNode { + final int lower; + final BinaryComparable mySplitPoint; + + SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) { + super(level); + this.lower = lower; + this.mySplitPoint = splitPoints[lower]; + } + + public int findPartition(BinaryComparable key) { + return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1); + } + } + + + /** + * Read the cut points from the given IFile. + * @param fs The file system + * @param p The path to read + * @param keyClass The map output key class + * @param job The job config + * @throws IOException + */ + // matching key types enforced by passing in + @SuppressWarnings("unchecked") // map output key class + private K[] readPartitions(FileSystem fs, Path p, Class keyClass, + Configuration conf) throws IOException { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); + ArrayList parts = new ArrayList(); + K key = ReflectionUtils.newInstance(keyClass, conf); + NullWritable value = NullWritable.get(); + while (reader.next(key, value)) { + parts.add(key); + key = ReflectionUtils.newInstance(keyClass, conf); + } + reader.close(); + return parts.toArray((K[])Array.newInstance(keyClass, parts.size())); + } + + /** + * + * This object contains a TrieNodeRef if there is such a thing that + * can be repeated. Two adjacent trie node slots that contain no + * split points can be filled with the same trie node, even if they + * are not on the same level. See buildTreeRec, below. + * + */ + private class CarriedTrieNodeRef + { + TrieNode content; + + CarriedTrieNodeRef() { + content = null; + } + } + + + /** + * Given a sorted set of cut points, build a trie that will find the correct + * partition quickly. + * @param splits the list of cut points + * @param lower the lower bound of partitions 0..numPartitions-1 + * @param upper the upper bound of partitions 0..numPartitions-1 + * @param prefix the prefix that we have already checked against + * @param maxDepth the maximum depth we will build a trie for + * @return the trie node that will divide the splits correctly + */ + private TrieNode buildTrie(BinaryComparable[] splits, int lower, + int upper, byte[] prefix, int maxDepth) { + return buildTrieRec + (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef()); + } + + /** + * This is the core of buildTrie. The interface, and stub, above, just adds + * an empty CarriedTrieNodeRef. + * + * We build trie nodes in depth first order, which is also in key space + * order. Every leaf node is referenced as a slot in a parent internal + * node. If two adjacent slots [in the DFO] hold leaf nodes that have + * no split point, then they are not separated by a split point either, + * because there's no place in key space for that split point to exist. + * + * When that happens, the leaf nodes would be semantically identical, and + * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the + * duration of the tree-walk. ref carries a potentially reusable, unsplit + * leaf node for such reuse until a leaf node with a split arises, which + * breaks the chain until we need to make a new unsplit leaf node. + * + * Note that this use of CarriedTrieNodeRef means that for internal nodes, + * for internal nodes if this code is modified in any way we still need + * to make or fill in the subnodes in key space order. + */ + private TrieNode buildTrieRec(BinaryComparable[] splits, int lower, + int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) { + final int depth = prefix.length; + // We generate leaves for a single split point as well as for + // no split points. + if (depth >= maxDepth || lower >= upper - 1) { + // If we have two consecutive requests for an unsplit trie node, we + // can deliver the same one the second time. + if (lower == upper && ref.content != null) { + return ref.content; + } + TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper); + ref.content = lower == upper ? result : null; + return result; + } + InnerTrieNode result = new InnerTrieNode(depth); + byte[] trial = Arrays.copyOf(prefix, prefix.length + 1); + // append an extra byte on to the prefix + int currentBound = lower; + for(int ch = 0; ch < 0xFF; ++ch) { + trial[depth] = (byte) (ch + 1); + lower = currentBound; + while (currentBound < upper) { + if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) { + break; + } + currentBound += 1; + } + trial[depth] = (byte) ch; + result.child[0xFF & ch] + = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); + } + // pick up the rest + trial[depth] = (byte)0xFF; + result.child[0xFF] + = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); + + return result; + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 038a335..fdd88ad 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -1890,6 +1891,24 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } } + + public void bulkLoadHFile(String hfilePath, byte[] familyName) + throws IOException { + // TODO - wtf, why is there this lock AND the splitsLock? + splitsAndClosesLock.readLock().lock(); + try { + Store store = getStore(familyName); + if (store == null) { + throw new DoNotRetryIOException( + "No such column family " + Bytes.toStringBinary(familyName)); + } + store.bulkLoadHFile(hfilePath); + } finally { + splitsAndClosesLock.readLock().unlock(); + } + + } + @Override public boolean equals(Object o) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a361498..9b233fa 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2003,6 +2003,14 @@ public class HRegionServer implements HConstants, HRegionInterface, } } + @Override + public void bulkLoadHFile( + String hfilePath, byte[] regionName, byte[] familyName) + throws IOException { + HRegion region = getRegion(regionName); + region.bulkLoadHFile(hfilePath, familyName); + } + Map rowlocks = new ConcurrentHashMap(); @@ -2452,4 +2460,5 @@ public class HRegionServer implements HConstants, HRegionInterface, HRegionServer.class); doMain(args, regionServerClass); } + } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 6c3153b..34f39b7 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -476,6 +477,73 @@ public class Store implements HConstants, HeapSize { return this.storefiles; } + public void bulkLoadHFile(String srcPathStr) throws IOException { + Path srcPath = new Path(srcPathStr).makeQualified(fs); + + HFile.Reader reader = null; + try { + LOG.info("Validating hfile at " + srcPath + " for inclusion in " + + "store " + this); + reader = new HFile.Reader(srcPath.getFileSystem(conf), + srcPath, null, false); + reader.loadFileInfo(); + + byte[] firstKey = reader.getFirstKey(); + byte[] lastKey = reader.getLastKey(); + + HRegionInfo rinfo = region.getRegionInfo(); + if (!HRegion.rowIsInRange(rinfo, firstKey)) { + throw new WrongRegionException( + "Region " + region + " starts with key " + + Bytes.toStringBinary(region.getStartKey()) + " but file at " + + srcPath + " starts with lower key " + + Bytes.toStringBinary(firstKey)); + } + if (!HRegion.rowIsInRange(rinfo, lastKey)) { + throw new WrongRegionException( + "Region " + region + " ends with key " + + Bytes.toStringBinary(region.getEndKey()) + " but file at " + + srcPath + " ends with higher key " + + Bytes.toStringBinary(lastKey)); + } + } finally { + if (reader != null) reader.close(); + } + + // Move the file if it's on another filesystem + // TODO move this to a util somewhere + FileSystem srcFs = srcPath.getFileSystem(conf); + if (!srcFs.equals(fs)) { + LOG.info("File " + srcPath + " on different filesystem than " + + "destination store - moving to this filesystem."); + Path tmpDir = new Path(homedir, "_tmp"); + Path tmpPath = new Path(tmpDir, srcPath.getName()); + FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); + srcPath = tmpPath; + } + + Path dstPath = StoreFile.getRandomFilename(fs, homedir); + LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath); + StoreFile.rename(fs, srcPath, dstPath); + + StoreFile sf = new StoreFile(fs, dstPath, blockcache, + this.conf, this.family.getBloomFilterType(), this.inMemory); + sf.createReader(); + + LOG.info("Moved hfile " + srcPath + " into store directory " + + homedir + " - updating store file list."); + this.lock.writeLock().lock(); + try { + // TODO millis below is totally bogus + this.storefiles.put(System.currentTimeMillis(), sf); + notifyChangedReadersObservers(); + } finally { + this.lock.writeLock().unlock(); + } + LOG.info("Successfully loaded store file " + srcPath + + " into store " + this + " (new location: " + dstPath + ")"); + } + /** * Close all the readers * diff --git src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 3feaedd..5ba12c6 100644 --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -447,7 +447,12 @@ public class HBaseTestingUtility { Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; - + return createMultiRegions(c, table, columnFamily, KEYS); + } + + public int createMultiRegions(final Configuration c, final HTable table, + final byte[] columnFamily, byte [][] startKeys) + throws IOException { HTable meta = new HTable(c, HConstants.META_TABLE_NAME); HTableDescriptor htd = table.getTableDescriptor(); if(!htd.hasFamily(columnFamily)) { @@ -458,13 +463,13 @@ public class HBaseTestingUtility { // setup already has the ",,123456789" row with an empty start // and end key. Adding the custom regions below adds those blindly, // including the new start region from empty to "bbb". lg - List rows = getMetaTableRows(); + List rows = getMetaTableRows(htd.getName()); // add custom ones int count = 0; - for (int i = 0; i < KEYS.length; i++) { - int j = (i + 1) % KEYS.length; + for (int i = 0; i < startKeys.length; i++) { + int j = (i + 1) % startKeys.length; HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(), - KEYS[i], KEYS[j]); + startKeys[i], startKeys[j]); Put put = new Put(hri.getRegionName()); put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(hri)); @@ -501,6 +506,29 @@ public class HBaseTestingUtility { s.close(); return rows; } + + /** + * Returns all rows from the .META. table for a given user table + * + * @throws IOException When reading the rows fails. + */ + public List getMetaTableRows(byte[] tableName) throws IOException { + HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME); + List rows = new ArrayList(); + ResultScanner s = t.getScanner(new Scan()); + for (Result result : s) { + HRegionInfo info = Writables.getHRegionInfo( + result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + HTableDescriptor desc = info.getTableDesc(); + if (Bytes.compareTo(desc.getName(), tableName) == 0) { + LOG.info("getMetaTableRows: row -> " + + Bytes.toStringBinary(result.getRow())); + rows.add(result.getRow()); + } + } + s.close(); + return rows; + } /** * Removes all rows from the .META. in preparation to add custom ones. @@ -546,6 +574,8 @@ public class HBaseTestingUtility { mrCluster = new MiniMRCluster(servers, FileSystem.get(c).getUri().toString(), 1); LOG.info("Mini mapreduce cluster started"); + c.set("mapred.job.tracker", + mrCluster.createJobConf().get("mapred.job.tracker")); } /** @@ -556,6 +586,7 @@ public class HBaseTestingUtility { if (mrCluster != null) { mrCluster.shutdown(); } + conf.set("mapred.job.tracker", "local"); LOG.info("Mini mapreduce cluster stopped"); } @@ -688,4 +719,8 @@ public class HBaseTestingUtility { public MiniDFSCluster getDFSCluster() { return dfsCluster; } + + public void cleanupTestDir() throws IOException { + getTestDir().getFileSystem(conf).delete(getTestDir(), true); + } } diff --git src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index d76c75e..875ebf7 100644 --- src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Hash; @@ -567,6 +568,7 @@ public class PerformanceEvaluation implements HConstants { job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs")); + TableMapReduceUtil.addDependencyJars(job); job.waitForCompletion(true); } diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index d04ced2..f2710e8 100644 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -20,16 +20,25 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.DataInput; + import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -41,6 +50,13 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Before; +import org.junit.Test; + +import org.mockito.Mockito; + + +import static org.junit.Assert.*; /** * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}. @@ -49,9 +65,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; * emits keys and values like those of {@link PerformanceEvaluation}. Makes * as many splits as "mapred.map.tasks" maps. */ -public class TestHFileOutputFormat extends HBaseTestCase { +public class TestHFileOutputFormat { private final static int ROWSPERSPLIT = 1024; + private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME; + private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); + + private HBaseTestingUtility util = new HBaseTestingUtility(); + /* * InputFormat that makes keys and values like those used in * PerformanceEvaluation. Makes as many splits as there are configured @@ -160,33 +181,140 @@ public class TestHFileOutputFormat extends HBaseTestCase { } } + @Before + public void cleanupDir() throws IOException { + util.cleanupTestDir(); + } + + + private void setupPEInput(Job job) { + job.setInputFormatClass(TestHFileOutputFormat.PEInputFormat.class); + job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + } + /** * Run small MR job. */ + //@Test public void testWritingPEData() throws Exception { + Configuration conf = util.getConfiguration(); + Path testDir = util.getTestDir("testWritingPEData"); + FileSystem fs = testDir.getFileSystem(conf); + // Set down this value or we OOME in eclipse. - this.conf.setInt("io.sort.mb", 20); + conf.setInt("io.sort.mb", 20); // Write a few files. - this.conf.setLong("hbase.hregion.max.filesize", 64 * 1024); - Job job = new Job(this.conf, getName()); - job.setInputFormatClass(TestHFileOutputFormat.PEInputFormat.class); - job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + conf.setLong("hbase.hregion.max.filesize", 64 * 1024); + + Job job = new Job(conf, "testWritingPEData"); + setupPEInput(job); // This partitioner doesn't work well for number keys but using it anyways // just to demonstrate how to configure it. job.setPartitionerClass(SimpleTotalOrderPartitioner.class); // Set start and end rows for partitioner. job.getConfiguration().set(SimpleTotalOrderPartitioner.START, Bytes.toString(PerformanceEvaluation.format(0))); - int rows = this.conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT; + int rows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT; job.getConfiguration().set(SimpleTotalOrderPartitioner.END, Bytes.toString(PerformanceEvaluation.format(rows))); job.setReducerClass(KeyValueSortReducer.class); job.setOutputFormatClass(HFileOutputFormat.class); - FileOutputFormat.setOutputPath(job, this.testDir); + job.setNumReduceTasks(4); + + FileOutputFormat.setOutputPath(job, testDir); assertTrue(job.waitForCompletion(false)); - FileStatus [] files = this.fs.listStatus(this.testDir); + FileStatus [] files = fs.listStatus(testDir); assertTrue(files.length > 0); } + + @Test + public void testJobConfiguration() throws Exception { + Job job = new Job(); + HTable table = Mockito.mock(HTable.class); + byte[][] mockKeys = new byte[][] { + HConstants.EMPTY_BYTE_ARRAY, + Bytes.toBytes("aaa"), + Bytes.toBytes("ggg"), + Bytes.toBytes("zzz") + }; + Mockito.doReturn(mockKeys).when(table).getStartKeys(); + + HFileOutputFormat.configureIncrementalLoad(job, table); + assertEquals(job.getNumReduceTasks(), 3); + } + + private byte [][] generateRandomStartKeys(int numKeys) { + Random random = new Random(); + byte[][] ret = new byte[numKeys][]; + // first region start key is always empty + ret[0] = HConstants.EMPTY_BYTE_ARRAY; + for (int i = 1; i < numKeys; i++) { + ret[i] = PerformanceEvaluation.generateValue(random); + } + return ret; + } + + @Test + public void testLocalMRIncrementalLoad() throws Exception { + Configuration conf = util.getConfiguration(); + Path testDir = util.getTestDir("testLocalMRIncrementalLoad"); + byte[][] startKeys = generateRandomStartKeys(5); + + try { + util.startMiniCluster(); + HTable table = util.createTable(TABLE_NAME, FAMILY_NAME); + int numRegions = util.createMultiRegions( + util.getConfiguration(), table, FAMILY_NAME, + startKeys); + assertEquals(numRegions, 5); + + util.startMiniMapReduceCluster(); + runIncrementalPELoad(conf, table, testDir); + } finally { + util.shutdownMiniMapReduceCluster(); + util.shutdownMiniCluster(); + } + } + + private void runIncrementalPELoad( + Configuration conf, HTable table, Path outDir) + throws Exception { + Job job = new Job(conf, "testLocalMRIncrementalLoad"); + setupPEInput(job); + HFileOutputFormat.configureIncrementalLoad(job, table); + FileOutputFormat.setOutputPath(job, outDir); + + assertEquals(table.getRegionsInfo().size(), + job.getNumReduceTasks()); + + assertTrue(job.waitForCompletion(true)); + } + + public static void main(String args[]) throws Exception { + new TestHFileOutputFormat().manualTest(args); + } + + public void manualTest(String args[]) throws Exception { + Configuration conf = HBaseConfiguration.create(); + util = new HBaseTestingUtility(conf); + if ("newtable".equals(args[0])) { + byte[] tname = args[1].getBytes(); + HTable table = util.createTable(tname, FAMILY_NAME); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.disableTable(tname); + util.createMultiRegions(conf, table, FAMILY_NAME, + generateRandomStartKeys(5)); + admin.enableTable(tname); + } else if ("incremental".equals(args[0])) { + byte[] tname = args[1].getBytes(); + HTable table = new HTable(conf, tname); + Path outDir = new Path("incremental-out"); + runIncrementalPELoad(conf, table, outDir); + } else { + throw new RuntimeException( + "usage: TestHFileOutputFormat newtable | incremental"); + } + } } \ No newline at end of file