diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7c145e0..eea848f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4820,110 +4820,6 @@ public class HRegion implements HeapSize { // , Writable{ return new Result(allKVs); } - /** - * @param row - * @param family - * @param qualifier - * @param amount - * @param writeToWAL - * @return The new value. - * @throws IOException - * @deprecated use {@link #increment(Increment, Integer, boolean)} - */ - @Deprecated - public long incrementColumnValue(byte [] row, byte [] family, - byte [] qualifier, long amount, boolean writeToWAL) - throws IOException { - // to be used for metrics - long before = EnvironmentEdgeManager.currentTimeMillis(); - - checkRow(row, "increment"); - boolean flush = false; - boolean wrongLength = false; - long txid = 0; - // Lock row - long result = amount; - startRegionOperation(); - this.writeRequestsCount.increment(); - try { - Integer lid = obtainRowLock(row); - this.updatesLock.readLock().lock(); - try { - Store store = stores.get(family); - - // Get the old value: - Get get = new Get(row); - get.addColumn(family, qualifier); - - // we don't want to invoke coprocessor in this case; ICV is wrapped - // in HRegionServer, so we leave getLastIncrement alone - List results = get(get, false); - - if (!results.isEmpty()) { - KeyValue kv = results.get(0); - if(kv.getValueLength() == Bytes.SIZEOF_LONG){ - byte [] buffer = kv.getBuffer(); - int valueOffset = kv.getValueOffset(); - result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); - } - else{ - wrongLength = true; - } - } - if(!wrongLength){ - // build the KeyValue now: - KeyValue newKv = new KeyValue(row, family, - qualifier, EnvironmentEdgeManager.currentTimeMillis(), - Bytes.toBytes(result)); - - // now log it: - if (writeToWAL) { - long now = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit walEdit = new WALEdit(); - walEdit.add(newKv); - // Using default cluster id, as this can only happen in the - // orginating cluster. A slave cluster receives the final value (not - // the delta) as a Put. - txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), - walEdit, HConstants.DEFAULT_CLUSTER_ID, now, - this.htableDescriptor); - } - - // Now request the ICV to the store, this will set the timestamp - // appropriately depending on if there is a value in memcache or not. - // returns the change in the size of the memstore from operation - long size = store.updateColumnValue(row, family, qualifier, result); - - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); - } - } finally { - this.updatesLock.readLock().unlock(); - releaseRowLock(lid); - } - if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock - } - } finally { - closeRegionOperation(); - } - - // do after lock - long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updateIncrementColumnValueMetrics(family, after - before); - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - if (wrongLength) { - throw new DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - return result; - } - - // // New HBASE-880 Helpers // diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 0982210..55f8bf6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -31,14 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.junit.experimental.categories.Category; @@ -110,32 +103,6 @@ public class TestAtomicOperation extends HBaseTestCase { } /** - * Test one increment command. - */ - public void testIncrementColumnValue() throws IOException { - LOG.info("Starting test testIncrementColumnValue"); - initHRegion(tableName, getName(), fam1); - - long value = 1L; - long amount = 3L; - - Put put = new Put(row); - put.add(fam1, qual1, Bytes.toBytes(value)); - region.put(put); - - long result = region.incrementColumnValue(row, fam1, qual1, amount, true); - - assertEquals(value+amount, result); - - HStore store = (HStore) region.getStore(fam1); - // ICV removes any extra values floating around in there. - assertEquals(1, store.memstore.kvset.size()); - assertTrue(store.memstore.snapshot.isEmpty()); - - assertICV(row, fam1, qual1, value+amount); - } - - /** * Test multi-threaded increments. */ public void testIncrementMultiThreads() throws IOException { @@ -223,7 +190,7 @@ public class TestAtomicOperation extends HBaseTestCase { private int count; - public Incrementer(HRegion region, + public Incrementer(HRegion region, int threadNumber, int amount, int numIncrements) { this.region = region; this.threadNumber = threadNumber; @@ -237,7 +204,9 @@ public class TestAtomicOperation extends HBaseTestCase { public void run() { for (int i=0; i= 0); - } - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } - - public void testIncrementColumnValue_UpdatingInPlace_Negative() - throws IOException { - this.region = initHRegion(tableName, getName(), fam1); - try { - long value = 3L; - long amount = -1L; - - Put put = new Put(row); - put.add(fam1, qual1, Bytes.toBytes(value)); - region.put(put); - - long result = region.incrementColumnValue(row, fam1, qual1, amount, true); - assertEquals(value+amount, result); - - assertICV(row, fam1, qual1, value+amount); - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } - - public void testIncrementColumnValue_AddingNew() - throws IOException { - this.region = initHRegion(tableName, getName(), fam1); - try { - long value = 1L; - long amount = 3L; - - Put put = new Put(row); - put.add(fam1, qual1, Bytes.toBytes(value)); - put.add(fam1, qual2, Bytes.toBytes(value)); - region.put(put); - - long result = region.incrementColumnValue(row, fam1, qual3, amount, true); - assertEquals(amount, result); - - Get get = new Get(row); - get.addColumn(fam1, qual3); - Result rr = region.get(get, null); - assertEquals(1, rr.size()); - - // ensure none of the other cols were incremented. - assertICV(row, fam1, qual1, value); - assertICV(row, fam1, qual2, value); - assertICV(row, fam1, qual3, amount); - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } - - public void testIncrementColumnValue_UpdatingFromSF() throws IOException { - this.region = initHRegion(tableName, getName(), fam1); - try { - long value = 1L; - long amount = 3L; - - Put put = new Put(row); - put.add(fam1, qual1, Bytes.toBytes(value)); - put.add(fam1, qual2, Bytes.toBytes(value)); - region.put(put); - - // flush to disk. - region.flushcache(); - - HStore store = (HStore) region.getStore(fam1); - assertEquals(0, store.memstore.kvset.size()); - - long r = region.incrementColumnValue(row, fam1, qual1, amount, true); - assertEquals(value+amount, r); - - assertICV(row, fam1, qual1, value+amount); - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } - - public void testIncrementColumnValue_AddingNewAfterSFCheck() - throws IOException { - this.region = initHRegion(tableName, getName(), fam1); - try { - long value = 1L; - long amount = 3L; - - Put put = new Put(row); - put.add(fam1, qual1, Bytes.toBytes(value)); - put.add(fam1, qual2, Bytes.toBytes(value)); - region.put(put); - region.flushcache(); - - HStore store = (HStore) region.getStore(fam1); - assertEquals(0, store.memstore.kvset.size()); - - long r = region.incrementColumnValue(row, fam1, qual3, amount, true); - assertEquals(amount, r); - - assertICV(row, fam1, qual3, amount); - - region.flushcache(); - - // ensure that this gets to disk. - assertICV(row, fam1, qual3, amount); - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } - - /** - * Added for HBASE-3235. - * - * When the initial put and an ICV update were arriving with the same timestamp, - * the initial Put KV was being skipped during {@link MemStore#upsert(KeyValue)} - * causing the iteration for matching KVs, causing the update-in-place to not - * happen and the ICV put to effectively disappear. - * @throws IOException - */ - public void testIncrementColumnValue_UpdatingInPlace_TimestampClobber() throws IOException { - this.region = initHRegion(tableName, getName(), fam1); - try { - long value = 1L; - long amount = 3L; - long now = EnvironmentEdgeManager.currentTimeMillis(); - ManualEnvironmentEdge mock = new ManualEnvironmentEdge(); - mock.setValue(now); - EnvironmentEdgeManagerTestHelper.injectEdge(mock); - - // verify we catch an ICV on a put with the same timestamp - Put put = new Put(row); - put.add(fam1, qual1, now, Bytes.toBytes(value)); - region.put(put); - - long result = region.incrementColumnValue(row, fam1, qual1, amount, true); - - assertEquals(value+amount, result); - - HStore store = (HStore) region.getStore(fam1); - // ICV should update the existing Put with the same timestamp - assertEquals(1, store.memstore.kvset.size()); - assertTrue(store.memstore.snapshot.isEmpty()); - - assertICV(row, fam1, qual1, value+amount); - - // verify we catch an ICV even when the put ts > now - put = new Put(row); - put.add(fam1, qual2, now+1, Bytes.toBytes(value)); - region.put(put); - - result = region.incrementColumnValue(row, fam1, qual2, amount, true); - - assertEquals(value+amount, result); - - store = (HStore) region.getStore(fam1); - // ICV should update the existing Put with the same timestamp - assertEquals(2, store.memstore.kvset.size()); - assertTrue(store.memstore.snapshot.isEmpty()); - - assertICV(row, fam1, qual2, value+amount); - EnvironmentEdgeManagerTestHelper.reset(); - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } - - public void testIncrementColumnValue_WrongInitialSize() throws IOException { - this.region = initHRegion(tableName, getName(), fam1); - try { - byte[] row1 = Bytes.add(Bytes.toBytes("1234"), Bytes.toBytes(0L)); - int row1Field1 = 0; - int row1Field2 = 1; - Put put1 = new Put(row1); - put1.add(fam1, qual1, Bytes.toBytes(row1Field1)); - put1.add(fam1, qual2, Bytes.toBytes(row1Field2)); - region.put(put1); - - long result; - try { - result = region.incrementColumnValue(row1, fam1, qual1, 1, true); - fail("Expected to fail here"); - } catch (Exception exception) { - // Expected. - } - - - assertICV(row1, fam1, qual1, row1Field1); - assertICV(row1, fam1, qual2, row1Field2); - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } - - public void testIncrement_WrongInitialSize() throws IOException { - this.region = initHRegion(tableName, getName(), fam1); - try { - byte[] row1 = Bytes.add(Bytes.toBytes("1234"), Bytes.toBytes(0L)); - long row1Field1 = 0; - int row1Field2 = 1; - Put put1 = new Put(row1); - put1.add(fam1, qual1, Bytes.toBytes(row1Field1)); - put1.add(fam1, qual2, Bytes.toBytes(row1Field2)); - region.put(put1); - Increment increment = new Increment(row1); - increment.addColumn(fam1, qual1, 1); - - //here we should be successful as normal - region.increment(increment, null, true); - assertICV(row1, fam1, qual1, row1Field1 + 1); - - //failed to increment - increment = new Increment(row1); - increment.addColumn(fam1, qual2, 1); - try { - region.increment(increment, null, true); - fail("Expected to fail here"); - } catch (Exception exception) { - // Expected. - } - assertICV(row1, fam1, qual2, row1Field2); - } finally { - HRegion.closeHRegion(this.region); - this.region = null; - } - } private void assertICV(byte [] row, byte [] familiy, byte[] qualifier,