diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0c783e144d..94902de03a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4400,6 +4400,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "logger used for llap-daemons."), LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true, "Whether LLapOutputFormatService should output arrow batches"), + LLAP_COLLECT_LOCK_METRICS("hive.llap.lockmetrics.collect", false, + "Whether lock metrics (wait times, counts) are collected for LLAP " + + "related locks"), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/ReadWriteLockMetrics.java b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/ReadWriteLockMetrics.java new file mode 100644 index 0000000000..7d52a15c35 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/ReadWriteLockMetrics.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License a + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.metrics; + +import avro.shaded.com.google.common.annotations.VisibleForTesting; + +import com.google.common.base.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hive.conf.HiveConf; + +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * Wrapper around a read/write lock to collect the lock wait times. + * Instances of this wrapper class can be used to collect/accumulate the wai + * times around R/W locks. This is helpful if the source of a performance issue + * might be related to lock contention and you need to identify the actual + * locks. Instances of this class can be wrapped around any ReadWriteLock + * implementation. + */ +public class ReadWriteLockMetrics implements ReadWriteLock { + private LockWrapper readLock; ///< wrapper around original read lock + private LockWrapper writeLock; ///< wrapper around original write lock + + /** + * Helper class to compare two LockMetricSource instances. + * This Comparator class can be used to sort a list of + * LockMetricSource instances in descending order by their total lock + * wait time. + */ + public static class MetricsComparator implements Comparator, Serializable { + private static final long serialVersionUID = -1; + + @Override + public int compare(MetricsSource o1, MetricsSource o2) { + if (o1 != null && o2 != null + && o1 instanceof LockMetricSource && o2 instanceof LockMetricSource) { + LockMetricSource lms1 = (LockMetricSource)o1; + LockMetricSource lms2 = (LockMetricSource)o2; + + long totalMs1 = (lms1.readLockWaitTimeTotal.value() / 1000000L) + + (lms1.writeLockWaitTimeTotal.value() / 1000000L); + long totalMs2 = (lms2.readLockWaitTimeTotal.value() / 1000000L) + + (lms2.writeLockWaitTimeTotal.value() / 1000000L); + + // sort descending by total lock time + if (totalMs1 < totalMs2) { + return 1; + } + + if (totalMs1 > totalMs2) { + return -1; + } + + // sort by label (ascending) if lock time is the same + return lms1.lockLabel.compareTo(lms2.lockLabel); + } + + return 0; + } + } + + /** + * Wraps a ReadWriteLock into a monitored lock if required by + * configuration. This helper is checking the + * hive.llap.lockmetrics.collect configuration option and wraps the + * passed in ReadWriteLock into a monitoring container if the + * option is set to true. Otherwise, the original (passed in) + * lock instance is returned unmodified. + * + * @param conf Configuration instance to check for LLAP conf options + * @param lock The ReadWriteLock to wrap for monitoring + * @param metrics The target container for locking metrics + * @see #createLockMetricsSource + */ + public static ReadWriteLock wrap(Configuration conf, ReadWriteLock lock, + MetricsSource metrics) { + Preconditions.checkNotNull(lock, "Caller has to provide valid input lock"); + boolean needsWrap = false; + + if (null != conf) { + needsWrap = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS); + } + + if (false == needsWrap) { + return lock; + } + + Preconditions.checkNotNull(metrics, + "Caller has to procide group specific metrics source"); + return new ReadWriteLockMetrics(lock, metrics); + } + + /** + * Factory method for new metric collections. + * You can create and use a single MetricsSource collection for + * multiple R/W locks. This makes sense if several locks belong to a single + * group and you're then interested in the accumulated values for the whole + * group, rather than the single lock instance. The passed in label is + * supposed to identify the group uniquely. + * + * @param label The group identifier for lock statistics + */ + public static MetricsSource createLockMetricsSource(String label) { + Preconditions.checkNotNull(label); + Preconditions.checkArgument(!label.contains("\""), + "Label can't contain quote (\")"); + return new LockMetricSource(label); + } + + /** + * Returns a list with all created MetricsSource instances for + * the R/W lock metrics. The returned list contains the instances that were + * previously created via the createLockMetricsSource function. + * + * @return A list of all R/W lock based metrics sources + */ + public static List getAllMetricsSources() { + ArrayList ret = null; + + synchronized (LockMetricSource.allInstances) { + ret = new ArrayList<>(LockMetricSource.allInstances); + } + + return ret; + } + + /// Enumeration of metric info names and descriptions + @VisibleForTesting + public enum LockMetricInfo implements MetricsInfo { + ReadLockWaitTimeTotal("The total wait time for read locks in nanoseconds"), + ReadLockWaitTimeMax("The maximum wait time for a read lock in nanoseconds"), + ReadLockCount("Total amount of read lock requests"), + WriteLockWaitTimeTotal( + "The total wait time for write locks in nanoseconds"), + WriteLockWaitTimeMax( + "The maximum wait time for a write lock in nanoseconds"), + WriteLockCount("Total amount of write lock requests"); + + private final String description; ///< metric description + + /** + * Creates a new MetricsInfo with the given description. + * + * @param desc The description of the info + */ + private LockMetricInfo(String desc) { + description = desc; + } + + @Override + public String description() { + return this.description; + } + } + + /** + * Source of the accumulated lock times and counts. + * Instances of this MetricSource can be created via the static + * factory method createLockMetricsSource and shared across + * multiple instances of the outer ReadWriteLockMetric class. + */ + @Metrics(about = "Lock Metrics", context = "locking") + private static class LockMetricSource implements MetricsSource { + private static final ArrayList allInstances = new ArrayList<>(); + + private final String lockLabel; ///< identifier for the group of locks + + /// accumulated wait time for read locks + @Metric + MutableCounterLong readLockWaitTimeTotal; + + /// highest wait time for read locks + @Metric + MutableCounterLong readLockWaitTimeMax; + + /// total number of read lock calls + @Metric + MutableCounterLong readLockCounts; + + /// accumulated wait time for write locks + @Metric + MutableCounterLong writeLockWaitTimeTotal; + + /// highest wait time for write locks + @Metric + MutableCounterLong writeLockWaitTimeMax; + + /// total number of write lock calls + @Metric + MutableCounterLong writeLockCounts; + + /** + * Creates a new metrics collection instance. + * Several locks can share a single MetricsSource instances + * where all of them increment the metrics counts together. This can be + * interesting to have a single instance for a group of related locks. The + * group should then be identified by the label. + * + * @param label The identifier of the metrics collection + */ + private LockMetricSource(String label) { + lockLabel = label; + readLockWaitTimeTotal + = new MutableCounterLong(LockMetricInfo.ReadLockWaitTimeTotal, 0); + readLockWaitTimeMax + = new MutableCounterLong(LockMetricInfo.ReadLockWaitTimeMax, 0); + readLockCounts + = new MutableCounterLong(LockMetricInfo.ReadLockCount, 0); + writeLockWaitTimeTotal + = new MutableCounterLong(LockMetricInfo.WriteLockWaitTimeTotal, 0); + writeLockWaitTimeMax + = new MutableCounterLong(LockMetricInfo.WriteLockWaitTimeMax, 0); + writeLockCounts + = new MutableCounterLong(LockMetricInfo.WriteLockCount, 0); + + synchronized (allInstances) { + allInstances.add(this); + } + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + collector.addRecord(this.lockLabel) + .setContext("Locking") + .addCounter(LockMetricInfo.ReadLockWaitTimeTotal, + readLockWaitTimeTotal.value()) + .addCounter(LockMetricInfo.ReadLockWaitTimeMax, + readLockWaitTimeMax.value()) + .addCounter(LockMetricInfo.ReadLockCount, + readLockCounts.value()) + .addCounter(LockMetricInfo.WriteLockWaitTimeTotal, + writeLockWaitTimeTotal.value()) + .addCounter(LockMetricInfo.WriteLockWaitTimeMax, + writeLockWaitTimeMax.value()) + .addCounter(LockMetricInfo.WriteLockCount, + writeLockCounts.value()); + } + + @Override + public String toString() { + long avgRead = 0L; + long avgWrite = 0L; + long totalMillis = 0L; + + if (0 < readLockCounts.value()) { + avgRead = readLockWaitTimeTotal.value() / readLockCounts.value(); + } + + if (0 < writeLockCounts.value()) { + avgWrite = writeLockWaitTimeTotal.value() / writeLockCounts.value(); + } + + totalMillis = (readLockWaitTimeTotal.value() / 1000000L) + + (writeLockWaitTimeTotal.value() / 1000000L); + + StringBuffer sb = new StringBuffer(); + sb.append("{ \"type\" : \"R/W Lock Stats\", \"label\" : \""); + sb.append(lockLabel); + sb.append("\", \"totalLockWaitTimeMillis\" : "); + sb.append(totalMillis); + sb.append(", \"readLock\" : { \"count\" : "); + sb.append(readLockCounts.value()); + sb.append(", \"avgWaitTimeNanos\" : "); + sb.append(avgRead); + sb.append(", \"maxWaitTimeNanos\" : "); + sb.append(readLockWaitTimeMax.value()); + sb.append(" }, \"writeLock\" : { \"count\" : "); + sb.append(writeLockCounts.value()); + sb.append(", \"avgWaitTimeNanos\" : "); + sb.append(avgWrite); + sb.append(", \"maxWaitTimeNanos\" : "); + sb.append(writeLockWaitTimeMax.value()); + sb.append(" } }"); + + return sb.toString(); + } + } + + /** + * Inner helper class to wrap the original lock with a monitored one. + * This inner class is delegating all actual locking operations to the wrapped + * lock, while itself is only responsible to measure the time that it took to + * acquire a specific lock. + */ + private static class LockWrapper implements Lock { + /// the lock to delegate the work to + private final Lock wrappedLock; + /// total lock wait time in nanos + private final MutableCounterLong lockWaitTotal; + /// highest lock wait time (max) + private final MutableCounterLong lockWaitMax; + /// number of lock counts + private final MutableCounterLong lockWaitCount; + + /** + * Creates a new wrapper around an existing lock. + * + * @param original The original lock to wrap by this monitoring lock + * @param total The (atomic) counter to increment for total lock wait time + * @param max The (atomic) counter to adjust to the maximum wait time + * @param cnt The (atomic) counter to increment with each lock call + */ + LockWrapper(Lock original, MutableCounterLong total, + MutableCounterLong max, MutableCounterLong cnt) { + wrappedLock = original; + this.lockWaitTotal = total; + this.lockWaitMax = max; + this.lockWaitCount = cnt; + } + + @Override + public void lock() { + long start = System.nanoTime(); + wrappedLock.lock(); + incrementBy(System.nanoTime() - start); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + long start = System.nanoTime(); + wrappedLock.lockInterruptibly(); + incrementBy(System.nanoTime() - start); + } + + @Override + public boolean tryLock() { + return wrappedLock.tryLock(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) + throws InterruptedException { + long start = System.nanoTime(); + boolean ret = wrappedLock.tryLock(time, unit); + incrementBy(System.nanoTime() - start); + return ret; + } + + @Override + public void unlock() { + wrappedLock.unlock(); + } + + @Override + public Condition newCondition() { + return wrappedLock.newCondition(); + } + + /** + * Helper to increment the monitoring counters. + * Called from the lock implementations to increment the total/max/coun + * values of the monitoring counters. + * + * @param waitTime The actual wait time (in nanos) for the lock operation + */ + private void incrementBy(long waitTime) { + this.lockWaitTotal.incr(waitTime); + this.lockWaitCount.incr(); + + if (waitTime > this.lockWaitMax.value()) { + this.lockWaitMax.incr(waitTime - this.lockWaitMax.value()); + } + } + } + + /** + * Creates a new monitoring wrapper around a R/W lock. + * The so created wrapper instance can be used instead of the original R/W + * lock, which then automatically updates the monitoring values in the + * MetricsSource. This allows easy "slide in" of lock monitoring where + * originally only a standard R/W lock was used. + * + * @param lock The original R/W lock to wrap for monitoring + * @param metrics The target for lock monitoring + */ + private ReadWriteLockMetrics(ReadWriteLock lock, MetricsSource metrics) { + Preconditions.checkNotNull(lock); + Preconditions.checkArgument(metrics instanceof LockMetricSource, + "Invalid MetricsSource"); + + LockMetricSource lms = (LockMetricSource)metrics; + readLock = new LockWrapper(lock.readLock(), lms.readLockWaitTimeTotal, + lms.readLockWaitTimeMax, lms.readLockCounts); + writeLock = new LockWrapper(lock.writeLock(), lms.writeLockWaitTimeTotal, + lms.writeLockWaitTimeMax, lms.writeLockCounts); + } + + @Override + public Lock readLock() { + return readLock; + } + + @Override + public Lock writeLock() { + return writeLock; + } +} diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/metrics/TestReadWriteLockMetrics.java b/llap-common/src/test/org/apache/hadoop/hive/llap/metrics/TestReadWriteLockMetrics.java new file mode 100644 index 0000000000..9784376643 --- /dev/null +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/metrics/TestReadWriteLockMetrics.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License a + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.metrics; + +import static java.lang.Math.max; +import static java.lang.System.nanoTime; +import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.ReadLockCount; +import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.ReadLockWaitTimeMax; +import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.ReadLockWaitTimeTotal; +import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.WriteLockCount; +import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.WriteLockWaitTimeMax; +import static org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics.LockMetricInfo.WriteLockWaitTimeTotal; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsTag; +import org.junit.Test; + +/** + * JUnit test suite for the ReadWriteLockMetrics class. + * The test uses a background thread and has some hard coded thread execution + * times. It should normally not take more than 2 threads and 400ms execution + * time. + */ +public class TestReadWriteLockMetrics { + /** + * Thread which performs locks in loop, holding the lock for 5ms. + */ + private static class LockHolder extends Thread { + public static final long LOCK_HOLD_TIME = 5; ///< lock hold time in ms + + private final Lock targetLock; ///< the lock to hold + private long lockCount; ///< loop coun + private long lockWaitSum; ///< total lock wait time + private long lockWaitMax; ///< highest lock wait time + private long endTime; ///< runtime for the thread + + /** + * Create a new lock holding thread. + * The so created thread start immediately. + * + * @param l The lock to lock/unlock in loop + * @param ttl The expected thread run time in ms + */ + public LockHolder(Lock l, long ttl) { + targetLock = l; + + lockCount = 0; + lockWaitSum = 0; + lockWaitMax = 0; + endTime = ttl; + + setName(getClass().getSimpleName()); + setDaemon(true); + start(); + } + + /** + * Returns the number of counted locks. + * @return The total lock loop execution coun + */ + public long getLockCount() { + return lockCount; + } + + /** + * Returns the accumulated nano seconds for locks. + * @return The aggregated time, the thread was waiting on locks (in nanos) + */ + public long getLockSum() { + return lockWaitSum; + } + + /** + * Returns the highest lock time in nano seconds. + * @return The highest (single) lock wait time (in nanos) + */ + public long getLockMax() { + return lockWaitMax; + } + + @Override + public void run() { + endTime = nanoTime() + toNano(endTime); // ttl was in ms + + // loop for specified amount of time + while (nanoTime() <= endTime && !isInterrupted()) { + try { + long start = nanoTime(); + targetLock.lock(); + ++lockCount; + long diff = nanoTime() - start; + lockWaitSum += diff; + lockWaitMax = max(diff, lockWaitMax); + + while (nanoTime() <= (start + toNano(LOCK_HOLD_TIME))) { + // spin for LOCK_HOLD_TIME ms (under lock) + } + } finally { + targetLock.unlock(); + } + } + } + } + + /** + * Mock metrics collector for this test only. + * This MetricsCollector implementation is used to get the actual + * MetricsSource data, collected by the + * ReadWriteLockMetrics. + */ + private static class MockMetricsCollector implements MetricsCollector { + private ArrayList records = new ArrayList<>(); + + /** + * Single metrics record mock implementation. + */ + public static class MockRecord { + private final String recordLabel; ///< record tag/label + private final HashMap metrics; ///< metrics within record + private String context; ///< collector context ID + + /** + * @param label metrics record label. + */ + public MockRecord(String label) { + recordLabel = label; + metrics = new HashMap<>(); + } + + /** + * @return The record's tag/label. + */ + public String getLabel() { + return recordLabel; + } + + /** + * @return The context of the collector. + */ + public String getContext() { + return context; + } + + /** + * @return Map of identifier/metric value pairs. + */ + public Map getMetrics() { + return metrics; + } + } + + /** + * Record builder mock implementation. + */ + private class MockMetricsRecordBuilder extends MetricsRecordBuilder { + private MockRecord target = null; ///< the record that is populated + + /** + * Used by outer class to provide a new MetricsRecordBuilder + * for a single metrics record. + * + * @param t The record to build. + */ + public MockMetricsRecordBuilder(MockRecord t) { + target = t; + } + + @Override + public MetricsRecordBuilder add(MetricsTag arg0) { + throw new AssertionError("Not implemented for test"); + } + + @Override + public MetricsRecordBuilder add(AbstractMetric arg0) { + throw new AssertionError("Not implemented for test"); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo arg0, int arg1) { + target.getMetrics().put(arg0, arg1); + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo arg0, long arg1) { + target.getMetrics().put(arg0, arg1); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo arg0, int arg1) { + throw new AssertionError("Not implemented for test"); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo arg0, long arg1) { + throw new AssertionError("Not implemented for test"); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo arg0, float arg1) { + throw new AssertionError("Not implemented for test"); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo arg0, double arg1) { + throw new AssertionError("Not implemented for test"); + } + + @Override + public MetricsCollector parent() { + return MockMetricsCollector.this; + } + + @Override + public MetricsRecordBuilder setContext(String arg0) { + target.context = arg0; + return this; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo arg0, String arg1) { + throw new AssertionError("Not implemented for test"); + } + } + + @Override + public MetricsRecordBuilder addRecord(String arg0) { + MockRecord tr = new MockRecord(arg0); + records.add(tr); + return new MockMetricsRecordBuilder(tr); + } + + @Override + public MetricsRecordBuilder addRecord(MetricsInfo arg0) { + MockRecord tr = new MockRecord(arg0.name()); + records.add(tr); + return new MockMetricsRecordBuilder(tr); + } + + /** + * @return A list of all built metrics records. + */ + public List getRecords() { + return records; + } + } + + /** + * Helper to verify the actual value by comparing it with a +/- tolerance of + * 10% with the expected value. + * + * @param txt Assertion message + * @param expected The expected value (tolerance will be applied) + * @param actual Actual test outcome + */ + private void assertWithTolerance(String txt, long expected, long actual) { + long lowExpected = expected - (expected / 10L); + long highExpected = expected + (expected / 10L); + + StringBuffer msg = new StringBuffer(txt); + msg.append(" (expected "); + msg.append(lowExpected); + msg.append(" <= x <= "); + msg.append(highExpected); + msg.append(" but actual = "); + msg.append(actual); + msg.append(")"); + + assertTrue(msg.toString(), actual >= lowExpected && actual <= highExpected); + } + + /** + * Helper to convert milliseconds to nanoseconds. + * + * @param ms Millisecond inpu + * @return Value in nanoseconds + */ + private static long toNano(long ms) { + return ms * 1000000; + } + + /** + * Helper to produce ReadWriteLockMetrics instances. + * The wrapping of lock instances is configuration dependent. This helper ensures that the + * configuration creates wrapped lock instances. + * + * @param lock The lock to wrap + * @param ms The metrics source, storing the lock measurements + * @return The wrapped lock + */ + private ReadWriteLockMetrics create(ReadWriteLock lock, MetricsSource ms) { + Configuration dummyConf = new Configuration(); + + HiveConf.setBoolVar(dummyConf, + HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS, true); + return (ReadWriteLockMetrics)ReadWriteLockMetrics.wrap(dummyConf, lock, ms); + } + + /** + * Runs a simple test where a thread is running in a loop, getting read locks w/o having to + * deal with any contention. The test shows that the locks are received rather quick and tha + * all metrics for write locks remain zero. + */ + @Test + public void testWithoutContention() throws Exception { + final long execTime = 100; + + MetricsSource ms = ReadWriteLockMetrics.createLockMetricsSource("test1"); + ReadWriteLock rwl = create(new ReentrantReadWriteLock(), ms); + LockHolder lhR = new LockHolder(rwl.readLock(), execTime); + + // wait for the thread to do its locks and waits (for 100ms) + lhR.join(); + + // get the reported metrics + MockMetricsCollector tmc = new MockMetricsCollector(); + ms.getMetrics(tmc, true); + + List result = tmc.getRecords(); + assertEquals("Unexpected amount of metrics", 1, result.size()); + MockMetricsCollector.MockRecord rec = result.get(0); + + // verify label and context (context is hard coded) + assertEquals("Invalid record label", "test1", rec.getLabel()); + assertEquals("Invalid record context", "Locking", rec.getContext()); + + // we expect around exectome / thread loop time executions + assertWithTolerance("Unexpected count of lock executions (reader)", + execTime / LockHolder.LOCK_HOLD_TIME, lhR.getLockCount()); + assertEquals("Counting the locks failed", + lhR.getLockCount(), rec.getMetrics().get(ReadLockCount)); + + // sanity check in read lock metrics + assertNotEquals("Local thread should have lock time", lhR.getLockSum(), 0); + assertNotEquals("Accounted lock time zero", + rec.getMetrics().get(ReadLockWaitTimeTotal), 0); + assertTrue("Local measurement larger (overhead)", + rec.getMetrics().get(ReadLockWaitTimeTotal).longValue() + < lhR.getLockSum()); + + assertNotEquals("Local thread should have max lock time", + lhR.getLockMax(), 0); + assertNotEquals("Accounted lock max time zero", + rec.getMetrics().get(ReadLockWaitTimeMax), 0); + + assertTrue("Local max larger (overhead)", + rec.getMetrics().get(ReadLockWaitTimeMax).longValue() + < lhR.getLockMax()); + + assertTrue("Max greater or equal to avergae lock time", + (rec.getMetrics().get(ReadLockWaitTimeTotal).longValue() + / rec.getMetrics().get(ReadLockCount).longValue()) + <= rec.getMetrics().get(ReadLockWaitTimeMax).longValue()); + + assertTrue("Lock time less than 1% (no contention)", + rec.getMetrics().get(ReadLockWaitTimeTotal).longValue() + < toNano(execTime / 100L)); + + // sanity check on write lock metrics (should be all zero) + assertEquals("No writer lock activity expected (total)", + rec.getMetrics().get(WriteLockWaitTimeTotal), 0L); + assertEquals("No writer lock activity expected (max)", + rec.getMetrics().get(WriteLockWaitTimeMax), 0L); + assertEquals("No writer lock activity expected (count)", + rec.getMetrics().get(WriteLockCount), 0L); + } + + /** + * Test where read/write lock contention is tested. + * This test has a background thread that tries to get read locks within a + * loop while the main thread holds a write lock for half of the tex + * execution time. The test verifies that the reported metric for read lock + * wait time reflects that the thread was blocked until the write lock was + * released. It also performs basic sanity checks on the read and write lock + * metrics. + */ + @Test + public void testWithContention() throws Exception { + final long execTime = 200; + + MetricsSource ms = ReadWriteLockMetrics.createLockMetricsSource("test1"); + ReadWriteLock rwl = create(new ReentrantReadWriteLock(), ms); + LockHolder lhR = new LockHolder(rwl.readLock(), execTime); + + // get a write lock for half of the execution time + try { + long endOfLock = nanoTime() + toNano(execTime / 2); + rwl.writeLock().lock(); + + while (nanoTime() < endOfLock) { + // spin until end time is reached + } + } finally { + rwl.writeLock().unlock(); + } + + // wait for the thread to do its locks and waits (for 100ms) + lhR.join(); + + MockMetricsCollector tmc = new MockMetricsCollector(); + ms.getMetrics(tmc, true); + + List result = tmc.getRecords(); + assertEquals("Unexpected amount of metrics", 1, result.size()); + MockMetricsCollector.MockRecord rec = result.get(0); + + // sanity checks for read lock values + assertEquals("Verifying the loop count (read lock)", + lhR.getLockCount(), + rec.getMetrics().get(ReadLockCount).longValue()); + + assertWithTolerance("Only half of possible read locks expected", + (execTime / LockHolder.LOCK_HOLD_TIME) / 2, + rec.getMetrics().get(ReadLockCount).longValue()); + + assertWithTolerance("Max read lock wait time close to write lock hold", + toNano(execTime / 2), + rec.getMetrics().get(ReadLockWaitTimeMax).longValue()); + + assertTrue("Total read lock wait time larger than max", + rec.getMetrics().get(ReadLockWaitTimeMax).longValue() + < rec.getMetrics().get(ReadLockWaitTimeTotal).longValue()); + + // sanity check for write locks + assertEquals("Write lock count supposed to be one", + 1, rec.getMetrics().get(WriteLockCount).longValue()); + + assertTrue("Write lock wait time non zero", + 0L < rec.getMetrics().get(WriteLockWaitTimeTotal).longValue()); + assertEquals("With one lock, total should me max", + rec.getMetrics().get(WriteLockWaitTimeTotal), + rec.getMetrics().get(WriteLockWaitTimeMax)); + } + + /** + * Testing the wrap function for different configuration + * combinations. + */ + @Test + public void testWrap() throws Exception { + Configuration testConf = new Configuration(); + MetricsSource ms = ReadWriteLockMetrics.createLockMetricsSource("testConf"); + + // default = passthrough + ReadWriteLock rwlDef = + ReadWriteLockMetrics.wrap(testConf, new ReentrantReadWriteLock(), ms); + assertTrue("Basic ReentrantReadWriteLock expected", + rwlDef instanceof ReentrantReadWriteLock); + assertFalse("Basic ReentrantReadWriteLock expected", + rwlDef instanceof ReadWriteLockMetrics); + + // false = pass through + HiveConf.setBoolVar(testConf, + HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS, false); + ReadWriteLock rwlBasic = + ReadWriteLockMetrics.wrap(testConf, new ReentrantReadWriteLock(), ms); + assertTrue("Basic ReentrantReadWriteLock expected", + rwlBasic instanceof ReentrantReadWriteLock); + assertFalse("Basic ReentrantReadWriteLock expected", + rwlBasic instanceof ReadWriteLockMetrics); + + // true = wrap + HiveConf.setBoolVar(testConf, + HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS, true); + ReadWriteLock rwlWrap = + ReadWriteLockMetrics.wrap(testConf, new ReentrantReadWriteLock(), ms); + assertTrue("Wrapped lock expected", + rwlWrap instanceof ReadWriteLockMetrics); + + // null = passthrough + ReadWriteLock rwlNoConf = + ReadWriteLockMetrics.wrap(null, new ReentrantReadWriteLock(), null); + assertTrue("Basic ReentrantReadWriteLock expected", + rwlNoConf instanceof ReentrantReadWriteLock); + assertFalse("Basic ReentrantReadWriteLock expected", + rwlNoConf instanceof ReadWriteLockMetrics); + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java index c41b34ab83..2a39d2d328 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java @@ -15,8 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.llap.cache; +import com.google.common.base.Function; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +32,8 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.io.Allocator; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; @@ -37,14 +42,17 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hive.common.util.Ref; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.ColumnEncoding; -import com.google.common.base.Function; - -public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDump { +public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDump, Configurable { private static final int DEFAULT_CLEANUP_INTERVAL = 600; + private Configuration conf; private final Allocator allocator; private final AtomicInteger newEvictions = new AtomicInteger(0); private Thread cleanupThread = null; @@ -53,6 +61,18 @@ private final long cleanupInterval; private final LlapDaemonCacheMetrics metrics; + /// Shared singleton MetricsSource instance for all FileData locks + private static final MetricsSource LOCK_METRICS; + + static { + // create and register the MetricsSource for lock metrics + MetricsSystem ms = LlapMetricsSystem.instance(); + ms.register("FileDataLockMetrics", + "Lock metrics for R/W locks around FileData instances", + LOCK_METRICS = + ReadWriteLockMetrics.createLockMetricsSource("FileData")); + } + public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer { public boolean isCached = false; private String tag; @@ -90,14 +110,18 @@ public int compare(StripeData o1, StripeData o2) { * TODO: make more granular? We only care that each one reader sees consistent boundaries. * So, we could shallow-copy the stripes list, then have individual locks inside each. */ - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final ReadWriteLock rwLock; private final Object fileKey; private final int colCount; private ArrayList stripes; - public FileData(Object fileKey, int colCount) { + public FileData(Configuration conf, Object fileKey, int colCount) { this.fileKey = fileKey; this.colCount = colCount; + + rwLock = ReadWriteLockMetrics.wrap(conf, + new ReentrantReadWriteLock(), + LOCK_METRICS); } public void toString(StringBuilder sb) { @@ -298,7 +322,7 @@ public FileData getFileData(Object fileKey, long start, long end, boolean[] incl throw new IOException("Includes " + DebugUtils.toString(includes) + " for " + cached.colCount + " columns"); } - FileData result = new FileData(cached.fileKey, cached.colCount); + FileData result = new FileData(conf, cached.fileKey, cached.colCount); if (gotAllData != null) { gotAllData.value = true; } @@ -726,6 +750,16 @@ public Allocator getAllocator() { return allocator; } + @Override + public void setConf(Configuration newConf) { + this.conf = newConf; + } + + @Override + public Configuration getConf() { + return conf; + } + @Override public void debugDumpShort(StringBuilder sb) { sb.append("\nSerDe cache state "); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 932d0adeba..0d5713a5f9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -24,10 +24,11 @@ import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker; import org.apache.hadoop.hive.llap.log.LogHelpers; +import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; +import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.StringUtils; import org.apache.logging.slf4j.Log4jMarker; import org.apache.tez.common.CallableWithNdc; @@ -41,6 +42,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.slf4j.Logger; @@ -66,12 +69,26 @@ public class QueryTracker extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class); - private static final Marker QUERY_COMPLETE_MARKER = new Log4jMarker(new Log4jQueryCompleteMarker()); + private static final Marker QUERY_COMPLETE_MARKER = + new Log4jMarker(new Log4jQueryCompleteMarker()); + + /// Shared singleton MetricsSource instance for all DAG locks + private static final MetricsSource LOCK_METRICS; + + static { + // create and register the MetricsSource for lock metrics + MetricsSystem ms = LlapMetricsSystem.instance(); + LOCK_METRICS = ReadWriteLockMetrics.createLockMetricsSource("QueryTracker"); + + ms.register("QueryTrackerDAGLockMetrics", + "Lock metrics for QueryTracher DAG instances", LOCK_METRICS); + } private final ScheduledExecutorService executorService; private final ConcurrentHashMap queryInfoMap = new ConcurrentHashMap<>(); + private final Configuration conf; private final String[] localDirsBase; private final FileSystem localFs; private final String clusterId; @@ -91,7 +108,7 @@ private final Lock lock = new ReentrantLock(); - private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); // Tracks various maps for dagCompletions. This is setup here since stateChange messages // may be processed by a thread which ends up executing before a task. @@ -109,6 +126,8 @@ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId super("QueryTracker"); this.localDirsBase = localDirsBase; this.clusterId = clusterId; + this.conf = conf; + try { localFs = FileSystem.getLocal(conf); } catch (IOException e) { @@ -380,16 +399,18 @@ void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceNam } } - private ReentrantReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) { + private ReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) { return dagSpecificLocks.get(queryIdentifier); } - private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { + private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { lock.lock(); try { - ReentrantReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier); + ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier); if (dagLock == null) { - dagLock = new ReentrantReadWriteLock(); + dagLock = ReadWriteLockMetrics.wrap(conf, + new ReentrantReadWriteLock(), + LOCK_METRICS); dagSpecificLocks.put(queryIdentifier, dagLock); } return dagLock; @@ -477,7 +498,7 @@ public ExternalQueryCleanerCallable(String queryIdString, String dagIdString, @Override protected Void callInternal() { LOG.info("External cleanup callable for {}", queryIdentifier); - ReentrantReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier); + ReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier); if (dagLock == null) { if (LOG.isTraceEnabled()) { LOG.trace("null dagLock. No cleanup required at the moment for {}", queryIdString); @@ -528,7 +549,7 @@ public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOExcept private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) { if (queryInfo.isExternalQuery()) { - ReentrantReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier()); + ReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier()); if (dagLock == null) { LOG.warn("Ignoring fragment completion for unknown query: {}", queryInfo.getQueryIdentifier()); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapLockingServlet.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapLockingServlet.java new file mode 100644 index 0000000000..86d9552fa8 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapLockingServlet.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License a + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.services.impl; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import java.io.PrintWriter; +import java.util.List; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hive.http.HttpServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Servlet to produce the JSON output for the locking endpoint. + * The servlet produces and writes a JSON document, that lists all the locking statistics, + * available through the ReadWriteLockMetrics instrumentation. + */ +public class LlapLockingServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(LlapLockingServlet.class); + private static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods"; + private static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; + private static Configuration conf = null; + + /** + * Configuration setter, used to figure out the lock statistics collection setting. + * + * @param c The configuration to use + */ + public static void setConf(Configuration c) { + conf = c; + } + + @Override + public void init() throws ServletException { + LOG.info("LlapLockingServlet initialized"); + } + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) { + try { + if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), + request, response)) { + response.setStatus(HttpServletResponse.SC_FORBIDDEN); + } else { + String collString = "\"disabled\""; + boolean statsEnabled = false; + + // populate header + response.setContentType("application/json; charset=utf8"); + response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET"); + response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + response.setHeader("Cache-Control", + "no-transform,public,max-age=60,s-maxage=60"); + + if (null != conf && HiveConf.getBoolVar(conf, + HiveConf.ConfVars.LLAP_COLLECT_LOCK_METRICS)) { + collString = "\"enabled\""; + statsEnabled = true; + } + + StringBuffer result = new StringBuffer(); + List sourceList + = ReadWriteLockMetrics.getAllMetricsSources(); + if (null == sourceList) { + // should actually never happen + result.append("{\"error\":\"R/W statistics not found\"}"); + } else { + sourceList.sort(new ReadWriteLockMetrics.MetricsComparator()); + boolean first = true; + + result.append("{\"statsCollection\":"); + result.append(collString); + result.append(",\"lockStats\":["); + + // dump an object per lock label + if (statsEnabled) { + for (MetricsSource ms : sourceList) { + if (!first) { + result.append(","); + } + + first = false; + result.append(ms); + } + } + + result.append("]}"); + } + + // send string through JSON parser/builder to pretty print it. + JsonParser parser = new JsonParser(); + JsonObject json = parser.parse(result.toString()).getAsJsonObject(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + try (PrintWriter w = response.getWriter()) { + w.println(gson.toJson(json)); + } + } + } + catch (Exception e) { + LOG.error("Exception while processing locking stats request", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } +} \ No newline at end of file diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index b944fad45d..3c124f9b63 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.llap.daemon.services.impl; import java.io.IOException; @@ -22,7 +23,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; -import javax.management.MalformedObjectNameException; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl; import org.apache.hadoop.hive.registry.ServiceInstanceSet; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; @@ -58,8 +57,8 @@ static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods"; static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin"; - static final String REGISTRY_ATTRIBUTE="llap.registry"; - static final String PARENT_ATTRIBUTE="llap.parent"; + static final String REGISTRY_ATTRIBUTE = "llap.registry"; + static final String PARENT_ATTRIBUTE = "llap.parent"; private int port; private HttpServer http; @@ -100,12 +99,16 @@ public void serviceInit(Configuration conf) { builder.setContextAttribute(REGISTRY_ATTRIBUTE, registry); builder.setContextAttribute(PARENT_ATTRIBUTE, parent); + // make conf available to the locking stats servle + LlapLockingServlet.setConf(conf); + try { this.http = builder.build(); this.http.addServlet("status", "/status", LlapStatusServlet.class); this.http.addServlet("peers", "/peers", LlapPeerRegistryServlet.class); this.http.addServlet("iomem", "/iomem", LlapIoMemoryServlet.class); this.http.addServlet("system", "/system", SystemConfigurationServlet.class); + this.http.addServlet("locking", "/locking", LlapLockingServlet.class); } catch (IOException e) { LOG.warn("LLAP web service failed to come up", e); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 2fffeb876e..c63ee5f79b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -167,6 +167,7 @@ private LlapIoImpl(Configuration conf) throws IOException { SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl( cacheMetrics, cachePolicyWrapper, allocator); serdeCache = serdeCacheImpl; + serdeCacheImpl.setConf(conf); } boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index a5671e9682..462b25fa23 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -777,7 +777,7 @@ public void cacheFileData(StripeData sd) { throw new AssertionError("Caching data without an encoding at " + i + ": " + sd); } } - FileData fd = new FileData(fileKey, encodings.length); + FileData fd = new FileData(daemonConf, fileKey, encodings.length); fd.addStripe(sd); cache.putFileData(fd, Priority.NORMAL, counters, cacheTag); } else { diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 560cbaa9bd..cdf767f1db 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -17,6 +17,8 @@ import com.google.common.io.ByteArrayDataOutput; import org.apache.hadoop.io.Text; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; @@ -59,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -70,6 +73,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.hive.llap.metrics.ReadWriteLockMetrics; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; @@ -175,6 +179,19 @@ public void setError(Void v, Throwable t) { } } + /// Shared singleton MetricsSource instance for all FileData locks + private static final MetricsSource LOCK_METRICS; + + static { + // create and register the MetricsSource for lock metrics + MetricsSystem ms = LlapMetricsSystem.instance(); + LOCK_METRICS = + ReadWriteLockMetrics.createLockMetricsSource("TaskScheduler"); + + ms.register("LLAPTaskSchedulerLockMetrics", + "Lock metrics for R/W locks LLAP task scheduler", LOCK_METRICS); + } + // TODO: this is an ugly hack; see the same in LlapTaskCommunicator for discussion. // This only lives for the duration of the service init. static LlapTaskSchedulerService instance = null; @@ -225,9 +242,9 @@ public void setError(Void v, Throwable t) { @VisibleForTesting final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + private final ReadWriteLock lock; + private final Lock readLock; + private final Lock writeLock; private final Lock scheduleLock = new ReentrantLock(); private final Condition scheduleCondition = scheduleLock.newCondition(); @@ -321,6 +338,12 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock "Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), e); } + lock = ReadWriteLockMetrics.wrap(conf, + new ReentrantReadWriteLock(), + LOCK_METRICS); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + if (conf.getBoolean(LLAP_PLUGIN_ENDPOINT_ENABLED, false)) { JobTokenSecretManager sm = null; if (UserGroupInformation.isSecurityEnabled()) {