Index: src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (revision 0) @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public class WALPlayer extends Configured implements Tool { + final static String NAME = "WALPlayer"; + final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output"; + final static String HLOG_INPUT_KEY = "hlog.input.dir"; + final static String TABLE_KEY = "hlog.table.name"; + + /** + * A mapper that just writes out KeyValues. + * This one can be used together with {@link KeyValueSortReducer} + */ + static class HLogKeyValueMapper + extends Mapper { + private byte[] table; + + @Override + public void map(HLogKey key, WALEdit value, + Context context) + throws IOException { + try { + // skip all other tables + if (Bytes.equals(table, key.getTablename())) { + for (KeyValue kv : value.getKeyValues()) { + if (HLog.isMetaFamily(kv.getFamily())) continue; + context.write(new ImmutableBytesWritable(kv.getRow()), kv); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) { + table = Bytes.toBytes(context.getConfiguration().get(TABLE_KEY)); + } + } + + /** + * A mapper that writes out {@link Mutation} to be directly applied to + * a running HBase instance. + */ + static class HLogMapper + extends Mapper { + private byte[] table; + + @Override + public void map(HLogKey key, WALEdit value, + Context context) + throws IOException { + try { + // skip all other tables + if (Bytes.equals(table, key.getTablename())) { + Put put = null; + Delete del = null; + KeyValue lastKV = null; + for (KeyValue kv : value.getKeyValues()) { + if (HLog.isMetaFamily(kv.getFamily())) continue; + // A WALEdit may contain multiple operations (HBASE-3584) and/or + // multiple rows (HBASE-5229). + // Aggregate as much as possible into a single Put/Delete + // operation before writing to the context. + if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + // row or type changed, write out aggregate KVs. + if (put != null) context.write(new ImmutableBytesWritable(put.getRow()), put); + if (del != null) context.write(new ImmutableBytesWritable(del.getRow()), del); + + if (kv.isDelete()) { + del = new Delete(kv.getRow()); + } else { + put = new Put(kv.getRow()); + } + } + if (kv.isDelete()) { + del.addDeleteMarker(kv); + } else { + put.add(kv); + } + lastKV = kv; + } + // write residual KVs + if (put != null) context.write(new ImmutableBytesWritable(put.getRow()), put); + if (del != null) context.write(new ImmutableBytesWritable(del.getRow()), del); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) { + table = Bytes.toBytes(context.getConfiguration().get(TABLE_KEY)); + } + } + + /** + * @param conf The {@link Configuration} to use. + */ + public WALPlayer(Configuration conf) { + super(conf); + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) + throws IOException { + Configuration conf = getConf(); + String tableName = args[0]; + String targetTableName = args[1]; + Path inputDir = new Path(args[2]); + conf.set(TABLE_KEY, tableName); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(WALPlayer.class); + FileInputFormat.setInputPaths(job, inputDir); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + job.setInputFormatClass(HLogInputFormat.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + if (hfileOutPath != null) { + HTable table = new HTable(conf, targetTableName); + job.setMapperClass(HLogKeyValueMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat.configureIncrementalLoad(job, table); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } else { + job.setMapperClass(HLogMapper.class); + // No reducers. + TableMapReduceUtil.initTableReducerJob(targetTableName, null, job); + job.setNumReduceTasks(0); + } + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: "+NAME+" [options] "); + System.err.println("Read all WAL entries for table and apply them to table ."); + System.err.println("By default "+NAME+" will load data directly into HBase. To instead generate"); + System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("Other options:"); + System.err.println(" -Dhlog.start.time=ms (only apply edit after this time)"); + System.err.println(" -Dhlog.end.time=ms (only apply edit before this time)"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapred.map.tasks.speculative.execution=false\n" + + " -Dmapred.reduce.tasks.speculative.execution=false"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + if (otherArgs.length < 3) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + Job job = createSubmittableJob(otherArgs); + return job.waitForCompletion(true) ? 0 : 1; + } +} Index: src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (revision 0) @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Simple {@link InputFormat} for {@link HLog} files. + */ +@InterfaceAudience.Public +public class HLogInputFormat extends InputFormat { + private static Log LOG = LogFactory.getLog(HLogInputFormat.class); + /** + * {@link InputSplit} for {@link HLog} files. Each split represent + * exactly one log file. + */ + static class HLogSplit extends InputSplit implements Writable { + private String logFileName; + private long fileSize; + private long startTime; + private long endTime; + + + /** for serialization */ + public HLogSplit() {} + + /** + * Represent an HLogSplit, i.e. a single HLog file. + * Start- and EndTime are managed by the split, so that HLog files can be + * filtered before WALEdits are passed to the mapper(s). + * @param logFileName + * @param fileSize + * @param startTime + * @param endTime + */ + public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) { + this.logFileName = logFileName; + this.fileSize = fileSize; + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return fileSize; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + // TODO: Find the data node with the most blocks for this HLog? + return new String[] {}; + } + + public String getLogFileName() { + return logFileName; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + @Override + public void readFields(DataInput in) throws IOException { + logFileName = in.readUTF(); + fileSize = in.readLong(); + startTime = in.readLong(); + endTime = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(logFileName); + out.writeLong(fileSize); + out.writeLong(startTime); + out.writeLong(endTime); + } + + @Override + public String toString() { + return logFileName+" ("+startTime+":"+endTime+") length:"+fileSize; + } + } + + /** + * {@link RecordReader} for an {@link HLog} file. + */ + static class HLogRecordReader extends RecordReader { + private HLog.Reader reader = null; + private HLog.Entry currentEntry = new HLog.Entry(); + private long startTime; + private long endTime; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + HLogSplit hsplit = (HLogSplit)split; + Path logFile = new Path(hsplit.getLogFileName()); + Configuration conf = context.getConfiguration(); + LOG.info("Opening reader for "+split); + try { + this.reader = HLog.getReader(logFile.getFileSystem(conf), logFile, conf); + } catch (EOFException x) { + LOG.info("Ignoring corrupted HLog file: " + logFile + + " (This is normal when a RegionServer crashed.)"); + } + this.startTime = hsplit.getStartTime(); + this.endTime = hsplit.getEndTime(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (reader == null) return false; + + HLog.Entry temp; + long i=-1; + do { + // skip older entries + try { + temp = reader.next(currentEntry); + i++; + } catch (EOFException x) { + LOG.info("Corrupted entry detected. Ignoring the rest of the file." + + " (This is normal when a RegionServer crashed.)"); + return false; + } + } + while(temp != null && temp.getKey().getWriteTime() < startTime); + + if (temp == null) { + if (i>0) LOG.info("Skipped "+i+" entries."); + LOG.info("Reached end of file."); + return false; + } else if (i>0) { + LOG.info("Skipped "+i+" entries, until ts: "+temp.getKey().getWriteTime()+"."); + } + boolean res = temp.getKey().getWriteTime() <= endTime; + if (!res) { + LOG.info("Reached ts: "+temp.getKey().getWriteTime()+" ignoring the rest of the file."); + } + return res; + } + + @Override + public HLogKey getCurrentKey() throws IOException, InterruptedException { + return currentEntry.getKey(); + } + + @Override + public WALEdit getCurrentValue() throws IOException, InterruptedException { + return currentEntry.getEdit(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // N/A depends on total number of entries, which is unknown + return 0; + } + + @Override + public void close() throws IOException { + LOG.info("Closing reader"); + if (reader != null) this.reader.close(); + } + + } + @Override + public List getSplits(JobContext context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + Path inputDir = new Path(conf.get("mapred.input.dir")); + + long startTime = conf.getLong("hlog.start.time", Long.MIN_VALUE); + long endTime = conf.getLong("hlog.end.time", Long.MAX_VALUE); + + FileSystem fs = inputDir.getFileSystem(conf); + List files = getFiles(fs, inputDir, startTime, endTime); + + List splits = new ArrayList(files.size()); + for (FileStatus file : files) { + splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); + } + return splits; + } + + private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) + throws IOException { + List result = new ArrayList(); + LOG.debug("Scanning "+dir.toString()+" for HLog files"); + + FileStatus[] files = fs.listStatus(dir); + for (FileStatus file : files) { + if (file.isDir()) { + // recurse into sub directories + result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); + } else { + String name = file.getPath().toString(); + int idx = name.lastIndexOf('.'); + if (idx > 0) { + try { + long fileStartTime = Long.parseLong(name.substring(idx+1)); + if (fileStartTime <= endTime) { + LOG.info("Found: "+name); + result.add(file); + } + } catch (NumberFormatException x) { + idx = 0; + } + } + if (idx == 0) { + LOG.warn("File "+name+" does not appear to be an HLog file. Skipping..."); + } + } + } + return result; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new HLogRecordReader(); + } +} Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java (revision 0) @@ -0,0 +1,154 @@ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestHLogRecordReader { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + private static FileSystem fs; + private static Path hbaseDir; + private static Path dir; + + private static String getName() { + return "TestHLogRecordReader"; + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + conf = TEST_UTIL.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.replication", 1); + TEST_UTIL.startMiniDFSCluster(1); + + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + + hbaseDir = TEST_UTIL.createRootDir(); + dir = new Path(hbaseDir, getName()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testHLogRecordReader() throws Exception { + final byte [] tableName = Bytes.toBytes(getName()); + final byte [] rowName = tableName; + Path logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + long firstTs = System.currentTimeMillis(); + HLog log = new HLog(fs, logDir, oldLogDir, conf); + HRegionInfo info = new HRegionInfo(tableName, + Bytes.toBytes(""), Bytes.toBytes(""), false); + byte [] family = Bytes.toBytes("column"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + + byte [] value = Bytes.toBytes("value"); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + System.currentTimeMillis(), value)); + log.append(info, tableName, edit, + System.currentTimeMillis(), htd); + + Thread.sleep(1); // make sure 2nd log gets a later timestamp + long secondTs = System.currentTimeMillis(); + log.rollWriter(); + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + System.currentTimeMillis(), value)); + log.append(info, tableName, edit, + System.currentTimeMillis(), htd); + log.close(); + long thirdTs = System.currentTimeMillis(); + + // should have 4 log files now + HLogInputFormat input = new HLogInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapred.input.dir", logDir.toString()); + + // make sure both logs are found + List splits = input.getSplits(new JobContext(jobConf, new JobID())); + assertEquals(2, splits.size()); + + // should return exactly one KV + HLogRecordReader reader = new HLogRecordReader(); + reader.initialize(splits.get(0), new TaskAttemptContext(conf, new TaskAttemptID())); + assertTrue(reader.nextKeyValue()); + assertTrue(Bytes.equals(Bytes.toBytes("1"), reader.getCurrentValue().getKeyValues().get(0).getQualifier())); + assertFalse(reader.nextKeyValue()); + reader.close(); + + // same for the 2nd split + reader = new HLogRecordReader(); + reader.initialize(splits.get(1), new TaskAttemptContext(conf, new TaskAttemptID())); + assertTrue(reader.nextKeyValue()); + assertTrue(Bytes.equals(Bytes.toBytes("2"), reader.getCurrentValue().getKeyValues().get(0).getQualifier())); + assertFalse(reader.nextKeyValue()); + reader.close(); + + // set an endtime, the 2nd log file can be ignored completely. + jobConf.setLong("hlog.end.time", secondTs-1); + splits = input.getSplits(new JobContext(jobConf, new JobID())); + assertEquals(1, splits.size()); + reader = new HLogRecordReader(); + reader.initialize(splits.get(0), new TaskAttemptContext(conf, new TaskAttemptID())); + assertTrue(reader.nextKeyValue()); + assertTrue(Bytes.equals(Bytes.toBytes("1"), reader.getCurrentValue().getKeyValues().get(0).getQualifier())); + assertFalse(reader.nextKeyValue()); + reader.close(); + + // now set a start time + jobConf.setLong("hlog.end.time", Long.MAX_VALUE); + jobConf.setLong("hlog.start.time", thirdTs); + splits = input.getSplits(new JobContext(jobConf, new JobID())); + // both logs need to be considered + assertEquals(2, splits.size()); + // but both readers skip all edits + reader = new HLogRecordReader(); + reader.initialize(splits.get(0), new TaskAttemptContext(conf, new TaskAttemptID())); + assertFalse(reader.nextKeyValue()); + reader.close(); + reader = new HLogRecordReader(); + reader.initialize(splits.get(1), new TaskAttemptContext(conf, new TaskAttemptID())); + assertFalse(reader.nextKeyValue()); + reader.close(); + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +}