Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1296937) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -139,17 +139,17 @@ } @Override - public void prePut(final ObserverContext e, + public void prePut(final ObserverContext e, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { } @Override - public void postPut(final ObserverContext e, + public void postPut(final ObserverContext e, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { } @Override - public void preDelete(final ObserverContext e, + public void preDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { } @@ -274,4 +274,17 @@ public void postWALRestore(ObserverContext env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { } + + @Override + public void preRollBackMemstore( + ObserverContext ctx, List putList) + throws IOException { + } + + @Override + public void postRollBackMemstore( + ObserverContext ctx, List putList) + throws IOException { + } + } Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1296937) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -276,12 +276,13 @@ * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - void prePut(final ObserverContext c, + void prePut(final ObserverContext c, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException; /** - * Called after the client stores a value. + * Called after the client stores a value and if the put + * is successful. *

* Call CoprocessorEnvironment#complete to skip any subsequent chained * coprocessors @@ -291,7 +292,7 @@ * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - void postPut(final ObserverContext c, + void postPut(final ObserverContext c, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException; @@ -308,7 +309,7 @@ * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - void preDelete(final ObserverContext c, + void preDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException; @@ -341,7 +342,7 @@ * @param compareOp the comparison operation * @param comparator the comparator * @param put data to put if check succeeds - * @param result + * @param result * @return the return value to return to client if bypassing default * processing * @throws IOException if an error occurred on the coprocessor @@ -388,7 +389,7 @@ * @param compareOp the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds - * @param result + * @param result * @return the value to return to client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ @@ -649,4 +650,28 @@ */ void postWALRestore(final ObserverContext ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; + + /** + * Called before rolling back the memstore incase the current set of puts + * failed + * + * @param ctx the environment provided by the region server + * @param putList list of puts + * @throws IOException + */ + void preRollBackMemstore( + final ObserverContext ctx, + final List putList) throws IOException; + + /** + * Called after rolling back the memstore incase the current set of puts + * failed + * + * @param ctx the environment provided by the region server + * @param putList list of puts + * @throws IOException + */ + void postRollBackMemstore( + final ObserverContext ctx, + final List putList) throws IOException; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1305831) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -2198,6 +2198,7 @@ this.log.sync(txid); } walSyncSuccessful = true; + // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ @@ -2228,7 +2229,33 @@ // if the wal sync was unsuccessful, remove keys from memstore if (!walSyncSuccessful) { + if (coprocessorHost != null) { + List putList = new ArrayList(); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // only for successful puts + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + continue; + } + Put p = batchOp.operations[i].getFirst(); + putList.add(p); + } + coprocessorHost.preRollBackMemStore(putList); + } rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); + + // call post roll back hook also. + if (coprocessorHost != null) { + List putList = new ArrayList(); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // only for successful puts + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + continue; + } + Put p = batchOp.operations[i].getFirst(); + putList.add(p); + } + coprocessorHost.postRollBackMemStore(putList); + } } if (w != null) mvcc.completeMemstoreInsert(w); Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1229039) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import java.io.IOException; @@ -666,6 +667,50 @@ return bypass; } + public boolean preRollBackMemStore(final List putList) + throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()) + .preRollBackMemstore(ctx, putList); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + public boolean postRollBackMemStore(final List putList) + throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postRollBackMemstore(ctx, + putList); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + /** * @param put The Put object * @param edit The WALEdit object. Index: src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1210745) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -29,6 +29,8 @@ import java.util.Arrays; import com.google.common.collect.ImmutableList; +import com.sun.jersey.api.container.filter.PostReplaceFilter; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue; @@ -85,6 +87,8 @@ boolean hadPostScannerClose = false; boolean hadPreScannerOpen = false; boolean hadPostScannerOpen = false; + boolean hadPreRollBackMemStore = false; + boolean hadPostRollBackMemStore = false; @Override public void preOpen(ObserverContext c) { @@ -258,7 +262,7 @@ } @Override - public void prePut(final ObserverContext c, + public void prePut(final ObserverContext c, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { Map> familyMap = put.getFamilyMap(); @@ -318,7 +322,7 @@ } @Override - public void preDelete(final ObserverContext c, + public void preDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { Map> familyMap = delete.getFamilyMap(); @@ -332,7 +336,7 @@ } @Override - public void postDelete(final ObserverContext c, + public void postDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { Map> familyMap = delete.getFamilyMap(); @@ -384,6 +388,20 @@ return result; } + @Override + public void preRollBackMemstore( + ObserverContext ctx, List putList) + throws IOException { + hadPreRollBackMemStore = true; + } + + @Override + public void postRollBackMemstore( + ObserverContext ctx, List putList) + throws IOException { + hadPostRollBackMemStore = true; + } + public boolean hadPreGet() { return hadPreGet; } @@ -430,4 +448,12 @@ public boolean hadDeleted() { return hadPreDeleted && hadPostDeleted; } + + public boolean hadPreRollBackMemStore() { + return hadPreRollBackMemStore; + } + + public boolean hadPostRollBackMemStore() { + return hadPostRollBackMemStore; + } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1298701) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -20,13 +20,18 @@ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -58,6 +64,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -79,6 +87,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; @@ -498,6 +507,7 @@ } OperationStatus[] codes = this.region.put(puts); + assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i] @@ -577,6 +587,63 @@ region.releaseRowLock(lockedRow); } + @Test + public void testBatchPutFail() throws IOException { + byte[] b = Bytes.toBytes(getName()); + byte[] cf = Bytes.toBytes(COLUMN_FAMILY); + byte[] qual = Bytes.toBytes("qual"); + byte[] val = Bytes.toBytes("val"); + Configuration conf = HBaseConfiguration.create(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver"); + initHRegion(b, getName(), conf, false, cf); + region.setCoprocessorHost(new RegionCoprocessorHost(region, null, conf)); + verifyMethodResult(SimpleRegionObserver.class, new String[] { "hadPreGet", + "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete", "hadPreBatchPut", + "hadPostBatchPut", "hadPostCompleteBatchPut", "hadPreRollBackMemStore", + "hadPostRollBackMemStore" }, null, new Boolean[] { false, false, false, + false, false, false, false, false, false, false }); + + LOG.info("First a batch put with all valid puts"); + final Put[] puts = new Put[10]; + for (int i = 0; i < 10; i++) { + puts[i] = new Put(Bytes.toBytes("row_" + i)); + puts[i].add(cf, qual, val); + } + try { + this.region.put(puts); + fail("IOException should be thrown."); + } catch (IOException e) { + + } + verifyMethodResult(SimpleRegionObserver.class, new String[] { "hadPreGet", + "hadPostGet", "hadPrePut", "hadPostPut", "hadDelete", "hadPreBatchPut", + "hadPostBatchPut", "hadPostCompleteBatchPut", "hadPreRollBackMemStore", + "hadPostRollBackMemStore" }, null, new Boolean[] { false, false, true, + false, false, true, false, true, true, true }); + } + + private void verifyMethodResult(Class c, String methodName[], + byte[] tableName, Object value[]) throws IOException { + try { + + RegionCoprocessorHost cph = region.getCoprocessorHost(); + + Coprocessor cp = cph.findCoprocessor(c.getName()); + assertNotNull(cp); + for (int i = 0; i < methodName.length; ++i) { + Method m = c.getMethod(methodName[i]); + Object o = m.invoke(cp); + assertTrue("Result of " + c.getName() + "." + methodName[i] + + " is expected to be " + value[i].toString() + ", while we get " + + o.toString(), o.equals(value[i])); + } + } catch (Exception e) { + throw new IOException(e.toString()); + } + } + + ////////////////////////////////////////////////////////////////////////////// // checkAndMutate tests ////////////////////////////////////////////////////////////////////////////// @@ -602,7 +669,7 @@ boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal), put, lockId, true); assertTrue(res); - + //Putting data in key put = new Put(row1); put.add(fam1, qf1, val1); @@ -3326,21 +3393,40 @@ return conf; } - private void initHRegion (byte [] tableName, String callingMethod, + private void initHRegion(byte[] tableName, String callingMethod, + byte[]... families) throws IOException { + initHRegion(tableName, callingMethod, HBaseConfiguration.create(), true, + families); + } + + private void initHRegion (byte [] tableName, String callingMethod, boolean walAppendSuccess, byte[] ... families) throws IOException { - initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); + initHRegion(tableName, callingMethod, HBaseConfiguration.create(), walAppendSuccess, families); } + private void initHRegion (byte [] tableName, String callingMethod, Configuration conf, byte [] ... families) throws IOException{ - initHRegion(tableName, null, null, callingMethod, conf, families); + initHRegion(tableName, null, null, callingMethod, conf, true, families); } + private void initHRegion (byte [] tableName, String callingMethod, + Configuration conf, boolean walAppendSuccess, byte [] ... families) + throws IOException{ + initHRegion(tableName, null, null, callingMethod, conf, walAppendSuccess, families); + } + private void initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, String callingMethod, Configuration conf, byte[]... families) throws IOException { + initHRegion(tableName, startKey, stopKey, callingMethod, conf, true, families); + } + + private void initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, + String callingMethod, Configuration conf, boolean walAppendSuccess, byte[]... families) + throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); @@ -3352,7 +3438,24 @@ throw new IOException("Failed delete of " + path); } } - region = HRegion.createHRegion(info, path, conf, htd); + Path tableDir = HTableDescriptor.getTableDir(path, info.getTableName()); + Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName()); + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(regionDir); + HLog hlog = null; + if(walAppendSuccess == false){ + hlog = new HLog(fs, new Path(regionDir, + HConstants.HREGION_LOGDIR_NAME), new Path(regionDir, + HConstants.HREGION_OLDLOGDIR_NAME), conf){ + @Override + public long appendNoSync(HRegionInfo info, byte[] tableName, + WALEdit edits, UUID clusterId, long now, HTableDescriptor htd) + throws IOException { + throw new IOException(); + } + }; + } + region = HRegion.createHRegion(info, path, conf, htd, hlog); } /**