Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1381122) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1764,7 +1764,12 @@ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - internalDelete(delete, delete.getClusterId(), writeToWAL); + Pair putsAndLocks[] = new Pair[1]; + putsAndLocks[0] = new Pair(delete, lid); + OperationStatus[] batchMutate = this.batchMutate(putsAndLocks); + if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new DoNotRetryIOException(batchMutate[0].getExceptionMsg()); + } } finally { if(lockid == null) releaseRowLock(lid); } @@ -1781,11 +1786,16 @@ */ void delete(Map> familyMap, UUID clusterId, boolean writeToWAL) throws IOException { - Delete delete = new Delete(); + Delete delete = new Delete(new byte[0]); delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setWriteToWAL(writeToWAL); - internalDelete(delete, clusterId, writeToWAL); + Pair putsAndLocks[] = new Pair[1]; + putsAndLocks[0] = new Pair(delete, null); + OperationStatus[] batchMutate = this.batchMutate(putsAndLocks); + if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new DoNotRetryIOException(batchMutate[0].getExceptionMsg()); + } } /** @@ -1843,65 +1853,6 @@ } /** - * @param delete The Delete command - * @param clusterId UUID of the originating cluster (for replication). - * @param writeToWAL - * @throws IOException - */ - private void internalDelete(Delete delete, UUID clusterId, - boolean writeToWAL) throws IOException { - Map> familyMap = delete.getFamilyMap(); - WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) { - return; - } - } - - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte [] byteNow = Bytes.toBytes(now); - boolean flush = false; - - updatesLock.readLock().lock(); - try { - prepareDeleteTimestamps(delete.getFamilyMap(), byteNow); - - if (writeToWAL) { - // write/sync to WAL should happen before we touch memstore. - // - // If order is reversed, i.e. we write to memstore first, and - // for some reason fail to write/sync to commit log, the memstore - // will contain uncommitted transactions. - // - // bunch up all edits across all column families into a - // single WALEdit. - addFamilyMapToWALEdit(familyMap, walEdit); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); - } - - // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap, null); - flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - - } finally { - this.updatesLock.readLock().unlock(); - } - // do after lock - if (coprocessorHost != null) { - coprocessorHost.postDelete(delete, walEdit, writeToWAL); - } - final long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now); - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - } - - /** * @param put * @throws IOException */ @@ -1958,7 +1909,13 @@ try { // All edits for the given row (across all column families) must happen atomically. - internalPut(put, put.getClusterId(), writeToWAL); + Pair putsAndLocks[] = new Pair[1]; + putsAndLocks[0] = new Pair(put, lid); + OperationStatus[] batchMutate = this.batchMutate(putsAndLocks); + if (batchMutate[0].getOperationStatusCode().equals( + OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new DoNotRetryIOException(batchMutate[0].getExceptionMsg()); + } } finally { if(lockid == null) releaseRowLock(lid); } @@ -2467,13 +2424,17 @@ // Using default cluster id, as this can only happen in the // originating cluster. A slave cluster receives the result as a Put // or Delete + Pair[] batchOp = new Pair[1]; if (isPut) { - internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + batchOp[0] = new Pair(((Put) w), lid); } else { - Delete d = (Delete)w; - prepareDelete(d); - internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + batchOp[0] = new Pair(((Delete) w), lid); } + OperationStatus[] batchMutate = this.batchMutate(batchOp); + if (batchMutate[0].getOperationStatusCode().equals( + OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new DoNotRetryIOException(batchMutate[0].getExceptionMsg()); + } return true; } return false; @@ -2557,7 +2518,7 @@ * @praram now * @throws IOException */ - private void put(byte [] family, List edits) + private void put(byte [] family, List edits, Integer lid) throws IOException { Map> familyMap; familyMap = new HashMap>(); @@ -2567,70 +2528,14 @@ p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setWriteToWAL(true); - this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true); - } - - /** - * Add updates first to the hlog (if writeToWal) and then add values to memstore. - * Warning: Assumption is caller has lock on passed in row. - * @param put The Put command - * @param clusterId UUID of the originating cluster (for replication). - * @param writeToWAL if true, then we should write to the log - * @throws IOException - */ - private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException { - Map> familyMap = put.getFamilyMap(); - WALEdit walEdit = new WALEdit(); - /* run pre put hook outside of lock to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.prePut(put, walEdit, writeToWAL)) { - return; - } + Pair putsAndLocks[] = new Pair[1]; + putsAndLocks[0] = new Pair(p, lid); + OperationStatus[] batchMutate = this.batchMutate(putsAndLocks); + if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new DoNotRetryIOException(batchMutate[0].getExceptionMsg()); } - - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] byteNow = Bytes.toBytes(now); - boolean flush = false; - - this.updatesLock.readLock().lock(); - try { - checkFamilies(familyMap.keySet()); - checkTimestamps(familyMap, now); - updateKVTimestamps(familyMap.values(), byteNow); - // write/sync to WAL should happen before we touch memstore. - // - // If order is reversed, i.e. we write to memstore first, and - // for some reason fail to write/sync to commit log, the memstore - // will contain uncommitted transactions. - if (writeToWAL) { - addFamilyMapToWALEdit(familyMap, walEdit); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); - } else { - recordPutWithoutWal(familyMap); - } - - long addedSize = applyFamilyMapToMemstore(familyMap, null); - flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - } finally { - this.updatesLock.readLock().unlock(); - } - - if (coprocessorHost != null) { - coprocessorHost.postPut(put, walEdit, writeToWAL); - } - - // do after lock - final long after = EnvironmentEdgeManager.currentTimeMillis(); - this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now); - - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } } - + /** * Atomically apply the given map of family->edits to the memstore. * This handles the consistency control on its own, but the caller @@ -3982,7 +3887,7 @@ edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY, HConstants.META_VERSION_QUALIFIER, now, Bytes.toBytes(HConstants.META_VERSION))); - meta.put(HConstants.CATALOG_FAMILY, edits); + meta.put(HConstants.CATALOG_FAMILY, edits, lid); } finally { meta.releaseRowLock(lid); }