diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml index 2ac274d..bbb7b0a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml @@ -102,6 +102,10 @@ hadoop-yarn-api + org.apache.hadoop + hadoop-mapreduce-client-core + + com.google.guava guava diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TimelineServerPerf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TimelineServerPerf.java new file mode 100644 index 0000000..f17051a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TimelineServerPerf.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Date; + +public class TimelineServerPerf extends Configured implements Tool { + + static int mapperType = 1; + protected static int printUsage() { + System.err.println( + "Usage: [-m ] number of Mappers\n" + + " [-mtype ] \n" + + " 1. simple write Mapper\n" + + " [-conf \n"); + GenericOptionsParser.printGenericCommandUsage(System.err); + return -1; + } + + + /** + * Configure a job given argv. + */ + public static boolean parseArgs(String[] args, Job job) throws IOException { + if (args.length < 1) { + return 0 == printUsage(); + } + for(int i=0; i < args.length; ++i) { + if (args.length == i + 1) { + System.out.println("ERROR: Required parameter missing from " + + args[i]); + return 0 == printUsage(); + } + try { + if ("-m".equals(args[i])) { + if (Integer.parseInt(args[++i]) > 0) { + job.getConfiguration() + .setInt(MRJobConfig.NUM_MAPS, (Integer.parseInt(args[++i]))); + } else { + job.getConfiguration() + .setInt(MRJobConfig.NUM_MAPS, 1); + } + } else if ("-mtype".equals(args[i])) { + mapperType = Integer.parseInt(args[++i]); + switch(mapperType) { + default: job.setMapperClass( + SimpleWriteMapper.class); + } + } else if ("-conf".equals(args[i])) { + job.getConfiguration().addResource(new Path(args[++i])); + } else { + System.out.println("Unexpected argument: " + args[i]); + return 0 == printUsage(); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return 0 == printUsage(); + } catch (Exception e) { + throw (IOException)new IOException().initCause(e); + } + } + return true; + } + + @Override + public int run(String[] args) throws Exception { + Job job = Job.getInstance(getConf()); + job.setJarByClass(TimelineServerPerf.class); + job.setMapperClass(SimpleWriteMapper.class); + if (!parseArgs(args, job)) { + return -1; + } + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) /1000 + + " seconds."); + + return ret; + } + + public static void main(String[] args) throws Exception { + int res = + ToolRunner.run(new Configuration(), new TimelineServerPerf(), args); + System.exit(res); + } + + /** + * TimelineServer Performance counters + */ + static enum Counters { + TIMELINESERVER_WRITE_TIME, + TIMELINESERVER_WRITE_COUNTER, + TIMELINESERVER_WRITE_PER_SEC + } + + //different types of perf evaluation mappers + // simple write mapper + public static class SimpleWriteMapper + extends org.apache.hadoop.mapreduce.Mapper { + + public void map(K key, V val, + Context context) throws IOException { + //create timeline server entity + final TimelineClient tlc = new TimelineClientImpl(); + tlc.init(context.getConfiguration()); + final TimelineEntity entity = new TimelineEntity(); + entity.setEntityId("application_"+System.currentTimeMillis() + "_0001"); + entity.setEntityType("TestEntityType"); + entity.setDomainId("TestDomainId"); + entity.addPrimaryFilter("user", UserGroupInformation + .getCurrentUser().getShortUserName()); + TimelineEvent event = new TimelineEvent(); + event.setEventType("TestAppEvent"); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + long startWrite = System.currentTimeMillis(); + + try { + UserGroupInformation + .getCurrentUser() + .doAs(new PrivilegedExceptionAction() { + @Override + public TimelinePutResponse run() throws Exception { + return tlc.putEntities(entity); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + context.getCounter(Counters.TIMELINESERVER_WRITE_TIME) + .increment(System.currentTimeMillis()-startWrite); + context.getCounter(Counters.TIMELINESERVER_WRITE_COUNTER).increment(1); + context.getCounter(Counters.TIMELINESERVER_WRITE_PER_SEC) + .setValue((context.getCounter(Counters.TIMELINESERVER_WRITE_COUNTER) + .getValue()*1000/context.getCounter(Counters.TIMELINESERVER_WRITE_TIME) + .getValue())); + } + } +}