### Eclipse Workspace Patch 1.0
#P 0.94.0-ali-1.0
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStoreOnCluster.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStoreOnCluster.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStoreOnCluster.java (revision 0)
@@ -0,0 +1,156 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.assistant.RowValueSwapAssistant;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestAssistantStoreOnCluster {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static int SLAVES = 3;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(SLAVES);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testRowValueSwapAssistant() throws IOException,
+ InterruptedException {
+ byte[] tableName = Bytes.toBytes("testRowValueSwapAssistant");
+ byte[] normalFamilyName = Bytes.toBytes("family");
+ byte[] assistantFamilyName = Bytes.toBytes("assistantFamily");
+ byte[] qualifier = Bytes.toBytes("qualifier");
+
+ HColumnDescriptor normalFamily = new HColumnDescriptor(normalFamilyName);
+ HColumnDescriptor assistantFamily = new HColumnDescriptor(
+ assistantFamilyName).setAssistant(RowValueSwapAssistant
+ .getAssistantConfString(null, null));
+
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(normalFamily);
+ desc.addFamily(assistantFamily);
+ TEST_UTIL.getHBaseAdmin().createTable(desc);
+
+ // Load data to table
+ Configuration c = TEST_UTIL.getConfiguration();
+ HTable htable = new HTable(c, tableName);
+ int rowCount = 100;
+ int valueCount = 10;
+ for (int i = 0; i < rowCount; i++) {
+ Put put = new Put(Bytes.toBytes("testrow" + String.format("%04d", i)));
+ put.add(normalFamilyName, qualifier,
+ Bytes.toBytes("testvalue" + String.format("%04d", i % valueCount)));
+ htable.put(put);
+ if (i == rowCount / 2) {
+ // make a flush
+ TEST_UTIL.getHBaseAdmin().flush(tableName);
+ }
+ }
+
+ // verify count
+ Scan scan = new Scan();
+ ResultScanner scanner = htable.getScanner(scan);
+ verifyCount(scanner, rowCount, rowCount);
+
+ // verify count with assistant scan
+ scan = new Scan().setAssistantScan(new Scan());
+ scanner = htable.getScanner(scan);
+ verifyCount(scanner, rowCount, rowCount);
+
+ //split region
+ byte[] splitPoint = Bytes.toBytes("testrow"
+ + String.format("%04d", rowCount / 2));
+ TEST_UTIL.getHBaseAdmin().split(tableName, splitPoint);
+
+ // wait unit region split is done
+ long timeout = System.currentTimeMillis() + (15 * 1000);
+ while ((System.currentTimeMillis() < timeout)
+ && (htable.getRegionLocations().size() < 2)) {
+ Thread.sleep(250);
+ }
+
+ scan = new Scan().setStopRow(splitPoint).setAssistantScan(new Scan());
+ scanner = htable.getScanner(scan);
+ verifyCount(scanner, rowCount / 2, rowCount / 2);
+
+ // scan the table with condition, using the assistant scan
+ Scan assistantScan = new Scan();
+ byte[] assistantStart = Bytes.toBytes("testvalue" + String.format("%04d", 0));
+ byte[] assistantStop = Bytes.toBytes("testvalue" + String.format("%04d", valueCount / 2));
+ assistantScan.setStartRow(assistantStart);
+ assistantScan.setStopRow(assistantStop);
+ scan = new Scan().setAssistantScan(assistantScan);
+ scanner = htable.getScanner(scan);
+ int scannedRowCount = 0;
+ Result result = null;
+ while ((result = scanner.next()) != null) {
+ scannedRowCount++;
+ assertTrue(Bytes.compareTo(result.getRow(), assistantStart) >= 0);
+ assertTrue(Bytes.compareTo(result.getRow(), assistantStop) < 0);
+ System.out.println(result);
+ }
+ scanner.close();
+ assertEquals(rowCount / 2, scannedRowCount);
+
+ }
+
+ private void verifyCount(ResultScanner scanner, int expectedKVCount,
+ int expectedRowCount) throws IOException {
+ int rowCount = 0;
+ int kvCount = 0;
+ try {
+ Result result = null;
+ while ((result = scanner.next()) != null) {
+ rowCount++;
+ kvCount += result.size();
+ System.out.println(result);
+ }
+ } finally {
+ scanner.close();
+ }
+ assertEquals(expectedKVCount, kvCount);
+ assertEquals(expectedRowCount, rowCount);
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 239702)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy)
@@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.ValueSplitHalfStoreFileReader;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -178,6 +179,8 @@
// Set when we obtain a Reader.
private long maxMemstoreTS = -1;
+ private final boolean isAssistanFile;
+
public long getMaxMemstoreTS() {
return maxMemstoreTS;
}
@@ -246,33 +249,38 @@
// the last modification time stamp
private long modificationTimeStamp = 0L;
+ StoreFile(final FileSystem fs, final Path p, final Configuration conf,
+ final CacheConfig cacheConf, final BloomType cfBloomType,
+ final HFileDataBlockEncoder dataBlockEncoder) throws IOException {
+ this(fs, p, conf, cacheConf, cfBloomType, dataBlockEncoder, false);
+ }
+
/**
* Constructor, loads a reader and it's indices, etc. May allocate a
* substantial amount of ram depending on the underlying files (10-20MB?).
- *
- * @param fs The current file system to use.
- * @param p The path of the file.
- * @param blockcache true if the block cache is enabled.
- * @param conf The current configuration.
- * @param cacheConf The cache configuration and block cache reference.
+ *
+ * @param fs The current file system to use.
+ * @param p The path of the file.
+ * @param blockcache true if the block cache is enabled.
+ * @param conf The current configuration.
+ * @param cacheConf The cache configuration and block cache reference.
* @param cfBloomType The bloom type to use for this store file as specified
- * by column family configuration. This may or may not be the same
- * as the Bloom filter type actually present in the HFile, because
+ * by column family configuration. This may or may not be the same as
+ * the Bloom filter type actually present in the HFile, because
* column family configuration might change. If this is
* {@link BloomType#NONE}, the existing Bloom filter is ignored.
* @param dataBlockEncoder data block encoding algorithm.
+ * @param isAssistanFile true if this file belongs to the assistant store
* @throws IOException When opening the reader fails.
*/
- StoreFile(final FileSystem fs,
- final Path p,
- final Configuration conf,
- final CacheConfig cacheConf,
- final BloomType cfBloomType,
- final HFileDataBlockEncoder dataBlockEncoder)
+ StoreFile(final FileSystem fs, final Path p, final Configuration conf,
+ final CacheConfig cacheConf, final BloomType cfBloomType,
+ final HFileDataBlockEncoder dataBlockEncoder, final boolean isAssistanFile)
throws IOException {
this.fs = fs;
this.path = p;
this.cacheConf = cacheConf;
+ this.isAssistanFile = isAssistanFile;
this.dataBlockEncoder =
dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
: dataBlockEncoder;
@@ -621,12 +629,23 @@
}
if (isReference()) {
if (this.link != null) {
- this.reader = new HalfStoreFileReader(this.fs, this.referencePath, this.link,
- this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache());
+ if (isAssistanFile) {
+ this.reader = new ValueSplitHalfStoreFileReader(this.fs, this.referencePath, this.link,
+ this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache());
+ } else {
+ this.reader = new HalfStoreFileReader(this.fs, this.referencePath, this.link,
+ this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache());
+ }
} else {
- this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
- this.cacheConf, this.reference,
- dataBlockEncoder.getEncodingInCache());
+ if (isAssistanFile) {
+ this.reader = new ValueSplitHalfStoreFileReader(this.fs,
+ this.referencePath, this.cacheConf, this.reference,
+ dataBlockEncoder.getEncodingInCache());
+ } else {
+ this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
+ this.cacheConf, this.reference,
+ dataBlockEncoder.getEncodingInCache());
+ }
}
} else if (isLink()) {
long size = link.getFileStatus(fs).getLen();
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 239709)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -40,6 +40,7 @@
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -141,6 +142,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MutableClassToInstanceMap;
+import com.google.common.collect.SortedMaps;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@@ -204,6 +206,8 @@
protected final Map stores =
new ConcurrentSkipListMap(Bytes.BYTES_RAWCOMPARATOR);
+ private boolean hasAssistantStore = false;
+
// Registered region protocol handlers
private ClassToInstanceMap
protocolHandlers = MutableClassToInstanceMap.create();
@@ -566,6 +570,9 @@
Store store = future.get();
this.stores.put(store.getColumnFamilyName().getBytes(), store);
+ if (store.isAssistant()) {
+ this.hasAssistantStore = true;
+ }
long storeSeqId = store.getMaxSequenceId();
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
storeSeqId);
@@ -654,6 +661,13 @@
}
/**
+ * @return true if this region has assistant store
+ */
+ public boolean hasAssistantStore() {
+ return hasAssistantStore;
+ }
+
+ /**
* This function will return the HDFS blocks distribution based on the data
* captured when HFile is created
* @return The HDFS blocks distribution for the region.
@@ -1731,28 +1745,43 @@
return getScanner(scan, null);
}
- void prepareScanner(Scan scan) throws IOException {
+ void prepareScanner(Scan scan, boolean isAssistant) throws IOException {
if(!scan.hasFamilies()) {
- // Adding all families to scanner
- for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
- scan.addFamily(family);
+ // Adding all non-assistant families to scanner
+
+ for (HColumnDescriptor family : this.htableDescriptor.getColumnFamilies()) {
+ if (isAssistant && family.isAssistanFamily()) {
+ scan.addFamily(family.getName());
+ } else if (!isAssistant && !family.isAssistanFamily()) {
+ scan.addFamily(family.getName());
+ }
}
}
}
+ void prepareScanner(Scan scan) throws IOException {
+ prepareScanner(scan, false);
+ }
+
protected RegionScanner getScanner(Scan scan,
List additionalScanners) throws IOException {
startRegionOperation();
this.readRequestsCount.increment();
try {
+ Scan executedScan = scan;
+ if (scan.getAssistantScan() != null) {
+ executedScan = scan.getAssistantScan();
+ prepareScanner(executedScan, true);
+ } else {
+ prepareScanner(executedScan, false);
+ }
// Verify families are all valid
- prepareScanner(scan);
- if(scan.hasFamilies()) {
+ if (executedScan.hasFamilies()) {
for(byte [] family : scan.getFamilyMap().keySet()) {
checkFamily(family);
}
}
- return instantiateRegionScanner(scan, additionalScanners);
+ return instantiateRegionScanner(executedScan, additionalScanners);
} finally {
closeRegionOperation();
}
@@ -1801,6 +1830,10 @@
public void delete(Delete delete, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
+ if(this.hasAssistantStore()){
+ mutateRow(delete, lockid);
+ return;
+ }
checkResources();
Integer lid = null;
startRegionOperation();
@@ -1989,6 +2022,10 @@
throws IOException {
checkReadOnly();
+ if (this.hasAssistantStore()) {
+ mutateRow(put, lockid);
+ return;
+ }
// Do a rough check that we have resources to accept a write. The check is
// 'rough' in that between the resource check and the call to obtain a
// read lock, resources may run out. For now, the thought is that this
@@ -3150,7 +3187,10 @@
protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
throws IOException {
- return new Store(tableDir, this, c, this.fs, this.conf);
+ return c.getAssistant() == null ? new Store(tableDir, this, c,
+ this.fs, this.conf) : new AssistantStore(tableDir, this, c, this.fs,
+ this.conf, c.getAssistant());
+
}
/**
@@ -4788,8 +4828,20 @@
return results;
}
+ public void mutateRow(Mutation mutation, Integer lockid) throws IOException {
+ SortedMap rowsToLockId = new TreeMap(
+ Bytes.BYTES_COMPARATOR);
+ rowsToLockId.put(mutation.getRow(), lockid);
+ List mutations = new ArrayList();
+ mutations.add(mutation);
+ mutateRowsWithLocks(mutations, rowsToLockId);
+ }
+
public void mutateRow(RowMutations rm) throws IOException {
- mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
+ SortedMap rowsToLockId = new TreeMap(
+ Bytes.BYTES_COMPARATOR);
+ rowsToLockId.put(rm.getRow(), null);
+ mutateRowsWithLocks(rm.getMutations(), rowsToLockId);
}
/**
@@ -4803,9 +4855,9 @@
* @throws IOException
*/
public void mutateRowsWithLocks(Collection mutations,
- Collection rowsToLock) throws IOException {
+ SortedMap rowsToLockId) throws IOException {
boolean flush = false;
-
+ checkResources();
startRegionOperation();
List acquiredLocks = null;
try {
@@ -4832,20 +4884,34 @@
}
}
+ // generate the assistant store data if exist
+ if (this.hasAssistantStore()) {
+ mutations = new ArrayList(mutations);
+ for (Store store : this.stores.values()) {
+ if (store.isAssistant()) {
+ mutations.addAll(store.generateAssistantData(mutations));
+ }
+ }
+ }
+
+
long txid = 0;
boolean walSyncSuccessful = false;
boolean locked = false;
// 2. acquire the row lock(s)
- acquiredLocks = new ArrayList(rowsToLock.size());
- for (byte[] row : rowsToLock) {
+ acquiredLocks = new ArrayList(rowsToLockId.size());
+ for (Map.Entry rowToLockId : rowsToLockId.entrySet()) {
// attempt to lock all involved rows, fail if one lock times out
- Integer lid = getLock(null, row, true);
+ Integer lid = getLock(rowToLockId.getValue(), rowToLockId.getKey(),
+ true);
if (lid == null) {
throw new IOException("Failed to acquire lock on "
- + Bytes.toStringBinary(row));
+ + Bytes.toStringBinary(rowToLockId.getKey()));
}
- acquiredLocks.add(lid);
+ if (rowToLockId.getValue() == null) {
+ acquiredLocks.add(lid);
+ }
}
// 3. acquire the region lock
Index: src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (revision 239702)
+++ src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (working copy)
@@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.regex.Matcher;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@@ -86,6 +87,8 @@
public static final String MIN_VERSIONS = "MIN_VERSIONS";
public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS";
+ public static final String ASSISTANT = "ASSISTANT";
+
/**
* Default compression type.
*/
@@ -869,6 +872,37 @@
}
/**
+ * Make this column family as an assistant one with the given configuration
+ * @param assistantConf
+ * @return this (for chained invocation)
+ */
+ public HColumnDescriptor setAssistant(String assistantConf) {
+ Matcher matcher = HConstants.ASSISTANT_CONF_VALUE_PATTERN
+ .matcher(assistantConf);
+ if (!matcher.matches()) {
+ throw new RuntimeException("Failed setting assistant configuration to "
+ + assistantConf + ", but it does not match pattern");
+ }
+ return setValue(ASSISTANT, assistantConf);
+ }
+
+ /**
+ * @return assistant configuration of this column family, null if not an
+ * assistant one
+ */
+ public String getAssistant() {
+ String n = getValue(ASSISTANT);
+ return n;
+ }
+
+ /**
+ * @return true if it is an assistant family
+ */
+ public boolean isAssistanFamily() {
+ return getAssistant() != null;
+ }
+
+ /**
* @see java.lang.Object#toString()
*/
@Override
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStore.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStore.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAssistantStore.java (revision 0)
@@ -0,0 +1,231 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.regionserver.assistant.RowValueSwapAssistant;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestAssistantStore {
+ static final Log LOG = LogFactory.getLog(TestAssistantStore.class);
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final String DIR = TEST_UTIL.getDataTestDir("TestAssistantStore")
+ .toString();
+ private HRegion region = null;
+ private Configuration conf = TEST_UTIL.getConfiguration();
+
+ private static byte[] ROW = Bytes.toBytes("testRow");
+ private static final int ROWSIZE = 200;
+ private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+ private static byte[] QUAL = Bytes.toBytes("testQual");
+ private static final int QUALSIZE = 3;
+ private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
+ private static byte[] VALUE = Bytes.toBytes("testValue");
+ private static final int VALUESIZE = 10;
+ private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
+
+ @Test
+ public void testRowValueSwapAssistant() throws IOException {
+ String method = "testRowValueSwapAssistant";
+ byte[] regionStartKey=ROW;
+ byte[] regionEndKey = Bytes.add(ROW, new byte[] { (byte) 0xff });
+ byte[] tableName = Bytes.toBytes(method);
+ byte[] normalFamilyName = Bytes.toBytes("family");
+ byte[] assistantFamilyName = Bytes.toBytes("assistantFamily");
+ HColumnDescriptor normalFamily = new HColumnDescriptor(normalFamilyName);
+ HColumnDescriptor assistantFamily = new HColumnDescriptor(
+ assistantFamilyName).setAssistant(RowValueSwapAssistant
+ .getAssistantConfString(null, null));
+ this.region = initHRegion(tableName, regionStartKey, regionEndKey, method,
+ conf, normalFamily, assistantFamily);
+ Store normalStore = this.region.getStore(normalFamilyName);
+ Store assistantStore =this.region.getStore(assistantFamilyName);
+ assertFalse(normalStore.isAssistant());
+ assertTrue(assistantStore.isAssistant());
+
+ // No data now
+ assertNull(normalStore.getSplitPoint());
+ assertNull(assistantStore.getSplitPoint());
+
+ // Load data to region
+ loadDataToRegion(region, normalFamilyName);
+
+ // Check the data in normal store
+ Scan scan = new Scan();
+ // Open the scanner, check the count
+ RegionScanner scanner = region.getScanner(scan);
+ verifyCount(scanner, ROWSIZE * QUALSIZE, ROWSIZE);
+ // Check the data in normal store
+
+ // Check the data in assistant store
+ scan = new Scan();
+ scan.addFamily(assistantFamilyName);
+ scanner = region.getScanner(scan);
+ verifyCount(scanner, ROWSIZE * QUALSIZE, ROWSIZE);
+
+ // Check the data with filter
+ byte[] foundValue = VALUES[0];
+ Filter filter = new ValueFilter(CompareOp.EQUAL, new BinaryComparator(
+ foundValue ));
+ int expectRowCount = ROWSIZE / VALUESIZE
+ + ((ROWSIZE % VALUESIZE) == 0 ? 0 : 1);
+ scan = new Scan().setFilter(filter);
+ scanner = region.getScanner(scan);
+ verifyCount(scanner, expectRowCount * QUALSIZE, expectRowCount);
+
+ // Using assistant store to scan the equal data without filter
+ scan = new Scan().setAssistantScan(new Scan().setStartRow(foundValue)
+ .setStopRow(Bytes.add(foundValue, new byte[] { (byte) 0xff })));
+ scanner = region.getScanner(scan);
+ verifyCount(scanner, expectRowCount * QUALSIZE, expectRowCount);
+
+ // Delete some rows
+ int deleteCount = 10;
+ assertTrue(deleteCount < ROWSIZE);
+
+ for (int i = 0; i < deleteCount; i++) {
+ Delete delete = new Delete(ROWS[i]);
+ region.delete(delete, null, true);
+ }
+
+ // check after deleting
+ scan = new Scan();
+ scanner = region.getScanner(scan);
+ verifyCount(scanner, (ROWSIZE - deleteCount) * QUALSIZE,
+ (ROWSIZE - deleteCount));
+
+ // Check the data in assistant store
+ scan = new Scan();
+ scan.addFamily(assistantFamilyName);
+ scanner = region.getScanner(scan);
+ verifyCount(scanner, (ROWSIZE - deleteCount) * QUALSIZE,
+ (ROWSIZE - deleteCount));
+
+ }
+
+ private void verifyCount(InternalScanner scanner, int expectedKVCount,
+ int expectedRowCount) throws IOException {
+ List kvList = new ArrayList();
+ int rowCount = 0;
+ int kvCount = 0;
+ try {
+ while (scanner.next(kvList)) {
+ if (kvList.isEmpty()) continue;
+ rowCount++;
+ kvCount += kvList.size();
+ kvList.clear();
+ }
+ } finally {
+ scanner.close();
+ }
+ if (!kvList.isEmpty()) {
+ rowCount++;
+ kvCount += kvList.size();
+ kvList.clear();
+ }
+ assertEquals(expectedKVCount, kvCount);
+ assertEquals(expectedRowCount, rowCount);
+ }
+
+ private static void loadDataToRegion(HRegion region, byte[] family)
+ throws IOException {
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ for (int j = 0; j < QUALSIZE; j++) {
+ put.add(family, QUALS[j], VALUES[i % VALUESIZE]);
+ }
+ region.put(put);
+ if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
+ region.flushcache();
+ }
+ }
+ }
+
+ public static HRegion initHRegion(byte[] tableName, String callingMethod,
+ Configuration conf, HColumnDescriptor... families) throws IOException {
+ return initHRegion(tableName, null, null, callingMethod, conf, families);
+ }
+
+ /**
+ * @param tableName
+ * @param startKey
+ * @param stopKey
+ * @param callingMethod
+ * @param conf
+ * @param families
+ * @throws IOException
+ * @return A region on which you must call
+ * {@link HRegion#closeHRegion(HRegion)} when done.
+ */
+ private static HRegion initHRegion(byte[] tableName, byte[] startKey,
+ byte[] stopKey, String callingMethod, Configuration conf,
+ HColumnDescriptor... families) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ for (HColumnDescriptor family : families) {
+ htd.addFamily(family);
+ }
+ HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false);
+ Path path = new Path(DIR + callingMethod);
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(path)) {
+ if (!fs.delete(path, true)) {
+ throw new IOException("Failed delete of " + path);
+ }
+ }
+ return HRegion.createHRegion(info, path, conf, htd);
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
+ }
+ return ret;
+ }
+
+}
Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 239702)
+++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy)
@@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@@ -89,7 +90,8 @@
// version 4: support reversed scan & set small
// version 5: support encoding result
- private static final byte SCAN_VERSION = (byte) 5;
+ // version 6: support scan using assistant store data
+ private static final byte SCAN_VERSION = (byte) 6;
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
@@ -131,6 +133,8 @@
*/
private boolean small = false;
+ private Scan assistantScan = null;
+
/**
* Create a Scan operation across all rows.
*/
@@ -547,6 +551,22 @@
}
/**
+ * Set an assistant scan which will use the assistant store data.
+ *
+ * It could be only set if the scanned table has assistant store.
+ * @param assistantscan
+ * @return this
+ */
+ public Scan setAssistantScan(Scan assistantscan) {
+ this.assistantScan = assistantscan;
+ return this;
+ }
+
+ public Scan getAssistantScan() {
+ return this.assistantScan;
+ }
+
+ /**
* Compile the table and column family (i.e. schema) information
* into a String. Useful for parsing and aggregation by debugging,
* logging, and administration tools.
@@ -691,6 +711,13 @@
} else {
this.supportEncodingResult = false;
}
+
+ if (version >= 6) {
+ if (in.readBoolean()) {
+ this.assistantScan = new Scan();
+ this.assistantScan.readFields(in);
+ }
+ }
}
public void write(final DataOutput out)
@@ -728,6 +755,10 @@
out.writeBoolean(this.reversed);
out.writeBoolean(this.small);
out.writeBoolean(this.supportEncodingResult);
+ out.writeBoolean(this.assistantScan != null);
+ if (this.assistantScan != null) {
+ this.assistantScan.write(out);
+ }
}
/**
Index: src/test/java/org/apache/hadoop/hbase/io/TestValueSplitHalfStoreFileReader.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/io/TestValueSplitHalfStoreFileReader.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/io/TestValueSplitHalfStoreFileReader.java (revision 0)
@@ -0,0 +1,266 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestValueSplitHalfStoreFileReader {
+ @Test
+ public void testHalfScanAndReseek() throws IOException {
+ HBaseTestingUtility test_util = new HBaseTestingUtility();
+ String root_dir = test_util.getDataTestDir("TestValueSplitHalfStoreFile")
+ .toString();
+ Path p = new Path(root_dir, "test");
+
+ Configuration conf = test_util.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ CacheConfig cacheConf = new CacheConfig(conf);
+
+ HFile.Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(fs, p)
+ .withBlockSize(1024).withComparator(KeyValue.KEY_COMPARATOR).create();
+
+ // write some things.
+ List items = genSomeKeys();
+ for (KeyValue kv : items) {
+ w.append(kv);
+ }
+ w.close();
+
+ byte[] midValue = _b(String.format("value_%04d", VALUENUM / 2));
+
+ Reference bottom = new Reference(midValue, Reference.Range.bottom);
+ doTestOfScanAndReseek(p, fs, bottom, cacheConf, midValue);
+
+ Reference top = new Reference(midValue, Reference.Range.top);
+ doTestOfScanAndReseek(p, fs, top, cacheConf, midValue);
+ }
+
+ @Test
+ public void testHalfScanner() throws IOException {
+ HBaseTestingUtility test_util = new HBaseTestingUtility();
+ String root_dir = test_util.getDataTestDir(
+ "TestValueSplitHalfStoreFileScanBefore").toString();
+ Path p = new Path(root_dir, "test");
+ Configuration conf = test_util.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ CacheConfig cacheConf = new CacheConfig(conf);
+
+ HFile.Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(fs, p)
+ .withBlockSize(1024).withComparator(KeyValue.KEY_COMPARATOR).create();
+
+ // write some things.
+ List items = genSomeKeys();
+ for (KeyValue kv : items) {
+ w.append(kv);
+ }
+ w.close();
+
+ byte[] midValue = _b(String.format("value_%04d", VALUENUM / 2));
+
+ Reference bottom = new Reference(midValue, Reference.Range.bottom);
+ Reference top = new Reference(midValue, Reference.Range.top);
+
+ for (int i = 1; i * VALUENUM / 2 < items.size(); i++) {
+ // Check the every edge KeyValue
+ int edgeNum = i * VALUENUM / 2;
+ KeyValue edgeKeyValue = items.get(edgeNum);
+ System.out.println("edgeKeyValue:" + edgeKeyValue + ",value:"
+ + Bytes.toString(edgeKeyValue.getValue()));
+
+
+ KeyValue foundKeyValue = doTestOfSeekBefore(p, fs, bottom, edgeKeyValue,
+ cacheConf);
+ if (i % 2 == 1) {
+ assertEquals(items.get(edgeNum - 1), foundKeyValue);
+ } else {
+ assertEquals(items.get(edgeNum - 1 - VALUENUM / 2), foundKeyValue);
+ }
+
+ foundKeyValue = doTestOfSeekBefore(p, fs, top, edgeKeyValue, cacheConf);
+ if (i % 2 == 1) {
+ assertEquals(
+ edgeNum - 1 - VALUENUM / 2 < 0 ? null : items.get(edgeNum - 1
+ - VALUENUM / 2), foundKeyValue);
+ } else {
+ assertEquals(items.get(edgeNum - 1), foundKeyValue);
+ }
+ }
+
+ // Try and seek before the first thing.
+ KeyValue foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0),
+ cacheConf);
+ assertNull(foundKeyValue);
+
+ // Try and seek before the second thing in the top and bottom.
+ foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf);
+ assertNull(foundKeyValue);
+
+ foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf);
+ assertEquals(items.get(0), foundKeyValue);
+ }
+
+ @Test
+ public void testFirstLastKey() throws IOException {
+ HBaseTestingUtility test_util = new HBaseTestingUtility();
+ String root_dir = test_util.getDataTestDir("TestValueSplitHalfStoreFile")
+ .toString();
+ Path p = new Path(root_dir, "test");
+
+ Configuration conf = test_util.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ CacheConfig cacheConf = new CacheConfig(conf);
+
+ HFile.Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(fs, p)
+ .withBlockSize(1024).withComparator(KeyValue.KEY_COMPARATOR).create();
+
+ // write some things.
+ List items = genSomeKeys();
+ for (KeyValue kv : items) {
+ w.append(kv);
+ }
+ w.close();
+
+ byte[] midValue = _b(String.format("value_%04d", VALUENUM / 2));
+
+ Reference bottom = new Reference(midValue, Reference.Range.bottom);
+ Reference top = new Reference(midValue, Reference.Range.top);
+
+ // check bottom half
+ ValueSplitHalfStoreFileReader valueSplitHalfreader = new ValueSplitHalfStoreFileReader(
+ fs, p, cacheConf, bottom, DataBlockEncoding.NONE);
+ valueSplitHalfreader.loadFileInfo();
+ KeyValue theFirstKeyValue = null;
+ KeyValue theLastKeyValue = null;
+ for (KeyValue kv : items) {
+ if (Bytes.compareTo(kv.getValue(), midValue) < 0) {
+ if (theFirstKeyValue == null) {
+ theFirstKeyValue = kv;
+ }
+ theLastKeyValue = kv;
+ }
+ }
+ Bytes.equals(theFirstKeyValue.getKey(), valueSplitHalfreader.getFirstKey());
+ Bytes.equals(theLastKeyValue.getKey(), valueSplitHalfreader.getLastKey());
+
+ // check bottom half
+ valueSplitHalfreader = new ValueSplitHalfStoreFileReader(fs, p, cacheConf,
+ top, DataBlockEncoding.NONE);
+ valueSplitHalfreader.loadFileInfo();
+ theFirstKeyValue = null;
+ theLastKeyValue = null;
+ for (KeyValue kv : items) {
+ if (Bytes.compareTo(kv.getValue(), midValue) >= 0) {
+ if (theFirstKeyValue == null) {
+ theFirstKeyValue = kv;
+ }
+ theLastKeyValue = kv;
+ }
+ }
+ Bytes.equals(theFirstKeyValue.getKey(), valueSplitHalfreader.getFirstKey());
+ Bytes.equals(theLastKeyValue.getKey(), valueSplitHalfreader.getLastKey());
+
+ }
+
+ private KeyValue doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom,
+ KeyValue seekBefore, CacheConfig cacheConfig) throws IOException {
+ final ValueSplitHalfStoreFileReader valueSplitHalfreader = new ValueSplitHalfStoreFileReader(fs, p,
+ cacheConfig, bottom, DataBlockEncoding.NONE);
+ valueSplitHalfreader.loadFileInfo();
+ final HFileScanner scanner = valueSplitHalfreader.getScanner(false, false);
+ if (scanner.seekBefore(seekBefore.getKey())) {
+ return scanner.getKeyValue();
+ }
+ return null;
+ }
+
+ private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom,
+ CacheConfig cacheConf, byte[] midValue) throws IOException {
+ final ValueSplitHalfStoreFileReader valueSplitHalfreader = new ValueSplitHalfStoreFileReader(
+ fs, p, cacheConf, bottom, DataBlockEncoding.NONE);
+ valueSplitHalfreader.loadFileInfo();
+ final HFileScanner scanner = valueSplitHalfreader.getScanner(false, false);
+
+ scanner.seekTo();
+ KeyValue curr;
+ do {
+ curr = scanner.getKeyValue();
+ assertEquals(Bytes.compareTo(curr.getValue(), midValue) >= 0,
+ bottom.getFileRegion() == Range.top);
+ KeyValue reseekKv = getLastOnCol(curr);
+ int ret = scanner.reseekTo(reseekKv.getKey());
+ assertTrue("reseek to returned: " + ret, ret > 0);
+ assertEquals(
+ Bytes.compareTo(scanner.getKeyValue().getValue(), midValue) >= 0,
+ bottom.getFileRegion() == Range.top);
+ } while (scanner.next());
+
+ int ret = scanner.reseekTo(getLastOnCol(curr).getKey());
+ // System.out.println("Last reseek: " + ret);
+ assertTrue(ret > 0);
+
+ valueSplitHalfreader.close(true);
+ }
+
+ private KeyValue getLastOnCol(KeyValue curr) {
+ return KeyValue.createLastOnRow(curr.getBuffer(), curr.getRowOffset(),
+ curr.getRowLength(), curr.getBuffer(), curr.getFamilyOffset(),
+ curr.getFamilyLength(), curr.getBuffer(), curr.getQualifierOffset(),
+ curr.getQualifierLength());
+ }
+
+ private static final int SIZE = 1000;
+ private static final int VALUENUM = 100;
+
+ private static byte[] _b(String s) {
+ return Bytes.toBytes(s);
+ }
+
+ private List genSomeKeys() {
+ List ret = new ArrayList(SIZE);
+ for (int i = 0; i < SIZE; i++) {
+ KeyValue kv = new KeyValue(_b(String.format("row_%04d", i)),
+ _b("family"), _b("qualifier"), 1000, // timestamp
+ _b(String.format("value_%04d", i % VALUENUM)));
+ ret.add(kv);
+ }
+ return ret;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 239702)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -97,6 +97,7 @@
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
@@ -3910,40 +3911,58 @@
isMetaTable = true;
}
- List> putsWithLocks =
- Lists.newArrayListWithCapacity(puts.size());
for (Action a : puts) {
Put p = (Put) a.getAction();
dataLen = dataLen + (int) p.getWritableSize();
- Integer lock;
- try {
- lock = getLockFromId(p.getLockId());
- } catch (UnknownRowLockException ex) {
- response.add(regionName, a.getOriginalIndex(), ex);
- continue;
- }
- putsWithLocks.add(new Pair(p, lock));
}
+ if (!region.hasAssistantStore()) {
+ List> putsWithLocks = Lists
+ .newArrayListWithCapacity(puts.size());
+ for (Action a : puts) {
+ Put p = (Put) a.getAction();
+ Integer lock;
+ try {
+ lock = getLockFromId(p.getLockId());
+ } catch (UnknownRowLockException ex) {
+ response.add(regionName, a.getOriginalIndex(), ex);
+ continue;
+ }
+ putsWithLocks.add(new Pair(p, lock));
+ }
- OperationStatus[] codes =
- region.put(putsWithLocks.toArray(new Pair[]{}));
+ OperationStatus[] codes = region.put(putsWithLocks
+ .toArray(new Pair[] {}));
- for (i = 0; i < codes.length; i++) {
- OperationStatus code = codes[i];
+ for (i = 0; i < codes.length; i++) {
+ OperationStatus code = codes[i];
- Action theAction = puts.get(i);
- Object result = null;
+ Action theAction = puts.get(i);
+ Object result = null;
- if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
- result = new Result();
- } else if (code.getOperationStatusCode()
- == OperationStatusCode.SANITY_CHECK_FAILURE) {
- result = new DoNotRetryIOException(code.getExceptionMsg());
+ if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
+ result = new Result();
+ } else if (code.getOperationStatusCode() == OperationStatusCode.SANITY_CHECK_FAILURE) {
+ result = new DoNotRetryIOException(code.getExceptionMsg());
+ }
+ // FAILURE && NOT_RUN becomes null, aka: need to run again.
+ response.add(regionName, theAction.getOriginalIndex(), result);
}
- // FAILURE && NOT_RUN becomes null, aka: need to run again.
-
- response.add(regionName, theAction.getOriginalIndex(), result);
+ } else {
+ // map of rows to lockid, sorted to avoid deadlocks
+ SortedMap rowsToLockId = new TreeMap(
+ Bytes.BYTES_COMPARATOR);
+ List mutations = new ArrayList();
+ for (Action a : puts) {
+ Put p = (Put) a.getAction();
+ Integer lock = getLockFromId(p.getLockId());
+ rowsToLockId.put(a.getAction().getRow(), lock);
+ mutations.add(p);
+ }
+ region.mutateRowsWithLocks(mutations, rowsToLockId);
+ for (Action a : puts) {
+ response.add(regionName, a.getOriginalIndex(), new Result());
+ }
}
long took = System.currentTimeMillis() - start;
Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 239702)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy)
@@ -22,7 +22,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -30,14 +32,21 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@@ -347,7 +356,10 @@
AtomicLong timeStamps = new AtomicLong(0);
AtomicInteger failures = new AtomicInteger(0);
- final List rowsToLock = Arrays.asList(row, row2);
+ final TreeMap rowsToLockId = new TreeMap(
+ Bytes.BYTES_COMPARATOR);
+ rowsToLockId.put(row, null);
+ rowsToLockId.put(row2, null);
// create all threads
for (int i = 0; i < numThreads; i++) {
all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
@@ -380,7 +392,7 @@
p.add(fam1, qual1, value2);
mrm.add(p);
}
- region.mutateRowsWithLocks(mrm, rowsToLock);
+ region.mutateRowsWithLocks(mrm, rowsToLockId);
op ^= true;
// check: should always see exactly one column
Scan s = new Scan(row);
Index: src/main/java/org/apache/hadoop/hbase/io/ValueSplitHalfStoreFileReader.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/ValueSplitHalfStoreFileReader.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/io/ValueSplitHalfStoreFileReader.java (revision 0)
@@ -0,0 +1,272 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is similar to
+ * {@link org.apache.hadoop.hbase.io.HalfStoreFileReader}, the difference is
+ * that the half store file is decided by the value rather than key.
+ *
+ * For example, suppose the store file includes the following KeyValues:
+ * r1/c1:q1/v1
+ * r2/c1:q1/v3
+ * r3/c1:q1/v1
+ * r4/c1:q1/v1
+ * r5/c1:q1/v3
+ *
+ * With the split key 'v2',
+ * the 'bottom' half file includes the following KeyValues:
+ * r1/c1:q1/v1
+ * r3/c1:q1/v1
+ * r4/c1:q1/v1
+ *
+ * the 'top' half file includes the following KeyValues:
+ * r2/c1:q1/v3
+ * r5/c1:q1/v3
+ *
+ */
+public class ValueSplitHalfStoreFileReader extends HalfStoreFileReader {
+ private final Log LOG = LogFactory.getLog(ValueSplitHalfStoreFileReader.class);
+
+ protected final byte[] splitvalue;
+ private byte[] lastKey = null;
+ private boolean lastKeySeeked = false;
+
+ /**
+ *
+ * @param fs
+ * @param p
+ * @param cacheConf
+ * @param r
+ * @param preferredEncodingInCache
+ * @throws IOException
+ */
+ public ValueSplitHalfStoreFileReader(final FileSystem fs, final Path p,
+ final CacheConfig cacheConf, final Reference r,
+ DataBlockEncoding preferredEncodingInCache) throws IOException {
+ super(fs, p, cacheConf, r, preferredEncodingInCache);
+ splitvalue = KeyValue.createKeyValueFromKey(splitkey).getRow();
+ }
+
+ /**
+ * Creates a assistant half file reader for a hfile referred to by an
+ * hfilelink.
+ * @param fs
+ * @param p
+ * @param link
+ * @param cacheConf
+ * @param r
+ * @param preferredEncodingInCache
+ * @throws IOException
+ */
+ public ValueSplitHalfStoreFileReader(FileSystem fs, Path p, HFileLink link,
+ CacheConfig cacheConf, Reference r,
+ DataBlockEncoding preferredEncodingInCache) throws IOException {
+ super(fs, p, link, cacheConf, r, preferredEncodingInCache);
+ splitvalue = KeyValue.createKeyValueFromKey(splitkey).getRow();
+ }
+
+ @Override
+ public byte[] getLastKey() {
+ if (!lastKeySeeked) {
+ KeyValue lastKV = KeyValue.createLastOnRow(getHFileReader()
+ .getLastRowKey());
+ // Get a scanner that caches the block and that uses pread.
+ HFileScanner scanner = getScanner(true, true);
+ try {
+ if (scanner.seekBefore(lastKV.getBuffer(), lastKV.getKeyOffset(),
+ lastKV.getKeyLength())) {
+ this.lastKey = Bytes.toBytes(scanner.getKey());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed seekBefore " + Bytes.toStringBinary(lastKV.getKey()),
+ e);
+ }
+ }
+ return this.lastKey;
+ }
+
+ @Override
+ public HFileScanner getScanner(final boolean cacheBlocks,
+ final boolean pread, final boolean isCompaction) {
+ final HFileScanner s = getHFileReader().getScanner(cacheBlocks, pread,
+ isCompaction);
+ return new HFileScanner() {
+ final HFileScanner delegate = s;
+
+ @Override
+ public ByteBuffer getKey() {
+ return delegate.getKey();
+ }
+
+ @Override
+ public String getKeyString() {
+ return delegate.getKeyString();
+ }
+
+ @Override
+ public ByteBuffer getValue() {
+ return delegate.getValue();
+ }
+
+ @Override
+ public String getValueString() {
+ return delegate.getValueString();
+ }
+
+ @Override
+ public KeyValue getKeyValue() {
+ return delegate.getKeyValue();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ while (delegate.next()) {
+ if (isCurrentKVValid()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key) throws IOException {
+ return seekBefore(key, 0, key.length);
+ }
+
+ @Override
+ public boolean seekBefore(byte[] key, int offset, int length)
+ throws IOException {
+ byte[] seekKey = key;
+ int seekKeyOffset = offset;
+ int seekKeyLength = length;
+ while (delegate.seekBefore(seekKey, seekKeyOffset, seekKeyLength)) {
+ if (isCurrentKVValid()) {
+ return true;
+ }
+ ByteBuffer curKey = getKey();
+ if (curKey == null) return false;
+ seekKey = curKey.array();
+ seekKeyOffset = curKey.arrayOffset();
+ seekKeyLength = curKey.limit();
+ }
+ return false;
+ }
+
+ private boolean isCurrentKVValid() {
+ ByteBuffer value = getValue();
+ if (!top) {
+ // Current value < split key, it belongs to bottom, return true
+ if (Bytes.compareTo(value.array(), value.arrayOffset(),
+ value.limit(), splitvalue, 0, splitvalue.length) < 0) {
+ return true;
+ }
+ } else {
+ if (Bytes.compareTo(value.array(), value.arrayOffset(),
+ value.limit(), splitvalue, 0, splitvalue.length) >= 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean seekTo() throws IOException {
+ boolean b = delegate.seekTo();
+ if (!b) {
+ return b;
+ }
+
+ if (isCurrentKVValid()) {
+ return true;
+ }
+
+ return next();
+ }
+
+ @Override
+ public int seekTo(byte[] key) throws IOException {
+ return seekTo(key, 0, key.length);
+ }
+
+ public int seekTo(byte[] key, int offset, int length) throws IOException {
+ int b = delegate.seekTo(key, offset, length);
+ if (b < 0) {
+ return b;
+ } else {
+ if (isCurrentKVValid()) {
+ return b;
+ } else {
+ boolean existBefore = seekBefore(key, offset, length);
+ if (existBefore) {
+ return 1;
+ }
+ return -1;
+ }
+ }
+ }
+
+ @Override
+ public int reseekTo(byte[] key) throws IOException {
+ return reseekTo(key, 0, key.length);
+ }
+
+ @Override
+ public int reseekTo(byte[] key, int offset, int length)
+ throws IOException {
+ int b = delegate.reseekTo(key, offset, length);
+ if (b < 0) {
+ return b;
+ } else {
+ if (isCurrentKVValid()) {
+ return b;
+ } else {
+ boolean existBefore = seekBefore(key, offset, length);
+ if (existBefore) {
+ return 1;
+ }
+ return -1;
+ }
+ }
+ }
+
+ public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
+ return this.delegate.getReader();
+ }
+
+ public boolean isSeeked() {
+ return this.delegate.isSeeked();
+ }
+ };
+ }
+
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/AssistantStore.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/AssistantStore.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/AssistantStore.java (revision 0)
@@ -0,0 +1,151 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.regex.Matcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.assistant.Assistant;
+
+/**
+ * An assistant store is used to store data with another organization which is
+ * defined as configuration. It means each region could have several
+ * organizations of data using assistant stores. We could use it to speed up
+ * some read(scan) with filter.
+ *
+ * If using the assistant store in the region, each original KeyValue would has
+ * zero or some corresponding KeyValue(s) in assistant store, the corresponding
+ * KeyValue's row is redefined, but its value is the same as original KeyValue's
+ * row.
+ *
+ * For example, if a region includes the following KeyValues:
+ * r1/c1:q1/v1
+ * r2/c1:q1/v2
+ * r3/c1:q1/v1
+ * r4/c1:q1/v3
+ * r5/c1:q1/v1
+ *
+ * we could use an assistant store for it, include the following kvs:
+ * v1/c1:q1/r1
+ * v1/c1:q1/r3
+ * v1/c1:q1/r5
+ * v2/c1:q1/r2
+ * v3/c1:q1/r4
+ *
+ *
+ * when scanning with the ColumnValueFilter, the assistant store could speed
+ * up this read.
+ */
+public class AssistantStore extends Store {
+ static final Log LOG = LogFactory.getLog(AssistantStore.class);
+
+ private final Assistant assistant;
+
+ /**
+ * Constructor
+ * @param basedir basedir qualified path under which the region
+ * directory;generally the table subdirectory lives;
+ * @param region
+ * @param family HColumnDescriptor for this column
+ * @param fs file system object
+ * @param conf configuration object. Can be null.
+ * @param assistantConf
+ * @throws IOException
+ */
+ protected AssistantStore(Path basedir, HRegion region,
+ HColumnDescriptor family, FileSystem fs, Configuration conf,
+ String assistantConf) throws IOException {
+ super(basedir, region, family, fs, conf);
+ Matcher matcher = HConstants.ASSISTANT_CONF_VALUE_PATTERN
+ .matcher(assistantConf);
+ if (matcher.matches()) {
+ this.assistant = getAssistantByName(matcher.group(1), matcher.group(2));
+ } else {
+ throw new RuntimeException("Assistant configuration " + assistantConf
+ + " does not match pattern");
+ }
+
+ }
+
+ private Assistant getAssistantByName(String assistantClassName, String argStr) {
+ Class extends Assistant> assistantClass = null;
+ try {
+ assistantClass = getClass(assistantClassName, Assistant.class);
+ Constructor extends Assistant> c = assistantClass.getConstructor();
+ return c.newInstance().initialize(argStr);
+ } catch (Exception e) {
+ String msg = "Failed construction of assistant for" + assistantClassName
+ + " with argument = " + argStr;
+ LOG.warn(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private Class extends U> getClass(String name, Class xface) {
+ try {
+ Class> theClass = Class.forName(name);
+ if (theClass != null && !xface.isAssignableFrom(theClass))
+ throw new RuntimeException(theClass + " not " + xface.getName());
+ else if (theClass != null)
+ return theClass.asSubclass(xface);
+ else
+ return null;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ @Override
+ public byte[] getSplitPoint() {
+ // Returns null to indicate this store can't decide the region's split point
+ return null;
+ }
+
+ @Override
+ public boolean isAssistant() {
+ return true;
+ }
+
+ /**
+ * Generate the assistant data with the given mutations
+ * @param mutations
+ * @return a collection of generated mutations
+ * @throws IOException
+ */
+ @Override
+ public Collection generateAssistantData(
+ Collection mutations) throws IOException {
+ Collection generatedMutations = assistant.assist(
+ this.getHRegion(), mutations, this.getFamily().getName());
+ // LOG.debug("generatedMutations:" + generatedMutations);
+ return generatedMutations;
+ }
+
+}
Index: src/main/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 239702)
+++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy)
@@ -639,6 +639,9 @@
"(" + CP_HTD_ATTR_VALUE_PARAM_KEY_PATTERN + ")=(" +
CP_HTD_ATTR_VALUE_PARAM_VALUE_PATTERN + "),?");
+ public static final Pattern ASSISTANT_CONF_VALUE_PATTERN = Pattern
+ .compile("^([^\\|]+)\\|(.*)$");
+
/** The delay when re-trying a socket operation in a loop (HBASE-4712) */
public static final int SOCKET_RETRY_WAIT_MS = 200;
Index: src/main/java/org/apache/hadoop/hbase/regionserver/assistant/Assistant.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/assistant/Assistant.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/assistant/Assistant.java (revision 0)
@@ -0,0 +1,117 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.assistant;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Abstract class for objects that generate assistant data when region has
+ * assistant store(s).
+ *
+ * Assistant instances are created one per assistant store.
+ *
+ * When implementing your own assistants, please implement the methods
+ * {@link #assist(HRegion, Mutation)} and {@link #getName()}.
+ *
+ * NOTE: For each generated assistant mutation, its KeyValues' value must be the
+ * same as the original corresponding mutation's row. Thus, when region split,
+ * we could make sure the daughter region has its correct assistant data.
+ */
+public abstract class Assistant {
+
+ /**
+ * Help the given region to get some assistant data as the given mutations.
+ *
+ * We will check the generated mutations to make sure the values of generated
+ * KeyValues are same as the original mutation's row
+ * @param store
+ * @param mutations
+ * @param assistanStoreName name of assistant store
+ * @return a collection of mutations
+ * @throws IOException
+ */
+ public final Collection assist(final HRegion region,
+ final Collection mutations, final byte[] assistanStoreName)
+ throws IOException {
+ List generatedMutations = new ArrayList();
+ for (Mutation mutation : mutations) {
+ Collection muts = assist(region, mutation, assistanStoreName);
+ checkValue(mutation.getRow(), muts);
+ generatedMutations.addAll(muts);
+ }
+ return generatedMutations;
+ }
+
+ /**
+ * Check whether each KeyValue's value in mutations is the same as the given
+ * expected value
+ * @param expectedValue
+ * @param toCheckMutations
+ */
+ private void checkValue(byte[] expectedValue, Collection toCheckMutations) {
+ for (Mutation toCheckMutation : toCheckMutations) {
+ for (List kvList : toCheckMutation.getFamilyMap().values()) {
+ for (KeyValue kv : kvList) {
+ if (!Bytes.equals(expectedValue, 0, expectedValue.length,
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())) {
+ throw new IllegalArgumentException("Assistant class " + getName()
+ + " is not legal");
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Help the given region to get some assistant data as the given mutation
+ *
+ * This method would be implemented in kinds of Assistants as use cases
+ *
+ * NOTE: The generated mutations' KeyValue must has the same value as given
+ * mutation's row
+ * @param region
+ * @param mutation
+ * @param assistanStoreName name of assistant store
+ * @return a collection of mutations
+ * @throws IOException
+ */
+ protected abstract Collection assist(final HRegion region,
+ final Mutation mutation, final byte[] assistanStoreName)
+ throws IOException;
+
+ /**
+ * @return the name of this Assistant
+ */
+ public abstract String getName();
+
+ /**
+ * Initialize the instance as the given string
+ * @return the own Assistant objec
+ */
+ public abstract Assistant initialize(String argumentStr);
+
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/assistant/RowValueSwapAssistant.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/assistant/RowValueSwapAssistant.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/assistant/RowValueSwapAssistant.java (revision 0)
@@ -0,0 +1,218 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.assistant;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This assistant will swap the row and value, it could be consider as a simple
+ * basic index assistant
+ */
+public class RowValueSwapAssistant extends Assistant {
+ private static final Pattern ARGUMENT_PATTERN = Pattern
+ .compile("^FAMILY=(.*),QUALIFIER=(.*)$");
+ // What family and qualifier we want to swap row, value
+ private byte[] family;
+ private byte[] qualifier;
+
+ /** default constructor */
+ public RowValueSwapAssistant() {
+ }
+
+ @Override
+ protected Collection assist(final HRegion region,
+ final Mutation mutation, final byte[] assistanStoreName)
+ throws IOException {
+ List generatedMutations = new ArrayList();
+ Map> familyMap = mutation.getFamilyMap();
+ if (mutation instanceof Put) {
+ for (Map.Entry> entry : familyMap.entrySet()) {
+ for (KeyValue kv : entry.getValue()) {
+ if (ignoreKV(kv)) continue;
+ Put p = new Put(Bytes.add(kv.getValue(), kv.getRow()));
+ p.add(assistanStoreName, kv.getQualifier(), kv.getRow());
+ generatedMutations.add(p);
+ }
+ }
+ } else if (mutation instanceof Delete) {
+ if(familyMap.isEmpty()){
+ for (byte[] tableFamily : region.getTableDesc().getFamiliesKeys()) {
+ ((Delete) mutation).deleteFamily(tableFamily, mutation.getTimeStamp());
+ }
+ }
+ for (Map.Entry> entry : familyMap.entrySet()) {
+ Iterator kvIterator = entry.getValue().iterator();
+ while (kvIterator.hasNext()) {
+ KeyValue kv = kvIterator.next();
+ if(!kv.isDelete()) continue;
+ // Create the Get for the corresponding delete
+ Get get = new Get(kv.getRow());
+ boolean isLatestTimestamp = kv.isLatestTimestamp();
+ if (kv.getType() == KeyValue.Type.Delete.getCode()) {
+ if (ignoreKV(kv)) continue;
+ get.addColumn(kv.getFamily(), kv.getQualifier());
+ if (!isLatestTimestamp) {
+ get.setTimeStamp(kv.getTimestamp());
+ }
+ } else if (kv.getType() == KeyValue.Type.DeleteColumn.getCode()) {
+ if (ignoreKV(kv)) continue;
+ get.addColumn(kv.getFamily(), kv.getQualifier()).setMaxVersions();
+ if (!isLatestTimestamp) {
+ get.setTimeRange(0, kv.getTimestamp() + 1);
+ }
+ } else if (kv.getType() == KeyValue.Type.DeleteFamily.getCode()) {
+ if (ignoreFamily(kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength())) {
+ continue;
+ }
+ get.addFamily(kv.getFamily()).setMaxVersions();
+ if (!isLatestTimestamp) {
+ get.setTimeRange(0, kv.getTimestamp() + 1);
+ }
+ }
+
+ Result result = region.get(get, null);
+ if (result.isEmpty()) {
+ kvIterator.remove();
+ continue;
+ }
+
+ // Create the delete on assistant store according to the result
+
+ boolean updateTs = true;
+ for (KeyValue existedKV : result.raw()) {
+ long tsOfExisted = existedKV.getTimestamp();
+ // specify the time stamp for delete to prevent new putting
+ byte[] tsBytes = Bytes.toBytes(tsOfExisted);
+ if (updateTs) {
+ kv.updateLatestStamp(tsBytes);
+ updateTs = false;
+ }
+ if (ignoreQualifier(existedKV.getQualifierArray(),
+ existedKV.getQualifierOffset(), existedKV.getQualifierLength())) {
+ continue;
+ }
+
+ byte[] assistantRow = Bytes.add(existedKV.getValue(),
+ existedKV.getRow());
+ Delete assistantDelete = new Delete(assistantRow);
+ assistantDelete.addDeleteMarker(new KeyValue(assistantRow,
+ assistanStoreName, existedKV.getQualifier(), tsOfExisted,
+ KeyValue.Type.Delete, existedKV.getRow()));
+ generatedMutations.add(assistantDelete);
+ }
+ }
+ }
+ }
+ return generatedMutations;
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getName();
+ }
+
+ /**
+ * Check whether the given KeyValue's family and qualifier is wanted
+ * @param kv
+ * @return true if ignore this kv.
+ */
+ private boolean ignoreKV(KeyValue kv) {
+ if (ignoreFamily(kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength())) {
+ return true;
+ }
+ if (ignoreFamily(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength())) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Check whether the given family is wanted
+ * @param checkFamily
+ * @return true if ignore the given family.
+ */
+ private boolean ignoreFamily(byte[] checkFamily, int offset, int len) {
+ if (this.family != null
+ && !Bytes.equals(this.family, 0, this.family.length, checkFamily,
+ offset, len)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Check whether the given family is wanted
+ * @param checkFamily
+ * @return true if ignore the given family.
+ */
+ private boolean ignoreQualifier(byte[] checkQualifier, int offset, int len) {
+ if (this.qualifier != null && !Bytes.equals(this.qualifier, 0, this.qualifier.length,
+ checkQualifier, offset, len)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Assistant initialize(String argumentStr) {
+ Matcher matcher = ARGUMENT_PATTERN.matcher(argumentStr);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Argument:" + argumentStr
+ + " is not suited for " + getName());
+ }
+ String familyStr = matcher.group(1);
+ String qualStr = matcher.group(2);
+ this.family = familyStr.isEmpty() ? null : Bytes.toBytesBinary(familyStr);
+ this.qualifier = qualStr.isEmpty() ? null : Bytes.toBytesBinary(qualStr);
+ return this;
+ }
+
+ /**
+ * Get the configuration string of RowValueSwapAssistant when setting
+ * assistant in HColumnDescriptor
+ * @param family
+ * @param qualifier
+ * @return the configuration string
+ */
+ public static String getAssistantConfString(String family, String qualifier) {
+ return RowValueSwapAssistant.class.getName() + "|" + "FAMILY="
+ + ((family == null || family.isEmpty()) ? "" : family) + ",QUALIFIER="
+ + ((qualifier == null || qualifier.isEmpty()) ? "" : qualifier);
+ }
+
+}
Index: src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java (revision 239702)
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java (working copy)
@@ -19,8 +19,8 @@
import java.io.IOException;
import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.SortedMap;
+import java.util.TreeMap;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -42,8 +42,9 @@
// get the coprocessor environment
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment();
- // set of rows to lock, sorted to avoid deadlocks
- SortedSet rowsToLock = new TreeSet(Bytes.BYTES_COMPARATOR);
+ // map of rows to lock, sorted to avoid deadlocks
+ SortedMap rowsToLockId = new TreeMap(
+ Bytes.BYTES_COMPARATOR);
HRegionInfo regionInfo = env.getRegion().getRegionInfo();
for (Mutation m : mutations) {
@@ -51,7 +52,7 @@
if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
String msg = "Requested row out of range '"
+ Bytes.toStringBinary(m.getRow()) + "'";
- if (rowsToLock.isEmpty()) {
+ if (rowsToLockId.isEmpty()) {
// if this is the first row, region might have moved,
// allow client to retry
throw new WrongRegionException(msg);
@@ -60,9 +61,9 @@
throw new DoNotRetryIOException(msg);
}
}
- rowsToLock.add(m.getRow());
+ rowsToLockId.put(m.getRow(), null);
}
// call utility method on region
- env.getRegion().mutateRowsWithLocks(mutations, rowsToLock);
+ env.getRegion().mutateRowsWithLocks(mutations, rowsToLockId);
}
}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 241368)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy)
@@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink;
@@ -426,7 +427,7 @@
completionService.submit(new Callable() {
public StoreFile call() throws IOException {
StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
- family.getBloomFilterType(), dataBlockEncoder);
+ family.getBloomFilterType(), dataBlockEncoder, isAssistant());
passSchemaMetricsTo(storeFile);
storeFile.createReader();
return storeFile;
@@ -608,7 +609,7 @@
StoreFile.rename(fs, srcPath, dstPath);
StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
+ this.family.getBloomFilterType(), this.dataBlockEncoder, isAssistant());
passSchemaMetricsTo(sf);
StoreFile.Reader r = sf.createReader();
@@ -864,7 +865,7 @@
status.setStatus("Flushing " + this + ": reopening flushed file");
StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
+ this.family.getBloomFilterType(), this.dataBlockEncoder, isAssistant());
passSchemaMetricsTo(sf);
StoreFile.Reader r = sf.createReader();
@@ -1847,7 +1848,7 @@
try {
storeFile = new StoreFile(this.fs, path, this.conf,
this.cacheConf, this.family.getBloomFilterType(),
- NoOpDataBlockEncoder.INSTANCE);
+ NoOpDataBlockEncoder.INSTANCE, isAssistant());
passSchemaMetricsTo(storeFile);
storeFile.createReader();
} catch (IOException e) {
@@ -1899,7 +1900,7 @@
" to " + destPath);
}
result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
+ this.family.getBloomFilterType(), this.dataBlockEncoder, isAssistant());
passSchemaMetricsTo(result);
result.createReader();
}
@@ -2538,6 +2539,15 @@
return comparator;
}
+ public boolean isAssistant() {
+ return false;
+ }
+
+ public Collection generateAssistantData(
+ Collection mutations) throws IOException {
+ return null;
+ }
+
/**
* Immutable information for scans over a store.
*/