diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java deleted file mode 100644 index 0e26512..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ /dev/null @@ -1,813 +0,0 @@ -/** - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.concurrent.Callable; - -import junit.framework.Assert; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.PerformanceEvaluation; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - -import com.google.common.collect.Lists; - -/** - * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}. - * Sets up and runs a mapreduce job that writes hfile output. - * Creates a few inner classes to implement splits and an inputformat that - * emits keys and values like those of {@link PerformanceEvaluation}. - */ -@Category(LargeTests.class) -public class TestHFileOutputFormat { - private final static int ROWSPERSPLIT = 1024; - - private static final byte[][] FAMILIES - = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")) - , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))}; - private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); - - private HBaseTestingUtility util = new HBaseTestingUtility(); - - private static Log LOG = LogFactory.getLog(TestHFileOutputFormat.class); - - /** - * Simple mapper that makes KeyValue output. - */ - static class RandomKVGeneratingMapper - extends Mapper { - - private int keyLength; - private static final int KEYLEN_DEFAULT=10; - private static final String KEYLEN_CONF="randomkv.key.length"; - - private int valLength; - private static final int VALLEN_DEFAULT=10; - private static final String VALLEN_CONF="randomkv.val.length"; - - @Override - protected void setup(Context context) throws IOException, - InterruptedException { - super.setup(context); - - Configuration conf = context.getConfiguration(); - keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); - valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); - } - - protected void map( - NullWritable n1, NullWritable n2, - Mapper.Context context) - throws java.io.IOException ,InterruptedException - { - - byte keyBytes[] = new byte[keyLength]; - byte valBytes[] = new byte[valLength]; - - int taskId = context.getTaskAttemptID().getTaskID().getId(); - assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - - Random random = new Random(); - for (int i = 0; i < ROWSPERSPLIT; i++) { - - random.nextBytes(keyBytes); - // Ensure that unique tasks generate unique keys - keyBytes[keyLength - 1] = (byte)(taskId & 0xFF); - random.nextBytes(valBytes); - ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - - for (byte[] family : TestHFileOutputFormat.FAMILIES) { - KeyValue kv = new KeyValue(keyBytes, family, - PerformanceEvaluation.QUALIFIER_NAME, valBytes); - context.write(key, kv); - } - } - } - } - - private void setupRandomGeneratorMapper(Job job) { - job.setInputFormatClass(NMapInputFormat.class); - job.setMapperClass(RandomKVGeneratingMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - } - - /** - * Test that {@link HFileOutputFormat} RecordWriter amends timestamps if - * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}. - * @see HBASE-2615 - */ - @Test - public void test_LATEST_TIMESTAMP_isReplaced() - throws Exception { - Configuration conf = new Configuration(this.util.getConfiguration()); - RecordWriter writer = null; - TaskAttemptContext context = null; - Path dir = - util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); - try { - Job job = new Job(conf); - FileOutputFormat.setOutputPath(job, dir); - context = getTestTaskAttemptContext(job); - HFileOutputFormat hof = new HFileOutputFormat(); - writer = hof.getRecordWriter(context); - final byte [] b = Bytes.toBytes("b"); - - // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be - // changed by call to write. Check all in kv is same but ts. - KeyValue kv = new KeyValue(b, b, b); - KeyValue original = kv.clone(); - writer.write(new ImmutableBytesWritable(), kv); - assertFalse(original.equals(kv)); - assertTrue(Bytes.equals(original.getRow(), kv.getRow())); - assertTrue(original.matchingColumn(kv.getFamily(), kv.getQualifier())); - assertNotSame(original.getTimestamp(), kv.getTimestamp()); - assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); - - // Test 2. Now test passing a kv that has explicit ts. It should not be - // changed by call to record write. - kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); - original = kv.clone(); - writer.write(new ImmutableBytesWritable(), kv); - assertTrue(original.equals(kv)); - } finally { - if (writer != null && context != null) writer.close(context); - dir.getFileSystem(conf).delete(dir, true); - } - } - - /** - * @return True if the available mapreduce is post-0.20. - */ - private static boolean isPost020MapReduce() { - // Here is a coarse test for post 0.20 hadoop; TAC became an interface. - return TaskAttemptContext.class.isInterface(); - } - - private TaskAttemptContext getTestTaskAttemptContext(final Job job) - throws IOException, Exception { - TaskAttemptContext context; - if (isPost020MapReduce()) { - TaskAttemptID id = - TaskAttemptID.forName("attempt_200707121733_0001_m_000000_0"); - Class clazz = - Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); - Constructor c = clazz. - getConstructor(Configuration.class, TaskAttemptID.class); - context = (TaskAttemptContext)c.newInstance(job.getConfiguration(), id); - } else { - context = org.apache.hadoop.hbase.mapreduce.hadoopbackport.InputSampler. - getTaskAttemptContext(job); - } - return context; - } - - /* - * Test that {@link HFileOutputFormat} creates an HFile with TIMERANGE - * metadata used by time-restricted scans. - */ - @Test - public void test_TIMERANGE() throws Exception { - Configuration conf = new Configuration(this.util.getConfiguration()); - RecordWriter writer = null; - TaskAttemptContext context = null; - Path dir = - util.getDataTestDir("test_TIMERANGE_present"); - LOG.info("Timerange dir writing to dir: "+ dir); - try { - // build a record writer using HFileOutputFormat - Job job = new Job(conf); - FileOutputFormat.setOutputPath(job, dir); - context = getTestTaskAttemptContext(job); - HFileOutputFormat hof = new HFileOutputFormat(); - writer = hof.getRecordWriter(context); - - // Pass two key values with explicit times stamps - final byte [] b = Bytes.toBytes("b"); - - // value 1 with timestamp 2000 - KeyValue kv = new KeyValue(b, b, b, 2000, b); - KeyValue original = kv.clone(); - writer.write(new ImmutableBytesWritable(), kv); - assertEquals(original,kv); - - // value 2 with timestamp 1000 - kv = new KeyValue(b, b, b, 1000, b); - original = kv.clone(); - writer.write(new ImmutableBytesWritable(), kv); - assertEquals(original, kv); - - // verify that the file has the proper FileInfo. - writer.close(context); - - // the generated file lives 1 directory down from the attempt directory - // and is the only file, e.g. - // _attempt__0000_r_000000_0/b/1979617994050536795 - FileSystem fs = FileSystem.get(conf); - Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); - FileStatus[] sub1 = fs.listStatus(attemptDirectory); - FileStatus[] file = fs.listStatus(sub1[0].getPath()); - - // open as HFile Reader and pull out TIMERANGE FileInfo. - HFile.Reader rd = HFile.createReader(fs, file[0].getPath(), - new CacheConfig(conf)); - Map finfo = rd.loadFileInfo(); - byte[] range = finfo.get("TIMERANGE".getBytes()); - assertNotNull(range); - - // unmarshall and check values. - TimeRangeTracker timeRangeTracker = new TimeRangeTracker(); - Writables.copyWritable(range, timeRangeTracker); - LOG.info(timeRangeTracker.getMinimumTimestamp() + - "...." + timeRangeTracker.getMaximumTimestamp()); - assertEquals(1000, timeRangeTracker.getMinimumTimestamp()); - assertEquals(2000, timeRangeTracker.getMaximumTimestamp()); - rd.close(); - } finally { - if (writer != null && context != null) writer.close(context); - dir.getFileSystem(conf).delete(dir, true); - } - } - - /** - * Run small MR job. - */ - @Test - public void testWritingPEData() throws Exception { - Configuration conf = util.getConfiguration(); - Path testDir = util.getDataTestDir("testWritingPEData"); - FileSystem fs = testDir.getFileSystem(conf); - - // Set down this value or we OOME in eclipse. - conf.setInt("io.sort.mb", 20); - // Write a few files. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); - - Job job = new Job(conf, "testWritingPEData"); - setupRandomGeneratorMapper(job); - // This partitioner doesn't work well for number keys but using it anyways - // just to demonstrate how to configure it. - byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; - byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; - - Arrays.fill(startKey, (byte)0); - Arrays.fill(endKey, (byte)0xff); - - job.setPartitionerClass(SimpleTotalOrderPartitioner.class); - // Set start and end rows for partitioner. - SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); - SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); - job.setReducerClass(KeyValueSortReducer.class); - job.setOutputFormatClass(HFileOutputFormat.class); - job.setNumReduceTasks(4); - - FileOutputFormat.setOutputPath(job, testDir); - assertTrue(job.waitForCompletion(false)); - FileStatus [] files = fs.listStatus(testDir); - assertTrue(files.length > 0); - } - - @Test - public void testJobConfiguration() throws Exception { - Job job = new Job(util.getConfiguration()); - job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); - HTable table = Mockito.mock(HTable.class); - setupMockStartKeys(table); - HFileOutputFormat.configureIncrementalLoad(job, table); - assertEquals(job.getNumReduceTasks(), 4); - } - - private byte [][] generateRandomStartKeys(int numKeys) { - Random random = new Random(); - byte[][] ret = new byte[numKeys][]; - // first region start key is always empty - ret[0] = HConstants.EMPTY_BYTE_ARRAY; - for (int i = 1; i < numKeys; i++) { - ret[i] = PerformanceEvaluation.generateValue(random); - } - return ret; - } - - @Test - public void testMRIncrementalLoad() throws Exception { - LOG.info("\nStarting test testMRIncrementalLoad\n"); - doIncrementalLoadTest(false); - } - - @Test - public void testMRIncrementalLoadWithSplit() throws Exception { - LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); - doIncrementalLoadTest(true); - } - - private void doIncrementalLoadTest( - boolean shouldChangeRegions) throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - Path testDir = util.getDataTestDir("testLocalMRIncrementalLoad"); - byte[][] startKeys = generateRandomStartKeys(5); - - try { - util.startMiniCluster(); - HBaseAdmin admin = new HBaseAdmin(conf); - HTable table = util.createTable(TABLE_NAME, FAMILIES); - assertEquals("Should start with empty table", - 0, util.countRows(table)); - int numRegions = util.createMultiRegions( - util.getConfiguration(), table, FAMILIES[0], startKeys); - assertEquals("Should make 5 regions", numRegions, 5); - - // Generate the bulk load files - util.startMiniMapReduceCluster(); - runIncrementalPELoad(conf, table, testDir); - // This doesn't write into the table, just makes files - assertEquals("HFOF should not touch actual table", - 0, util.countRows(table)); - - - // Make sure that a directory was created for every CF - int dir = 0; - for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) { - for (byte[] family : FAMILIES) { - if (Bytes.toString(family).equals(f.getPath().getName())) { - ++dir; - } - } - } - assertEquals("Column family not found in FS.", FAMILIES.length, dir); - - // handle the split case - if (shouldChangeRegions) { - LOG.info("Changing regions in table"); - admin.disableTable(table.getTableName()); - while(util.getMiniHBaseCluster().getMaster().getAssignmentManager(). - isRegionsInTransition()) { - Threads.sleep(200); - LOG.info("Waiting on table to finish disabling"); - } - byte[][] newStartKeys = generateRandomStartKeys(15); - util.createMultiRegions( - util.getConfiguration(), table, FAMILIES[0], newStartKeys); - admin.enableTable(table.getTableName()); - while (table.getRegionsInfo().size() != 15 || - !admin.isTableAvailable(table.getTableName())) { - Thread.sleep(200); - LOG.info("Waiting for new region assignment to happen"); - } - } - - // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); - - // Ensure data shows up - int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; - assertEquals("LoadIncrementalHFiles should put expected data in table", - expectedRows, util.countRows(table)); - Scan scan = new Scan(); - ResultScanner results = table.getScanner(scan); - for (Result res : results) { - assertEquals(FAMILIES.length, res.raw().length); - KeyValue first = res.raw()[0]; - for (KeyValue kv : res.raw()) { - assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv)); - assertTrue(Bytes.equals(first.getValue(), kv.getValue())); - } - } - results.close(); - String tableDigestBefore = util.checksumRows(table); - - // Cause regions to reopen - admin.disableTable(TABLE_NAME); - while (!admin.isTableDisabled(TABLE_NAME)) { - Thread.sleep(200); - LOG.info("Waiting for table to disable"); - } - admin.enableTable(TABLE_NAME); - util.waitTableAvailable(TABLE_NAME, 30000); - assertEquals("Data should remain after reopening of regions", - tableDigestBefore, util.checksumRows(table)); - } finally { - util.shutdownMiniMapReduceCluster(); - util.shutdownMiniCluster(); - } - } - - private void runIncrementalPELoad( - Configuration conf, HTable table, Path outDir) - throws Exception { - Job job = new Job(conf, "testLocalMRIncrementalLoad"); - job.setWorkingDirectory(util.getDataTestDir("runIncrementalPELoad")); - setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); - FileOutputFormat.setOutputPath(job, outDir); - - Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ; - - assertEquals(table.getRegionsInfo().size(), - job.getNumReduceTasks()); - - assertTrue(job.waitForCompletion(true)); - } - - /** - * Test for - * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests - * that the compression map is correctly deserialized from configuration - * - * @throws IOException - */ - @Test - public void testCreateFamilyCompressionMap() throws IOException { - for (int numCfs = 0; numCfs <= 3; numCfs++) { - Configuration conf = new Configuration(this.util.getConfiguration()); - Map familyToCompression = getMockColumnFamilies(numCfs); - HTable table = Mockito.mock(HTable.class); - setupMockColumnFamilies(table, familyToCompression); - HFileOutputFormat.configureCompression(table, conf); - - // read back family specific compression setting from the configuration - Map retrievedFamilyToCompressionMap = HFileOutputFormat.createFamilyCompressionMap(conf); - - // test that we have a value for all column families that matches with the - // used mock values - for (Entry entry : familyToCompression.entrySet()) { - assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue() - .getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); - } - } - } - - private void setupMockColumnFamilies(HTable table, - Map familyToCompression) throws IOException - { - HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); - for (Entry entry : familyToCompression.entrySet()) { - mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) - .setMaxVersions(1) - .setCompressionType(entry.getValue()) - .setBlockCacheEnabled(false) - .setTimeToLive(0)); - } - Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); - } - - private void setupMockStartKeys(HTable table) throws IOException { - byte[][] mockKeys = new byte[][] { - HConstants.EMPTY_BYTE_ARRAY, - Bytes.toBytes("aaa"), - Bytes.toBytes("ggg"), - Bytes.toBytes("zzz") - }; - Mockito.doReturn(mockKeys).when(table).getStartKeys(); - } - - /** - * @return a map from column family names to compression algorithms for - * testing column family compression. Column family names have special characters - */ - private Map getMockColumnFamilies(int numCfs) { - Map familyToCompression = new HashMap(); - // use column family names having special characters - if (numCfs-- > 0) { - familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); - } - if (numCfs-- > 0) { - familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); - } - if (numCfs-- > 0) { - familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); - } - if (numCfs-- > 0) { - familyToCompression.put("Family3", Compression.Algorithm.NONE); - } - return familyToCompression; - } - - /** - * Test that {@link HFileOutputFormat} RecordWriter uses compression settings - * from the column family descriptor - */ - @Test - public void testColumnFamilyCompression() throws Exception { - Configuration conf = new Configuration(this.util.getConfiguration()); - RecordWriter writer = null; - TaskAttemptContext context = null; - Path dir = - util.getDataTestDir("testColumnFamilyCompression"); - - HTable table = Mockito.mock(HTable.class); - - Map configuredCompression = - new HashMap(); - Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms(); - - int familyIndex = 0; - for (byte[] family : FAMILIES) { - configuredCompression.put(Bytes.toString(family), - supportedAlgos[familyIndex++ % supportedAlgos.length]); - } - setupMockColumnFamilies(table, configuredCompression); - - // set up the table to return some mock keys - setupMockStartKeys(table); - - try { - // partial map red setup to get an operational writer for testing - // We turn off the sequence file compression, because DefaultCodec - // pollutes the GZip codec pool with an incompatible compressor. - conf.set("io.seqfile.compression.type", "NONE"); - Job job = new Job(conf, "testLocalMRIncrementalLoad"); - job.setWorkingDirectory(util.getDataTestDir("testColumnFamilyCompression")); - setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); - FileOutputFormat.setOutputPath(job, dir); - context = getTestTaskAttemptContext(job); - HFileOutputFormat hof = new HFileOutputFormat(); - writer = hof.getRecordWriter(context); - - // write out random rows - writeRandomKeyValues(writer, context, ROWSPERSPLIT); - writer.close(context); - - // Make sure that a directory was created for every CF - FileSystem fileSystem = dir.getFileSystem(conf); - - // commit so that the filesystem has one directory per column family - hof.getOutputCommitter(context).commitTask(context); - hof.getOutputCommitter(context).commitJob(context); - for (byte[] family : FAMILIES) { - String familyStr = new String(family); - boolean found = false; - for (FileStatus f : fileSystem.listStatus(dir)) { - - if (Bytes.toString(family).equals(f.getPath().getName())) { - // we found a matching directory - found = true; - - // verify that the compression on this file matches the configured - // compression - Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath(); - Reader reader = HFile.createReader(fileSystem, dataFilePath, - new CacheConfig(conf)); - reader.loadFileInfo(); - assertEquals("Incorrect compression used for column family " + familyStr - + "(reader: " + reader + ")", - configuredCompression.get(familyStr), reader.getCompressionAlgorithm()); - break; - } - } - - if (!found) { - fail("HFile for column family " + familyStr + " not found"); - } - } - - } finally { - dir.getFileSystem(conf).delete(dir, true); - } - } - - - /** - * @return - */ - private Compression.Algorithm[] getSupportedCompressionAlgorithms() { - String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); - List supportedAlgos = Lists.newArrayList(); - - for (String algoName : allAlgos) { - try { - Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); - algo.getCompressor(); - supportedAlgos.add(algo); - }catch (Exception e) { - // this algo is not available - } - } - - return supportedAlgos.toArray(new Compression.Algorithm[0]); - } - - - /** - * Write random values to the writer assuming a table created using - * {@link #FAMILIES} as column family descriptors - */ - private void writeRandomKeyValues(RecordWriter writer, TaskAttemptContext context, - int numRows) - throws IOException, InterruptedException { - byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; - int valLength = 10; - byte valBytes[] = new byte[valLength]; - - int taskId = context.getTaskAttemptID().getTaskID().getId(); - assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; - - Random random = new Random(); - for (int i = 0; i < numRows; i++) { - - Bytes.putInt(keyBytes, 0, i); - random.nextBytes(valBytes); - ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - - for (byte[] family : TestHFileOutputFormat.FAMILIES) { - KeyValue kv = new KeyValue(keyBytes, family, - PerformanceEvaluation.QUALIFIER_NAME, valBytes); - writer.write(key, kv); - } - } - } - - @Test - public void testExcludeMinorCompaction() throws Exception { - Configuration conf = util.getConfiguration(); - conf.setInt("hbase.hstore.compaction.min", 2); - Path testDir = util.getDataTestDir("testExcludeMinorCompaction"); - byte[][] startKeys = generateRandomStartKeys(5); - - try { - util.startMiniCluster(); - final FileSystem fs = util.getDFSCluster().getFileSystem(); - HBaseAdmin admin = new HBaseAdmin(conf); - HTable table = util.createTable(TABLE_NAME, FAMILIES); - assertEquals("Should start with empty table", 0, util.countRows(table)); - - // deep inspection: get the StoreFile dir - final Path storePath = Store.getStoreHomedir( - HTableDescriptor.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME), - admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(), - FAMILIES[0]); - assertEquals(0, fs.listStatus(storePath).length); - - // put some data in it and flush to create a storefile - Put p = new Put(Bytes.toBytes("test")); - p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); - table.put(p); - admin.flush(TABLE_NAME); - assertEquals(1, util.countRows(table)); - quickPoll(new Callable() { - public Boolean call() throws Exception { - return fs.listStatus(storePath).length == 1; - } - }, 5000); - - // Generate a bulk load file with more rows - conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", - true); - util.startMiniMapReduceCluster(); - runIncrementalPELoad(conf, table, testDir); - - // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); - - // Ensure data shows up - int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; - assertEquals("LoadIncrementalHFiles should put expected data in table", - expectedRows + 1, util.countRows(table)); - - // should have a second StoreFile now - assertEquals(2, fs.listStatus(storePath).length); - - // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME); - try { - quickPoll(new Callable() { - public Boolean call() throws Exception { - return fs.listStatus(storePath).length == 1; - } - }, 5000); - throw new IOException("SF# = " + fs.listStatus(storePath).length); - } catch (AssertionError ae) { - // this is expected behavior - } - - // a major compaction should work though - admin.majorCompact(TABLE_NAME); - quickPoll(new Callable() { - public Boolean call() throws Exception { - return fs.listStatus(storePath).length == 1; - } - }, 5000); - - } finally { - util.shutdownMiniMapReduceCluster(); - util.shutdownMiniCluster(); - } - } - - private void quickPoll(Callable c, int waitMs) throws Exception { - int sleepMs = 10; - int retries = (int) Math.ceil(((double) waitMs) / sleepMs); - while (retries-- > 0) { - if (c.call().booleanValue()) { - return; - } - Thread.sleep(sleepMs); - } - fail(); - } - - public static void main(String args[]) throws Exception { - new TestHFileOutputFormat().manualTest(args); - } - - public void manualTest(String args[]) throws Exception { - Configuration conf = HBaseConfiguration.create(); - util = new HBaseTestingUtility(conf); - if ("newtable".equals(args[0])) { - byte[] tname = args[1].getBytes(); - HTable table = util.createTable(tname, FAMILIES); - HBaseAdmin admin = new HBaseAdmin(conf); - admin.disableTable(tname); - byte[][] startKeys = generateRandomStartKeys(5); - util.createMultiRegions(conf, table, FAMILIES[0], startKeys); - admin.enableTable(tname); - } else if ("incremental".equals(args[0])) { - byte[] tname = args[1].getBytes(); - HTable table = new HTable(conf, tname); - Path outDir = new Path("incremental-out"); - runIncrementalPELoad(conf, table, outDir); - } else { - throw new RuntimeException( - "usage: TestHFileOutputFormat newtable | incremental"); - } - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java deleted file mode 100644 index 0b3ba83..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * JUnit tests for the HLogRecordReader - */ -@Category(MediumTests.class) -public class TestHLogRecordReader { - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static Configuration conf; - private static FileSystem fs; - private static Path hbaseDir; - private static final byte [] tableName = Bytes.toBytes(getName()); - private static final byte [] rowName = tableName; - private static final HRegionInfo info = new HRegionInfo(tableName, - Bytes.toBytes(""), Bytes.toBytes(""), false); - private static final byte [] family = Bytes.toBytes("column"); - private static final byte [] value = Bytes.toBytes("value"); - private static HTableDescriptor htd; - private static Path logDir; - private static Path oldLogDir; - - private static String getName() { - return "TestHLogRecordReader"; - } - - @Before - public void setUp() throws Exception { - FileStatus[] entries = fs.listStatus(hbaseDir); - for (FileStatus dir : entries) { - fs.delete(dir.getPath(), true); - } - - } - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Make block sizes small. - conf = TEST_UTIL.getConfiguration(); - conf.setInt("dfs.blocksize", 1024 * 1024); - conf.setInt("dfs.replication", 1); - TEST_UTIL.startMiniDFSCluster(1); - - conf = TEST_UTIL.getConfiguration(); - fs = TEST_UTIL.getDFSCluster().getFileSystem(); - - hbaseDir = TEST_UTIL.createRootDir(); - logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); - oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); - htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(family)); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Test partial reads from the log based on passed time range - * @throws Exception - */ - @Test - public void testPartialRead() throws Exception { - HLog log = new HLog(fs, logDir, oldLogDir, conf); - long ts = System.currentTimeMillis(); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), - ts, value)); - log.append(info, tableName, edit, - ts, htd); - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - ts+1, value)); - log.append(info, tableName, edit, - ts+1, htd); - log.rollWriter(); - - Thread.sleep(1); - long ts1 = System.currentTimeMillis(); - - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), - ts1+1, value)); - log.append(info, tableName, edit, - ts1+1, htd); - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), - ts1+2, value)); - log.append(info, tableName, edit, - ts1+2, htd); - log.close(); - - HLogInputFormat input = new HLogInputFormat(); - Configuration jobConf = new Configuration(conf); - jobConf.set("mapred.input.dir", logDir.toString()); - jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts); - - // only 1st file is considered, and only its 1st entry is used - List splits = input.getSplits(new JobContext(jobConf, new JobID())); - assertEquals(1, splits.size()); - testSplit(splits.get(0), Bytes.toBytes("1")); - - jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1); - jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1); - splits = input.getSplits(new JobContext(jobConf, new JobID())); - // both files need to be considered - assertEquals(2, splits.size()); - // only the 2nd entry from the 1st file is used - testSplit(splits.get(0), Bytes.toBytes("2")); - // only the 1nd entry from the 2nd file is used - testSplit(splits.get(1), Bytes.toBytes("3")); - } - - /** - * Test basic functionality - * @throws Exception - */ - @Test - public void testHLogRecordReader() throws Exception { - HLog log = new HLog(fs, logDir, oldLogDir, conf); - byte [] value = Bytes.toBytes("value"); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), - System.currentTimeMillis(), value)); - log.append(info, tableName, edit, - System.currentTimeMillis(), htd); - - Thread.sleep(1); // make sure 2nd log gets a later timestamp - long secondTs = System.currentTimeMillis(); - log.rollWriter(); - - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - System.currentTimeMillis(), value)); - log.append(info, tableName, edit, - System.currentTimeMillis(), htd); - log.close(); - long thirdTs = System.currentTimeMillis(); - - // should have 2 log files now - HLogInputFormat input = new HLogInputFormat(); - Configuration jobConf = new Configuration(conf); - jobConf.set("mapred.input.dir", logDir.toString()); - - // make sure both logs are found - List splits = input.getSplits(new JobContext(jobConf, new JobID())); - assertEquals(2, splits.size()); - - // should return exactly one KV - testSplit(splits.get(0), Bytes.toBytes("1")); - // same for the 2nd split - testSplit(splits.get(1), Bytes.toBytes("2")); - - // now test basic time ranges: - - // set an endtime, the 2nd log file can be ignored completely. - jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1); - splits = input.getSplits(new JobContext(jobConf, new JobID())); - assertEquals(1, splits.size()); - testSplit(splits.get(0), Bytes.toBytes("1")); - - // now set a start time - jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE); - jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs); - splits = input.getSplits(new JobContext(jobConf, new JobID())); - // both logs need to be considered - assertEquals(2, splits.size()); - // but both readers skip all edits - testSplit(splits.get(0)); - testSplit(splits.get(1)); - } - - /** - * Create a new reader from the split, and match the edits against the passed columns. - */ - private void testSplit(InputSplit split, byte[]... columns) throws Exception { - HLogRecordReader reader = new HLogRecordReader(); - reader.initialize(split, new TaskAttemptContext(conf, new TaskAttemptID())); - - for (byte[] column : columns) { - assertTrue(reader.nextKeyValue()); - assertTrue(Bytes - .equals(column, reader.getCurrentValue().getKeyValues().get(0).getQualifier())); - } - assertFalse(reader.nextKeyValue()); - reader.close(); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java deleted file mode 100644 index a3faeb3..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.GenericOptionsParser; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; - -@Category(MediumTests.class) -public class TestImportExport { - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static final byte[] ROW1 = Bytes.toBytes("row1"); - private static final byte[] ROW2 = Bytes.toBytes("row2"); - private static final String FAMILYA_STRING = "a"; - private static final String FAMILYB_STRING = "b"; - private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); - private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); - private static final byte[] QUAL = Bytes.toBytes("q"); - private static final String OUTPUT_DIR = "outputdir"; - - private static MiniHBaseCluster cluster; - private static long now = System.currentTimeMillis(); - - @BeforeClass - public static void beforeClass() throws Exception { - cluster = UTIL.startMiniCluster(); - UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniMapReduceCluster(); - UTIL.shutdownMiniCluster(); - } - - @Before - @After - public void cleanup() throws Exception { - FileSystem fs = FileSystem.get(UTIL.getConfiguration()); - fs.delete(new Path(OUTPUT_DIR), true); - } - - /** - * Test simple replication case with column mapping - * @throws Exception - */ - @Test - public void testSimpleCase() throws Exception { - String EXPORT_TABLE = "exportSimpleCase"; - HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA); - Put p = new Put(ROW1); - p.add(FAMILYA, QUAL, now, QUAL); - p.add(FAMILYA, QUAL, now+1, QUAL); - p.add(FAMILYA, QUAL, now+2, QUAL); - t.put(p); - p = new Put(ROW2); - p.add(FAMILYA, QUAL, now, QUAL); - p.add(FAMILYA, QUAL, now+1, QUAL); - p.add(FAMILYA, QUAL, now+2, QUAL); - t.put(p); - - String[] args = new String[] { - EXPORT_TABLE, - OUTPUT_DIR, - "1000" - }; - - GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - Configuration conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - Job job = Export.createSubmittableJob(conf, args); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - - - String IMPORT_TABLE = "importTableSimpleCase"; - t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB); - args = new String[] { - "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, - IMPORT_TABLE, - OUTPUT_DIR - }; - - opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - job = Import.createSubmittableJob(conf, args); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - - Get g = new Get(ROW1); - g.setMaxVersions(); - Result r = t.get(g); - assertEquals(3, r.size()); - g = new Get(ROW2); - g.setMaxVersions(); - r = t.get(g); - assertEquals(3, r.size()); - } - - @Test - public void testWithDeletes() throws Exception { - String EXPORT_TABLE = "exportWithDeletes"; - HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(true) - ); - UTIL.getHBaseAdmin().createTable(desc); - HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE); - - Put p = new Put(ROW1); - p.add(FAMILYA, QUAL, now, QUAL); - p.add(FAMILYA, QUAL, now+1, QUAL); - p.add(FAMILYA, QUAL, now+2, QUAL); - p.add(FAMILYA, QUAL, now+3, QUAL); - p.add(FAMILYA, QUAL, now+4, QUAL); - t.put(p); - - Delete d = new Delete(ROW1, now+3, null); - t.delete(d); - d = new Delete(ROW1); - d.deleteColumns(FAMILYA, QUAL, now+2); - t.delete(d); - - String[] args = new String[] { - "-D" + Export.RAW_SCAN + "=true", - EXPORT_TABLE, - OUTPUT_DIR, - "1000" - }; - - GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - Configuration conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - Job job = Export.createSubmittableJob(conf, args); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - - - String IMPORT_TABLE = "importWithDeletes"; - desc = new HTableDescriptor(IMPORT_TABLE); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(true) - ); - UTIL.getHBaseAdmin().createTable(desc); - t.close(); - t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE); - args = new String[] { - IMPORT_TABLE, - OUTPUT_DIR - }; - - opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args); - conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - job = Import.createSubmittableJob(conf, args); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - - Scan s = new Scan(); - s.setMaxVersions(); - s.setRaw(true); - ResultScanner scanner = t.getScanner(s); - Result r = scanner.next(); - KeyValue[] res = r.raw(); - assertTrue(res[0].isDeleteFamily()); - assertEquals(now+4, res[1].getTimestamp()); - assertEquals(now+3, res[2].getTimestamp()); - assertTrue(res[3].isDelete()); - assertEquals(now+2, res[4].getTimestamp()); - assertEquals(now+1, res[5].getTimestamp()); - assertEquals(now, res[6].getTimestamp()); - t.close(); - } -} diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java deleted file mode 100644 index ff1ede3..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ /dev/null @@ -1,283 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.ArrayList; - -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.GenericOptionsParser; - -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Result; - -import org.junit.Test; - -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.*; - -@Category(MediumTests.class) -public class TestImportTsv { - - @Test - public void testTsvParserSpecParsing() { - TsvParser parser; - - parser = new TsvParser("HBASE_ROW_KEY", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertEquals(0, parser.getRowKeyColumnIndex()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertEquals(0, parser.getRowKeyColumnIndex()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); - assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); - assertEquals(0, parser.getRowKeyColumnIndex()); - } - - @Test - public void testTsvParser() throws BadTsvLineException { - TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t"); - assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); - assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); - assertNull(parser.getFamily(2)); - assertNull(parser.getQualifier(2)); - assertEquals(2, parser.getRowKeyColumnIndex()); - - byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); - ParsedLine parsed = parser.parse(line, line.length); - checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); - } - - private void checkParsing(ParsedLine parsed, Iterable expected) { - ArrayList parsedCols = new ArrayList(); - for (int i = 0; i < parsed.getColumnCount(); i++) { - parsedCols.add(Bytes.toString( - parsed.getLineBytes(), - parsed.getColumnOffset(i), - parsed.getColumnLength(i))); - } - if (!Iterables.elementsEqual(parsedCols, expected)) { - fail("Expected: " + Joiner.on(",").join(expected) + "\n" + - "Got:" + Joiner.on(",").join(parsedCols)); - } - } - - private void assertBytesEquals(byte[] a, byte[] b) { - assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b)); - } - - /** - * Test cases that throw BadTsvLineException - */ - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); - ParsedLine parsed = parser.parse(line, line.length); - } - - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes(""); - ParsedLine parsed = parser.parse(line, line.length); - } - - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes("key_only"); - ParsedLine parsed = parser.parse(line, line.length); - } - - @Test(expected=BadTsvLineException.class) - public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); - byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); - ParsedLine parsed = parser.parse(line, line.length); - } - - @Test - public void testMROnTable() - throws Exception { - String TABLE_NAME = "TestTable"; - String FAMILY = "FAM"; - String INPUT_FILE = "InputFile.esv"; - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - TABLE_NAME, - INPUT_FILE - }; - - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1); - } - - @Test - public void testMROnTableWithCustomMapper() - throws Exception { - String TABLE_NAME = "TestTable"; - String FAMILY = "FAM"; - String INPUT_FILE = "InputFile2.esv"; - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper", - TABLE_NAME, - INPUT_FILE - }; - - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3); - } - - private void doMROnTableTest(String inputFile, String family, String tableName, - String[] args, int valueMultiplier) throws Exception { - - // Cluster - HBaseTestingUtility htu1 = new HBaseTestingUtility(); - - MiniHBaseCluster cluster = htu1.startMiniCluster(); - htu1.startMiniMapReduceCluster(); - - GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args); - Configuration conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); - - try { - - FileSystem fs = FileSystem.get(conf); - FSDataOutputStream op = fs.create(new Path(inputFile), true); - String line = "KEY\u001bVALUE1\u001bVALUE2\n"; - op.write(line.getBytes(HConstants.UTF8_ENCODING)); - op.close(); - - final byte[] FAM = Bytes.toBytes(family); - final byte[] TAB = Bytes.toBytes(tableName); - final byte[] QA = Bytes.toBytes("A"); - final byte[] QB = Bytes.toBytes("B"); - if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { - HTableDescriptor desc = new HTableDescriptor(TAB); - desc.addFamily(new HColumnDescriptor(FAM)); - new HBaseAdmin(conf).createTable(desc); - } else { // set the hbaseAdmin as we are not going through main() - ImportTsv.createHbaseAdmin(conf); - } - Job job = ImportTsv.createSubmittableJob(conf, args); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); - - HTable table = new HTable(new Configuration(conf), TAB); - boolean verified = false; - long pause = conf.getLong("hbase.client.pause", 5 * 1000); - int numRetries = conf.getInt("hbase.client.retries.number", 5); - for (int i = 0; i < numRetries; i++) { - try { - Scan scan = new Scan(); - // Scan entire family. - scan.addFamily(FAM); - ResultScanner resScanner = table.getScanner(scan); - for (Result res : resScanner) { - assertTrue(res.size() == 2); - List kvs = res.list(); - assertEquals(toU8Str(kvs.get(0).getRow()), - toU8Str(Bytes.toBytes("KEY"))); - assertEquals(toU8Str(kvs.get(1).getRow()), - toU8Str(Bytes.toBytes("KEY"))); - assertEquals(toU8Str(kvs.get(0).getValue()), - toU8Str(Bytes.toBytes("VALUE" + valueMultiplier))); - assertEquals(toU8Str(kvs.get(1).getValue()), - toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier))); - // Only one result set is expected, so let it loop. - } - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - } finally { - htu1.shutdownMiniMapReduceCluster(); - htu1.shutdownMiniCluster(); - } - } - - @Test - public void testBulkOutputWithoutAnExistingTable() throws Exception { - String TABLE_NAME = "TestTable"; - String FAMILY = "FAM"; - String INPUT_FILE = "InputFile2.esv"; - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME, - INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3); - } - - public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException { - return new String(bytes, HConstants.UTF8_ENCODING); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java deleted file mode 100644 index d0f9ef7..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.*; -import org.junit.experimental.categories.Category; - -/** - * Test cases for the "load" half of the HFileOutputFormat bulk load - * functionality. These tests run faster than the full MR cluster - * tests in TestHFileOutputFormat - */ -@Category(LargeTests.class) -public class TestLoadIncrementalHFiles { - private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); - private static final byte[] FAMILY = Bytes.toBytes("myfam"); - - private static final byte[][] SPLIT_KEYS = new byte[][] { - Bytes.toBytes("ddd"), - Bytes.toBytes("ppp") - }; - - public static int BLOCKSIZE = 64*1024; - public static String COMPRESSION = - Compression.Algorithm.NONE.getName(); - - private static HBaseTestingUtility util = new HBaseTestingUtility(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - util.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - util.shutdownMiniCluster(); - } - - /** - * Test case that creates some regions and loads - * HFiles that fit snugly inside those regions - */ - @Test - public void testSimpleLoad() throws Exception { - runTest("testSimpleLoad", BloomType.NONE, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, - new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, - }); - } - - /** - * Test case that creates some regions and loads - * HFiles that cross the boundaries of those regions - */ - @Test - public void testRegionCrossingLoad() throws Exception { - runTest("testRegionCrossingLoad", BloomType.NONE, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, - new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); - } - - /** - * Test loading into a column family that has a ROW bloom filter. - */ - @Test - public void testRegionCrossingRowBloom() throws Exception { - runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, - new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); - } - - /** - * Test loading into a column family that has a ROWCOL bloom filter. - */ - @Test - public void testRegionCrossingRowColBloom() throws Exception { - runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, - new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); - } - - private void runTest(String testName, BloomType bloomType, - byte[][][] hfileRanges) throws Exception { - Path dir = util.getDataTestDir(testName); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); - - int hfileIdx = 0; - for (byte[][] range : hfileRanges) { - byte[] from = range[0]; - byte[] to = range[1]; - createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" - + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); - } - int expectedRows = hfileIdx * 1000; - - final byte[] TABLE = Bytes.toBytes("mytable_"+testName); - - HBaseAdmin admin = new HBaseAdmin(util.getConfiguration()); - HTableDescriptor htd = new HTableDescriptor(TABLE); - HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); - familyDesc.setBloomFilterType(bloomType); - htd.addFamily(familyDesc); - admin.createTable(htd, SPLIT_KEYS); - - HTable table = new HTable(util.getConfiguration(), TABLE); - util.waitTableAvailable(TABLE, 30000); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles( - util.getConfiguration()); - loader.doBulkLoad(dir, table); - - assertEquals(expectedRows, util.countRows(table)); - } - - @Test - public void testSplitStoreFile() throws IOException { - Path dir = util.getDataTestDir("testSplitHFile"); - FileSystem fs = util.getTestFileSystem(); - Path testIn = new Path(dir, "testhfile"); - HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); - createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, - Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); - - Path bottomOut = new Path(dir, "bottom.out"); - Path topOut = new Path(dir, "top.out"); - - LoadIncrementalHFiles.splitStoreFile( - util.getConfiguration(), testIn, - familyDesc, Bytes.toBytes("ggg"), - bottomOut, - topOut); - - int rowCount = verifyHFile(bottomOut); - rowCount += verifyHFile(topOut); - assertEquals(1000, rowCount); - } - - private int verifyHFile(Path p) throws IOException { - Configuration conf = util.getConfiguration(); - HFile.Reader reader = HFile.createReader( - p.getFileSystem(conf), p, new CacheConfig(conf)); - reader.loadFileInfo(); - HFileScanner scanner = reader.getScanner(false, false); - scanner.seekTo(); - int count = 0; - do { - count++; - } while (scanner.next()); - assertTrue(count > 0); - reader.close(); - return count; - } - - - /** - * Create an HFile with the given number of rows between a given - * start key and end key. - * TODO put me in an HFileTestUtil or something? - */ - static void createHFile( - Configuration conf, - FileSystem fs, Path path, - byte[] family, byte[] qualifier, - byte[] startKey, byte[] endKey, int numRows) throws IOException - { - HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)) - .withPath(fs, path) - .withBlockSize(BLOCKSIZE) - .withCompression(COMPRESSION) - .withComparator(KeyValue.KEY_COMPARATOR) - .create(); - long now = System.currentTimeMillis(); - try { - // subtract 2 since iterateOnSplits doesn't include boundary keys - for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) { - KeyValue kv = new KeyValue(key, family, qualifier, now, key); - writer.append(kv); - } - } finally { - writer.close(); - } - } - - private void addStartEndKeysForTest(TreeMap map, byte[] first, byte[] last) { - Integer value = map.containsKey(first)?(Integer)map.get(first):0; - map.put(first, value+1); - - value = map.containsKey(last)?(Integer)map.get(last):0; - map.put(last, value-1); - } - - @Test - public void testInferBoundaries() { - TreeMap map = new TreeMap(Bytes.BYTES_COMPARATOR); - - /* Toy example - * c---------i o------p s---------t v------x - * a------e g-----k m-------------q r----s u----w - * - * Should be inferred as: - * a-----------------k m-------------q r--------------t u---------x - * - * The output should be (m,r,u) - */ - - String first; - String last; - - first = "a"; last = "e"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "r"; last = "s"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "o"; last = "p"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "g"; last = "k"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "v"; last = "x"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "c"; last = "i"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "m"; last = "q"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "s"; last = "t"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "u"; last = "w"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map); - byte[][] compare = new byte[3][]; - compare[0] = "m".getBytes(); - compare[1] = "r".getBytes(); - compare[2] = "u".getBytes(); - - assertEquals(keysArray.length, 3); - - for (int row = 0; row nm : r.getNoVersionMap().values()) { - for (byte[] val : nm.values()) { - assertTrue(Bytes.equals(val, value(value))); - } - } - } - assertEquals(count, i); - } catch (IOException e) { - fail("Failed due to exception"); - } - } - - /** - * Test that shows that exception thrown from the RS side will result in an - * exception on the LIHFile client. - */ - @Test(expected=IOException.class) - public void testBulkLoadPhaseFailure() throws Exception { - String table = "bulkLoadPhaseFailure"; - setupTable(table, 10); - - final AtomicInteger attmptedCalls = new AtomicInteger(); - final AtomicInteger failedCalls = new AtomicInteger(); - LoadIncrementalHFiles lih = new LoadIncrementalHFiles( - util.getConfiguration()) { - - protected List tryAtomicRegionLoad(final HConnection conn, - byte[] tableName, final byte[] first, Collection lqis) - throws IOException { - int i = attmptedCalls.incrementAndGet(); - if (i == 1) { - HConnection errConn = null; - try { - errConn = getMockedConnection(util.getConfiguration()); - } catch (Exception e) { - LOG.fatal("mocking cruft, should never happen", e); - throw new RuntimeException("mocking cruft, should never happen"); - } - failedCalls.incrementAndGet(); - return super.tryAtomicRegionLoad(errConn, tableName, first, lqis); - } - - return super.tryAtomicRegionLoad(conn, tableName, first, lqis); - } - }; - - // create HFiles for different column families - Path dir = buildBulkFiles(table, 1); - HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table)); - lih.doBulkLoad(dir, t); - - fail("doBulkLoad should have thrown an exception"); - } - - private HConnection getMockedConnection(final Configuration conf) - throws IOException { - HConnection c = Mockito.mock(HConnection.class); - Mockito.when(c.getConfiguration()).thenReturn(conf); - Mockito.doNothing().when(c).close(); - // Make it so we return a particular location when asked. - final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, - "example.org", 1234); - Mockito.when(c.getRegionLocation((byte[]) Mockito.any(), - (byte[]) Mockito.any(), Mockito.anyBoolean())). - thenReturn(loc); - Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())). - thenReturn(loc); - HRegionInterface hri = Mockito.mock(HRegionInterface.class); - Mockito.when(hri.bulkLoadHFiles(Mockito.anyList(), (byte [])Mockito.any())). - thenThrow(new IOException("injecting bulk load error")); - Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())). - thenReturn(hri); - return c; - } - - /** - * This test exercises the path where there is a split after initial - * validation but before the atomic bulk load call. We cannot use presplitting - * to test this path, so we actually inject a split just before the atomic - * region load. - */ - @Test - public void testSplitWhileBulkLoadPhase() throws Exception { - final String table = "splitWhileBulkloadPhase"; - setupTable(table, 10); - populateTable(table,1); - assertExpectedTable(table, ROWCOUNT, 1); - - // Now let's cause trouble. This will occur after checks and cause bulk - // files to fail when attempt to atomically import. This is recoverable. - final AtomicInteger attemptedCalls = new AtomicInteger(); - LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles( - util.getConfiguration()) { - - protected void bulkLoadPhase(final HTable htable, final HConnection conn, - ExecutorService pool, Deque queue, - final Multimap regionGroups) throws IOException { - int i = attemptedCalls.incrementAndGet(); - if (i == 1) { - // On first attempt force a split. - forceSplit(table); - } - - super.bulkLoadPhase(htable, conn, pool, queue, regionGroups); - } - }; - - // create HFiles for different column families - HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table)); - Path bulk = buildBulkFiles(table, 2); - lih2.doBulkLoad(bulk, t); - - // check that data was loaded - // The three expected attempts are 1) failure because need to split, 2) - // load of split top 3) load of split bottom - assertEquals(attemptedCalls.get(), 3); - assertExpectedTable(table, ROWCOUNT, 2); - } - - /** - * This test splits a table and attempts to bulk load. The bulk import files - * should be split before atomically importing. - */ - @Test - public void testGroupOrSplitPresplit() throws Exception { - final String table = "groupOrSplitPresplit"; - setupTable(table, 10); - populateTable(table, 1); - assertExpectedTable(table, ROWCOUNT, 1); - forceSplit(table); - - final AtomicInteger countedLqis= new AtomicInteger(); - LoadIncrementalHFiles lih = new LoadIncrementalHFiles( - util.getConfiguration()) { - protected List groupOrSplit( - Multimap regionGroups, - final LoadQueueItem item, final HTable htable, - final Pair startEndKeys) throws IOException { - List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); - if (lqis != null) { - countedLqis.addAndGet(lqis.size()); - } - return lqis; - } - }; - - // create HFiles for different column families - Path bulk = buildBulkFiles(table, 2); - HTable ht = new HTable(util.getConfiguration(), Bytes.toBytes(table)); - lih.doBulkLoad(bulk, ht); - - assertExpectedTable(table, ROWCOUNT, 2); - assertEquals(20, countedLqis.get()); - } - - /** - * This simulates an remote exception which should cause LIHF to exit with an - * exception. - */ - @Test(expected = IOException.class) - public void testGroupOrSplitFailure() throws Exception { - String table = "groupOrSplitFailure"; - setupTable(table, 10); - - LoadIncrementalHFiles lih = new LoadIncrementalHFiles( - util.getConfiguration()) { - int i = 0; - - protected List groupOrSplit( - Multimap regionGroups, - final LoadQueueItem item, final HTable table, - final Pair startEndKeys) throws IOException { - i++; - - if (i == 5) { - throw new IOException("failure"); - } - return super.groupOrSplit(regionGroups, item, table, startEndKeys); - } - }; - - // create HFiles for different column families - Path dir = buildBulkFiles(table,1); - HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table)); - lih.doBulkLoad(dir, t); - - fail("doBulkLoad should have thrown an exception"); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java deleted file mode 100644 index f77b7eb..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; - -/** - * Test Map/Reduce job over HBase tables. The map/reduce process we're testing - * on our tables is simple - take every row in the table, reverse the value of - * a particular cell, and write it back to the table. - */ -@Category(LargeTests.class) -public class TestMultithreadedTableMapper { - private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class); - private static final HBaseTestingUtility UTIL = - new HBaseTestingUtility(); - static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); - static final int NUMBER_OF_THREADS = 10; - - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(); - HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY}); - UTIL.createMultiRegions(table, INPUT_FAMILY); - UTIL.loadTable(table, INPUT_FAMILY); - UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniMapReduceCluster(); - UTIL.shutdownMiniCluster(); - } - - /** - * Pass the given key and processed record reduce - */ - public static class ProcessContentsMapper - extends TableMapper { - - /** - * Pass the key, and reversed value to reduce - * - * @param key - * @param value - * @param context - * @throws IOException - */ - public void map(ImmutableBytesWritable key, Result value, - Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - // Get the original value and reverse it - String originalValue = new String(value.getValue(INPUT_FAMILY, null), - HConstants.UTF8_ENCODING); - StringBuilder newValue = new StringBuilder(originalValue); - newValue.reverse(); - // Now set the value to be collected - Put outval = new Put(key.get()); - outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); - context.write(key, outval); - } - } - - /** - * Test multithreadedTableMappper map/reduce against a multi-region table - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testMultithreadedTableMapper() - throws IOException, InterruptedException, ClassNotFoundException { - runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()), - MULTI_REGION_TABLE_NAME)); - } - - private void runTestOnTable(HTable table) - throws IOException, InterruptedException, ClassNotFoundException { - Job job = null; - try { - LOG.info("Before map/reduce startup"); - job = new Job(table.getConfiguration(), "process column contents"); - job.setNumReduceTasks(1); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - TableMapReduceUtil.initTableMapperJob( - Bytes.toString(table.getTableName()), scan, - MultithreadedTableMapper.class, ImmutableBytesWritable.class, - Put.class, job); - MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); - MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); - TableMapReduceUtil.initTableReducerJob( - Bytes.toString(table.getTableName()), - IdentityTableReducer.class, job); - FileOutputFormat.setOutputPath(job, new Path("test")); - LOG.info("Started " + Bytes.toString(table.getTableName())); - assertTrue(job.waitForCompletion(true)); - LOG.info("After map/reduce completion"); - // verify map-reduce results - verify(Bytes.toString(table.getTableName())); - } finally { - table.close(); - if (job != null) { - FileUtil.fullyDelete( - new File(job.getConfiguration().get("hadoop.tmp.dir"))); - } - } - } - - private void verify(String tableName) throws IOException { - HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName); - boolean verified = false; - long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); - int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5); - for (int i = 0; i < numRetries; i++) { - try { - LOG.info("Verification attempt #" + i); - verifyAttempt(table); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - LOG.debug("Verification attempt failed: " + e.getMessage()); - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - table.close(); - } - - /** - * Looks at every value of the mapreduce output and verifies that indeed - * the values have been reversed. - * - * @param table Table to scan. - * @throws IOException - * @throws NullPointerException if we failed to find a cell value - */ - private void verifyAttempt(final HTable table) - throws IOException, NullPointerException { - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - scan.addFamily(OUTPUT_FAMILY); - ResultScanner scanner = table.getScanner(scan); - try { - Iterator itr = scanner.iterator(); - assertTrue(itr.hasNext()); - while(itr.hasNext()) { - Result r = itr.next(); - if (LOG.isDebugEnabled()) { - if (r.size() > 2 ) { - throw new IOException("Too many results, expected 2 got " + - r.size()); - } - } - byte[] firstValue = null; - byte[] secondValue = null; - int count = 0; - for(KeyValue kv : r.list()) { - if (count == 0) { - firstValue = kv.getValue(); - }else if (count == 1) { - secondValue = kv.getValue(); - }else if (count == 2) { - break; - } - count++; - } - String first = ""; - if (firstValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": first value is null"); - } - first = new String(firstValue, HConstants.UTF8_ENCODING); - String second = ""; - if (secondValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": second value is null"); - } - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; - } - second = new String(secondReversed, HConstants.UTF8_ENCODING); - if (first.compareTo(second) != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("second key is not the reverse of first. row=" + - Bytes.toStringBinary(r.getRow()) + ", first value=" + first + - ", second value=" + second); - } - fail(); - } - } - } finally { - scanner.close(); - } - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java deleted file mode 100644 index 7ec759a..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.experimental.categories.Category; - -/** - * Test of simple partitioner. - */ -@Category(SmallTests.class) -public class TestSimpleTotalOrderPartitioner extends HBaseTestCase { - public void testSplit() throws Exception { - String start = "a"; - String end = "{"; - SimpleTotalOrderPartitioner p = - new SimpleTotalOrderPartitioner(); - this.conf.set(SimpleTotalOrderPartitioner.START, start); - this.conf.set(SimpleTotalOrderPartitioner.END, end); - p.setConf(this.conf); - ImmutableBytesWritable c = new ImmutableBytesWritable(Bytes.toBytes("c")); - // If one reduce, partition should be 0. - int partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 1); - assertEquals(0, partition); - // If two reduces, partition should be 0. - partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 2); - assertEquals(0, partition); - // Divide in 3. - partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 3); - assertEquals(0, partition); - ImmutableBytesWritable q = new ImmutableBytesWritable(Bytes.toBytes("q")); - partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 2); - assertEquals(1, partition); - partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 3); - assertEquals(2, partition); - // What about end and start keys. - ImmutableBytesWritable startBytes = - new ImmutableBytesWritable(Bytes.toBytes(start)); - partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 2); - assertEquals(0, partition); - partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 3); - assertEquals(0, partition); - ImmutableBytesWritable endBytes = - new ImmutableBytesWritable(Bytes.toBytes("z")); - partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 2); - assertEquals(1, partition); - partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 3); - assertEquals(2, partition); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java deleted file mode 100644 index afead7d..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java +++ /dev/null @@ -1,395 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests various scan start and stop row scenarios. This is set in a scan and - * tested in a MapReduce job to see if that is handed over and done properly - * too. - */ -@Category(LargeTests.class) -public class TestTableInputFormatScan { - - static final Log LOG = LogFactory.getLog(TestTableInputFormatScan.class); - static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - static final byte[] TABLE_NAME = Bytes.toBytes("scantest"); - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final String KEY_STARTROW = "startRow"; - static final String KEY_LASTROW = "stpRow"; - - private static HTable table = null; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // switch TIF to log at DEBUG level - TEST_UTIL.enableDebug(TableInputFormat.class); - TEST_UTIL.enableDebug(TableInputFormatBase.class); - // start mini hbase cluster - TEST_UTIL.startMiniCluster(3); - // create and fill table - table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY); - TEST_UTIL.createMultiRegions(table, INPUT_FAMILY); - TEST_UTIL.loadTable(table, INPUT_FAMILY); - // start MR cluster - TEST_UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniMapReduceCluster(); - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Pass the key and value to reduce. - */ - public static class ScanMapper - extends TableMapper { - - /** - * Pass the key and value to reduce. - * - * @param key The key, here "aaa", "aab" etc. - * @param value The value is the same as the key. - * @param context The task context. - * @throws IOException When reading the rows fails. - */ - @Override - public void map(ImmutableBytesWritable key, Result value, - Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); - LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + - ", value -> " + val); - context.write(key, key); - } - - } - - /** - * Checks the last and first key seen against the scanner boundaries. - */ - public static class ScanReducer - extends Reducer { - - private String first = null; - private String last = null; - - protected void reduce(ImmutableBytesWritable key, - Iterable values, Context context) - throws IOException ,InterruptedException { - int count = 0; - for (ImmutableBytesWritable value : values) { - String val = Bytes.toStringBinary(value.get()); - LOG.info("reduce: key[" + count + "] -> " + - Bytes.toStringBinary(key.get()) + ", value -> " + val); - if (first == null) first = val; - last = val; - count++; - } - } - - protected void cleanup(Context context) - throws IOException, InterruptedException { - Configuration c = context.getConfiguration(); - String startRow = c.get(KEY_STARTROW); - String lastRow = c.get(KEY_LASTROW); - LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); - LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); - if (startRow != null && startRow.length() > 0) { - assertEquals(startRow, first); - } - if (lastRow != null && lastRow.length() > 0) { - assertEquals(lastRow, last); - } - } - - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, null, null); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToAPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "app", "apo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToBBA() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "bba", "baz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToBBB() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "bbb", "bba"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanEmptyToOPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan(null, "opp", "opo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanOBBToOPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("obb", "opp", "opo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanOBBToQPP() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("obb", "qpp", "qpo"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanOPPToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("opp", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanYYXToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("yyx", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanYYYToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("yyy", null, "zzz"); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testScanYZYToEmpty() - throws IOException, InterruptedException, ClassNotFoundException { - testScan("yzy", null, "zzz"); - } - - @Test - public void testScanFromConfiguration() - throws IOException, InterruptedException, ClassNotFoundException { - testScanFromConfiguration("bba", "bbd", "bbc"); - } - - /** - * Tests an MR Scan initialized from properties set in the Configuration. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - private void testScanFromConfiguration(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") + - "To" + (stop != null ? stop.toUpperCase() : "Empty"); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME)); - c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY)); - c.set(KEY_STARTROW, start != null ? start : ""); - c.set(KEY_LASTROW, last != null ? last : ""); - - if (start != null) { - c.set(TableInputFormat.SCAN_ROW_START, start); - } - - if (stop != null) { - c.set(TableInputFormat.SCAN_ROW_STOP, stop); - } - - Job job = new Job(c, jobName); - job.setMapperClass(ScanMapper.class); - job.setReducerClass(ScanReducer.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(ImmutableBytesWritable.class); - job.setInputFormatClass(TableInputFormat.class); - job.setNumReduceTasks(1); - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - job.waitForCompletion(true); - assertTrue(job.isComplete()); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - private void testScan(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") + - "To" + (stop != null ? stop.toUpperCase() : "Empty"); - LOG.info("Before map/reduce startup - job " + jobName); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - if (start != null) { - scan.setStartRow(Bytes.toBytes(start)); - } - c.set(KEY_STARTROW, start != null ? start : ""); - if (stop != null) { - scan.setStopRow(Bytes.toBytes(stop)); - } - c.set(KEY_LASTROW, last != null ? last : ""); - LOG.info("scan before: " + scan); - Job job = new Job(c, jobName); - TableMapReduceUtil.initTableMapperJob( - Bytes.toString(TABLE_NAME), scan, ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - job.setReducerClass(ScanReducer.class); - job.setNumReduceTasks(1); // one to get final "first" and "last" key - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - LOG.info("Started " + job.getJobName()); - job.waitForCompletion(true); - assertTrue(job.isComplete()); - LOG.info("After map/reduce completion - job " + jobName); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java deleted file mode 100644 index b351444..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -/** - * Test Map/Reduce job over HBase tables. The map/reduce process we're testing - * on our tables is simple - take every row in the table, reverse the value of - * a particular cell, and write it back to the table. - */ -@Category(LargeTests.class) -public class TestTableMapReduce { - private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); - private static final HBaseTestingUtility UTIL = - new HBaseTestingUtility(); - static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest"); - static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); - - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(); - HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY}); - UTIL.createMultiRegions(table, INPUT_FAMILY); - UTIL.loadTable(table, INPUT_FAMILY); - UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniMapReduceCluster(); - UTIL.shutdownMiniCluster(); - } - - /** - * Pass the given key and processed record reduce - */ - public static class ProcessContentsMapper - extends TableMapper { - - /** - * Pass the key, and reversed value to reduce - * - * @param key - * @param value - * @param context - * @throws IOException - */ - public void map(ImmutableBytesWritable key, Result value, - Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - - // Get the original value and reverse it - String originalValue = new String(value.getValue(INPUT_FAMILY, null), - HConstants.UTF8_ENCODING); - StringBuilder newValue = new StringBuilder(originalValue); - newValue.reverse(); - // Now set the value to be collected - Put outval = new Put(key.get()); - outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); - context.write(key, outval); - } - } - - /** - * Test a map/reduce against a multi-region table - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testMultiRegionTable() - throws IOException, InterruptedException, ClassNotFoundException { - runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()), - MULTI_REGION_TABLE_NAME)); - } - - private void runTestOnTable(HTable table) - throws IOException, InterruptedException, ClassNotFoundException { - Job job = null; - try { - LOG.info("Before map/reduce startup"); - job = new Job(table.getConfiguration(), "process column contents"); - job.setNumReduceTasks(1); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - TableMapReduceUtil.initTableMapperJob( - Bytes.toString(table.getTableName()), scan, - ProcessContentsMapper.class, ImmutableBytesWritable.class, - Put.class, job); - TableMapReduceUtil.initTableReducerJob( - Bytes.toString(table.getTableName()), - IdentityTableReducer.class, job); - FileOutputFormat.setOutputPath(job, new Path("test")); - LOG.info("Started " + Bytes.toString(table.getTableName())); - assertTrue(job.waitForCompletion(true)); - LOG.info("After map/reduce completion"); - - // verify map-reduce results - verify(Bytes.toString(table.getTableName())); - } finally { - table.close(); - if (job != null) { - FileUtil.fullyDelete( - new File(job.getConfiguration().get("hadoop.tmp.dir"))); - } - } - } - - private void verify(String tableName) throws IOException { - HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName); - boolean verified = false; - long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); - int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5); - for (int i = 0; i < numRetries; i++) { - try { - LOG.info("Verification attempt #" + i); - verifyAttempt(table); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - LOG.debug("Verification attempt failed: " + e.getMessage()); - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - table.close(); - } - - /** - * Looks at every value of the mapreduce output and verifies that indeed - * the values have been reversed. - * - * @param table Table to scan. - * @throws IOException - * @throws NullPointerException if we failed to find a cell value - */ - private void verifyAttempt(final HTable table) throws IOException, NullPointerException { - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - scan.addFamily(OUTPUT_FAMILY); - ResultScanner scanner = table.getScanner(scan); - try { - Iterator itr = scanner.iterator(); - assertTrue(itr.hasNext()); - while(itr.hasNext()) { - Result r = itr.next(); - if (LOG.isDebugEnabled()) { - if (r.size() > 2 ) { - throw new IOException("Too many results, expected 2 got " + - r.size()); - } - } - byte[] firstValue = null; - byte[] secondValue = null; - int count = 0; - for(KeyValue kv : r.list()) { - if (count == 0) { - firstValue = kv.getValue(); - } - if (count == 1) { - secondValue = kv.getValue(); - } - count++; - if (count == 2) { - break; - } - } - - String first = ""; - if (firstValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": first value is null"); - } - first = new String(firstValue, HConstants.UTF8_ENCODING); - - String second = ""; - if (secondValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": second value is null"); - } - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; - } - second = new String(secondReversed, HConstants.UTF8_ENCODING); - - if (first.compareTo(second) != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("second key is not the reverse of first. row=" + - Bytes.toStringBinary(r.getRow()) + ", first value=" + first + - ", second value=" + second); - } - fail(); - } - } - } finally { - scanner.close(); - } - } - - /** - * Test that we add tmpjars correctly including the ZK jar. - */ - public void testAddDependencyJars() throws Exception { - Job job = new Job(); - TableMapReduceUtil.addDependencyJars(job); - String tmpjars = job.getConfiguration().get("tmpjars"); - - System.err.println("tmpjars: " + tmpjars); - assertTrue(tmpjars.contains("zookeeper")); - assertFalse(tmpjars.contains("guava")); - - System.err.println("appending guava jar"); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - com.google.common.base.Function.class); - tmpjars = job.getConfiguration().get("tmpjars"); - assertTrue(tmpjars.contains("guava")); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java deleted file mode 100644 index ded4dd6..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.SmallTests; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.util.HashSet; - -import static junit.framework.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@Category(SmallTests.class) -public class TestTableSplit { - @Test - public void testHashCode() { - TableSplit split1 = new TableSplit("table".getBytes(), "row-start".getBytes(), "row-end".getBytes(), "location"); - TableSplit split2 = new TableSplit("table".getBytes(), "row-start".getBytes(), "row-end".getBytes(), "location"); - assertEquals (split1, split2); - assertTrue (split1.hashCode() == split2.hashCode()); - HashSet set = new HashSet(2); - set.add(split1); - set.add(split2); - assertTrue(set.size() == 1); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java deleted file mode 100644 index e051f6e..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(LargeTests.class) -public class TestTimeRangeMapRed { - private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class); - private static final HBaseTestingUtility UTIL = - new HBaseTestingUtility(); - private HBaseAdmin admin; - - private static final byte [] KEY = Bytes.toBytes("row1"); - private static final NavigableMap TIMESTAMP = - new TreeMap(); - static { - TIMESTAMP.put((long)1245620000, false); - TIMESTAMP.put((long)1245620005, true); // include - TIMESTAMP.put((long)1245620010, true); // include - TIMESTAMP.put((long)1245620055, true); // include - TIMESTAMP.put((long)1245620100, true); // include - TIMESTAMP.put((long)1245620150, false); - TIMESTAMP.put((long)1245620250, false); - } - static final long MINSTAMP = 1245620005; - static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it. - - static final byte[] TABLE_NAME = Bytes.toBytes("table123"); - static final byte[] FAMILY_NAME = Bytes.toBytes("text"); - static final byte[] COLUMN_NAME = Bytes.toBytes("input"); - - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Before - public void before() throws MasterNotRunningException, ZooKeeperConnectionException { - this.admin = new HBaseAdmin(UTIL.getConfiguration()); - } - - @After - public void after() throws IOException { - this.admin.close(); - } - - private static class ProcessTimeRangeMapper - extends TableMapper - implements Configurable { - - private Configuration conf = null; - private HTable table = null; - - @Override - public void map(ImmutableBytesWritable key, Result result, - Context context) - throws IOException { - List tsList = new ArrayList(); - for (KeyValue kv : result.list()) { - tsList.add(kv.getTimestamp()); - } - - for (Long ts : tsList) { - Put put = new Put(key.get()); - put.setWriteToWAL(false); - put.add(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true)); - table.put(put); - } - table.flushCommits(); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - try { - table = new HTable(HBaseConfiguration.create(conf), TABLE_NAME); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - @Test - public void testTimeRangeMapRed() - throws IOException, InterruptedException, ClassNotFoundException { - final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); - final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME); - col.setMaxVersions(Integer.MAX_VALUE); - desc.addFamily(col); - admin.createTable(desc); - HTable table = new HTable(UTIL.getConfiguration(), desc.getName()); - prepareTest(table); - runTestOnTable(); - verify(table); - } - - private void prepareTest(final HTable table) throws IOException { - for (Map.Entry entry : TIMESTAMP.entrySet()) { - Put put = new Put(KEY); - put.setWriteToWAL(false); - put.add(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false)); - table.put(put); - } - table.flushCommits(); - } - - private void runTestOnTable() - throws IOException, InterruptedException, ClassNotFoundException { - UTIL.startMiniMapReduceCluster(); - Job job = null; - try { - job = new Job(UTIL.getConfiguration(), "test123"); - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); - Scan scan = new Scan(); - scan.addColumn(FAMILY_NAME, COLUMN_NAME); - scan.setTimeRange(MINSTAMP, MAXSTAMP); - scan.setMaxVersions(); - TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), - scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job); - job.waitForCompletion(true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - UTIL.shutdownMiniMapReduceCluster(); - if (job != null) { - FileUtil.fullyDelete( - new File(job.getConfiguration().get("hadoop.tmp.dir"))); - } - } - } - - private void verify(final HTable table) throws IOException { - Scan scan = new Scan(); - scan.addColumn(FAMILY_NAME, COLUMN_NAME); - scan.setMaxVersions(1); - ResultScanner scanner = table.getScanner(scan); - for (Result r: scanner) { - for (KeyValue kv : r.list()) { - log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(kv.getFamily()) - + "\t" + Bytes.toString(kv.getQualifier()) - + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(kv.getValue())); - org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()), - (Boolean)Bytes.toBoolean(kv.getValue())); - } - } - scanner.close(); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - diff --git src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java deleted file mode 100644 index 93653af..0000000 --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNull; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Basic test for the WALPlayer M/R tool - */ -@Category(LargeTests.class) -public class TestWALPlayer { - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static MiniHBaseCluster cluster; - - @BeforeClass - public static void beforeClass() throws Exception { - cluster = TEST_UTIL.startMiniCluster(); - TEST_UTIL.startMiniMapReduceCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - TEST_UTIL.shutdownMiniMapReduceCluster(); - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Simple end-to-end test - * @throws Exception - */ - @Test - public void testWALPlayer() throws Exception { - final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1"); - final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2"); - final byte[] FAMILY = Bytes.toBytes("family"); - final byte[] COLUMN1 = Bytes.toBytes("c1"); - final byte[] COLUMN2 = Bytes.toBytes("c2"); - final byte[] ROW = Bytes.toBytes("row"); - HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY); - HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY); - - // put a row into the first table - Put p = new Put(ROW); - p.add(FAMILY, COLUMN1, COLUMN1); - p.add(FAMILY, COLUMN2, COLUMN2); - t1.put(p); - // delete one column - Delete d = new Delete(ROW); - d.deleteColumns(FAMILY, COLUMN1); - t1.delete(d); - - // replay the WAL, map table 1 to table 2 - HLog log = cluster.getRegionServer(0).getWAL(); - log.rollWriter(); - String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() - .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); - - WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration()); - assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1), - Bytes.toString(TABLENAME2) })); - - // verify the WAL was player into table 2 - Get g = new Get(ROW); - Result r = t2.get(g); - assertEquals(1, r.size()); - assertTrue(Bytes.equals(COLUMN2, r.raw()[0].getQualifier())); - } -}