### Eclipse Workspace Patch 1.0 #P 0.94.0-ali-1.0 Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStoreOnCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStoreOnCluster.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStoreOnCluster.java (revision 0) @@ -0,0 +1,156 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.assistant.RowValueSwapAssistant; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestAssistantStoreOnCluster { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static int SLAVES = 3; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRowValueSwapAssistant() throws IOException, + InterruptedException { + byte[] tableName = Bytes.toBytes("testRowValueSwapAssistant"); + byte[] normalFamilyName = Bytes.toBytes("family"); + byte[] assistantFamilyName = Bytes.toBytes("assistantFamily"); + byte[] qualifier = Bytes.toBytes("qualifier"); + + HColumnDescriptor normalFamily = new HColumnDescriptor(normalFamilyName); + HColumnDescriptor assistantFamily = new HColumnDescriptor( + assistantFamilyName).setAssistant(RowValueSwapAssistant + .getAssistantConfString(null, null)); + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(normalFamily); + desc.addFamily(assistantFamily); + TEST_UTIL.getHBaseAdmin().createTable(desc); + + // Load data to table + Configuration c = TEST_UTIL.getConfiguration(); + HTable htable = new HTable(c, tableName); + int rowCount = 100; + int valueCount = 10; + for (int i = 0; i < rowCount; i++) { + Put put = new Put(Bytes.toBytes("testrow" + String.format("%04d", i))); + put.add(normalFamilyName, qualifier, + Bytes.toBytes("testvalue" + String.format("%04d", i % valueCount))); + htable.put(put); + if (i == rowCount / 2) { + // make a flush + TEST_UTIL.getHBaseAdmin().flush(tableName); + } + } + + // verify count + Scan scan = new Scan(); + ResultScanner scanner = htable.getScanner(scan); + verifyCount(scanner, rowCount, rowCount); + + // verify count with assistant scan + scan = new Scan().setAssistantScan(new Scan()); + scanner = htable.getScanner(scan); + verifyCount(scanner, rowCount, rowCount); + + //split region + byte[] splitPoint = Bytes.toBytes("testrow" + + String.format("%04d", rowCount / 2)); + TEST_UTIL.getHBaseAdmin().split(tableName, splitPoint); + + // wait unit region split is done + long timeout = System.currentTimeMillis() + (15 * 1000); + while ((System.currentTimeMillis() < timeout) + && (htable.getRegionLocations().size() < 2)) { + Thread.sleep(250); + } + + scan = new Scan().setStopRow(splitPoint).setAssistantScan(new Scan()); + scanner = htable.getScanner(scan); + verifyCount(scanner, rowCount / 2, rowCount / 2); + + // scan the table with condition, using the assistant scan + Scan assistantScan = new Scan(); + byte[] assistantStart = Bytes.toBytes("testvalue" + String.format("%04d", 0)); + byte[] assistantStop = Bytes.toBytes("testvalue" + String.format("%04d", valueCount / 2)); + assistantScan.setStartRow(assistantStart); + assistantScan.setStopRow(assistantStop); + scan = new Scan().setAssistantScan(assistantScan); + scanner = htable.getScanner(scan); + int scannedRowCount = 0; + Result result = null; + while ((result = scanner.next()) != null) { + scannedRowCount++; + assertTrue(Bytes.compareTo(result.getRow(), assistantStart) >= 0); + assertTrue(Bytes.compareTo(result.getRow(), assistantStop) < 0); + System.out.println(result); + } + scanner.close(); + assertEquals(rowCount / 2, scannedRowCount); + + } + + private void verifyCount(ResultScanner scanner, int expectedKVCount, + int expectedRowCount) throws IOException { + int rowCount = 0; + int kvCount = 0; + try { + Result result = null; + while ((result = scanner.next()) != null) { + rowCount++; + kvCount += result.size(); + System.out.println(result); + } + } finally { + scanner.close(); + } + assertEquals(expectedKVCount, kvCount); + assertEquals(expectedRowCount, rowCount); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 239702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.ValueSplitHalfStoreFileReader; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -178,6 +179,8 @@ // Set when we obtain a Reader. private long maxMemstoreTS = -1; + private final boolean isAssistanFile; + public long getMaxMemstoreTS() { return maxMemstoreTS; } @@ -246,33 +249,38 @@ // the last modification time stamp private long modificationTimeStamp = 0L; + StoreFile(final FileSystem fs, final Path p, final Configuration conf, + final CacheConfig cacheConf, final BloomType cfBloomType, + final HFileDataBlockEncoder dataBlockEncoder) throws IOException { + this(fs, p, conf, cacheConf, cfBloomType, dataBlockEncoder, false); + } + /** * Constructor, loads a reader and it's indices, etc. May allocate a * substantial amount of ram depending on the underlying files (10-20MB?). - * - * @param fs The current file system to use. - * @param p The path of the file. - * @param blockcache true if the block cache is enabled. - * @param conf The current configuration. - * @param cacheConf The cache configuration and block cache reference. + * + * @param fs The current file system to use. + * @param p The path of the file. + * @param blockcache true if the block cache is enabled. + * @param conf The current configuration. + * @param cacheConf The cache configuration and block cache reference. * @param cfBloomType The bloom type to use for this store file as specified - * by column family configuration. This may or may not be the same - * as the Bloom filter type actually present in the HFile, because + * by column family configuration. This may or may not be the same as + * the Bloom filter type actually present in the HFile, because * column family configuration might change. If this is * {@link BloomType#NONE}, the existing Bloom filter is ignored. * @param dataBlockEncoder data block encoding algorithm. + * @param isAssistanFile true if this file belongs to the assistant store * @throws IOException When opening the reader fails. */ - StoreFile(final FileSystem fs, - final Path p, - final Configuration conf, - final CacheConfig cacheConf, - final BloomType cfBloomType, - final HFileDataBlockEncoder dataBlockEncoder) + StoreFile(final FileSystem fs, final Path p, final Configuration conf, + final CacheConfig cacheConf, final BloomType cfBloomType, + final HFileDataBlockEncoder dataBlockEncoder, final boolean isAssistanFile) throws IOException { this.fs = fs; this.path = p; this.cacheConf = cacheConf; + this.isAssistanFile = isAssistanFile; this.dataBlockEncoder = dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE : dataBlockEncoder; @@ -621,12 +629,23 @@ } if (isReference()) { if (this.link != null) { - this.reader = new HalfStoreFileReader(this.fs, this.referencePath, this.link, - this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache()); + if (isAssistanFile) { + this.reader = new ValueSplitHalfStoreFileReader(this.fs, this.referencePath, this.link, + this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache()); + } else { + this.reader = new HalfStoreFileReader(this.fs, this.referencePath, this.link, + this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache()); + } } else { - this.reader = new HalfStoreFileReader(this.fs, this.referencePath, - this.cacheConf, this.reference, - dataBlockEncoder.getEncodingInCache()); + if (isAssistanFile) { + this.reader = new ValueSplitHalfStoreFileReader(this.fs, + this.referencePath, this.cacheConf, this.reference, + dataBlockEncoder.getEncodingInCache()); + } else { + this.reader = new HalfStoreFileReader(this.fs, this.referencePath, + this.cacheConf, this.reference, + dataBlockEncoder.getEncodingInCache()); + } } } else if (isLink()) { long size = link.getFileStatus(fs).getLen(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 239709) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -40,6 +40,7 @@ import java.util.NavigableSet; import java.util.Random; import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; @@ -141,6 +142,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MutableClassToInstanceMap; +import com.google.common.collect.SortedMaps; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -204,6 +206,8 @@ protected final Map stores = new ConcurrentSkipListMap(Bytes.BYTES_RAWCOMPARATOR); + private boolean hasAssistantStore = false; + // Registered region protocol handlers private ClassToInstanceMap protocolHandlers = MutableClassToInstanceMap.create(); @@ -566,6 +570,9 @@ Store store = future.get(); this.stores.put(store.getColumnFamilyName().getBytes(), store); + if (store.isAssistant()) { + this.hasAssistantStore = true; + } long storeSeqId = store.getMaxSequenceId(); maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), storeSeqId); @@ -654,6 +661,13 @@ } /** + * @return true if this region has assistant store + */ + public boolean hasAssistantStore() { + return hasAssistantStore; + } + + /** * This function will return the HDFS blocks distribution based on the data * captured when HFile is created * @return The HDFS blocks distribution for the region. @@ -1731,28 +1745,43 @@ return getScanner(scan, null); } - void prepareScanner(Scan scan) throws IOException { + void prepareScanner(Scan scan, boolean isAssistant) throws IOException { if(!scan.hasFamilies()) { - // Adding all families to scanner - for(byte[] family: this.htableDescriptor.getFamiliesKeys()){ - scan.addFamily(family); + // Adding all non-assistant families to scanner + + for (HColumnDescriptor family : this.htableDescriptor.getColumnFamilies()) { + if (isAssistant && family.isAssistanFamily()) { + scan.addFamily(family.getName()); + } else if (!isAssistant && !family.isAssistanFamily()) { + scan.addFamily(family.getName()); + } } } } + void prepareScanner(Scan scan) throws IOException { + prepareScanner(scan, false); + } + protected RegionScanner getScanner(Scan scan, List additionalScanners) throws IOException { startRegionOperation(); this.readRequestsCount.increment(); try { + Scan executedScan = scan; + if (scan.getAssistantScan() != null) { + executedScan = scan.getAssistantScan(); + prepareScanner(executedScan, true); + } else { + prepareScanner(executedScan, false); + } // Verify families are all valid - prepareScanner(scan); - if(scan.hasFamilies()) { + if (executedScan.hasFamilies()) { for(byte [] family : scan.getFamilyMap().keySet()) { checkFamily(family); } } - return instantiateRegionScanner(scan, additionalScanners); + return instantiateRegionScanner(executedScan, additionalScanners); } finally { closeRegionOperation(); } @@ -1801,6 +1830,10 @@ public void delete(Delete delete, Integer lockid, boolean writeToWAL) throws IOException { checkReadOnly(); + if(this.hasAssistantStore()){ + mutateRow(delete, lockid); + return; + } checkResources(); Integer lid = null; startRegionOperation(); @@ -1989,6 +2022,10 @@ throws IOException { checkReadOnly(); + if (this.hasAssistantStore()) { + mutateRow(put, lockid); + return; + } // Do a rough check that we have resources to accept a write. The check is // 'rough' in that between the resource check and the call to obtain a // read lock, resources may run out. For now, the thought is that this @@ -3150,7 +3187,10 @@ protected Store instantiateHStore(Path tableDir, HColumnDescriptor c) throws IOException { - return new Store(tableDir, this, c, this.fs, this.conf); + return c.getAssistant() == null ? new Store(tableDir, this, c, + this.fs, this.conf) : new AssistantStore(tableDir, this, c, this.fs, + this.conf, c.getAssistant()); + } /** @@ -4788,8 +4828,20 @@ return results; } + public void mutateRow(Mutation mutation, Integer lockid) throws IOException { + SortedMap rowsToLockId = new TreeMap( + Bytes.BYTES_COMPARATOR); + rowsToLockId.put(mutation.getRow(), lockid); + List mutations = new ArrayList(); + mutations.add(mutation); + mutateRowsWithLocks(mutations, rowsToLockId); + } + public void mutateRow(RowMutations rm) throws IOException { - mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + SortedMap rowsToLockId = new TreeMap( + Bytes.BYTES_COMPARATOR); + rowsToLockId.put(rm.getRow(), null); + mutateRowsWithLocks(rm.getMutations(), rowsToLockId); } /** @@ -4803,9 +4855,9 @@ * @throws IOException */ public void mutateRowsWithLocks(Collection mutations, - Collection rowsToLock) throws IOException { + SortedMap rowsToLockId) throws IOException { boolean flush = false; - + checkResources(); startRegionOperation(); List acquiredLocks = null; try { @@ -4832,20 +4884,34 @@ } } + // generate the assistant store data if exist + if (this.hasAssistantStore()) { + mutations = new ArrayList(mutations); + for (Store store : this.stores.values()) { + if (store.isAssistant()) { + mutations.addAll(store.generateAssistantData(mutations)); + } + } + } + + long txid = 0; boolean walSyncSuccessful = false; boolean locked = false; // 2. acquire the row lock(s) - acquiredLocks = new ArrayList(rowsToLock.size()); - for (byte[] row : rowsToLock) { + acquiredLocks = new ArrayList(rowsToLockId.size()); + for (Map.Entry rowToLockId : rowsToLockId.entrySet()) { // attempt to lock all involved rows, fail if one lock times out - Integer lid = getLock(null, row, true); + Integer lid = getLock(rowToLockId.getValue(), rowToLockId.getKey(), + true); if (lid == null) { throw new IOException("Failed to acquire lock on " - + Bytes.toStringBinary(row)); + + Bytes.toStringBinary(rowToLockId.getKey())); } - acquiredLocks.add(lid); + if (rowToLockId.getValue() == null) { + acquiredLocks.add(lid); + } } // 3. acquire the region lock Index: src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (revision 239702) +++ src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (working copy) @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.regex.Matcher; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -86,6 +87,8 @@ public static final String MIN_VERSIONS = "MIN_VERSIONS"; public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS"; + public static final String ASSISTANT = "ASSISTANT"; + /** * Default compression type. */ @@ -869,6 +872,37 @@ } /** + * Make this column family as an assistant one with the given configuration + * @param assistantConf + * @return this (for chained invocation) + */ + public HColumnDescriptor setAssistant(String assistantConf) { + Matcher matcher = HConstants.ASSISTANT_CONF_VALUE_PATTERN + .matcher(assistantConf); + if (!matcher.matches()) { + throw new RuntimeException("Failed setting assistant configuration to " + + assistantConf + ", but it does not match pattern"); + } + return setValue(ASSISTANT, assistantConf); + } + + /** + * @return assistant configuration of this column family, null if not an + * assistant one + */ + public String getAssistant() { + String n = getValue(ASSISTANT); + return n; + } + + /** + * @return true if it is an assistant family + */ + public boolean isAssistanFamily() { + return getAssistant() != null; + } + + /** * @see java.lang.Object#toString() */ @Override Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStore.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStore.java (revision 0) @@ -0,0 +1,231 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.ValueFilter; +import org.apache.hadoop.hbase.regionserver.assistant.RowValueSwapAssistant; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestAssistantStore { + static final Log LOG = LogFactory.getLog(TestAssistantStore.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final String DIR = TEST_UTIL.getDataTestDir("TestAssistantStore") + .toString(); + private HRegion region = null; + private Configuration conf = TEST_UTIL.getConfiguration(); + + private static byte[] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 200; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + private static byte[] QUAL = Bytes.toBytes("testQual"); + private static final int QUALSIZE = 3; + private static byte[][] QUALS = makeN(QUAL, QUALSIZE); + private static byte[] VALUE = Bytes.toBytes("testValue"); + private static final int VALUESIZE = 10; + private static byte[][] VALUES = makeN(VALUE, VALUESIZE); + + @Test + public void testRowValueSwapAssistant() throws IOException { + String method = "testRowValueSwapAssistant"; + byte[] regionStartKey=ROW; + byte[] regionEndKey = Bytes.add(ROW, new byte[] { (byte) 0xff }); + byte[] tableName = Bytes.toBytes(method); + byte[] normalFamilyName = Bytes.toBytes("family"); + byte[] assistantFamilyName = Bytes.toBytes("assistantFamily"); + HColumnDescriptor normalFamily = new HColumnDescriptor(normalFamilyName); + HColumnDescriptor assistantFamily = new HColumnDescriptor( + assistantFamilyName).setAssistant(RowValueSwapAssistant + .getAssistantConfString(null, null)); + this.region = initHRegion(tableName, regionStartKey, regionEndKey, method, + conf, normalFamily, assistantFamily); + Store normalStore = this.region.getStore(normalFamilyName); + Store assistantStore =this.region.getStore(assistantFamilyName); + assertFalse(normalStore.isAssistant()); + assertTrue(assistantStore.isAssistant()); + + // No data now + assertNull(normalStore.getSplitPoint()); + assertNull(assistantStore.getSplitPoint()); + + // Load data to region + loadDataToRegion(region, normalFamilyName); + + // Check the data in normal store + Scan scan = new Scan(); + // Open the scanner, check the count + RegionScanner scanner = region.getScanner(scan); + verifyCount(scanner, ROWSIZE * QUALSIZE, ROWSIZE); + // Check the data in normal store + + // Check the data in assistant store + scan = new Scan(); + scan.addFamily(assistantFamilyName); + scanner = region.getScanner(scan); + verifyCount(scanner, ROWSIZE * QUALSIZE, ROWSIZE); + + // Check the data with filter + byte[] foundValue = VALUES[0]; + Filter filter = new ValueFilter(CompareOp.EQUAL, new BinaryComparator( + foundValue )); + int expectRowCount = ROWSIZE / VALUESIZE + + ((ROWSIZE % VALUESIZE) == 0 ? 0 : 1); + scan = new Scan().setFilter(filter); + scanner = region.getScanner(scan); + verifyCount(scanner, expectRowCount * QUALSIZE, expectRowCount); + + // Using assistant store to scan the equal data without filter + scan = new Scan().setAssistantScan(new Scan().setStartRow(foundValue) + .setStopRow(Bytes.add(foundValue, new byte[] { (byte) 0xff }))); + scanner = region.getScanner(scan); + verifyCount(scanner, expectRowCount * QUALSIZE, expectRowCount); + + // Delete some rows + int deleteCount = 10; + assertTrue(deleteCount < ROWSIZE); + + for (int i = 0; i < deleteCount; i++) { + Delete delete = new Delete(ROWS[i]); + region.delete(delete, null, true); + } + + // check after deleting + scan = new Scan(); + scanner = region.getScanner(scan); + verifyCount(scanner, (ROWSIZE - deleteCount) * QUALSIZE, + (ROWSIZE - deleteCount)); + + // Check the data in assistant store + scan = new Scan(); + scan.addFamily(assistantFamilyName); + scanner = region.getScanner(scan); + verifyCount(scanner, (ROWSIZE - deleteCount) * QUALSIZE, + (ROWSIZE - deleteCount)); + + } + + private void verifyCount(InternalScanner scanner, int expectedKVCount, + int expectedRowCount) throws IOException { + List kvList = new ArrayList(); + int rowCount = 0; + int kvCount = 0; + try { + while (scanner.next(kvList)) { + if (kvList.isEmpty()) continue; + rowCount++; + kvCount += kvList.size(); + kvList.clear(); + } + } finally { + scanner.close(); + } + if (!kvList.isEmpty()) { + rowCount++; + kvCount += kvList.size(); + kvList.clear(); + } + assertEquals(expectedKVCount, kvCount); + assertEquals(expectedRowCount, rowCount); + } + + private static void loadDataToRegion(HRegion region, byte[] family) + throws IOException { + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + for (int j = 0; j < QUALSIZE; j++) { + put.add(family, QUALS[j], VALUES[i % VALUESIZE]); + } + region.put(put); + if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) { + region.flushcache(); + } + } + } + + public static HRegion initHRegion(byte[] tableName, String callingMethod, + Configuration conf, HColumnDescriptor... families) throws IOException { + return initHRegion(tableName, null, null, callingMethod, conf, families); + } + + /** + * @param tableName + * @param startKey + * @param stopKey + * @param callingMethod + * @param conf + * @param families + * @throws IOException + * @return A region on which you must call + * {@link HRegion#closeHRegion(HRegion)} when done. + */ + private static HRegion initHRegion(byte[] tableName, byte[] startKey, + byte[] stopKey, String callingMethod, Configuration conf, + HColumnDescriptor... families) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + for (HColumnDescriptor family : families) { + htd.addFamily(family); + } + HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false); + Path path = new Path(DIR + callingMethod); + FileSystem fs = FileSystem.get(conf); + if (fs.exists(path)) { + if (!fs.delete(path, true)) { + throw new IOException("Failed delete of " + path); + } + } + return HRegion.createHRegion(info, path, conf, htd); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); + } + return ret; + } + +} Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 239702) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -89,7 +90,8 @@ // version 4: support reversed scan & set small // version 5: support encoding result - private static final byte SCAN_VERSION = (byte) 5; + // version 6: support scan using assistant store data + private static final byte SCAN_VERSION = (byte) 6; private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; private int maxVersions = 1; @@ -131,6 +133,8 @@ */ private boolean small = false; + private Scan assistantScan = null; + /** * Create a Scan operation across all rows. */ @@ -547,6 +551,22 @@ } /** + * Set an assistant scan which will use the assistant store data. + * + * It could be only set if the scanned table has assistant store. + * @param assistantscan + * @return this + */ + public Scan setAssistantScan(Scan assistantscan) { + this.assistantScan = assistantscan; + return this; + } + + public Scan getAssistantScan() { + return this.assistantScan; + } + + /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, * logging, and administration tools. @@ -691,6 +711,13 @@ } else { this.supportEncodingResult = false; } + + if (version >= 6) { + if (in.readBoolean()) { + this.assistantScan = new Scan(); + this.assistantScan.readFields(in); + } + } } public void write(final DataOutput out) @@ -728,6 +755,10 @@ out.writeBoolean(this.reversed); out.writeBoolean(this.small); out.writeBoolean(this.supportEncodingResult); + out.writeBoolean(this.assistantScan != null); + if (this.assistantScan != null) { + this.assistantScan.write(out); + } } /** Index: src/test/java/org/apache/hadoop/hbase/io/TestValueSplitHalfStoreFileReader.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/TestValueSplitHalfStoreFileReader.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/io/TestValueSplitHalfStoreFileReader.java (revision 0) @@ -0,0 +1,266 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.Reference.Range; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestValueSplitHalfStoreFileReader { + @Test + public void testHalfScanAndReseek() throws IOException { + HBaseTestingUtility test_util = new HBaseTestingUtility(); + String root_dir = test_util.getDataTestDir("TestValueSplitHalfStoreFile") + .toString(); + Path p = new Path(root_dir, "test"); + + Configuration conf = test_util.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + CacheConfig cacheConf = new CacheConfig(conf); + + HFile.Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(fs, p) + .withBlockSize(1024).withComparator(KeyValue.KEY_COMPARATOR).create(); + + // write some things. + List items = genSomeKeys(); + for (KeyValue kv : items) { + w.append(kv); + } + w.close(); + + byte[] midValue = _b(String.format("value_%04d", VALUENUM / 2)); + + Reference bottom = new Reference(midValue, Reference.Range.bottom); + doTestOfScanAndReseek(p, fs, bottom, cacheConf, midValue); + + Reference top = new Reference(midValue, Reference.Range.top); + doTestOfScanAndReseek(p, fs, top, cacheConf, midValue); + } + + @Test + public void testHalfScanner() throws IOException { + HBaseTestingUtility test_util = new HBaseTestingUtility(); + String root_dir = test_util.getDataTestDir( + "TestValueSplitHalfStoreFileScanBefore").toString(); + Path p = new Path(root_dir, "test"); + Configuration conf = test_util.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + CacheConfig cacheConf = new CacheConfig(conf); + + HFile.Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(fs, p) + .withBlockSize(1024).withComparator(KeyValue.KEY_COMPARATOR).create(); + + // write some things. + List items = genSomeKeys(); + for (KeyValue kv : items) { + w.append(kv); + } + w.close(); + + byte[] midValue = _b(String.format("value_%04d", VALUENUM / 2)); + + Reference bottom = new Reference(midValue, Reference.Range.bottom); + Reference top = new Reference(midValue, Reference.Range.top); + + for (int i = 1; i * VALUENUM / 2 < items.size(); i++) { + // Check the every edge KeyValue + int edgeNum = i * VALUENUM / 2; + KeyValue edgeKeyValue = items.get(edgeNum); + System.out.println("edgeKeyValue:" + edgeKeyValue + ",value:" + + Bytes.toString(edgeKeyValue.getValue())); + + + KeyValue foundKeyValue = doTestOfSeekBefore(p, fs, bottom, edgeKeyValue, + cacheConf); + if (i % 2 == 1) { + assertEquals(items.get(edgeNum - 1), foundKeyValue); + } else { + assertEquals(items.get(edgeNum - 1 - VALUENUM / 2), foundKeyValue); + } + + foundKeyValue = doTestOfSeekBefore(p, fs, top, edgeKeyValue, cacheConf); + if (i % 2 == 1) { + assertEquals( + edgeNum - 1 - VALUENUM / 2 < 0 ? null : items.get(edgeNum - 1 + - VALUENUM / 2), foundKeyValue); + } else { + assertEquals(items.get(edgeNum - 1), foundKeyValue); + } + } + + // Try and seek before the first thing. + KeyValue foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0), + cacheConf); + assertNull(foundKeyValue); + + // Try and seek before the second thing in the top and bottom. + foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf); + assertNull(foundKeyValue); + + foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf); + assertEquals(items.get(0), foundKeyValue); + } + + @Test + public void testFirstLastKey() throws IOException { + HBaseTestingUtility test_util = new HBaseTestingUtility(); + String root_dir = test_util.getDataTestDir("TestValueSplitHalfStoreFile") + .toString(); + Path p = new Path(root_dir, "test"); + + Configuration conf = test_util.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + CacheConfig cacheConf = new CacheConfig(conf); + + HFile.Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(fs, p) + .withBlockSize(1024).withComparator(KeyValue.KEY_COMPARATOR).create(); + + // write some things. + List items = genSomeKeys(); + for (KeyValue kv : items) { + w.append(kv); + } + w.close(); + + byte[] midValue = _b(String.format("value_%04d", VALUENUM / 2)); + + Reference bottom = new Reference(midValue, Reference.Range.bottom); + Reference top = new Reference(midValue, Reference.Range.top); + + // check bottom half + ValueSplitHalfStoreFileReader valueSplitHalfreader = new ValueSplitHalfStoreFileReader( + fs, p, cacheConf, bottom, DataBlockEncoding.NONE); + valueSplitHalfreader.loadFileInfo(); + KeyValue theFirstKeyValue = null; + KeyValue theLastKeyValue = null; + for (KeyValue kv : items) { + if (Bytes.compareTo(kv.getValue(), midValue) < 0) { + if (theFirstKeyValue == null) { + theFirstKeyValue = kv; + } + theLastKeyValue = kv; + } + } + Bytes.equals(theFirstKeyValue.getKey(), valueSplitHalfreader.getFirstKey()); + Bytes.equals(theLastKeyValue.getKey(), valueSplitHalfreader.getLastKey()); + + // check bottom half + valueSplitHalfreader = new ValueSplitHalfStoreFileReader(fs, p, cacheConf, + top, DataBlockEncoding.NONE); + valueSplitHalfreader.loadFileInfo(); + theFirstKeyValue = null; + theLastKeyValue = null; + for (KeyValue kv : items) { + if (Bytes.compareTo(kv.getValue(), midValue) >= 0) { + if (theFirstKeyValue == null) { + theFirstKeyValue = kv; + } + theLastKeyValue = kv; + } + } + Bytes.equals(theFirstKeyValue.getKey(), valueSplitHalfreader.getFirstKey()); + Bytes.equals(theLastKeyValue.getKey(), valueSplitHalfreader.getLastKey()); + + } + + private KeyValue doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, + KeyValue seekBefore, CacheConfig cacheConfig) throws IOException { + final ValueSplitHalfStoreFileReader valueSplitHalfreader = new ValueSplitHalfStoreFileReader(fs, p, + cacheConfig, bottom, DataBlockEncoding.NONE); + valueSplitHalfreader.loadFileInfo(); + final HFileScanner scanner = valueSplitHalfreader.getScanner(false, false); + if (scanner.seekBefore(seekBefore.getKey())) { + return scanner.getKeyValue(); + } + return null; + } + + private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, + CacheConfig cacheConf, byte[] midValue) throws IOException { + final ValueSplitHalfStoreFileReader valueSplitHalfreader = new ValueSplitHalfStoreFileReader( + fs, p, cacheConf, bottom, DataBlockEncoding.NONE); + valueSplitHalfreader.loadFileInfo(); + final HFileScanner scanner = valueSplitHalfreader.getScanner(false, false); + + scanner.seekTo(); + KeyValue curr; + do { + curr = scanner.getKeyValue(); + assertEquals(Bytes.compareTo(curr.getValue(), midValue) >= 0, + bottom.getFileRegion() == Range.top); + KeyValue reseekKv = getLastOnCol(curr); + int ret = scanner.reseekTo(reseekKv.getKey()); + assertTrue("reseek to returned: " + ret, ret > 0); + assertEquals( + Bytes.compareTo(scanner.getKeyValue().getValue(), midValue) >= 0, + bottom.getFileRegion() == Range.top); + } while (scanner.next()); + + int ret = scanner.reseekTo(getLastOnCol(curr).getKey()); + // System.out.println("Last reseek: " + ret); + assertTrue(ret > 0); + + valueSplitHalfreader.close(true); + } + + private KeyValue getLastOnCol(KeyValue curr) { + return KeyValue.createLastOnRow(curr.getBuffer(), curr.getRowOffset(), + curr.getRowLength(), curr.getBuffer(), curr.getFamilyOffset(), + curr.getFamilyLength(), curr.getBuffer(), curr.getQualifierOffset(), + curr.getQualifierLength()); + } + + private static final int SIZE = 1000; + private static final int VALUENUM = 100; + + private static byte[] _b(String s) { + return Bytes.toBytes(s); + } + + private List genSomeKeys() { + List ret = new ArrayList(SIZE); + for (int i = 0; i < SIZE; i++) { + KeyValue kv = new KeyValue(_b(String.format("row_%04d", i)), + _b("family"), _b("qualifier"), 1000, // timestamp + _b(String.format("value_%04d", i % VALUENUM))); + ret.add(kv); + } + return ret; + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 239702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.MultiAction; import org.apache.hadoop.hbase.client.MultiResponse; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; @@ -3910,40 +3911,58 @@ isMetaTable = true; } - List> putsWithLocks = - Lists.newArrayListWithCapacity(puts.size()); for (Action a : puts) { Put p = (Put) a.getAction(); dataLen = dataLen + (int) p.getWritableSize(); - Integer lock; - try { - lock = getLockFromId(p.getLockId()); - } catch (UnknownRowLockException ex) { - response.add(regionName, a.getOriginalIndex(), ex); - continue; - } - putsWithLocks.add(new Pair(p, lock)); } + if (!region.hasAssistantStore()) { + List> putsWithLocks = Lists + .newArrayListWithCapacity(puts.size()); + for (Action a : puts) { + Put p = (Put) a.getAction(); + Integer lock; + try { + lock = getLockFromId(p.getLockId()); + } catch (UnknownRowLockException ex) { + response.add(regionName, a.getOriginalIndex(), ex); + continue; + } + putsWithLocks.add(new Pair(p, lock)); + } - OperationStatus[] codes = - region.put(putsWithLocks.toArray(new Pair[]{})); + OperationStatus[] codes = region.put(putsWithLocks + .toArray(new Pair[] {})); - for (i = 0; i < codes.length; i++) { - OperationStatus code = codes[i]; + for (i = 0; i < codes.length; i++) { + OperationStatus code = codes[i]; - Action theAction = puts.get(i); - Object result = null; + Action theAction = puts.get(i); + Object result = null; - if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) { - result = new Result(); - } else if (code.getOperationStatusCode() - == OperationStatusCode.SANITY_CHECK_FAILURE) { - result = new DoNotRetryIOException(code.getExceptionMsg()); + if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) { + result = new Result(); + } else if (code.getOperationStatusCode() == OperationStatusCode.SANITY_CHECK_FAILURE) { + result = new DoNotRetryIOException(code.getExceptionMsg()); + } + // FAILURE && NOT_RUN becomes null, aka: need to run again. + response.add(regionName, theAction.getOriginalIndex(), result); } - // FAILURE && NOT_RUN becomes null, aka: need to run again. - - response.add(regionName, theAction.getOriginalIndex(), result); + } else { + // map of rows to lockid, sorted to avoid deadlocks + SortedMap rowsToLockId = new TreeMap( + Bytes.BYTES_COMPARATOR); + List mutations = new ArrayList(); + for (Action a : puts) { + Put p = (Put) a.getAction(); + Integer lock = getLockFromId(p.getLockId()); + rowsToLockId.put(a.getAction().getRow(), lock); + mutations.add(p); + } + region.mutateRowsWithLocks(mutations, rowsToLockId); + for (Action a : puts) { + response.add(regionName, a.getOriginalIndex(), new Result()); + } } long took = System.currentTimeMillis() - start; Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 239702) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -22,7 +22,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -30,14 +32,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -347,7 +356,10 @@ AtomicLong timeStamps = new AtomicLong(0); AtomicInteger failures = new AtomicInteger(0); - final List rowsToLock = Arrays.asList(row, row2); + final TreeMap rowsToLockId = new TreeMap( + Bytes.BYTES_COMPARATOR); + rowsToLockId.put(row, null); + rowsToLockId.put(row2, null); // create all threads for (int i = 0; i < numThreads; i++) { all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { @@ -380,7 +392,7 @@ p.add(fam1, qual1, value2); mrm.add(p); } - region.mutateRowsWithLocks(mrm, rowsToLock); + region.mutateRowsWithLocks(mrm, rowsToLockId); op ^= true; // check: should always see exactly one column Scan s = new Scan(row); Index: src/main/java/org/apache/hadoop/hbase/io/ValueSplitHalfStoreFileReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/ValueSplitHalfStoreFileReader.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/ValueSplitHalfStoreFileReader.java (revision 0) @@ -0,0 +1,272 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is similar to + * {@link org.apache.hadoop.hbase.io.HalfStoreFileReader}, the difference is + * that the half store file is decided by the value rather than key. + * + * For example, suppose the store file includes the following KeyValues: + * r1/c1:q1/v1 + * r2/c1:q1/v3 + * r3/c1:q1/v1 + * r4/c1:q1/v1 + * r5/c1:q1/v3 + * + * With the split key 'v2', + * the 'bottom' half file includes the following KeyValues: + * r1/c1:q1/v1 + * r3/c1:q1/v1 + * r4/c1:q1/v1 + * + * the 'top' half file includes the following KeyValues: + * r2/c1:q1/v3 + * r5/c1:q1/v3 + * + */ +public class ValueSplitHalfStoreFileReader extends HalfStoreFileReader { + private final Log LOG = LogFactory.getLog(ValueSplitHalfStoreFileReader.class); + + protected final byte[] splitvalue; + private byte[] lastKey = null; + private boolean lastKeySeeked = false; + + /** + * + * @param fs + * @param p + * @param cacheConf + * @param r + * @param preferredEncodingInCache + * @throws IOException + */ + public ValueSplitHalfStoreFileReader(final FileSystem fs, final Path p, + final CacheConfig cacheConf, final Reference r, + DataBlockEncoding preferredEncodingInCache) throws IOException { + super(fs, p, cacheConf, r, preferredEncodingInCache); + splitvalue = KeyValue.createKeyValueFromKey(splitkey).getRow(); + } + + /** + * Creates a assistant half file reader for a hfile referred to by an + * hfilelink. + * @param fs + * @param p + * @param link + * @param cacheConf + * @param r + * @param preferredEncodingInCache + * @throws IOException + */ + public ValueSplitHalfStoreFileReader(FileSystem fs, Path p, HFileLink link, + CacheConfig cacheConf, Reference r, + DataBlockEncoding preferredEncodingInCache) throws IOException { + super(fs, p, link, cacheConf, r, preferredEncodingInCache); + splitvalue = KeyValue.createKeyValueFromKey(splitkey).getRow(); + } + + @Override + public byte[] getLastKey() { + if (!lastKeySeeked) { + KeyValue lastKV = KeyValue.createLastOnRow(getHFileReader() + .getLastRowKey()); + // Get a scanner that caches the block and that uses pread. + HFileScanner scanner = getScanner(true, true); + try { + if (scanner.seekBefore(lastKV.getBuffer(), lastKV.getKeyOffset(), + lastKV.getKeyLength())) { + this.lastKey = Bytes.toBytes(scanner.getKey()); + } + } catch (IOException e) { + LOG.warn("Failed seekBefore " + Bytes.toStringBinary(lastKV.getKey()), + e); + } + } + return this.lastKey; + } + + @Override + public HFileScanner getScanner(final boolean cacheBlocks, + final boolean pread, final boolean isCompaction) { + final HFileScanner s = getHFileReader().getScanner(cacheBlocks, pread, + isCompaction); + return new HFileScanner() { + final HFileScanner delegate = s; + + @Override + public ByteBuffer getKey() { + return delegate.getKey(); + } + + @Override + public String getKeyString() { + return delegate.getKeyString(); + } + + @Override + public ByteBuffer getValue() { + return delegate.getValue(); + } + + @Override + public String getValueString() { + return delegate.getValueString(); + } + + @Override + public KeyValue getKeyValue() { + return delegate.getKeyValue(); + } + + @Override + public boolean next() throws IOException { + while (delegate.next()) { + if (isCurrentKVValid()) { + return true; + } + } + return false; + } + + @Override + public boolean seekBefore(byte[] key) throws IOException { + return seekBefore(key, 0, key.length); + } + + @Override + public boolean seekBefore(byte[] key, int offset, int length) + throws IOException { + byte[] seekKey = key; + int seekKeyOffset = offset; + int seekKeyLength = length; + while (delegate.seekBefore(seekKey, seekKeyOffset, seekKeyLength)) { + if (isCurrentKVValid()) { + return true; + } + ByteBuffer curKey = getKey(); + if (curKey == null) return false; + seekKey = curKey.array(); + seekKeyOffset = curKey.arrayOffset(); + seekKeyLength = curKey.limit(); + } + return false; + } + + private boolean isCurrentKVValid() { + ByteBuffer value = getValue(); + if (!top) { + // Current value < split key, it belongs to bottom, return true + if (Bytes.compareTo(value.array(), value.arrayOffset(), + value.limit(), splitvalue, 0, splitvalue.length) < 0) { + return true; + } + } else { + if (Bytes.compareTo(value.array(), value.arrayOffset(), + value.limit(), splitvalue, 0, splitvalue.length) >= 0) { + return true; + } + } + return false; + } + + @Override + public boolean seekTo() throws IOException { + boolean b = delegate.seekTo(); + if (!b) { + return b; + } + + if (isCurrentKVValid()) { + return true; + } + + return next(); + } + + @Override + public int seekTo(byte[] key) throws IOException { + return seekTo(key, 0, key.length); + } + + public int seekTo(byte[] key, int offset, int length) throws IOException { + int b = delegate.seekTo(key, offset, length); + if (b < 0) { + return b; + } else { + if (isCurrentKVValid()) { + return b; + } else { + boolean existBefore = seekBefore(key, offset, length); + if (existBefore) { + return 1; + } + return -1; + } + } + } + + @Override + public int reseekTo(byte[] key) throws IOException { + return reseekTo(key, 0, key.length); + } + + @Override + public int reseekTo(byte[] key, int offset, int length) + throws IOException { + int b = delegate.reseekTo(key, offset, length); + if (b < 0) { + return b; + } else { + if (isCurrentKVValid()) { + return b; + } else { + boolean existBefore = seekBefore(key, offset, length); + if (existBefore) { + return 1; + } + return -1; + } + } + } + + public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() { + return this.delegate.getReader(); + } + + public boolean isSeeked() { + return this.delegate.isSeeked(); + } + }; + } + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/AssistantStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/AssistantStore.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/AssistantStore.java (revision 0) @@ -0,0 +1,151 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.regex.Matcher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.assistant.Assistant; + +/** + * An assistant store is used to store data with another organization which is + * defined as configuration. It means each region could have several + * organizations of data using assistant stores. We could use it to speed up + * some read(scan) with filter. + * + * If using the assistant store in the region, each original KeyValue would has + * zero or some corresponding KeyValue(s) in assistant store, the corresponding + * KeyValue's row is redefined, but its value is the same as original KeyValue's + * row. + * + * For example, if a region includes the following KeyValues: + * r1/c1:q1/v1 + * r2/c1:q1/v2 + * r3/c1:q1/v1 + * r4/c1:q1/v3 + * r5/c1:q1/v1 + * + * we could use an assistant store for it, include the following kvs: + * v1/c1:q1/r1 + * v1/c1:q1/r3 + * v1/c1:q1/r5 + * v2/c1:q1/r2 + * v3/c1:q1/r4 + * + * + * when scanning with the ColumnValueFilter, the assistant store could speed + * up this read. + */ +public class AssistantStore extends Store { + static final Log LOG = LogFactory.getLog(AssistantStore.class); + + private final Assistant assistant; + + /** + * Constructor + * @param basedir basedir qualified path under which the region + * directory;generally the table subdirectory lives; + * @param region + * @param family HColumnDescriptor for this column + * @param fs file system object + * @param conf configuration object. Can be null. + * @param assistantConf + * @throws IOException + */ + protected AssistantStore(Path basedir, HRegion region, + HColumnDescriptor family, FileSystem fs, Configuration conf, + String assistantConf) throws IOException { + super(basedir, region, family, fs, conf); + Matcher matcher = HConstants.ASSISTANT_CONF_VALUE_PATTERN + .matcher(assistantConf); + if (matcher.matches()) { + this.assistant = getAssistantByName(matcher.group(1), matcher.group(2)); + } else { + throw new RuntimeException("Assistant configuration " + assistantConf + + " does not match pattern"); + } + + } + + private Assistant getAssistantByName(String assistantClassName, String argStr) { + Class assistantClass = null; + try { + assistantClass = getClass(assistantClassName, Assistant.class); + Constructor c = assistantClass.getConstructor(); + return c.newInstance().initialize(argStr); + } catch (Exception e) { + String msg = "Failed construction of assistant for" + assistantClassName + + " with argument = " + argStr; + LOG.warn(msg, e); + throw new RuntimeException(msg, e); + } + } + + private Class getClass(String name, Class xface) { + try { + Class theClass = Class.forName(name); + if (theClass != null && !xface.isAssignableFrom(theClass)) + throw new RuntimeException(theClass + " not " + xface.getName()); + else if (theClass != null) + return theClass.asSubclass(xface); + else + return null; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + @Override + public byte[] getSplitPoint() { + // Returns null to indicate this store can't decide the region's split point + return null; + } + + @Override + public boolean isAssistant() { + return true; + } + + /** + * Generate the assistant data with the given mutations + * @param mutations + * @return a collection of generated mutations + * @throws IOException + */ + @Override + public Collection generateAssistantData( + Collection mutations) throws IOException { + Collection generatedMutations = assistant.assist( + this.getHRegion(), mutations, this.getFamily().getName()); + // LOG.debug("generatedMutations:" + generatedMutations); + return generatedMutations; + } + +} Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 239702) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -639,6 +639,9 @@ "(" + CP_HTD_ATTR_VALUE_PARAM_KEY_PATTERN + ")=(" + CP_HTD_ATTR_VALUE_PARAM_VALUE_PATTERN + "),?"); + public static final Pattern ASSISTANT_CONF_VALUE_PATTERN = Pattern + .compile("^([^\\|]+)\\|(.*)$"); + /** The delay when re-trying a socket operation in a loop (HBASE-4712) */ public static final int SOCKET_RETRY_WAIT_MS = 200; Index: src/main/java/org/apache/hadoop/hbase/regionserver/assistant/Assistant.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/assistant/Assistant.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/assistant/Assistant.java (revision 0) @@ -0,0 +1,117 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver.assistant; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Abstract class for objects that generate assistant data when region has + * assistant store(s). + * + * Assistant instances are created one per assistant store. + * + * When implementing your own assistants, please implement the methods + * {@link #assist(HRegion, Mutation)} and {@link #getName()}. + * + * NOTE: For each generated assistant mutation, its KeyValues' value must be the + * same as the original corresponding mutation's row. Thus, when region split, + * we could make sure the daughter region has its correct assistant data. + */ +public abstract class Assistant { + + /** + * Help the given region to get some assistant data as the given mutations. + * + * We will check the generated mutations to make sure the values of generated + * KeyValues are same as the original mutation's row + * @param store + * @param mutations + * @param assistanStoreName name of assistant store + * @return a collection of mutations + * @throws IOException + */ + public final Collection assist(final HRegion region, + final Collection mutations, final byte[] assistanStoreName) + throws IOException { + List generatedMutations = new ArrayList(); + for (Mutation mutation : mutations) { + Collection muts = assist(region, mutation, assistanStoreName); + checkValue(mutation.getRow(), muts); + generatedMutations.addAll(muts); + } + return generatedMutations; + } + + /** + * Check whether each KeyValue's value in mutations is the same as the given + * expected value + * @param expectedValue + * @param toCheckMutations + */ + private void checkValue(byte[] expectedValue, Collection toCheckMutations) { + for (Mutation toCheckMutation : toCheckMutations) { + for (List kvList : toCheckMutation.getFamilyMap().values()) { + for (KeyValue kv : kvList) { + if (!Bytes.equals(expectedValue, 0, expectedValue.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())) { + throw new IllegalArgumentException("Assistant class " + getName() + + " is not legal"); + } + } + } + } + } + + /** + * Help the given region to get some assistant data as the given mutation + * + * This method would be implemented in kinds of Assistants as use cases + * + * NOTE: The generated mutations' KeyValue must has the same value as given + * mutation's row + * @param region + * @param mutation + * @param assistanStoreName name of assistant store + * @return a collection of mutations + * @throws IOException + */ + protected abstract Collection assist(final HRegion region, + final Mutation mutation, final byte[] assistanStoreName) + throws IOException; + + /** + * @return the name of this Assistant + */ + public abstract String getName(); + + /** + * Initialize the instance as the given string + * @return the own Assistant objec + */ + public abstract Assistant initialize(String argumentStr); + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/assistant/RowValueSwapAssistant.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/assistant/RowValueSwapAssistant.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/assistant/RowValueSwapAssistant.java (revision 0) @@ -0,0 +1,218 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver.assistant; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This assistant will swap the row and value, it could be consider as a simple + * basic index assistant + */ +public class RowValueSwapAssistant extends Assistant { + private static final Pattern ARGUMENT_PATTERN = Pattern + .compile("^FAMILY=(.*),QUALIFIER=(.*)$"); + // What family and qualifier we want to swap row, value + private byte[] family; + private byte[] qualifier; + + /** default constructor */ + public RowValueSwapAssistant() { + } + + @Override + protected Collection assist(final HRegion region, + final Mutation mutation, final byte[] assistanStoreName) + throws IOException { + List generatedMutations = new ArrayList(); + Map> familyMap = mutation.getFamilyMap(); + if (mutation instanceof Put) { + for (Map.Entry> entry : familyMap.entrySet()) { + for (KeyValue kv : entry.getValue()) { + if (ignoreKV(kv)) continue; + Put p = new Put(Bytes.add(kv.getValue(), kv.getRow())); + p.add(assistanStoreName, kv.getQualifier(), kv.getRow()); + generatedMutations.add(p); + } + } + } else if (mutation instanceof Delete) { + if(familyMap.isEmpty()){ + for (byte[] tableFamily : region.getTableDesc().getFamiliesKeys()) { + ((Delete) mutation).deleteFamily(tableFamily, mutation.getTimeStamp()); + } + } + for (Map.Entry> entry : familyMap.entrySet()) { + Iterator kvIterator = entry.getValue().iterator(); + while (kvIterator.hasNext()) { + KeyValue kv = kvIterator.next(); + if(!kv.isDelete()) continue; + // Create the Get for the corresponding delete + Get get = new Get(kv.getRow()); + boolean isLatestTimestamp = kv.isLatestTimestamp(); + if (kv.getType() == KeyValue.Type.Delete.getCode()) { + if (ignoreKV(kv)) continue; + get.addColumn(kv.getFamily(), kv.getQualifier()); + if (!isLatestTimestamp) { + get.setTimeStamp(kv.getTimestamp()); + } + } else if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) { + if (ignoreKV(kv)) continue; + get.addColumn(kv.getFamily(), kv.getQualifier()).setMaxVersions(); + if (!isLatestTimestamp) { + get.setTimeRange(0, kv.getTimestamp() + 1); + } + } else if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) { + if (ignoreFamily(kv.getFamilyArray(), kv.getFamilyOffset(), + kv.getFamilyLength())) { + continue; + } + get.addFamily(kv.getFamily()).setMaxVersions(); + if (!isLatestTimestamp) { + get.setTimeRange(0, kv.getTimestamp() + 1); + } + } + + Result result = region.get(get, null); + if (result.isEmpty()) { + kvIterator.remove(); + continue; + } + + // Create the delete on assistant store according to the result + + boolean updateTs = true; + for (KeyValue existedKV : result.raw()) { + long tsOfExisted = existedKV.getTimestamp(); + // specify the time stamp for delete to prevent new putting + byte[] tsBytes = Bytes.toBytes(tsOfExisted); + if (updateTs) { + kv.updateLatestStamp(tsBytes); + updateTs = false; + } + if (ignoreQualifier(existedKV.getQualifierArray(), + existedKV.getQualifierOffset(), existedKV.getQualifierLength())) { + continue; + } + + byte[] assistantRow = Bytes.add(existedKV.getValue(), + existedKV.getRow()); + Delete assistantDelete = new Delete(assistantRow); + assistantDelete.addDeleteMarker(new KeyValue(assistantRow, + assistanStoreName, existedKV.getQualifier(), tsOfExisted, + KeyValue.Type.Delete, existedKV.getRow())); + generatedMutations.add(assistantDelete); + } + } + } + } + return generatedMutations; + } + + @Override + public String getName() { + return this.getClass().getName(); + } + + /** + * Check whether the given KeyValue's family and qualifier is wanted + * @param kv + * @return true if ignore this kv. + */ + private boolean ignoreKV(KeyValue kv) { + if (ignoreFamily(kv.getFamilyArray(), kv.getFamilyOffset(), + kv.getFamilyLength())) { + return true; + } + if (ignoreFamily(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength())) { + return true; + } + return false; + } + + /** + * Check whether the given family is wanted + * @param checkFamily + * @return true if ignore the given family. + */ + private boolean ignoreFamily(byte[] checkFamily, int offset, int len) { + if (this.family != null + && !Bytes.equals(this.family, 0, this.family.length, checkFamily, + offset, len)) { + return true; + } + return false; + } + + /** + * Check whether the given family is wanted + * @param checkFamily + * @return true if ignore the given family. + */ + private boolean ignoreQualifier(byte[] checkQualifier, int offset, int len) { + if (this.qualifier != null && !Bytes.equals(this.qualifier, 0, this.qualifier.length, + checkQualifier, offset, len)) { + return true; + } + return false; + } + + @Override + public Assistant initialize(String argumentStr) { + Matcher matcher = ARGUMENT_PATTERN.matcher(argumentStr); + if (!matcher.matches()) { + throw new IllegalArgumentException("Argument:" + argumentStr + + " is not suited for " + getName()); + } + String familyStr = matcher.group(1); + String qualStr = matcher.group(2); + this.family = familyStr.isEmpty() ? null : Bytes.toBytesBinary(familyStr); + this.qualifier = qualStr.isEmpty() ? null : Bytes.toBytesBinary(qualStr); + return this; + } + + /** + * Get the configuration string of RowValueSwapAssistant when setting + * assistant in HColumnDescriptor + * @param family + * @param qualifier + * @return the configuration string + */ + public static String getAssistantConfString(String family, String qualifier) { + return RowValueSwapAssistant.class.getName() + "|" + "FAMILY=" + + ((family == null || family.isEmpty()) ? "" : family) + ",QUALIFIER=" + + ((qualifier == null || qualifier.isEmpty()) ? "" : qualifier); + } + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java (revision 239702) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java (working copy) @@ -19,8 +19,8 @@ import java.io.IOException; import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.SortedMap; +import java.util.TreeMap; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; @@ -42,8 +42,9 @@ // get the coprocessor environment RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment(); - // set of rows to lock, sorted to avoid deadlocks - SortedSet rowsToLock = new TreeSet(Bytes.BYTES_COMPARATOR); + // map of rows to lock, sorted to avoid deadlocks + SortedMap rowsToLockId = new TreeMap( + Bytes.BYTES_COMPARATOR); HRegionInfo regionInfo = env.getRegion().getRegionInfo(); for (Mutation m : mutations) { @@ -51,7 +52,7 @@ if (!HRegion.rowIsInRange(regionInfo, m.getRow())) { String msg = "Requested row out of range '" + Bytes.toStringBinary(m.getRow()) + "'"; - if (rowsToLock.isEmpty()) { + if (rowsToLockId.isEmpty()) { // if this is the first row, region might have moved, // allow client to retry throw new WrongRegionException(msg); @@ -60,9 +61,9 @@ throw new DoNotRetryIOException(msg); } } - rowsToLock.add(m.getRow()); + rowsToLockId.put(m.getRow(), null); } // call utility method on region - env.getRegion().mutateRowsWithLocks(mutations, rowsToLock); + env.getRegion().mutateRowsWithLocks(mutations, rowsToLockId); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 241368) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HFileLink; @@ -426,7 +427,7 @@ completionService.submit(new Callable() { public StoreFile call() throws IOException { StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf, - family.getBloomFilterType(), dataBlockEncoder); + family.getBloomFilterType(), dataBlockEncoder, isAssistant()); passSchemaMetricsTo(storeFile); storeFile.createReader(); return storeFile; @@ -608,7 +609,7 @@ StoreFile.rename(fs, srcPath, dstPath); StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); + this.family.getBloomFilterType(), this.dataBlockEncoder, isAssistant()); passSchemaMetricsTo(sf); StoreFile.Reader r = sf.createReader(); @@ -864,7 +865,7 @@ status.setStatus("Flushing " + this + ": reopening flushed file"); StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); + this.family.getBloomFilterType(), this.dataBlockEncoder, isAssistant()); passSchemaMetricsTo(sf); StoreFile.Reader r = sf.createReader(); @@ -1847,7 +1848,7 @@ try { storeFile = new StoreFile(this.fs, path, this.conf, this.cacheConf, this.family.getBloomFilterType(), - NoOpDataBlockEncoder.INSTANCE); + NoOpDataBlockEncoder.INSTANCE, isAssistant()); passSchemaMetricsTo(storeFile); storeFile.createReader(); } catch (IOException e) { @@ -1899,7 +1900,7 @@ " to " + destPath); } result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); + this.family.getBloomFilterType(), this.dataBlockEncoder, isAssistant()); passSchemaMetricsTo(result); result.createReader(); } @@ -2538,6 +2539,15 @@ return comparator; } + public boolean isAssistant() { + return false; + } + + public Collection generateAssistantData( + Collection mutations) throws IOException { + return null; + } + /** * Immutable information for scans over a store. */