diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java index cbf63fc..d05969e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java @@ -23,6 +23,8 @@ import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; + +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.util.StringUtils; import java.io.IOException; @@ -102,7 +104,7 @@ public class ServerRpcController implements RpcController { */ public void setFailedOn(IOException ioe) { serviceException = ioe; - setFailed(StringUtils.stringifyException(ioe)); + setFailed(Strings.stringifyException(ioe)); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index ca6b67e..e5074d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -23,12 +23,12 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; @@ -40,11 +40,11 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRespo import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; @@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRes import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.security.access.UserPermission; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.util.Strings; import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; @@ -165,7 +165,7 @@ public final class ResponseConverter { NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder(); parameterBuilder.setName(t.getClass().getName()); parameterBuilder.setValue( - ByteString.copyFromUtf8(StringUtils.stringifyException(t))); + ByteString.copyFromUtf8(Strings.stringifyException(t))); return parameterBuilder.build(); } @@ -324,7 +324,7 @@ public final class ResponseConverter { if (controller instanceof ServerRpcController) { ((ServerRpcController)controller).setFailedOn(ioe); } else { - controller.setFailed(StringUtils.stringifyException(ioe)); + controller.setFailed(Strings.stringifyException(ioe)); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java index 2319b7a..bde5521 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.util.StringUtils; /** @@ -113,7 +114,7 @@ public class QuotaRetriever implements Closeable, Iterable { try { cache = QuotaRetriever.this.next(); } catch (IOException e) { - LOG.warn(StringUtils.stringifyException(e)); + LOG.warn(Strings.stringifyException(e)); } } @@ -128,7 +129,7 @@ public class QuotaRetriever implements Closeable, Iterable { try { cache = QuotaRetriever.this.next(); } catch (IOException e) { - LOG.warn(StringUtils.stringifyException(e)); + LOG.warn(Strings.stringifyException(e)); } return result; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java index b4b4a13..cff84df 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java @@ -18,6 +18,9 @@ */ package org.apache.hadoop.hbase.util; +import java.io.PrintWriter; +import java.io.StringWriter; + import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -111,4 +114,21 @@ public class Strings { } return sb.toString(); } -} + + /** + * Make a string representation of the exception. Copyied from Hadoop StringUtils. Only + * difference is that presize in here. Default for StringWriter is 16 bytes so many resizings + * when a decent exception size and shows up when writing. For example, stringifying this + * exception makes for a String of 156 bytes: + * new NotServingRegionException("onerasdfsf,sadsfsdfdsa,adsfsdfsdasdsdf,asdsadasdaadads"). + * @param e The exception to stringify + * @return A string with exception name and call stack. + */ + public static String stringifyException(final Throwable e) { + StringWriter stm = new StringWriter(180); + PrintWriter wrt = new PrintWriter(stm); + e.printStackTrace(wrt); + wrt.close(); + return stm.toString(); + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 85cadd7..51b3005 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; /** * Thread Utility @@ -45,7 +44,7 @@ public class Threads { @Override public void uncaughtException(Thread t, Throwable e) { LOG.warn("Thread:" + t + " exited with Exception:" - + StringUtils.stringifyException(e)); + + Strings.stringifyException(e)); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index b89a5d2..32229e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.htrace.Trace; @@ -110,7 +111,7 @@ public class CallRunner { } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; - error = StringUtils.stringifyException(e); + error = Strings.stringifyException(e); } finally { if (traceScope != null) { traceScope.close(); @@ -147,7 +148,7 @@ public class CallRunner { cce.getMessage()); } catch (Exception e) { RpcServer.LOG.warn(Thread.currentThread().getName() - + ": caught: " + StringUtils.stringifyException(e)); + + ": caught: " + Strings.stringifyException(e)); } finally { // regardless if successful or not we need to reset the callQueueSize this.rpcServer.addCallSize(call.getSize() * -1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index e3654cb..6f36911 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -452,7 +453,7 @@ public class RpcServer implements RpcServerInterface { @Override public synchronized void endDelayThrowing(Throwable t) throws IOException { - this.setResponse(null, null, t, StringUtils.stringifyException(t)); + this.setResponse(null, null, t, Strings.stringifyException(t)); this.delayResponse = false; this.sendResponseIfReady(); } @@ -947,7 +948,7 @@ public class RpcServer implements RpcServerInterface { } } catch (Exception e) { LOG.warn(getName() + ": exception in Responder " + - StringUtils.stringifyException(e), e); + Strings.stringifyException(e), e); } } LOG.info(getName() + ": stopped"); @@ -2036,6 +2037,7 @@ public class RpcServer implements RpcServerInterface { if (tooSlow || tooLarge) { // when tagging, we let TooLarge trump TooSmall to keep output simple // note that large responses will often also be slow. + // This logging is expensive. Does jackson serialization. logResponse(new Object[]{param}, md.getName(), md.getName() + "(" + param.getClass().getName() + ")", (tooLarge ? "TooLarge" : "TooSlow"), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2fd03de..a366173 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -144,6 +144,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; @@ -1846,7 +1847,7 @@ public class HRegion implements HeapSize { // , Writable{ desc, sequenceId, false); } catch (Throwable t) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + - StringUtils.stringifyException(t)); + Strings.stringifyException(t)); // ignore this since we will be aborting the RS with DSE. } } @@ -1868,7 +1869,7 @@ public class HRegion implements HeapSize { // , Writable{ wal.sync(); // ensure that flush marker is sync'ed } catch (IOException ioe) { LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: " - + StringUtils.stringifyException(ioe)); + + Strings.stringifyException(ioe)); } } @@ -1944,7 +1945,7 @@ public class HRegion implements HeapSize { // , Writable{ desc, sequenceId, false); } catch (Throwable ex) { LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + - StringUtils.stringifyException(ex)); + Strings.stringifyException(ex)); // ignore this since we will be aborting the RS with DSE. } wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); @@ -1952,7 +1953,7 @@ public class HRegion implements HeapSize { // , Writable{ DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); - status.abort("Flush failed: " + StringUtils.stringifyException(t)); + status.abort("Flush failed: " + Strings.stringifyException(t)); throw dse; } @@ -3580,7 +3581,7 @@ public class HRegion implements HeapSize { // , Writable{ LOG.warn(msg, ioe); status.setStatus(msg); } else { - status.abort(StringUtils.stringifyException(ioe)); + status.abort(Strings.stringifyException(ioe)); // other IO errors may be transient (bad network connection, // checksum exception on one datanode, etc). throw & retry throw ioe; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1bc478f..617cf05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; @@ -1538,6 +1539,7 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.executor.closeregion.threads", 3)); this.service.startExecutorService(ExecutorType.RS_CLOSE_META, conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); + // Why we have this executor running up here in regionserver? It should be local to region? if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); @@ -1857,7 +1859,7 @@ public class HRegionServer extends HasThread implements // Do our best to report our abort to the master, but this may not work try { if (cause != null) { - msg += "\nCause:\n" + StringUtils.stringifyException(cause); + msg += "\nCause:\n" + Strings.stringifyException(cause); } // Report to the master but only if we have already registered with the master. if (rssStub != null && this.serverName != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4813e10..c2b3fd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -197,6 +197,11 @@ public class HStore implements Store { private volatile long majorCompactedCellsSize = 0; /** + * Cache this setting rather than do a lookup each time. + */ + private final long maxRowSize; + + /** * Constructor * @param region * @param family HColumnDescriptor for this column @@ -335,6 +340,18 @@ public class HStore implements Store { cryptoContext.setCipher(cipher); cryptoContext.setKey(key); } + + // Get this configuration once and feed it to created scanners. Getting it every time we create + // a store scanner is too expensive. + this.maxRowSize = conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, + HConstants.TABLE_MAX_ROWSIZE_DEFAULT); + } + + /** + * @return The configured maximum row size to return scanning + */ + public long getMaxRowSize() { + return this.maxRowSize; } /** @@ -1887,9 +1904,9 @@ public class HStore implements Store { scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); } if (scanner == null) { - scanner = scan.isReversed() ? new ReversedStoreScanner(this, - getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, - getScanInfo(), scan, targetCols, readPt); + scanner = scan.isReversed()? + new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt): + new StoreScanner(this, getScanInfo(), scan, targetCols, readPt); } return scanner; } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 60b2411..d75334e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -61,6 +61,11 @@ public interface Store extends HeapSize, StoreConfigInformation { Collection getStorefiles(); /** + * @return Cached max row size to return when scanning + */ + long getMaxRowSize(); + + /** * Close all the readers We don't need to worry about subsequent requests because the HRegion * holds a write lock that will prevent any more reads or writes. * @return the {@link StoreFile StoreFiles} that were previously being used. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 0a4e1ed..a6c5fcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.common.annotations.VisibleForTesting; + /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream * into List for a single row. @@ -77,7 +79,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final NavigableSet columns; protected final long oldestUnexpiredTS; protected final int minVersions; - protected final long maxRowSize; /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not @@ -104,6 +105,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private final long readPt; + /** + * Cache of the max row size. + */ + private final long maxRowSize; + // used by the injection framework to test race between StoreScanner construction and compaction enum StoreScannerCompactionRace { BEFORE_SEEK, @@ -124,14 +130,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.columns = columns; oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl; this.minVersions = minVersions; - - if (store != null && ((HStore)store).getHRegion() != null - && ((HStore)store).getHRegion().getBaseConf() != null) { - this.maxRowSize = ((HStore) store).getHRegion().getBaseConf().getLong( - HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT); - } else { - this.maxRowSize = HConstants.TABLE_MAX_ROWSIZE_DEFAULT; - } + // Store can be null when running tests. + this.maxRowSize = store == null? HConstants.TABLE_MAX_ROWSIZE_DEFAULT: store.getMaxRowSize(); // We look up row-column Bloom filters for multi-column queries as part of // the seek operation. However, we also look the row-column Bloom filter @@ -142,6 +142,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // The parallel-seeking is on : // 1) the config value is *true* // 2) store has more than one store file + // TODO: We are dependent on a regionserver being up and running for this feature to cut in. + // Fix it. if (store != null && ((HStore)store).getHRegion() != null && store.getStorefilesCount() > 1) { RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices(); @@ -203,8 +205,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param store who we scan * @param scan the spec * @param scanners ancillary scanners - * @param smallestReadPoint the readPoint that we should use for tracking - * versions + * @param smallestReadPoint the readPoint that we should use for tracking versions */ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, ScanType scanType, @@ -225,7 +226,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, List scanners, long smallestReadPoint, long earliestPutTs, - byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { + byte[] dropDeletesFromRow, byte[] dropDeletesToRow) + throws IOException { this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } @@ -254,6 +256,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } /** Constructor for testing. */ + @VisibleForTesting StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet columns, final List scanners) throws IOException { @@ -264,6 +267,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // Constructor for testing. + @VisibleForTesting StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet columns, final List scanners, long earliestPutTs) @@ -325,8 +329,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (!isParallelSeek) { long totalScannersSoughtBytes = 0; for (KeyValueScanner scanner : scanners) { - if (totalScannersSoughtBytes >= maxRowSize) { - throw new RowTooBigException("Max row size allowed: " + maxRowSize + if (totalScannersSoughtBytes >= this.maxRowSize) { + throw new RowTooBigException("Max row size allowed: " + this.store.getMaxRowSize() + ", but row is bigger than that"); } scanner.seek(seekKey); @@ -516,8 +520,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner outResult.add(cell); count++; totalBytesRead += CellUtil.estimatedSizeOf(cell); - if (totalBytesRead > maxRowSize) { - throw new RowTooBigException("Max row size allowed: " + maxRowSize + if (totalBytesRead > this.maxRowSize) { + throw new RowTooBigException("Max row size allowed: " + this.maxRowSize + ", but the row is bigger than that."); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java index 6f25129..d9fd1cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java @@ -40,30 +40,48 @@ import org.apache.hadoop.io.Writable; */ @InterfaceAudience.Private public class TimeRangeTracker implements Writable { - - long minimumTimestamp = -1; + // TODO: If lots of threads writing the one Store, we contend here. Every cell add will want to + // call includeTimestamp(long) in below. + long minimumTimestamp = Long.MAX_VALUE; long maximumTimestamp = -1; /** * Default constructor. * Initializes TimeRange to be null */ - public TimeRangeTracker() { - - } + public TimeRangeTracker() {} /** * Copy Constructor * @param trt source TimeRangeTracker */ public TimeRangeTracker(final TimeRangeTracker trt) { - this.minimumTimestamp = trt.getMinimumTimestamp(); - this.maximumTimestamp = trt.getMaximumTimestamp(); + set(trt.getMinimumTimestamp(), trt.getMaximumTimestamp()); } public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) { - this.minimumTimestamp = minimumTimestamp; - this.maximumTimestamp = maximumTimestamp; + set(minimumTimestamp, maximumTimestamp); + } + + private void set(final long min, final long max) { + setMin(min); + setMax(max); + } + + private synchronized boolean setMin(final long min) { + if (this.minimumTimestamp > min) { + this.minimumTimestamp = min; + return true; + } + return false; + } + + private synchronized boolean setMax(final long max) { + if (this.maximumTimestamp < max) { + this.maximumTimestamp = max; + return true; + } + return false; } /** @@ -98,18 +116,8 @@ public class TimeRangeTracker implements Writable { * If required, update the current TimestampRange to include timestamp * @param timestamp the timestamp value to include */ - private synchronized void includeTimestamp(final long timestamp) { - if (maximumTimestamp == -1) { - minimumTimestamp = timestamp; - maximumTimestamp = timestamp; - } - else if (minimumTimestamp > timestamp) { - minimumTimestamp = timestamp; - } - else if (maximumTimestamp < timestamp) { - maximumTimestamp = timestamp; - } - return; + private void includeTimestamp(final long timestamp) { + if (!setMin(timestamp)) setMax(timestamp); } /** @@ -118,8 +126,7 @@ public class TimeRangeTracker implements Writable { * @return True if there is overlap, false otherwise */ public synchronized boolean includesTimeRange(final TimeRange tr) { - return (this.minimumTimestamp < tr.getMax() && - this.maximumTimestamp >= tr.getMin()); + return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin()); } /** @@ -150,4 +157,32 @@ public class TimeRangeTracker implements Writable { public synchronized String toString() { return "[" + minimumTimestamp + "," + maximumTimestamp + "]"; } -} + + /** + * Bit of code to test concurrent access on this class. + * @param args + * @throws InterruptedException + */ + public static void main(String[] args) throws InterruptedException { + long start = System.currentTimeMillis(); + final TimeRangeTracker trr = new TimeRangeTracker(); + final int threadCount = 5; + final int calls = 1024 * 1024 * 16; + Thread [] threads = new Thread[threadCount]; + for (int i = 0; i < threads.length; i++) { + Thread t = new Thread("" + i) { + @Override + public void run() { + for (int i = 0; i < calls; i++) trr.includeTimestamp(i); + } + }; + t.start(); + threads[i] = t; + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + System.out.println(trr.getMinimumTimestamp() + " " + trr.getMaximumTimestamp() + " " + + (System.currentTimeMillis() - start)); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2ab036d..b652ed2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Threads; import com.google.common.collect.Lists; @@ -711,7 +712,7 @@ public class ReplicationSource extends Thread break; } catch (Exception ex) { LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + - org.apache.hadoop.util.StringUtils.stringifyException(ex)); + Strings.stringifyException(ex)); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; }