diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c139296..33be8e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -199,6 +199,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + + // in milliseconds + private static final String MAX_WAIT_FOR_SEQ_ID_KEY = + "hbase.hregion.max.wait.for.seq.id"; + + private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 60000; /** * This is the global default value for durability. All tables/mutations not @@ -330,6 +336,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private boolean isLoadingCfsOnDemandDefault = false; + private int maxWaitForSeqId; private final AtomicInteger majorInProgress = new AtomicInteger(0); private final AtomicInteger minorInProgress = new AtomicInteger(0); @@ -662,6 +669,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); + maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID); this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; this.rsServices = rsServices; @@ -2414,7 +2422,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { WALKey key = this.appendEmptyEdit(wal, null); - return key.getSequenceId(); + return key.getSequenceId(maxWaitForSeqId); } ////////////////////////////////////////////////////////////////////////////// @@ -7232,7 +7240,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index e8056e4..69c2aec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -30,6 +30,7 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.util.ByteStringer; @@ -301,8 +302,24 @@ public class WALKey implements SequenceId, Comparable { */ @Override public long getSequenceId() throws IOException { + return getSequenceId(-1); + } + + /** + * Wait for sequence number is assigned & return the assigned value + * @param maxWaitForSeqId maximum duration, in milliseconds, to wait for seq number to be assigned + * @return long the new assigned sequence number + * @throws IOException + */ + public long getSequenceId(int maxWaitForSeqId) throws IOException { try { - this.seqNumAssignedLatch.await(); + if (maxWaitForSeqId < 0) { + this.seqNumAssignedLatch.await(); + } else { + if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for seq number to be assigned"); + } + } } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for next log sequence number"); InterruptedIOException iie = new InterruptedIOException();