diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 412abf6..ba4beef 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1763,6 +1763,71 @@ public class HRegion implements HeapSize { // , Writable{ } } + /* + * @param Append The passed append is modified by this method. + * @param now The timestamp. + * @param advanceMemstoreRead Whether to wait for previous memstoreRead transaction to + * finish. + */ + boolean prepareAppend(Append append, long now, boolean advanceMemstoreRead) throws IOException { + boolean advancedMemstoreRead = false; + if (advanceMemstoreRead) { + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + advancedMemstoreRead = true; + } + byte[] row = append.getRow(); + // The code below has been borrowed from the append function. + // Process each family + for (Map.Entry> family : append.getFamilyMap() + .entrySet()) { + List kvs = new ArrayList(family.getValue().size()); + + // Get previous values for all columns in this family + Get get = new Get(row); + for (KeyValue kv : family.getValue()) { + get.addColumn(family.getKey(), kv.getQualifier()); + } + List results = get(get, false); + List newKvs = new ArrayList(family.getValue().size()); + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the append value + + // Avoid as much copying as possible. Every byte is copied at most + // once. + // Would be nice if KeyValue had scatter/gather logic + int idx = 0; + for (KeyValue kv : family.getValue()) { + if (idx < results.size() + && results.get(idx).matchingQualifier(kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + KeyValue newKV; + KeyValue oldKv = results.get(idx); + // allocate an empty kv once. + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + oldKv.getValueLength() + kv.getValueLength()); + // copy in the value + System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + oldKv.getValueLength()); + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), + newKV.getValueOffset() + oldKv.getValueLength(), + kv.getValueLength()); + newKvs.add(newKV); + idx++; + } else { + newKvs.add(kv); + } + } + // Override the append's map. + family.setValue(newKvs); + } + return advancedMemstoreRead; + } + ////////////////////////////////////////////////////////////////////////////// // set() methods for client use. ////////////////////////////////////////////////////////////////////////////// @@ -4351,7 +4416,7 @@ public class HRegion implements HeapSize { // , Writable{ boolean walSyncSuccessful = false; List acquiredLocks = null; long addedSize = 0; - List mutations = new ArrayList(); + Map > mutations = new TreeMap >(); Collection rowsToLock = processor.getRowsToLock(); try { // 2. Acquire the row lock(s) @@ -4380,11 +4445,26 @@ public class HRegion implements HeapSize { // , Writable{ // 5. Get a mvcc write number writeEntry = mvcc.beginMemstoreInsert(); // 6. Apply to memstore - for (KeyValue kv : mutations) { - kv.setMemstoreTS(writeEntry.getWriteNumber()); - byte[] family = kv.getFamily(); - checkFamily(family); - addedSize += stores.get(family).add(kv); + for (Map.Entry > entry : mutations.entrySet()) { + // Do an upsert for append. + boolean doUpsert = entry.getKey() instanceof Append; + long readPoint = getSmallestReadPoint(); + for (KeyValue kv : entry.getValue()) { + byte[] family = kv.getFamily(); + Store store = stores.get(family); + kv.setMemstoreTS(writeEntry.getWriteNumber()); + if (doUpsert) { + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + addedSize += store.upsert(Arrays.asList(kv), readPoint); + } else { + addedSize += store.add(kv); + } + } else { + checkFamily(family); + addedSize += store.add(kv); + } + } } long txid = 0; @@ -4418,8 +4498,10 @@ public class HRegion implements HeapSize { // , Writable{ LOG.warn("Wal sync failed. Roll back " + mutations.size() + " memstore keyvalues for row(s):" + processor.getRowsToLock().iterator().next() + "..."); - for (KeyValue kv : mutations) { - stores.get(kv.getFamily()).rollback(kv); + for (Map.Entry > entry : mutations.entrySet()) { + for (KeyValue kv : entry.getValue()) { + stores.get(kv.getFamily()).rollback(kv); + } } } // 11. Roll mvcc forward @@ -4456,7 +4538,7 @@ public class HRegion implements HeapSize { // , Writable{ private void doProcessRowWithTimeout(final RowProcessor processor, final long now, final HRegion region, - final List mutations, + final Map > mutations, final WALEdit walEdit, final long timeout) throws IOException { // Short circuit the no time bound case. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index c962bef..36ee907 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -56,9 +58,10 @@ class MultiRowMutationProcessor extends BaseRowProcessor { @Override public void process(long now, HRegion region, - List mutationKvs, + Map > mutationKvs, WALEdit walEdit) throws IOException { byte[] byteNow = Bytes.toBytes(now); + boolean advancedMemstoreWrite = false; // Check mutations and apply edits to a single WALEdit for (Mutation m : mutations) { if (m instanceof Put) { @@ -70,15 +73,21 @@ class MultiRowMutationProcessor extends BaseRowProcessor { Delete d = (Delete) m; region.prepareDelete(d); region.prepareDeleteTimestamps(d.getFamilyMap(), byteNow); + } else if (m instanceof Append) { + region.checkFamilies(m.getFamilyMap().keySet()); + advancedMemstoreWrite = region.prepareAppend((Append)m, now, !advancedMemstoreWrite); } else { throw new DoNotRetryIOException( - "Action must be Put or Delete. But was: " + "Action must be Put or Delete or Append. But was: " + m.getClass().getName()); } + if (mutationKvs.get(m) == null) { + mutationKvs.put(m, new ArrayList()); + } for (List edits : m.getFamilyMap().values()) { boolean writeToWAL = m.getWriteToWAL(); for (KeyValue kv : edits) { - mutationKvs.add(kv); + mutationKvs.get(m).add(kv); if (writeToWAL) { walEdit.add(kv); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index 4be0cd3..8b899e3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @InterfaceAudience.Public @@ -78,7 +80,7 @@ public interface RowProcessor { */ void process(long now, HRegion region, - List mutations, + Map > mutations, WALEdit walEdit) throws IOException; /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 426a586..7c70126 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -39,9 +40,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; @@ -299,7 +302,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + Map > mutations, WALEdit walEdit) throws IOException { // Scan current counter List kvs = new ArrayList(); Scan scan = new Scan(row, row); @@ -318,7 +321,7 @@ public class TestRowProcessorEndpoint { KeyValue kv = new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter)); - mutations.add(kv); + mutations.put(new Put(row), Arrays.asList(kv)); walEdit.add(kv); // We can also inject some meta data to the walEdit @@ -377,7 +380,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + Map > mutations, WALEdit walEdit) throws IOException { List kvs = new ArrayList(); { // First scan to get friends of the person Scan scan = new Scan(row, row); @@ -457,7 +460,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + Map > mutations, WALEdit walEdit) throws IOException { // Override the time to avoid race-condition in the unit test caused by // inacurate timer on some machines @@ -493,9 +496,9 @@ public class TestRowProcessorEndpoint { KeyValue kvAdd = new KeyValue(rows[1 - i], kv.getFamily(), kv.getQualifier(), now, kv.getValue()); - mutations.add(kvDelete); + mutations.put(new Delete(rows[i]), Arrays.asList(kvDelete)); walEdit.add(kvDelete); - mutations.add(kvAdd); + mutations.put(new Put(rows[1 - i]), Arrays.asList(kvAdd)); walEdit.add(kvAdd); } } @@ -540,7 +543,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + Map > mutations, WALEdit walEdit) throws IOException { try { // Sleep for a long time so it timeout Thread.sleep(100 * 1000L);