Index: src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java =================================================================== --- src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (revision 0) +++ src/test/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (revision 0) @@ -0,0 +1,182 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}. + * Sets up and runs a mapreduce job that writes hfile output. + * Creates a few inner classes to implement splits and an inputformat that + * emits keys and values like those of {@link PerformanceEvaluation}. Makes + * as many splits as "mapred.map.tasks" maps. + */ +public class TestHFileOutputFormat extends HBaseTestCase { + private final static int ROWSPERSPLIT = 1024; + + /* + * InputFormat that makes keys and values like those used in + * PerformanceEvaluation. Makes as many splits as there are configured + * maps ("mapred.map.tasks"). + */ + static class PEInputFormat extends InputFormat { + /* Split that holds nothing but split index. + */ + static class PEInputSplit extends InputSplit implements Writable { + private int index = -1; + + PEInputSplit() { + super(); + } + + PEInputSplit(final int i) { + this.index = i; + } + + int getIndex() { + return this.index; + } + + public long getLength() throws IOException, InterruptedException { + return ROWSPERSPLIT; + } + + public String [] getLocations() throws IOException, InterruptedException { + return new String [] {}; + } + + public void readFields(DataInput in) throws IOException { + this.index = in.readInt(); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(this.index); + } + } + + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + final int startrow = ((PEInputSplit)split).getIndex() * ROWSPERSPLIT; + return new RecordReader() { + // Starts at a particular row + private int counter = startrow; + private ImmutableBytesWritable key; + private ImmutableBytesWritable value; + private final Random random = new Random(System.currentTimeMillis()); + + public void close() throws IOException { + // Nothing to do. + } + + public ImmutableBytesWritable getCurrentKey() + throws IOException, InterruptedException { + return this.key; + } + + public ImmutableBytesWritable getCurrentValue() + throws IOException, InterruptedException { + return this.value; + } + + public float getProgress() throws IOException, InterruptedException { + return (ROWSPERSPLIT - this.counter) / this.counter; + } + + public void initialize(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + // Nothing to do. + + } + + public boolean nextKeyValue() throws IOException, InterruptedException { + if (this.counter - startrow > ROWSPERSPLIT) return false; + this.counter++; + this.key = new ImmutableBytesWritable(PerformanceEvaluation.format(this.counter)); + this.value = new ImmutableBytesWritable(PerformanceEvaluation.generateValue(this.random)); + return true; + } + }; + } + + public List getSplits(JobContext context) + throws IOException, InterruptedException { + int count = context.getConfiguration().getInt("mapred.map.tasks", 1); + List splits = new ArrayList(count); + for (int i = 0; i < count; i++) { + splits.add(new PEInputSplit(i)); + } + return splits; + } + } + + /** + * Simple mapper that makes KeyValue output. + */ + static class PEtoKVMapper extends Mapper { + protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, + org.apache.hadoop.mapreduce.Mapper.Context context) + throws java.io.IOException ,InterruptedException { + context.write(key, new KeyValue(key.get(), PerformanceEvaluation.FAMILY_NAME, + PerformanceEvaluation.QUALIFIER_NAME, value.get())); + } + } + + /** + * Run small MR job. + */ + public void testWritingPEData() throws Exception { + // Set down this value or we OOME in eclipse. + this.conf.setInt("io.sort.mb", 20); + // Write a few files. + this.conf.setLong("hbase.hregion.max.filesize", 64 * 1024); + Job job = new Job(this.conf, getName()); + job.setInputFormatClass(TestHFileOutputFormat.PEInputFormat.class); + job.setMapperClass(TestHFileOutputFormat.PEtoKVMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + job.setReducerClass(KeyValueSortReducer.class); + job.setOutputFormatClass(HFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, this.testDir); + assertTrue(job.waitForCompletion(false)); + FileStatus [] files = this.fs.listStatus(this.testDir); + assertTrue(files.length > 0); + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java =================================================================== --- src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (revision 799799) +++ src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (working copy) @@ -88,8 +88,8 @@ private static final int ONE_GB = 1024 * 1024 * 1000; private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; - static final byte [] FAMILY_NAME = Bytes.toBytes("info"); - static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); + public static final byte [] FAMILY_NAME = Bytes.toBytes("info"); + public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); protected static final HTableDescriptor TABLE_DESCRIPTOR; static { @@ -599,7 +599,7 @@ * consumes about 30% of CPU time. * @return Generated random value to insert into a table cell. */ - static byte[] generateValue(final Random r) { + public static byte[] generateValue(final Random r) { byte [] b = new byte [ROW_LENGTH]; r.nextBytes(b); return b; Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 799799) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -355,7 +355,7 @@ EventType type = event.getType(); KeeperState state = event.getState(); LOG.info("Got ZooKeeper event, state: " + state + ", type: " + - type + ", path: " + event.getPath()); + type + ", path: " + event.getPath()); // Ignore events if we're shutting down. if (stopRequested.get()) { @@ -365,7 +365,13 @@ if (state == KeeperState.Expired) { LOG.error("ZooKeeper session expired"); - restart(); + boolean restart = + this.conf.getBoolean("hbase.regionserver.restart.on.zk.expire", false); + if (restart) { + restart(); + } else { + abort(); + } } else if (type == EventType.NodeDeleted) { watchMasterAddress(); } else if (type == EventType.NodeCreated) { Index: src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 799799) +++ src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -397,15 +397,16 @@ /** * @param fs - * @param p + * @param dir Directory to create file in. * @return random filename inside passed dir */ - static Path getUniqueFile(final FileSystem fs, final Path p) + public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { - if (!fs.getFileStatus(p).isDir()) { - throw new IOException("Expecting a directory"); + if (!fs.getFileStatus(dir).isDir()) { + throw new IOException("Expecting " + dir.toString() + + " to be a directory"); } - return fs.getFileStatus(p).isDir()? getRandomFilename(fs, p): p; + return fs.getFileStatus(dir).isDir()? getRandomFilename(fs, dir): dir; } /** Index: src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java (revision 0) +++ src/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java (revision 0) @@ -0,0 +1,49 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.util.TreeSet; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Emits sorted KeyValues. + * Reads in all KeyValues from passed Iterator, sorts them, then emits + * KeyValues in sorted order. If lots of columns per row, it will use lots of + * memory sorting. + */ +public class KeyValueSortReducer extends Reducer { + protected void reduce(ImmutableBytesWritable row, java.lang.Iterable kvs, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet(KeyValue.COMPARATOR); + for (KeyValue kv: kvs) { + map.add(kv); + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv: map) { + context.write(row, kv); + if (index > 0 && index % 100 == 0) context.setStatus("Wrote " + index); + } + } +} Index: src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 0) +++ src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 0) @@ -0,0 +1,128 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.mortbay.log.Log; + +/** + * Writes HFiles. + * Passed KeyValues arrive in order. Can only do a single column family at a + * time. Multiple column families requires coordinating keys cross family. + * Writes current time as the sequence file. Sets the major compacted + * attribute on created hfiles. + * @see KeyValueSortReducer + */ +public class HFileOutputFormat extends FileOutputFormat { + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + // Get the path of the temporary output file + final Path outputdir = FileOutputFormat.getOutputPath(context); + Configuration conf = context.getConfiguration(); + final FileSystem fs = outputdir.getFileSystem(conf); + // These configs. are from hbase-*.xml + final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456); + final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536); + // Invented config. Add to hbase-*.xml if other than default compression. + final String compression = conf.get("hfile.compression", + Compression.Algorithm.NONE.getName()); + + return new RecordWriter() { + // Map of families to writers and how much has been output on the writer. + private final Map writers = + new TreeMap(Bytes.BYTES_COMPARATOR); + + public void write(ImmutableBytesWritable row, KeyValue kv) + throws IOException { + long length = kv.getLength(); + byte [] family = kv.getFamily(); + WriterLength wl = this.writers.get(family); + if (wl == null || (length + wl.written) >= maxsize) { + // Get a new writer. + Path basedir = new Path(outputdir, Bytes.toString(family)); + if (wl == null) { + wl = new WriterLength(); + this.writers.put(family, wl); + if (this.writers.size() > 1) throw new IOException("One family only"); + // If wl == null, first file in family. Ensure family dir exits. + if (!fs.exists(basedir)) fs.mkdirs(basedir); + } + wl.writer = getNewWriter(wl.writer, basedir); + Log.info("Writer=" + wl.writer.getPath() + + ((wl.written == 0)? "": ", wrote=" + wl.written)); + wl.written = 0; + } + wl.writer.append(kv); + wl.written += length; + } + + /* Create a new HFile.Writer. Close current if there is one. + * @param writer + * @param familydir + * @return A new HFile.Writer. + * @throws IOException + */ + private HFile.Writer getNewWriter(final HFile.Writer writer, + final Path familydir) + throws IOException { + close(writer); + return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), + blocksize, compression, KeyValue.KEY_COMPARATOR); + } + + private void close(final HFile.Writer w) throws IOException { + if (w != null) { + StoreFile.appendMetadata(w, System.currentTimeMillis(), true); + w.close(); + } + } + + public void close(TaskAttemptContext c) + throws IOException, InterruptedException { + for (Map.Entry e: this.writers.entrySet()) { + close(e.getValue().writer); + } + } + }; + } + + /* + * Data structure to hold a Writer and amount of data written on it. + */ + static class WriterLength { + long written = 0; + HFile.Writer writer = null; + } +} \ No newline at end of file Index: bin/loadtable.rb =================================================================== --- bin/loadtable.rb (revision 0) +++ bin/loadtable.rb (revision 0) @@ -0,0 +1,129 @@ +# Script that takes over from org.apache.hadoop.hbase.mapreduce.HFileOutputFormat. +# Pass it output directory of HFileOutputFormat. It will read the passed files, +# move them into place and update the catalog table appropriately. Warning: +# it will overwrite anything that exists already for passed table. +# It expects hbase to be up and running so it can insert table info. +# +# To see usage for this script, run: +# +# ${HBASE_HOME}/bin/hbase org.jruby.Main loadtable.rb +# +include Java +import java.util.TreeMap +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.util.FSUtils +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.util.Writables +import org.apache.hadoop.hbase.HConstants +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.HRegionInfo +import org.apache.hadoop.hbase.HTableDescriptor +import org.apache.hadoop.hbase.HColumnDescriptor +import org.apache.hadoop.hbase.HRegionInfo +import org.apache.hadoop.hbase.io.hfile.HFile +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileSystem +import org.apache.commons.logging.Log +import org.apache.commons.logging.LogFactory + +# Name of this script +NAME = "loadtable" + +# Print usage for this script +def usage + puts 'Usage: %s.rb TABLENAME HFILEOUTPUTFORMAT_OUTPUT_DIR' % NAME + exit! +end + +# Passed 'dir' exists and is a directory else exception +def isDirExists(fs, dir) + raise IOError.new("Does not exit: " + dir.toString()) unless fs.exists(dir) + raise IOError.new("Not a directory: " + dir.toString()) unless fs.isDirectory(dir) +end + +# Check arguments +if ARGV.size != 2 + usage +end + +# Check good table names were passed. +tableName = HTableDescriptor.isLegalTableName(ARGV[0].to_java_bytes) +outputdir = Path.new(ARGV[1]) + +# Get configuration to use. +c = HBaseConfiguration.new() +# Get a logger and a metautils instance. +LOG = LogFactory.getLog(NAME) + +# Set hadoop filesystem configuration using the hbase.rootdir. +# Otherwise, we'll always use localhost though the hbase.rootdir +# might be pointing at hdfs location. +c.set("fs.default.name", c.get(HConstants::HBASE_DIR)) +fs = FileSystem.get(c) + +# If hfiles directory does not exist, exit. +isDirExists(fs, outputdir) +# Create table dir if it doesn't exist. +rootdir = FSUtils.getRootDir(c) +tableDir = Path.new(rootdir, Path.new(Bytes.toString(tableName))) +fs.mkdirs(tableDir) unless fs.exists(tableDir) + +# Start. Per hfile, move it, and insert an entry in catalog table. +families = fs.listStatus(outputdir) +throw IOError.new("Can do one family only") if families.length > 1 +# Read meta on all files. Put in map keyed by end key. +map = TreeMap.new(Bytes::ByteArrayComparator.new()) +family = families[0] +# Make sure this subdir exists under table +hfiles = fs.listStatus(family.getPath()) +LOG.info("Found " + hfiles.length.to_s + " hfiles"); +count = 0 +for hfile in hfiles + reader = HFile::Reader.new(fs, hfile.getPath(), nil, false) + begin + fileinfo = reader.loadFileInfo() + lastkey = reader.getLastKey() + # Last key is row/column/ts. We just want the row part. + rowlen = Bytes.toShort(lastkey) + LOG.info(count.to_s + " read lastrow of " + + Bytes.toString(lastkey[2, rowlen]) + " from " + hfile.getPath().toString()) + map.put(lastkey[2, rowlen], [hfile, fileinfo]) + count = count + 1 + ensure + reader.close() + end +end +# Now I have sorted list of fileinfo+paths. Start insert. +# Get a client on catalog table. +meta = HTable.new(c, HConstants::META_TABLE_NAME) +# I can't find out from hfile how its compressed. +# Using all defaults. Change manually after loading if +# something else wanted in column or table attributes. +familyName = family.getPath().getName() +hcd = HColumnDescriptor.new(familyName) +htd = HTableDescriptor.new(tableName) +htd.addFamily(hcd) +previouslastkey = HConstants::EMPTY_START_ROW +count = 0 +for i in map.keySet() + tuple = map.get(i) + startkey = previouslastkey + count = 1 + count + lastkey = i + if count == map.size() + # Then we are at last key. Set it to special indicator + lastkey = HConstants::EMPTY_START_ROW + end + previouslastkey = lastkey + hri = HRegionInfo.new(htd, startkey, lastkey) + LOG.info(hri.toString()) + hfile = tuple[0].getPath() + tgt = Path.new(Path.new(tableDir, familyName), hfile.getName()) + fs.rename(hfile, tgt) + LOG.info("Moved " + hfile.toString() + " to " + tgt.toString()) + p = Put.new(hri.getRegionName()) + p.add(HConstants::CATALOG_FAMILY, HConstants::REGIONINFO_QUALIFIER, Writables.getBytes(hri)) + meta.put(p) + LOG.info("Inserted " + hri.toString()) +end