diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServerPerformance.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServerPerformance.java index e69de29..3316872 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServerPerformance.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServerPerformance.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; + +public class TimelineServerPerformance extends Configured implements Tool { + + static int mapperType = 1; + protected static int printUsage() { + System.err.println( + "Usage: [-m ] number of Mappers\n" + + " [-s <(KBs)test>] number of KB per put\n" + + " [-t] package sending iterations per mapper"+ + " [-mtype ] \n" + + " 1. simple write Mapper\n" + + " [-conf \n"); + GenericOptionsParser.printGenericCommandUsage(System.err); + return -1; + } + + /** + * Configure a job given argv. + */ + public static boolean parseArgs(String[] args, Job job) throws IOException { + + if (args.length < 1) { + return 0 == printUsage(); + } + List other_args = new ArrayList(); + for(int i=0; i < args.length; ++i) { + if (args.length == i + 1) { + System.out.println("ERROR: Required parameter missing from " + + args[i]); + return 0 == printUsage(); + } + try { + if ("-m".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + job.getConfiguration() + .setInt(MRJobConfig.NUM_MAPS, (Integer.parseInt(args[i]))); + } else { + job.getConfiguration() + .setInt(MRJobConfig.NUM_MAPS, 1); + } + } else if ("-s".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + job.getConfiguration() + .setInt("kbs sent", (Integer.parseInt(args[i]))); + } else { + job.getConfiguration() + .setInt("kbs sent", 1); + } + } else if ("-t".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + job.getConfiguration() + .setInt("testtimes", (Integer.parseInt(args[i]))); + } else { + job.getConfiguration() + .setInt("testtimes", 100); + } + } else if ("-mtype".equals(args[i])) { + mapperType = Integer.parseInt(args[++i]); + switch(mapperType) { + default: job.setMapperClass( + PerfTestMapper.class); + } + } else if ("-conf".equals(args[i])) { + job.getConfiguration().addResource(new Path(args[++i])); + } else { + System.out.println("Unexpected argument: " + args[i]); + return 0 == printUsage(); + + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return 0 == printUsage(); + } catch (Exception e) { + throw (IOException)new IOException().initCause(e); + } + } + + return true; + } + + /** + * TimelineServer Performance counters + */ + static enum PerfCounters { + TIMELINESERVER_WRITE_TIME, + TIMELINESERVER_WRITE_COUNTER, + TIMELINESERVER_WRITE_FAILURES, + TIMELINESERVER_WRITE_KBS, + } + + public int run(String[] args) throws Exception { + + Job job = Job.getInstance(getConf()); + job.setJarByClass(TimelineServerPerformance.class); + job.setMapperClass(PerfTestMapper.class); + job.setInputFormatClass(SleepInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + if (!parseArgs(args, job)) { + return -1; + } + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + org.apache.hadoop.mapreduce.Counters counters= job.getCounters(); + long writetime = counters.findCounter(PerfCounters.TIMELINESERVER_WRITE_TIME).getValue(); + long writecounts = counters.findCounter(PerfCounters.TIMELINESERVER_WRITE_COUNTER).getValue(); + long writesize = counters.findCounter(PerfCounters.TIMELINESERVER_WRITE_KBS).getValue(); + double transacrate = writecounts * 1000 / (double)writetime; + long testsize = Integer.parseInt(job.getConfiguration().get("kbs sent")); + double iorate = testsize * transacrate; + + System.out.println("TRANSACTION RATE: " + transacrate + " ops/s"); + System.out.println("IO RATE: " + iorate + " KB/s"); + + return ret; + } + + public static void main(String[] args) throws Exception { + int res = + ToolRunner.run(new Configuration(), new TimelineServerPerformance(), args); + System.exit(res); + } + + public static class PerfTestMapper + extends org.apache.hadoop.mapreduce.Mapper { + public void map(IntWritable key, IntWritable val, Context context) throws IOException { + /** + * Create timeline server entity. To ensure that the compression really gets + * exercised, replaced the logic of sending just 'a' characters, with a different + * assortment. + */ + + final char[] alphaNums = new char[] { 'a', 'b', 'c', 'd', 'e', 'f', + 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', + 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', + 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', + 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; + + Configuration conf = context.getConfiguration(); + final TimelineClient tlc = new TimelineClientImpl(); + tlc.init(context.getConfiguration()); + + final TimelineEntity entity = new TimelineEntity(); + final int kbs = Integer.parseInt(conf.get("kbs sent")); + + long totalTime = 0; + final int testtimes = Integer.parseInt(conf.get("testtimes")); + final Random rand = new Random(); + final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); + final char[] payLoad = new char[kbs * 1024]; + + for (int i = 0; i < testtimes; i++) { + // Generate a fixed length random payload + for (int xx = 0; xx < kbs * 1024; xx++) { + int alphaNumIdx = rand.nextInt(alphaNums.length); + payLoad[xx] = alphaNums[alphaNumIdx]; + } + String entId = taskAttemptId + "_" + Integer.toString(i); + entity.setEntityId(entId); + entity.setEntityType("TEZ_DAG_ID"); + entity.addOtherInfo("PERF_TEST", payLoad); + System.runFinalization(); + TimelineEvent event = new TimelineEvent(); + event.setEventType("TestAppEvent"); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + + long startWrite = System.currentTimeMillis(); + try { + tlc.putEntities(entity); + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINESERVER_WRITE_FAILURES).increment(1); + e.printStackTrace(); + } + long endWrite = System.currentTimeMillis(); + totalTime += (endWrite-startWrite); + } + context.getCounter(PerfCounters.TIMELINESERVER_WRITE_TIME).increment(totalTime); + context.getCounter(PerfCounters.TIMELINESERVER_WRITE_COUNTER).increment(testtimes); + context.getCounter(PerfCounters.TIMELINESERVER_WRITE_KBS).increment(kbs*testtimes); + } + } +} diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java index f2cd53c..55f32f3 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java @@ -28,12 +28,12 @@ import org.apache.hadoop.mapred.TestSequenceFileInputFormat; import org.apache.hadoop.mapred.TestTextInputFormat; import org.apache.hadoop.mapred.ThreadedMapBenchmark; +import org.apache.hadoop.mapred.TimelineServerPerformance; import org.apache.hadoop.mapreduce.FailJob; import org.apache.hadoop.mapreduce.LargeSorter; import org.apache.hadoop.mapreduce.MiniHadoopClusterManager; import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ProgramDriver; - import org.apache.hadoop.hdfs.NNBench; import org.apache.hadoop.fs.TestFileSystem; import org.apache.hadoop.fs.TestDFSIO; @@ -85,6 +85,8 @@ public MapredTestDriver(ProgramDriver pgd) { pgd.addClass("fail", FailJob.class, "a job that always fails"); pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task."); + pgd.addClass("testTimelinePerf", TimelineServerPerformance.class, + "A job that launch mappers to test timlineserver performance."); pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the namenode."); pgd.addClass("testfilesystem", TestFileSystem.class,