Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1381982) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1770,8 +1770,7 @@ try { // All edits for the given row (across all column families) must happen atomically. - prepareDelete(delete); - internalDelete(delete, delete.getClusterId(), writeToWAL); + doBatchMutate(delete, lid); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1788,11 +1787,11 @@ */ void delete(Map> familyMap, UUID clusterId, boolean writeToWAL) throws IOException { - Delete delete = new Delete(); + Delete delete = new Delete(new byte[0]); delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setWriteToWAL(writeToWAL); - internalDelete(delete, clusterId, writeToWAL); + doBatchMutate(delete, null); } /** @@ -1850,65 +1849,6 @@ } /** - * @param delete The Delete command - * @param clusterId UUID of the originating cluster (for replication). - * @param writeToWAL - * @throws IOException - */ - private void internalDelete(Delete delete, UUID clusterId, - boolean writeToWAL) throws IOException { - Map> familyMap = delete.getFamilyMap(); - WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) { - return; - } - } - - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte [] byteNow = Bytes.toBytes(now); - boolean flush = false; - - updatesLock.readLock().lock(); - try { - prepareDeleteTimestamps(delete.getFamilyMap(), byteNow); - - if (writeToWAL) { - // write/sync to WAL should happen before we touch memstore. - // - // If order is reversed, i.e. we write to memstore first, and - // for some reason fail to write/sync to commit log, the memstore - // will contain uncommitted transactions. - // - // bunch up all edits across all column families into a - // single WALEdit. - addFamilyMapToWALEdit(familyMap, walEdit); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); - } - - // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap, null); - flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - - } finally { - this.updatesLock.readLock().unlock(); - } - // do after lock - if (coprocessorHost != null) { - coprocessorHost.postDelete(delete, walEdit, writeToWAL); - } - final long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now); - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - } - - /** * @param put * @throws IOException */ @@ -1965,7 +1905,7 @@ try { // All edits for the given row (across all column families) must happen atomically. - internalPut(put, put.getClusterId(), writeToWAL); + doBatchMutate(put, lid); } finally { if(lockid == null) releaseRowLock(lid); } @@ -2418,7 +2358,6 @@ } startRegionOperation(); - this.writeRequestsCount.increment(); try { RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock(); Get get = new Get(row, lock); @@ -2474,13 +2413,7 @@ // Using default cluster id, as this can only happen in the // originating cluster. A slave cluster receives the result as a Put // or Delete - if (isPut) { - internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); - } else { - Delete d = (Delete)w; - prepareDelete(d); - internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); - } + doBatchMutate((Mutation)w, lid); return true; } return false; @@ -2492,6 +2425,15 @@ } } + private void doBatchMutate(Mutation mutation, Integer lid) throws IOException, + DoNotRetryIOException { + Pair putsAndLocks[] = new Pair[1]; + putsAndLocks[0] = new Pair(mutation, lid); + OperationStatus[] batchMutate = this.batchMutate(putsAndLocks); + if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new DoNotRetryIOException(batchMutate[0].getExceptionMsg()); + } + } /** * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} @@ -2564,7 +2506,7 @@ * @praram now * @throws IOException */ - private void put(byte [] family, List edits) + private void put(byte [] family, List edits, Integer lid) throws IOException { Map> familyMap; familyMap = new HashMap>(); @@ -2574,71 +2516,10 @@ p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setWriteToWAL(true); - this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true); + doBatchMutate(p, lid); } - + /** - * Add updates first to the hlog (if writeToWal) and then add values to memstore. - * Warning: Assumption is caller has lock on passed in row. - * @param put The Put command - * @param clusterId UUID of the originating cluster (for replication). - * @param writeToWAL if true, then we should write to the log - * @throws IOException - */ - private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException { - Map> familyMap = put.getFamilyMap(); - WALEdit walEdit = new WALEdit(); - /* run pre put hook outside of lock to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.prePut(put, walEdit, writeToWAL)) { - return; - } - } - - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] byteNow = Bytes.toBytes(now); - boolean flush = false; - - this.updatesLock.readLock().lock(); - try { - checkFamilies(familyMap.keySet()); - checkTimestamps(familyMap, now); - updateKVTimestamps(familyMap.values(), byteNow); - // write/sync to WAL should happen before we touch memstore. - // - // If order is reversed, i.e. we write to memstore first, and - // for some reason fail to write/sync to commit log, the memstore - // will contain uncommitted transactions. - if (writeToWAL) { - addFamilyMapToWALEdit(familyMap, walEdit); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); - } else { - recordPutWithoutWal(familyMap); - } - - long addedSize = applyFamilyMapToMemstore(familyMap, null); - flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - } finally { - this.updatesLock.readLock().unlock(); - } - - if (coprocessorHost != null) { - coprocessorHost.postPut(put, walEdit, writeToWAL); - } - - // do after lock - final long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now); - - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - } - - /** * Atomically apply the given map of family->edits to the memstore. * This handles the consistency control on its own, but the caller * should already have locked updatesLock.readLock(). This also does @@ -3989,7 +3870,7 @@ edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY, HConstants.META_VERSION_QUALIFIER, now, Bytes.toBytes(HConstants.META_VERSION))); - meta.put(HConstants.CATALOG_FAMILY, edits); + meta.put(HConstants.CATALOG_FAMILY, edits, lid); } finally { meta.releaseRowLock(lid); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (revision 1381982) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (working copy) @@ -188,10 +188,10 @@ assertTimeVaryingMetricCount(1, TABLE_NAME, cf, regionName, "append_"); // One delete where the cf is known - assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "delete_"); + assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "multidelete_"); // two deletes in the region. - assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "delete_"); + assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "multidelete_"); // Three gets. one for gets. One for append. One for increment. assertTimeVaryingMetricCount(3, TABLE_NAME, cf, regionName, "get_");