diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala index 678e769..5f9415a 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala @@ -34,6 +34,8 @@ private[spark] object HBaseConnectionCache extends Logging { // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey. val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]() + val cacheStat = HBaseConnectionCacheStat(0, 0, 0) + // in milliseconds private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay private var timeout = DEFAULT_TIME_OUT @@ -58,6 +60,13 @@ private[spark] object HBaseConnectionCache extends Logging { housekeepingThread.setDaemon(true) housekeepingThread.start() + def getStat: HBaseConnectionCacheStat = { + connectionMap.synchronized { + cacheStat.numActiveConnections = connectionMap.size + cacheStat.copy() + } + } + def close(): Unit = { try { connectionMap.synchronized { @@ -100,7 +109,9 @@ private[spark] object HBaseConnectionCache extends Logging { connectionMap.synchronized { if (closed) return null - val sc = connectionMap.getOrElseUpdate(key, new SmartConnection(conn)) + cacheStat.numTotalRequests += 1 + val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1 + new SmartConnection(conn)}) sc.refCount += 1 sc } @@ -136,13 +147,13 @@ private[hbase] case class SmartConnection ( } /** - * Denotes a unique key to an HBase Connection instance. - * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'. - * - * In essence, this class captures the properties in Configuration - * that may be used in the process of establishing a connection. - * - */ + * Denotes a unique key to an HBase Connection instance. + * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'. + * + * In essence, this class captures the properties in Configuration + * that may be used in the process of establishing a connection. + * + */ class HBaseConnectionKey(c: Configuration) extends Logging { val conf: Configuration = c val CONNECTION_PROPERTIES: Array[String] = Array[String]( @@ -240,4 +251,15 @@ class HBaseConnectionKey(c: Configuration) extends Logging { } } +/** + * To log the state of [[HBaseConnectionCache]] + * + * @param numTotalRequests number of total connection requests to the cache + * @param numActualConnectionsCreated number of actual HBase connections the cache ever created + * @param numActiveConnections number of current alive HBase connections the cache is holding + */ +case class HBaseConnectionCacheStat(var numTotalRequests: Long, + var numActualConnectionsCreated: Long, + var numActiveConnections: Long) + diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala index c9edcc4..6ebf044 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala @@ -76,11 +76,18 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { testWithPressureWithClose() } - def testBasic() { - HBaseConnectionCache.setTimeout(1 * 1000) + def cleanEnv() { HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.clear() + HBaseConnectionCache.cacheStat.numActiveConnections = 0 + HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0 + HBaseConnectionCache.cacheStat.numTotalRequests = 0 } + } + + def testBasic() { + cleanEnv() + HBaseConnectionCache.setTimeout(1 * 1000) val connKeyMocker1 = new HBaseConnectionKeyMocker(1) val connKeyMocker1a = new HBaseConnectionKeyMocker(1) @@ -88,11 +95,20 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { val c1 = HBaseConnectionCache .getConnection(connKeyMocker1, new ConnectionMocker) + + assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.getStat.numTotalRequests === 1) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) + val c1a = HBaseConnectionCache .getConnection(connKeyMocker1a, new ConnectionMocker) HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.getStat.numTotalRequests === 2) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) } val c2 = HBaseConnectionCache @@ -100,16 +116,21 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numTotalRequests === 3) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) } c1.close() HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) } c1a.close() HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) } Thread.sleep(3 * 1000) // Leave housekeeping thread enough time @@ -117,12 +138,15 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { assert(HBaseConnectionCache.connectionMap.size === 1) assert(HBaseConnectionCache.connectionMap.iterator.next()._1 .asInstanceOf[HBaseConnectionKeyMocker].confId === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) } c2.close() } def testWithPressureWithoutClose() { + cleanEnv() + class TestThread extends Runnable { override def run() { for (i <- 0 to 999) { @@ -147,6 +171,10 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { Thread.sleep(1000) HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 10) + assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 10) + var totalRc : Int = 0 HBaseConnectionCache.connectionMap.foreach { x => totalRc += x._2.refCount @@ -161,9 +189,13 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { } Thread.sleep(1000) assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 0) } def testWithPressureWithClose() { + cleanEnv() + class TestThread extends Runnable { override def run() { for (i <- 0 to 999) { @@ -189,11 +221,16 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 10) + assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 10) } Thread.sleep(6 * 1000) HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 0) } } }