Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1387989) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1783,8 +1783,7 @@ try { // All edits for the given row (across all column families) must happen atomically. - prepareDelete(delete); - internalDelete(delete, delete.getClusterId(), writeToWAL); + doBatchMutate(delete, lid); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1801,11 +1800,11 @@ */ void delete(Map> familyMap, UUID clusterId, boolean writeToWAL) throws IOException { - Delete delete = new Delete(); + Delete delete = new Delete(HConstants.EMPTY_BYTE_ARRAY); delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setWriteToWAL(writeToWAL); - internalDelete(delete, clusterId, writeToWAL); + doBatchMutate(delete, null); } /** @@ -1863,65 +1862,6 @@ } /** - * @param delete The Delete command - * @param clusterId UUID of the originating cluster (for replication). - * @param writeToWAL - * @throws IOException - */ - private void internalDelete(Delete delete, UUID clusterId, - boolean writeToWAL) throws IOException { - Map> familyMap = delete.getFamilyMap(); - WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) { - return; - } - } - - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte [] byteNow = Bytes.toBytes(now); - boolean flush = false; - - updatesLock.readLock().lock(); - try { - prepareDeleteTimestamps(delete.getFamilyMap(), byteNow); - - if (writeToWAL) { - // write/sync to WAL should happen before we touch memstore. - // - // If order is reversed, i.e. we write to memstore first, and - // for some reason fail to write/sync to commit log, the memstore - // will contain uncommitted transactions. - // - // bunch up all edits across all column families into a - // single WALEdit. - addFamilyMapToWALEdit(familyMap, walEdit); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); - } - - // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap, null); - flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - - } finally { - this.updatesLock.readLock().unlock(); - } - // do after lock - if (coprocessorHost != null) { - coprocessorHost.postDelete(delete, walEdit, writeToWAL); - } - final long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now); - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - } - - /** * @param put * @throws IOException */ @@ -1978,7 +1918,7 @@ try { // All edits for the given row (across all column families) must happen atomically. - internalPut(put, put.getClusterId(), writeToWAL); + doBatchMutate(put, lid); } finally { if(lockid == null) releaseRowLock(lid); } @@ -2444,7 +2384,6 @@ } startRegionOperation(); - this.writeRequestsCount.increment(); try { RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock(); Get get = new Get(row, lock); @@ -2496,17 +2435,7 @@ if (matches) { // All edits for the given row (across all column families) must // happen atomically. - // - // Using default cluster id, as this can only happen in the - // originating cluster. A slave cluster receives the result as a Put - // or Delete - if (isPut) { - internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); - } else { - Delete d = (Delete)w; - prepareDelete(d); - internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); - } + doBatchMutate((Mutation)w, lid); this.checkAndMutateChecksPassed.increment(); return true; } @@ -2520,6 +2449,18 @@ } } + @SuppressWarnings("unchecked") + private void doBatchMutate(Mutation mutation, Integer lid) throws IOException, + DoNotRetryIOException { + Pair[] mutateWithLocks = new Pair[] { new Pair(mutation, + lid) }; + OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks); + if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); + } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { + throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); + } + } /** * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} @@ -2592,7 +2533,7 @@ * @praram now * @throws IOException */ - private void put(byte [] family, List edits) + private void put(byte [] family, List edits, Integer lid) throws IOException { Map> familyMap; familyMap = new HashMap>(); @@ -2602,71 +2543,10 @@ p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setWriteToWAL(true); - this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true); + doBatchMutate(p, lid); } - + /** - * Add updates first to the hlog (if writeToWal) and then add values to memstore. - * Warning: Assumption is caller has lock on passed in row. - * @param put The Put command - * @param clusterId UUID of the originating cluster (for replication). - * @param writeToWAL if true, then we should write to the log - * @throws IOException - */ - private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException { - Map> familyMap = put.getFamilyMap(); - WALEdit walEdit = new WALEdit(); - /* run pre put hook outside of lock to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.prePut(put, walEdit, writeToWAL)) { - return; - } - } - - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] byteNow = Bytes.toBytes(now); - boolean flush = false; - - this.updatesLock.readLock().lock(); - try { - checkFamilies(familyMap.keySet()); - checkTimestamps(familyMap, now); - updateKVTimestamps(familyMap.values(), byteNow); - // write/sync to WAL should happen before we touch memstore. - // - // If order is reversed, i.e. we write to memstore first, and - // for some reason fail to write/sync to commit log, the memstore - // will contain uncommitted transactions. - if (writeToWAL) { - addFamilyMapToWALEdit(familyMap, walEdit); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); - } else { - recordPutWithoutWal(familyMap); - } - - long addedSize = applyFamilyMapToMemstore(familyMap, null); - flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - } finally { - this.updatesLock.readLock().unlock(); - } - - if (coprocessorHost != null) { - coprocessorHost.postPut(put, walEdit, writeToWAL); - } - - // do after lock - final long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now); - - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - } - - /** * Atomically apply the given map of family->edits to the memstore. * This handles the consistency control on its own, but the caller * should already have locked updatesLock.readLock(). This also does @@ -4019,7 +3899,7 @@ edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY, HConstants.META_VERSION_QUALIFIER, now, Bytes.toBytes(HConstants.META_VERSION))); - meta.put(HConstants.CATALOG_FAMILY, edits); + meta.put(HConstants.CATALOG_FAMILY, edits, lid); } finally { meta.releaseRowLock(lid); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (revision 1387989) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (working copy) @@ -189,10 +189,10 @@ assertTimeVaryingMetricCount(1, TABLE_NAME, cf, regionName, "append_"); // One delete where the cf is known - assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "delete_"); + assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "multidelete_"); // two deletes in the region. - assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "delete_"); + assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "multidelete_"); // Three gets. one for gets. One for append. One for increment. assertTimeVaryingMetricCount(3, TABLE_NAME, cf, regionName, "get_");