diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java index 5b12370..211ffc7 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java @@ -144,6 +144,11 @@ public void stop() { scheduledTask = null; } + @VisibleForTesting + public QuantileEstimator getEstimator() { + return estimator; + } + public synchronized void setEstimator(QuantileEstimator quantileEstimator) { this.estimator = quantileEstimator; } diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java index 29ab2ad..7d04db2 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskValidatorFactory.java @@ -62,7 +62,8 @@ private DiskValidatorFactory() { /** * Returns {@link DiskValidator} instance corresponding to its name. - * The diskValidator parameter can be "basic" for {@link BasicDiskValidator}. + * The diskValidator parameter can be "basic" for {@link BasicDiskValidator} + * or "read-write" for {@link ReadWriteDiskValidator}. * @param diskValidator canonical class name, for example, "basic" * @throws DiskErrorException if the class cannot be located */ @@ -74,6 +75,8 @@ public static DiskValidator getInstance(String diskValidator) if (diskValidator.equalsIgnoreCase(BasicDiskValidator.NAME)) { clazz = BasicDiskValidator.class; + } else if (diskValidator.equalsIgnoreCase(ReadWriteDiskValidator.NAME)) { + clazz = ReadWriteDiskValidator.class; } else { try { clazz = Class.forName(diskValidator); diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidator.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidator.java new file mode 100644 index 0000000..84475e5 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.util.DiskChecker.DiskErrorException; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +public class ReadWriteDiskValidator implements DiskValidator { + + public static final String NAME = "read-write"; + + @Override + public void checkStatus(File dir) throws DiskErrorException { + ReadWriteDiskValidatorMetrics metric = + ReadWriteDiskValidatorMetrics.getMetric(dir.toString()); + try { + // check the directory presence and permission. + DiskChecker.checkDir(dir); + + // create a tmp file under the dir + Path tmpFile = Files.createTempFile(dir.toPath(), "test", "tmp"); + + // write 16 bytes into the tmp file + byte[] inputBytes = new byte[16]; + new Random().nextBytes(inputBytes); + long startTime = System.nanoTime(); + Files.write(tmpFile, inputBytes); + long writeLatency = TimeUnit.MICROSECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + metric.addWriteFileLatency(writeLatency); + + // read back + startTime = System.nanoTime(); + byte[] outputBytes = Files.readAllBytes(tmpFile); + long readLatency = TimeUnit.MICROSECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + metric.addReadFileLatency(readLatency); + + // validation + if (!Arrays.equals(inputBytes, outputBytes)) { + metric.diskCheckFailed(); + throw new DiskErrorException("Data in file has bee corrupted."); + } + + // delete the file + Files.delete(tmpFile); + } catch (IOException e) { + metric.diskCheckFailed(); + throw new DiskErrorException("Disk Check failed!", e); + } + } +} diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidatorMetrics.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidatorMetrics.java new file mode 100644 index 0000000..7c07355 --- /dev/null +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReadWriteDiskValidatorMetrics.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.lib.*; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * The metrics for a directory generated by {@link ReadWriteDiskValidator} + */ +@InterfaceAudience.Private +public class ReadWriteDiskValidatorMetrics { + @Metric("# of disk failure") MutableCounterInt failureCount; + @Metric("Time of last failure") MutableGaugeLong lastFailureTime; + + final MetricsRegistry registry = new MetricsRegistry("ReadWriteDiskValidatorMetrics"); + + static final MetricsInfo RECORD_INFO = info("ReadWriteDiskValidatorMetrics", + "Metrics for the DiskValidator"); + + private String dirName; + + private final int[] QUANTILE_INTERVALS = new int[] { + 60 * 60, // 1h + 24 * 60 * 60, //1 day + 10 * 24 * 60 * 60 //10 day + }; + final MutableQuantiles[] fileReadQuantiles; + final MutableQuantiles[] fileWriteQuantiles; + + public ReadWriteDiskValidatorMetrics(String dir) { + this.dirName = dir; + + fileReadQuantiles = new MutableQuantiles[QUANTILE_INTERVALS.length]; + for (int i = 0; i < fileReadQuantiles.length; i++) { + int interval = QUANTILE_INTERVALS[i]; + fileReadQuantiles[i] = registry.newQuantiles( + "readLatency" + interval + "s", + "File read latency", "Ops", "latencyMicros", interval); + } + + fileWriteQuantiles = new MutableQuantiles[QUANTILE_INTERVALS.length]; + for (int i = 0; i < fileWriteQuantiles.length; i++) { + int interval = QUANTILE_INTERVALS[i]; + fileWriteQuantiles[i] = registry.newQuantiles( + "writeLatency" + interval + "s", + "File write latency", "Ops", "latencyMicros", interval); + } + } + + /** + * Simple metrics cache to help prevent re-registrations and help to access + * metrics. + */ + protected final static Map dirMetrics + = new HashMap<>(); + + /** + * Get a metric by given directory name. + * + * @param dirName directory name + * @return the metric + */ + public synchronized static ReadWriteDiskValidatorMetrics getMetric( + String dirName) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + ReadWriteDiskValidatorMetrics metrics = dirMetrics.get(dirName); + if (metrics == null) { + metrics = new ReadWriteDiskValidatorMetrics(dirName); + + // Register with the MetricsSystems + if (ms != null) { + metrics = ms.register(sourceName(dirName), + "Metrics for directory: " + dirName, metrics); + } + dirMetrics.put(dirName, metrics); + } + + return metrics; + } + + /** + * Add the file write latency to {@link MutableQuantiles} metrics. + * + * @param writeLatency file write latency in microseconds + */ + public void addWriteFileLatency(long writeLatency) { + if (fileWriteQuantiles != null) { + for (MutableQuantiles q : fileWriteQuantiles) { + q.add(writeLatency); + } + } + } + + /** + * Add the file read latency to {@link MutableQuantiles} metrics. + * + * @param readLatency file read latency in microseconds + */ + public void addReadFileLatency(long readLatency) { + if (fileReadQuantiles!= null) { + for (MutableQuantiles q : fileReadQuantiles) { + q.add(readLatency); + } + } + } + + /** + * Get a source name by given directory name. + * + * @param dirName directory name + * @return the source name + */ + protected static String sourceName(String dirName) { + StringBuilder sb = new StringBuilder(RECORD_INFO.name()); + sb.append(",dir=").append(dirName); + return sb.toString(); + } + + /** + * Increase the failure count and update the last failure timestamp. + */ + public void diskCheckFailed() { + failureCount.incr(); + lastFailureTime.set(System.nanoTime()); + } +} diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java index 3c0999e..707e868 100644 --- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java @@ -49,6 +49,14 @@ public static void assertMetric(MetricsRecord record, assertEquals(expectedValue, resourceLimitMetric.value()); } + public static void assertMetricNotNull(MetricsRecord record, + String metricName) { + AbstractMetric resourceLimitMetric = getFirstMetricByName( + record, metricName); + assertNotNull("Metric " + metricName + " doesn't exist", + resourceLimitMetric); + } + private static MetricsTag getFirstTagByName(MetricsRecord record, String name) { return Iterables.getFirst(Iterables.filter(record.tags(), new MetricsTagPredicate(name)), null); diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestReadWriteDiskValidator.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestReadWriteDiskValidator.java new file mode 100644 index 0000000..d6baae9 --- /dev/null +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestReadWriteDiskValidator.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.metrics2.impl.MetricsRecords; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.junit.Before; +import org.junit.Test; +import org.junit.Assert; + +import java.io.File; + +/** + * The class to test {@link ReadWriteDiskValidator} and + * {@link ReadWriteDiskValidatorMetrics}. + */ +public class TestReadWriteDiskValidator { + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = DefaultMetricsSystem.instance(); + } + + @Test + public void testReadWriteDiskValidator() + throws DiskErrorException, InterruptedException { + File testDir = new File(System.getProperty("test.build.data")); + ReadWriteDiskValidator readWriteDiskValidator = + (ReadWriteDiskValidator) DiskValidatorFactory.getInstance( + ReadWriteDiskValidator.NAME); + + for (int i = 0; i < 100; i++) { + readWriteDiskValidator.checkStatus(testDir); + } + + ReadWriteDiskValidatorMetrics metric = + ReadWriteDiskValidatorMetrics.getMetric(testDir.toString()); + Assert.assertEquals("The count number of estimator in MutableQuantiles" + + "metrics of file read is not right", + metric.fileReadQuantiles[0].getEstimator().getCount(), 100); + + Assert.assertEquals("The count number of estimator in MutableQuantiles" + + "metrics of file write is not right", + metric.fileWriteQuantiles[0].getEstimator().getCount(), 100); + + MetricsSource source = ms.getSource( + ReadWriteDiskValidatorMetrics.sourceName(testDir.toString())); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + source.getMetrics(collector, true); + + MetricsRecords.assertMetric(collector.getRecords().get(0), + "FailureCount", 0); + MetricsRecords.assertMetric(collector.getRecords().get(0), + "LastFailureTime", (long)0); + + // All MutableQuantiles haven't rolled over yet because the minimum + // interval is 1 hours, so we just test if these metrics exist. + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency3600sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency3600s50thPercentileLatencyMicros"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency86400sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "WriteLatency864000sNumOps"); + + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency3600sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency3600s50thPercentileLatencyMicros"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency86400sNumOps"); + MetricsRecords.assertMetricNotNull(collector.getRecords().get(0), + "ReadLatency864000sNumOps"); + } +}