Index: src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (revision 0) @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Simple {@link InputFormat} for {@link HLog} files. + */ +@InterfaceAudience.Public +public class HLogInputFormat extends InputFormat { + /** + * {@link InputSplit} for {@link HLog} files. Each split represent + * exactly one log file. + */ + static class HLogSplit extends InputSplit implements Writable { + private String logFileName; + private long fileSize; + private long startTime; + private long endTime; + + + /** for serialization */ + public HLogSplit() {} + + public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) { + this.logFileName = logFileName; + this.fileSize = fileSize; + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return fileSize; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + // TODO: Find the data node with the most blocks for this HLog? + return new String[] {}; + } + + public String getLogFileName() { + return logFileName; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + @Override + public void readFields(DataInput in) throws IOException { + logFileName = in.readUTF(); + fileSize = in.readLong(); + startTime = in.readLong(); + endTime = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(logFileName); + out.writeLong(fileSize); + out.writeLong(startTime); + out.writeLong(endTime); + } + } + + /** + * {@link RecordReader} for an {@link HLog} file. + */ + static class HLogRecordReader extends RecordReader { + private HLog.Reader reader = null; + private HLog.Entry currentEntry = new HLog.Entry(); + private long startTime; + private long endTime; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + HLogSplit hsplit = (HLogSplit)split; + Path logFile = new Path(hsplit.getLogFileName()); + Configuration conf = context.getConfiguration(); + try { + this.reader = HLog.getReader(logFile.getFileSystem(conf), logFile, conf); + } catch (EOFException x) { + // possible if RS crashed + } + this.startTime = hsplit.getStartTime(); + this.endTime = hsplit.getEndTime(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (reader == null) return false; + + HLog.Entry temp; + do { + // skip older entries + try { + temp = reader.next(currentEntry); + } catch (EOFException x) { + // expected if RS crashed + temp = null; + } + } + while(temp != null && temp.getKey().getWriteTime() < startTime); + // if we past endtime, we're done + return temp != null && temp.getKey().getWriteTime() <= endTime; + } + + @Override + public HLogKey getCurrentKey() throws IOException, InterruptedException { + return currentEntry.getKey(); + } + + @Override + public WALEdit getCurrentValue() throws IOException, InterruptedException { + return currentEntry.getEdit(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // N/A depends on total number of entries, which is unknown + return 0; + } + + @Override + public void close() throws IOException { + if (reader != null) this.reader.close(); + } + + } + @Override + public List getSplits(JobContext context) throws IOException, + InterruptedException { + Configuration conf = context.getConfiguration(); + Path inputDir = new Path(conf.get("mapred.input.dir")); + long startTime = conf.getLong("hlog.start.time", Long.MIN_VALUE); + long endTime = conf.getLong("hlog.end.time", Long.MAX_VALUE); + FileSystem fs = inputDir.getFileSystem(conf); + List files = getFiles(fs, inputDir, startTime, endTime); + List splits = new ArrayList(files.size()); + for (FileStatus file : files) { + splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); + } + return splits; + } + + private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) + throws IOException { + List result = new ArrayList(); + FileStatus[] files = fs.listStatus(dir); + for (FileStatus file : files) { + if (file.isDir()) { + result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); + } else { + String name = file.getPath().toString(); + // let it blow up if there is a parse exception, means these weren't HLog files + long fileStartTime = Long.parseLong(name.substring(name.lastIndexOf('.')+1)); + if (fileStartTime <= endTime) { + result.add(file); + } + } + } + return result; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new HLogRecordReader(); + } +} Index: src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (revision 0) @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public class WALPlayer { + final static String NAME = "WALPlayer"; + final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output"; + final static String HLOG_INPUT_KEY = "hlog.input.dir"; + final static String TABLE_KEY = "hlog.table.name"; + + /** + * A mapper that just writes out KeyValues. + */ + static class HLogKeyValueMapper + extends Mapper { + private byte[] table; + + @Override + public void map(HLogKey key, WALEdit value, + Context context) + throws IOException { + try { + // skip all other tables + if (Bytes.equals(table, key.getTablename())) { + for (KeyValue kv : value.getKeyValues()) { + if (HLog.isMetaFamily(kv.getFamily())) continue; + context.write(new ImmutableBytesWritable(kv.getRow()), kv); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) { + table = Bytes.toBytes(context.getConfiguration().get(TABLE_KEY)); + } + } + + static class HLogMapper + extends Mapper { + private byte[] table; + + @Override + public void map(HLogKey key, WALEdit value, + Context context) + throws IOException { + try { + // skip all other tables + if (Bytes.equals(table, key.getTablename())) { + Put put = null; + Delete del = null; + KeyValue lastKV = null; + for (KeyValue kv : value.getKeyValues()) { + if (HLog.isMetaFamily(kv.getFamily())) continue; + // A WALEdit may contain multiple operations (HBASE-3584) and/or + // multiple rows (HBASE-5229). + // Aggregate as much as possible into a single Put/Delete + // operation before writing to the context. + if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + // row or type changed, write out aggregate KVs. + if (put != null) context.write(new ImmutableBytesWritable(put.getRow()), put); + if (del != null) context.write(new ImmutableBytesWritable(del.getRow()), del); + + if (kv.isDelete()) { + del = new Delete(kv.getRow()); + } else { + put = new Put(kv.getRow()); + } + } + if (kv.isDelete()) { + del.addDeleteMarker(kv); + } else { + put.add(kv); + } + lastKV = kv; + } + // write residual KVs + if (put != null) context.write(new ImmutableBytesWritable(put.getRow()), put); + if (del != null) context.write(new ImmutableBytesWritable(del.getRow()), del); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) { + table = Bytes.toBytes(context.getConfiguration().get(TABLE_KEY)); + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + String targetTableName = args[1]; + Path inputDir = new Path(args[2]); + conf.set(TABLE_KEY, tableName); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(WALPlayer.class); + FileInputFormat.setInputPaths(job, inputDir); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + job.setInputFormatClass(HLogInputFormat.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + if (hfileOutPath != null) { + HTable table = new HTable(conf, targetTableName); + job.setMapperClass(HLogKeyValueMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat.configureIncrementalLoad(job, table); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } else { + job.setMapperClass(HLogMapper.class); + // No reducers. + TableMapReduceUtil.initTableReducerJob(targetTableName, null, job); + job.setNumReduceTasks(0); + } + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: "+NAME+" [options] "); + System.err.println("Read all WAL entries for table and apply them to table ."); + System.err.println("By default "+NAME+" will load data directly into HBase. To instead generate"); + System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("Other options:"); + System.err.println(" -Dhlog.start.time=ms (only apply edit after this time)"); + System.err.println(" -Dhlog.end.time=ms (only apply edit before this time)"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapred.map.tasks.speculative.execution=false\n" + + " -Dmapred.reduce.tasks.speculative.execution=false"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 3) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +}