commit 145d4c63b0492fbe005c523cfa82b363b5e7dbf9 Author: Aaron Gresch Date: Wed Aug 23 11:24:24 2017 -0500 YARN-6736 allow writing to both ats v1 & v2 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 876ae94c7a2..6a5c0b48ebb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.conf; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -123,27 +125,27 @@ private static void addDeprecatedKeys() { /** Factory to create client IPC classes.*/ public static final String IPC_CLIENT_FACTORY_CLASS = IPC_PREFIX + "client.factory.class"; - public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = + public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl"; /** Factory to create server IPC classes.*/ - public static final String IPC_SERVER_FACTORY_CLASS = + public static final String IPC_SERVER_FACTORY_CLASS = IPC_PREFIX + "server.factory.class"; - public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS = + public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl"; /** Factory to create serializeable records.*/ - public static final String IPC_RECORD_FACTORY_CLASS = + public static final String IPC_RECORD_FACTORY_CLASS = IPC_PREFIX + "record.factory.class"; - public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS = + public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl"; /** RPC class implementation*/ public static final String IPC_RPC_IMPL = IPC_PREFIX + "rpc.class"; - public static final String DEFAULT_IPC_RPC_IMPL = + public static final String DEFAULT_IPC_RPC_IMPL = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC"; - + //////////////////////////////// // Resource Manager Configs //////////////////////////////// @@ -158,7 +160,7 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_RM_EPOCH = 0L; /** The address of the applications manager interface in the RM.*/ - public static final String RM_ADDRESS = + public static final String RM_ADDRESS = RM_PREFIX + "address"; public static final int DEFAULT_RM_PORT = 8032; public static final String DEFAULT_RM_ADDRESS = @@ -193,9 +195,9 @@ private static void addDeprecatedKeys() { /** The Kerberos principal for the resource manager.*/ public static final String RM_PRINCIPAL = RM_PREFIX + "principal"; - + /** The address of the scheduler interface.*/ - public static final String RM_SCHEDULER_ADDRESS = + public static final String RM_SCHEDULER_ADDRESS = RM_PREFIX + "scheduler.address"; public static final int DEFAULT_RM_SCHEDULER_PORT = 8030; public static final String DEFAULT_RM_SCHEDULER_ADDRESS = "0.0.0.0:" + @@ -223,12 +225,12 @@ private static void addDeprecatedKeys() { public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; /** If the port should be included or not in the node name. The node name - * is used by the scheduler for resource requests allocation location + * is used by the scheduler for resource requests allocation location * matching. Typically this is just the hostname, using the port is needed * when using minicluster and specific NM are required.*/ public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME = YARN_PREFIX + "scheduler.include-port-in-node-name"; - public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = + public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false; /** Configured scheduler queue placement rules. */ @@ -275,19 +277,19 @@ private static void addDeprecatedKeys() { RM_PREFIX + "scheduler.monitor.policies"; /** The address of the RM web application.*/ - public static final String RM_WEBAPP_ADDRESS = + public static final String RM_WEBAPP_ADDRESS = RM_PREFIX + "webapp.address"; public static final int DEFAULT_RM_WEBAPP_PORT = 8088; public static final String DEFAULT_RM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_RM_WEBAPP_PORT; - + /** The https address of the RM web application.*/ public static final String RM_WEBAPP_HTTPS_ADDRESS = RM_PREFIX + "webapp.https.address"; public static final boolean YARN_SSL_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false; public static final String YARN_SSL_SERVER_RESOURCE_DEFAULT = "ssl-server.xml"; - + public static final int DEFAULT_RM_WEBAPP_HTTPS_PORT = 8090; public static final String DEFAULT_RM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_RM_WEBAPP_HTTPS_PORT; @@ -309,17 +311,17 @@ private static void addDeprecatedKeys() { "0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT; /** The expiry interval for application master reporting.*/ - public static final String RM_AM_EXPIRY_INTERVAL_MS = + public static final String RM_AM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms"; public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000; /** How long to wait until a node manager is considered dead.*/ - public static final String RM_NM_EXPIRY_INTERVAL_MS = + public static final String RM_NM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms"; public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000; /** Are acls enabled.*/ - public static final String YARN_ACL_ENABLE = + public static final String YARN_ACL_ENABLE = YARN_PREFIX + "acl.enable"; public static final boolean DEFAULT_YARN_ACL_ENABLE = false; @@ -333,10 +335,10 @@ public static boolean isAclEnabled(Configuration conf) { } /** ACL of who can be admin of YARN cluster.*/ - public static final String YARN_ADMIN_ACL = + public static final String YARN_ADMIN_ACL = YARN_PREFIX + "admin.acl"; public static final String DEFAULT_YARN_ADMIN_ACL = "*"; - + /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; @@ -422,17 +424,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false; /** The address of the RM admin interface.*/ - public static final String RM_ADMIN_ADDRESS = + public static final String RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"; public static final int DEFAULT_RM_ADMIN_PORT = 8033; public static final String DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:" + DEFAULT_RM_ADMIN_PORT; - + /**Number of threads used to handle RM admin interface.*/ public static final String RM_ADMIN_CLIENT_THREAD_COUNT = RM_PREFIX + "admin.client.thread-count"; public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1; - + /** * The maximum number of application attempts. * It's a global setting for all application masters. @@ -440,15 +442,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_AM_MAX_ATTEMPTS = RM_PREFIX + "am.max-attempts"; public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2; - + /** The keytab for the resource manager.*/ - public static final String RM_KEYTAB = + public static final String RM_KEYTAB = RM_PREFIX + "keytab"; /**The kerberos principal to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_USER_NAME_KEY = RM_PREFIX + "webapp.spnego-principal"; - + /**The kerberos keytab to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = RM_PREFIX + "webapp.spnego-keytab-file"; @@ -470,30 +472,30 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_RM_WEBAPP_ENABLE_CORS_FILTER = false; /** How long to wait until a container is considered dead.*/ - public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = + public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000; - + /** Path to file with nodes to include.*/ - public static final String RM_NODES_INCLUDE_FILE_PATH = + public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = ""; - + /** Path to file with nodes to exclude.*/ - public static final String RM_NODES_EXCLUDE_FILE_PATH = + public static final String RM_NODES_EXCLUDE_FILE_PATH = RM_PREFIX + "nodes.exclude-path"; public static final String DEFAULT_RM_NODES_EXCLUDE_FILE_PATH = ""; - + /** Number of threads to handle resource tracker calls.*/ public static final String RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = RM_PREFIX + "resource-tracker.client.thread-count"; public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 50; - + /** The class to use as the resource scheduler.*/ - public static final String RM_SCHEDULER = + public static final String RM_SCHEDULER = RM_PREFIX + "scheduler.class"; - - public static final String DEFAULT_RM_SCHEDULER = + + public static final String DEFAULT_RM_SCHEDULER = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; /** RM set next Heartbeat interval for NM */ @@ -737,7 +739,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; - + /** URI for FileSystemRMStateStore */ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; @@ -795,7 +797,7 @@ public static boolean isAclEnabled(Configuration conf) { /** Default application type length */ public static final int APPLICATION_TYPE_LENGTH = 20; - + /** Default queue name */ public static final String DEFAULT_QUEUE_NAME = "default"; @@ -808,7 +810,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Default sizes of the runtime metric buckets in minutes. */ - public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = + public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = "60,300,1440"; public static final String RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX @@ -825,7 +827,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX + "nm-tokens.master-key-rolling-interval-secs"; - + public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = 24 * 60 * 60; @@ -870,7 +872,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// // Node Manager Configs //////////////////////////////// - + /** Prefix for all node manager configs.*/ public static final String NM_PREFIX = "yarn.nodemanager."; @@ -898,57 +900,57 @@ public static boolean isAclEnabled(Configuration conf) { ApplicationConstants.Environment.HADOOP_CONF_DIR.key(), ApplicationConstants.Environment.CLASSPATH_PREPEND_DISTCACHE.key(), ApplicationConstants.Environment.HADOOP_YARN_HOME.key())); - + /** address of node manager IPC.*/ public static final String NM_ADDRESS = NM_PREFIX + "address"; public static final int DEFAULT_NM_PORT = 0; public static final String DEFAULT_NM_ADDRESS = "0.0.0.0:" + DEFAULT_NM_PORT; - + /** The actual bind address or the NM.*/ public static final String NM_BIND_HOST = NM_PREFIX + "bind-host"; /** who will execute(launch) the containers.*/ - public static final String NM_CONTAINER_EXECUTOR = + public static final String NM_CONTAINER_EXECUTOR = NM_PREFIX + "container-executor.class"; - /** + /** * Adjustment to make to the container os scheduling priority. * The valid values for this could vary depending on the platform. - * On Linux, higher values mean run the containers at a less - * favorable priority than the NM. + * On Linux, higher values mean run the containers at a less + * favorable priority than the NM. * The value specified is an int. */ - public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = + public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = NM_PREFIX + "container-executor.os.sched.priority.adjustment"; public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0; - + /** Number of threads container manager uses.*/ public static final String NM_CONTAINER_MGR_THREAD_COUNT = NM_PREFIX + "container-manager.thread-count"; public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; - + /** Number of threads container manager uses.*/ public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT = NM_PREFIX + "collector-service.thread-count"; public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5; /** Number of threads used in cleanup.*/ - public static final String NM_DELETE_THREAD_COUNT = + public static final String NM_DELETE_THREAD_COUNT = NM_PREFIX + "delete.thread-count"; public static final int DEFAULT_NM_DELETE_THREAD_COUNT = 4; - + /** Keytab for NM.*/ public static final String NM_KEYTAB = NM_PREFIX + "keytab"; - + /**List of directories to store localized files in.*/ public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs"; public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir"; /** * Number of files in each localized directories - * Avoid tuning this too low. + * Avoid tuning this too low. */ public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = NM_PREFIX + "local-cache.max-files-per-directory"; @@ -960,7 +962,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_NM_LOCALIZER_PORT = 8040; public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; - + /** Address where the collector service IPC is.*/ public static final String NM_COLLECTOR_SERVICE_ADDRESS = NM_PREFIX + "collector-service.address"; @@ -971,9 +973,9 @@ public static boolean isAclEnabled(Configuration conf) { /** Interval in between cache cleanups.*/ public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = NM_PREFIX + "localizer.cache.cleanup.interval-ms"; - public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = + public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = 10 * 60 * 1000; - + /** * Target size of localizer cache in MB, per nodemanager. It is a target * retention size that only includes resources with PUBLIC and PRIVATE @@ -982,14 +984,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOCALIZER_CACHE_TARGET_SIZE_MB = NM_PREFIX + "localizer.cache.target-size-mb"; public static final long DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB = 10 * 1024; - + /** Number of threads to handle localization requests.*/ public static final String NM_LOCALIZER_CLIENT_THREAD_COUNT = NM_PREFIX + "localizer.client.thread-count"; public static final int DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT = 5; - + /** Number of threads to use for localization fetching.*/ - public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = + public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = NM_PREFIX + "localizer.fetch.thread-count"; public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4; @@ -1035,7 +1037,7 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; - + /** Delegation Token renewer thread count */ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = RM_PREFIX + "delegation-token-renewer.thread-count"; @@ -1064,15 +1066,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - - /** + + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. */ public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX + "log-aggregation.retain-seconds"; public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; - + /** * How long to wait between aggregated log retention checks. If set to * a value {@literal <=} 0 then the value is computed as one-tenth of the @@ -1113,12 +1115,12 @@ public static boolean isAclEnabled(Configuration conf) { * Number of threads used in log cleanup. Only applicable if Log aggregation * is disabled */ - public static final String NM_LOG_DELETION_THREADS_COUNT = + public static final String NM_LOG_DELETION_THREADS_COUNT = NM_PREFIX + "log.deletion-threads-count"; public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4; /** Where to aggregate logs to.*/ - public static final String NM_REMOTE_APP_LOG_DIR = + public static final String NM_REMOTE_APP_LOG_DIR = NM_PREFIX + "remote-app-log-dir"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; @@ -1126,7 +1128,7 @@ public static boolean isAclEnabled(Configuration conf) { * The remote log dir will be created at * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} */ - public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = + public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; @@ -1136,7 +1138,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String YARN_LOG_SERVER_WEBSERVICE_URL = YARN_PREFIX + "log.server.web-service.url"; - public static final String YARN_TRACKING_URL_GENERATOR = + public static final String YARN_TRACKING_URL_GENERATOR = YARN_PREFIX + "tracking.url.generator"; /** Amount of memory in MB that can be allocated for containers.*/ @@ -1296,13 +1298,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_NM_WEBAPP_PORT = 8042; public static final String DEFAULT_NM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_NM_WEBAPP_PORT; - + /** NM Webapp https address.**/ public static final String NM_WEBAPP_HTTPS_ADDRESS = NM_PREFIX + "webapp.https.address"; public static final int DEFAULT_NM_WEBAPP_HTTPS_PORT = 8044; public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" - + DEFAULT_NM_WEBAPP_HTTPS_PORT; + + DEFAULT_NM_WEBAPP_HTTPS_PORT; /** Enable/disable CORS filter. */ public static final String NM_WEBAPP_ENABLE_CORS_FILTER = @@ -1426,22 +1428,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0; /** Frequency of running node health script.*/ - public static final String NM_HEALTH_CHECK_INTERVAL_MS = + public static final String NM_HEALTH_CHECK_INTERVAL_MS = NM_PREFIX + "health-checker.interval-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000; - /** Health check script time out period.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + /** Health check script time out period.*/ + public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = NM_PREFIX + "health-checker.script.timeout-ms"; - public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; - + /** The health check script to run.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_PATH = + public static final String NM_HEALTH_CHECK_SCRIPT_PATH = NM_PREFIX + "health-checker.script.path"; - + /** The arguments to pass to the health check script.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = + public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = NM_PREFIX + "health-checker.script.opts"; /** The JVM options used on forking ContainerLocalizer process @@ -1547,10 +1549,10 @@ public static boolean isAclEnabled(Configuration conf) { /** The path to the Linux container executor.*/ public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH = NM_PREFIX + "linux-container-executor.path"; - - /** + + /** * The UNIX group that the linux-container-executor should run as. - * This is intended to be set as part of container-executor.cfg. + * This is intended to be set as part of container-executor.cfg. */ public static final String NM_LINUX_CONTAINER_GROUP = NM_PREFIX + "linux-container-executor.group"; @@ -1574,30 +1576,30 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NM_NONSECURE_MODE_LOCAL_USER = "nobody"; /** - * The allowed pattern for UNIX user names enforced by - * Linux-container-executor when used in nonsecure mode (use case for this + * The allowed pattern for UNIX user names enforced by + * Linux-container-executor when used in nonsecure mode (use case for this * is using cgroups). The default value is taken from /usr/sbin/adduser */ public static final String NM_NONSECURE_MODE_USER_PATTERN_KEY = NM_PREFIX + "linux-container-executor.nonsecure-mode.user-pattern"; - public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = + public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = "^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$"; /** The type of resource enforcement to use with the * linux container executor. */ - public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = + public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = NM_PREFIX + "linux-container-executor.resources-handler.class"; - + /** The path the linux container executor should use for cgroups */ public static final String NM_LINUX_CONTAINER_CGROUPS_HIERARCHY = NM_PREFIX + "linux-container-executor.cgroups.hierarchy"; - + /** Whether the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT = NM_PREFIX + "linux-container-executor.cgroups.mount"; - + /** Where the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH = NM_PREFIX + "linux-container-executor.cgroups.mount-path"; @@ -1621,7 +1623,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Interval of time the linux container executor should try cleaning up - * cgroups entry when cleaning up a container. This is required due to what + * cgroups entry when cleaning up a container. This is required due to what * it seems a race condition because the SIGTERM/SIGKILL is asynch. */ public static final String NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT = @@ -1651,24 +1653,24 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "windows-container.cpu-limit.enabled"; public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false; - /** + /** /* The Windows group that the windows-secure-container-executor should run as. */ public static final String NM_WINDOWS_SECURE_CONTAINER_GROUP = NM_PREFIX + "windows-secure-container-executor.group"; /** T-file compression types used to compress aggregated logs.*/ - public static final String NM_LOG_AGG_COMPRESSION_TYPE = + public static final String NM_LOG_AGG_COMPRESSION_TYPE = NM_PREFIX + "log-aggregation.compression-type"; public static final String DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE = "none"; - + /** The kerberos principal for the node manager.*/ public static final String NM_PRINCIPAL = NM_PREFIX + "principal"; - - public static final String NM_AUX_SERVICES = + + public static final String NM_AUX_SERVICES = NM_PREFIX + "aux-services"; - + public static final String NM_AUX_SERVICE_FMT = NM_PREFIX + "aux-services.%s.class"; @@ -1695,11 +1697,11 @@ public static boolean isAclEnabled(Configuration conf) { /**The kerberos principal to be used for spnego filter for NM.*/ public static final String NM_WEBAPP_SPNEGO_USER_NAME_KEY = NM_PREFIX + "webapp.spnego-principal"; - + /**The kerberos keytab to be used for spnego filter for NM.*/ public static final String NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = NM_PREFIX + "webapp.spnego-keytab-file"; - + public static final String DEFAULT_NM_USER_HOME_DIR= "/home/"; public static final String NM_RECOVERY_PREFIX = NM_PREFIX + "recovery."; @@ -1730,41 +1732,41 @@ public static boolean isAclEnabled(Configuration conf) { // Web Proxy Configs //////////////////////////////// public static final String PROXY_PREFIX = "yarn.web-proxy."; - + /** The kerberos principal for the proxy.*/ public static final String PROXY_PRINCIPAL = PROXY_PREFIX + "principal"; - + /** Keytab for Proxy.*/ public static final String PROXY_KEYTAB = PROXY_PREFIX + "keytab"; - + /** The address for the web proxy.*/ public static final String PROXY_ADDRESS = PROXY_PREFIX + "address"; public static final int DEFAULT_PROXY_PORT = 9099; public static final String DEFAULT_PROXY_ADDRESS = "0.0.0.0:" + DEFAULT_PROXY_PORT; - + /** * YARN Service Level Authorization */ - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL = "security.resourcetracker.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONCLIENT_PROTOCOL = "security.applicationclient.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCEMANAGER_ADMINISTRATION_PROTOCOL = "security.resourcemanager-administration.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONMASTER_PROTOCOL = "security.applicationmaster.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL = "security.containermanagement.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; @@ -1945,6 +1947,9 @@ public static boolean isAclEnabled(Configuration conf) { + "version"; public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f; + public static final String TIMELINE_SERVICE_VERSIONS = TIMELINE_SERVICE_PREFIX + + "versions"; + /** * Comma separated list of names for UIs hosted in the timeline server * (For pluggable UIs). @@ -2106,7 +2111,7 @@ public static boolean isAclEnabled(Configuration conf) { TIMELINE_SERVICE_PREFIX + "reader.class"; public static final String DEFAULT_TIMELINE_SERVICE_READER_CLASS = - "org.apache.hadoop.yarn.server.timelineservice.storage" + + "org.apache.hadoop.yarn.server.timelineservice.storage" + ".HBaseTimelineReaderImpl"; /** @@ -2287,7 +2292,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = TIMELINE_SERVICE_PREFIX + "handler-thread-count"; public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10; - + /** The address of the timeline service web application.*/ public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS = @@ -2305,6 +2310,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_PORT; + /** The address of the timeline service v2 web application.*/ + public static final String TIMELINE_SERVICE_V2_WEBAPP_ADDRESS = + TIMELINE_SERVICE_PREFIX + "v2.webapp.address"; + + /** The https address of the timeline service v2 web application.*/ + public static final String TIMELINE_SERVICE_V2_WEBAPP_HTTPS_ADDRESS = + TIMELINE_SERVICE_PREFIX + "v2.webapp.https.address"; + /** * Defines the max number of applications could be fetched using * REST API or application history protocol and shown in timeline @@ -2477,7 +2490,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME = 7*24*60*60*1000; // 7 days - // Timeline service v2 offlien aggregation related keys + // Timeline service v2 offline aggregation related keys public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline."; public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR @@ -2508,7 +2521,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String SHARED_CACHE_NESTED_LEVEL = SHARED_CACHE_PREFIX + "nested-level"; public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; - + // Shared Cache Manager Configs public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store."; @@ -2542,7 +2555,7 @@ public static boolean isAclEnabled(Configuration conf) { "0.0.0.0:" + DEFAULT_SCM_WEBAPP_PORT; // In-memory SCM store configuration - + public static final String IN_MEMORY_STORE_PREFIX = SCM_STORE_PREFIX + "in-memory."; @@ -2563,7 +2576,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String IN_MEMORY_INITIAL_DELAY_MINS = IN_MEMORY_STORE_PREFIX + "initial-delay-mins"; public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10; - + /** * The frequency at which the in-memory store checks to remove dead initial * applications. Specified in minutes. @@ -2911,13 +2924,13 @@ public static boolean isAclEnabled(Configuration conf) { * Node-labels configurations */ public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels."; - + /** Node label store implementation class */ public static final String FS_NODE_LABELS_STORE_IMPL_CLASS = NODE_LABELS_PREFIX + "fs-store.impl.class"; public static final String DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS = "org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore"; - + /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX + "fs-store.root-dir"; @@ -2929,10 +2942,10 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX + "enabled"; public static final boolean DEFAULT_NODE_LABELS_ENABLED = false; - + public static final String NODELABEL_CONFIGURATION_TYPE = NODE_LABELS_PREFIX + "configuration-type"; - + public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE = "centralized"; @@ -2941,7 +2954,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE = "distributed"; - + public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = CENTRALIZED_NODELABEL_CONFIGURATION_TYPE; @@ -3104,7 +3117,7 @@ public static boolean areNodeLabelsEnabled( public YarnConfiguration() { super(); } - + public YarnConfiguration(Configuration conf) { super(conf); if (! (conf instanceof YarnConfiguration)) { @@ -3221,8 +3234,8 @@ public static boolean timelineServiceEnabled(Configuration conf) { } /** - * Returns the timeline service version. It does not check whether the - * timeline service itself is enabled. + * Returns the default timeline service version to be used. It does not check + * whether the timeline service itself is enabled. * * @param conf the configuration * @return the timeline service version as a float. @@ -3233,6 +3246,28 @@ public static float getTimelineServiceVersion(Configuration conf) { } /** + * Returns all the active timeline service versions. It does not check + * whether the timeline service itself is enabled. + * + * @param conf the configuration + * @return the timeline service versions as a collection of floats. + */ + private static Collection getTimelineServiceVersions( + Configuration conf) { + String versions = conf.get(TIMELINE_SERVICE_VERSIONS); + if (versions == null) { + versions = Float.toString(getTimelineServiceVersion(conf)); + } + List stringList = Arrays.asList(versions.split(",")); + List floatList = new ArrayList(); + for (String s : stringList) { + Float f = Float.parseFloat(s); + floatList.add(f); + } + return floatList; + } + + /** * Returns whether the timeline service v.2 is enabled via configuration. * * @param conf the configuration @@ -3240,8 +3275,38 @@ public static float getTimelineServiceVersion(Configuration conf) { * version greater than equal to 2 but smaller than 3. */ public static boolean timelineServiceV2Enabled(Configuration conf) { - return timelineServiceEnabled(conf) && - (int)getTimelineServiceVersion(conf) == 2; + boolean enabled = false; + if (timelineServiceEnabled(conf)) { + Collection versions = getTimelineServiceVersions(conf); + for (Float version : versions) { + if (version.intValue() == 2) { + enabled = true; + break; + } + } + } + return enabled; + } + + /** + * Returns whether the timeline service v.1 is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service v.1 is enabled. V.1 refers to a + * version greater than equal to 1 but smaller than 2. + */ + public static boolean timelineServiceV1Enabled(Configuration conf) { + boolean enabled = false; + if (timelineServiceEnabled(conf)) { + Collection versions = getTimelineServiceVersions(conf); + for (Float version : versions) { + if (version.intValue() == 1) { + enabled = true; + break; + } + } + } + return enabled; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index a02af709116..a2887099209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -115,12 +115,12 @@ /** * An ApplicationMaster for executing shell commands on a set of launched * containers using the YARN framework. - * + * *

* This class is meant to act as an example on how to write yarn-based * application masters. *

- * + * *

* The ApplicationMaster is started on a container by the * ResourceManager's launcher. The first thing that the @@ -132,14 +132,14 @@ * status/job history if needed. However, in the distributedshell, trackingurl * and appMasterHost:appMasterRpcPort are not supported. *

- * + * *

* The ApplicationMaster needs to send a heartbeat to the * ResourceManager at regular intervals to inform the * ResourceManager that it is up and alive. The * {@link ApplicationMasterProtocol#allocate} to the ResourceManager from the * ApplicationMaster acts as a heartbeat. - * + * *

* For the actual handling of the job, the ApplicationMaster has to * request the ResourceManager via {@link AllocateRequest} for the @@ -150,7 +150,7 @@ * ApplicationMaster of the set of newly allocated containers, * completed containers as well as current state of available resources. *

- * + * *

* For each allocated container, the ApplicationMaster can then set * up the necessary launch context via {@link ContainerLaunchContext} to specify @@ -159,7 +159,7 @@ * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to * launch and execute the defined commands on the given allocated container. *

- * + * *

* The ApplicationMaster can monitor the launched container by * either querying the ResourceManager using @@ -184,7 +184,7 @@ public enum DSEvent { DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END } - + @VisibleForTesting @Private public enum DSEntity { @@ -300,7 +300,8 @@ TimelineClient timelineClient; // Timeline v2 Client - private TimelineV2Client timelineV2Client; + @VisibleForTesting + TimelineV2Client timelineV2Client; static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; static final String APPID_TIMELINE_FILTER_NAME = "appId"; @@ -575,16 +576,13 @@ public boolean init(String[] args) throws ParseException, IOException { containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); - if (YarnConfiguration.timelineServiceEnabled(conf)) { - timelineServiceV2Enabled = - ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2); - timelineServiceV1Enabled = !timelineServiceV2Enabled; - } else { + if (!YarnConfiguration.timelineServiceEnabled(conf)) { timelineClient = null; timelineV2Client = null; LOG.warn("Timeline service is not enabled"); } + return true; } @@ -649,14 +647,16 @@ public void run() throws YarnException, IOException, InterruptedException { amRMClient.registerTimelineV2Client(timelineV2Client); } + if (timelineServiceV1Enabled) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + } if (timelineServiceV2Enabled) { publishApplicationAttemptEventOnTimelineServiceV2( DSEvent.DS_APP_ATTEMPT_START); - } else if (timelineServiceV1Enabled) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); } + // Setup local RPC Server to accept status requests directly from clients // TODO need to setup a protocol for client to be able to communicate to // the RPC server @@ -673,7 +673,7 @@ public void run() throws YarnException, IOException, InterruptedException { // resource manager long maxMem = response.getMaximumResourceCapability().getMemorySize(); LOG.info("Max mem capability of resources in this cluster " + maxMem); - + int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); LOG.info("Max vcores capability of resources in this cluster " + maxVCores); @@ -725,18 +725,23 @@ void startTimelineClient(final Configuration conf) @Override public Void run() throws Exception { if (YarnConfiguration.timelineServiceEnabled(conf)) { + timelineServiceV1Enabled = + YarnConfiguration.timelineServiceV1Enabled(conf); + timelineServiceV2Enabled = + YarnConfiguration.timelineServiceV2Enabled(conf); // Creating the Timeline Client + if (timelineServiceV1Enabled) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + LOG.info("Timeline service V1 client is enabled"); + } if (timelineServiceV2Enabled) { timelineV2Client = TimelineV2Client.createTimelineClient( appAttemptID.getApplicationId()); timelineV2Client.init(conf); timelineV2Client.start(); LOG.info("Timeline service V2 client is enabled"); - } else { - timelineClient = TimelineClient.createTimelineClient(); - timelineClient.init(conf); - timelineClient.start(); - LOG.info("Timeline service V1 client is enabled"); } } else { timelineClient = null; @@ -766,12 +771,13 @@ protected boolean finish() { } catch (InterruptedException ex) {} } + if (timelineServiceV1Enabled) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); + } if (timelineServiceV2Enabled) { publishApplicationAttemptEventOnTimelineServiceV2( DSEvent.DS_APP_ATTEMPT_END); - } else if (timelineServiceV1Enabled) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } // Join all launched threads @@ -816,13 +822,14 @@ protected boolean finish() { } catch (IOException e) { LOG.error("Failed to unregister application", e); } - + amRMClient.stop(); // Stop Timeline Client if(timelineServiceV1Enabled) { timelineClient.stop(); - } else if (timelineServiceV2Enabled) { + } + if (timelineServiceV2Enabled) { timelineV2Client.stop(); } @@ -878,6 +885,10 @@ public void onContainersCompleted(List completedContainers) { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } + if (timelineServiceV1Enabled) { + publishContainerEndEvent(timelineClient, containerStatus, domainId, + appSubmitterUgi); + } if (timelineServiceV2Enabled) { Long containerStartTime = containerStartTimes.get(containerStatus.getContainerId()); @@ -888,9 +899,6 @@ public void onContainersCompleted(List completedContainers) { } publishContainerEndEventOnTimelineServiceV2(containerStatus, containerStartTime); - } else if (timelineServiceV1Enabled) { - publishContainerEndEvent(timelineClient, containerStatus, domainId, - appSubmitterUgi); } } @@ -904,7 +912,7 @@ public void onContainersCompleted(List completedContainers) { amRMClient.addContainerRequest(containerAsk); } } - + if (numCompletedContainers.get() == numTotalContainers) { done = true; } @@ -1014,15 +1022,16 @@ public void onContainerStarted(ContainerId containerId, applicationMaster.nmClientAsync.getContainerStatusAsync( containerId, container.getNodeId()); } + if (applicationMaster.timelineServiceV1Enabled) { + applicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); + } if (applicationMaster.timelineServiceV2Enabled) { long startTime = SystemClock.getInstance().getTime(); applicationMaster.getContainerStartTimes().put(containerId, startTime); applicationMaster.publishContainerStartEventOnTimelineServiceV2( container, startTime); - } else if (applicationMaster.timelineServiceV1Enabled) { - applicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); } } @@ -1081,9 +1090,9 @@ public LaunchContainerRunnable(Container lcontainer, @Override /** - * Connects to CM, sets up container launch context - * for shell command and eventually dispatches the container - * start request to the CM. + * Connects to CM, sets up container launch context + * for shell command and eventually dispatches the container + * start request to the CM. */ public void run() { LOG.info("Setting up container launch container for containerid=" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index 2789d047fb3..403322acb37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -33,6 +35,8 @@ import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -167,14 +171,83 @@ private Container generateContainer(ContainerId cid) { } @Test - public void testTimelineClientInDSAppMaster() throws Exception { + public void testTimelineClientInDSAppMasterV1() throws Exception { + runTimelineClientInDSAppMaster(true, false); + } + + @Test + public void testTimelineClientInDSAppMasterV2() throws Exception { + runTimelineClientInDSAppMaster(false, true); + } + + @Test + public void testTimelineClientInDSAppMasterV1V2() throws Exception { + runTimelineClientInDSAppMaster(true, true); + } + + @Test + public void testTimelineClientInDSAppMasterDisabled() throws Exception { + runTimelineClientInDSAppMaster(false, false); + } + + private void runTimelineClientInDSAppMaster(boolean v1Enabled, + boolean v2Enabled) throws Exception { + ApplicationMaster appMaster = createAppMasterWithStartedTimelineService( + v1Enabled, v2Enabled); + validateAppMasterTimelineService(v1Enabled, v2Enabled, appMaster); + } + + private void validateAppMasterTimelineService(boolean v1Enabled, + boolean v2Enabled, ApplicationMaster appMaster) { + if (v1Enabled) { + Assert.assertEquals(appMaster.appSubmitterUgi, + ((TimelineClientImpl)appMaster.timelineClient).getUgi()); + } else { + Assert.assertNull(appMaster.timelineClient); + } + if (v2Enabled) { + Assert.assertNotNull(appMaster.timelineV2Client); + } else { + Assert.assertNull(appMaster.timelineV2Client); + } + } + + private ApplicationMaster createAppMasterWithStartedTimelineService( + boolean v1Enabled, boolean v2Enabled) throws Exception { ApplicationMaster appMaster = new ApplicationMaster(); appMaster.appSubmitterUgi = UserGroupInformation.createUserForTesting("foo", new String[]{"bar"}); - Configuration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + Configuration conf = this.getTimelineServiceConf(v1Enabled, v2Enabled); + ApplicationId appId = ApplicationId.newInstance(1L, 1); + appMaster.appAttemptID = ApplicationAttemptId.newInstance(appId, 1); appMaster.startTimelineClient(conf); - Assert.assertEquals(appMaster.appSubmitterUgi, - ((TimelineClientImpl)appMaster.timelineClient).getUgi()); + return appMaster; + } + + private Configuration getTimelineServiceConf(boolean v1Enabled, + boolean v2Enabled) { + Configuration conf = new YarnConfiguration(new Configuration(false)); + Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf)); + + if (v1Enabled || v2Enabled) { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + } + + if (v1Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + } + + if (v2Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + } + + if (v1Enabled && v2Enabled) { + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f"); + } + + return conf; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 19cb10b1d11..e16b8dfcadb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -167,13 +167,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); } - float timelineServiceVersion = - conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && ((Float.compare(timelineServiceVersion, 1.0f) == 0) - || (Float.compare(timelineServiceVersion, 1.5f) == 0))) { + if (YarnConfiguration.timelineServiceV1Enabled(conf)) { timelineV1ServiceEnabled = true; timelineDTRenewer = getTimelineDelegationTokenRenewer(conf); timelineService = TimelineUtils.buildTimelineTokenService(conf); @@ -271,11 +265,11 @@ public YarnClientApplication createApplication() int pollCount = 0; long startTime = System.currentTimeMillis(); - EnumSet waitingStates = + EnumSet waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); - EnumSet failToSubmitStates = + EnumSet failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { @@ -284,7 +278,7 @@ public YarnClientApplication createApplication() YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { - throw new YarnException("Failed to submit " + applicationId + + throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index f49618b10b2..cce0d0b0fda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -99,10 +99,7 @@ protected void serviceInit(Configuration conf) throws Exception { timelineServiceVersion = conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); - LOG.info("Timeline service address: " + getTimelineServiceAddress()); - if (!YarnConfiguration.timelineServiceEnabled(conf) - || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0) - || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) { + if (!YarnConfiguration.timelineServiceV1Enabled(conf)) { throw new IOException("Timeline V1 client is not properly configured. " + "Either timeline service is not enabled or version is not set to" + " 1.x"); @@ -128,6 +125,7 @@ protected void serviceInit(Configuration conf) throws Exception { conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); } + LOG.info("Timeline service address: " + getTimelineServiceAddress()); super.serviceInit(conf); } @@ -317,7 +315,7 @@ public static void main(String[] argv) throws Exception { /** * Put timeline data in a JSON file via command line. - * + * * @param path * path to the timeline data JSON file * @param type diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java index 220d6af9ad6..02c9519d9be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -94,8 +94,7 @@ public ApplicationId getContextAppId() { } protected void serviceInit(Configuration conf) throws Exception { - if (!YarnConfiguration.timelineServiceEnabled(conf) - || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) { + if (!YarnConfiguration.timelineServiceV2Enabled(conf)) { throw new IOException("Timeline V2 client is not properly configured. " + "Either timeline service is not enabled or version is not set to" + " 2"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 3b12f3c0304..1013df203b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -38,7 +38,7 @@ /** * The helper class for the timeline module. - * + * */ @Public @Evolving @@ -60,7 +60,7 @@ /** * Serialize a POJO object into a JSON string not in a pretty format - * + * * @param o * an object to serialize * @return a JSON string @@ -75,7 +75,7 @@ public static String dumpTimelineRecordtoJSON(Object o) /** * Serialize a POJO object into a JSON string - * + * * @param o * an object to serialize * @param pretty @@ -106,8 +106,8 @@ public static boolean timelineServiceEnabled(Configuration conf) { } /** - * Returns the timeline service version. It does not check whether the - * timeline service itself is enabled. + * Returns the default timeline service version. It does not check whether + * the timeline service itself is enabled. * * @param conf the configuration * @return the timeline service version as a float. @@ -118,7 +118,8 @@ public static float getTimelineServiceVersion(Configuration conf) { } /** - * Returns whether the timeline service v.1.5 is enabled via configuration. + * Returns whether the timeline service v.1.5 is enabled by default via + * configuration. * * @param conf the configuration * @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java index fb0a9bf533f..47219e680e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java @@ -78,7 +78,7 @@ public static void setRMWebAppHostnameAndPort(Configuration conf, conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress); } } - + public static void setNMWebAppHostNameAndPort(Configuration conf, String hostName, int port) { if (YarnConfiguration.useHttps(conf)) { @@ -172,7 +172,7 @@ public static String getRouterWebAppURLWithoutScheme(Configuration conf) { } return addrs; } - + public static String getProxyHostAndPort(Configuration conf) { String addr = conf.get(YarnConfiguration.PROXY_ADDRESS); if(addr == null || addr.isEmpty()) { @@ -191,7 +191,7 @@ public static String getResolvedRMWebAppURLWithScheme(Configuration conf) { return getHttpSchemePrefix(conf) + getResolvedRMWebAppURLWithoutScheme(conf); } - + public static String getResolvedRemoteRMWebAppURLWithoutScheme( Configuration conf) { return getResolvedRemoteRMWebAppURLWithoutScheme(conf, @@ -202,7 +202,7 @@ public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf) { return getResolvedRMWebAppURLWithoutScheme(conf, YarnConfiguration.useHttps(conf) ? Policy.HTTPS_ONLY : Policy.HTTP_ONLY); } - + public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf, Policy httpPolicy) { InetSocketAddress address = null; @@ -215,7 +215,7 @@ public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf, address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_PORT); + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT); } return getResolvedAddress(address); } @@ -271,7 +271,7 @@ public static String getResolvedAddress(InetSocketAddress address) { sb.append(":").append(address.getPort()); return sb.toString(); } - + /** * Get the URL to use for binding where bind hostname can be specified * to override the hostname in the webAppURLWithoutScheme. Port specified in the @@ -314,16 +314,36 @@ public static String getNMWebAppURLWithoutScheme(Configuration conf) { } public static String getAHSWebAppURLWithoutScheme(Configuration conf) { - return getTimelineReaderWebAppURL(conf); + if (YarnConfiguration.useHttps(conf)) { + return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); + } else { + return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); + } } public static String getTimelineReaderWebAppURL(Configuration conf) { if (YarnConfiguration.useHttps(conf)) { - return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); + // use v2 conf setting if it exists, otherwise fall back to v1 + String v2Address = conf.get( + YarnConfiguration.TIMELINE_SERVICE_V2_WEBAPP_HTTPS_ADDRESS); + if (v2Address != null) { + return v2Address; + } else { + return conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); + } } else { - return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); + String v2Address = conf.get( + YarnConfiguration.TIMELINE_SERVICE_V2_WEBAPP_ADDRESS); + if (v2Address != null) { + return v2Address; + } else { + return conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); + } } } @@ -342,7 +362,7 @@ public static String getURLWithScheme(String schemePrefix, String url) { return schemePrefix + url; } } - + public static String getRunningLogURL( String nodeHttpAddress, String containerId, String user) { if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() || @@ -370,7 +390,7 @@ public static String getAggregatedLogURL(String serverHttpAddress, /** * Choose which scheme (HTTP or HTTPS) to use when generating a URL based on * the configuration. - * + * * @return the scheme (HTTP / HTTPS) */ public static String getHttpSchemePrefix(Configuration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 5333f254328..bf1f528fb48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; @@ -253,7 +254,7 @@ protected void serviceInit(Configuration conf) throws Exception { loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); validateConfigs(this.conf); - + // Set HA configuration should be done before login this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { @@ -297,7 +298,7 @@ protected void serviceInit(Configuration conf) throws Exception { } rmContext.setYarnConfiguration(conf); - + createAndInitActiveServices(false); webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, @@ -320,7 +321,6 @@ protected void serviceInit(Configuration conf) throws Exception { SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); - addIfService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); super.serviceInit(this.conf); @@ -474,12 +474,12 @@ private NMLivelinessMonitor createNMLivelinessMonitor() { protected AMLivelinessMonitor createAMLivelinessMonitor() { return new AMLivelinessMonitor(this.rmDispatcher); } - + protected RMNodeLabelsManager createNodeLabelManager() throws InstantiationException, IllegalAccessException { return new RMNodeLabelsManager(); } - + protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); } @@ -502,26 +502,33 @@ private FederationStateStoreService createFederationStateStoreService() { } protected SystemMetricsPublisher createSystemMetricsPublisher() { - SystemMetricsPublisher publisher; - if (YarnConfiguration.timelineServiceEnabled(conf) && - YarnConfiguration.systemMetricsPublisherEnabled(conf)) { - if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - // we're dealing with the v.2.x publisher - LOG.info("system metrics publisher with the timeline service V2 is " + - "configured"); - publisher = new TimelineServiceV2Publisher( - rmContext.getRMTimelineCollectorManager()); - } else { - // we're dealing with the v.1.x publisher - LOG.info("system metrics publisher with the timeline service V1 is " + - "configured"); - publisher = new TimelineServiceV1Publisher(); - } - } else { + List publishers = + new ArrayList(); + if (YarnConfiguration.timelineServiceV1Enabled(conf)) { + SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher(); + publishers.add(publisherV1); + } + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + // we're dealing with the v.2.x publisher + LOG.info("system metrics publisher with the timeline service V2 is " + + "configured"); + SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher( + rmContext.getRMTimelineCollectorManager()); + publishers.add(publisherV2); + } + if (publishers.isEmpty()) { LOG.info("TimelineServicePublisher is not configured"); - publisher = new NoOpSystemMetricPublisher(); + SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher(); + publishers.add(noopPublisher); + } + + for (SystemMetricsPublisher publisher : publishers) { + addIfService(publisher); } - return publisher; + + SystemMetricsPublisher combinedPublisher = + new CombinedSystemMetricsPublisher(publishers); + return combinedPublisher; } // sanity check for configurations @@ -588,7 +595,7 @@ protected void serviceInit(Configuration configuration) throws Exception { AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); - + RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor(); addService(rmAppLifetimeMonitor); rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); @@ -1056,7 +1063,7 @@ protected void startWepApp() { RMWebAppUtil.setupSecurityAndFilters(conf, getClientRMService().rmDTSecretManager); - Builder builder = + Builder builder = WebApps .$for("cluster", ApplicationMasterService.class, masterService, "ws") @@ -1234,7 +1241,7 @@ protected void serviceStart() throws Exception { } super.serviceStart(); } - + protected void doSecureLogin() throws IOException { InetSocketAddress socAddr = getBindAddress(conf); SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB, @@ -1264,7 +1271,7 @@ protected void serviceStop() throws Exception { transitionToStandby(false); rmContext.setHAServiceState(HAServiceState.STOPPING); } - + protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, @@ -1472,7 +1479,7 @@ private void setSchedulerRecoveryStartAndWaitTime(RMState state, /** * Retrieve RM bind address from configuration - * + * * @param conf * @return InetSocketAddress */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java new file mode 100644 index 00000000000..96467477986 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.Collection; + +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * A metrics publisher that can publish for a collection of publishers. + */ +public class CombinedSystemMetricsPublisher implements SystemMetricsPublisher { + private Collection publishers; + + public CombinedSystemMetricsPublisher(Collection + publishers) { + this.publishers = publishers; + } + + @Override + public void appCreated(RMApp app, long createdTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appCreated(app, createdTime); + } + } + + @Override + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appACLsUpdated(app, appViewACLs, updatedTime); + } + } + + @Override + public void appUpdated(RMApp app, long updatedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appUpdated(app, updatedTime); + } + } + + @Override + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appStateUpdated(app, appState, updatedTime); + } + } + + @Override + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appFinished(app, state, finishedTime); + } + } + + @Override + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appAttemptRegistered(appAttempt, registeredTime); + } + } + + @Override + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appAttemptFinished(appAttempt, appAttemtpState, app, + finishedTime); + } + } + + @Override + public void containerCreated(RMContainer container, long createdTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.containerCreated(container, createdTime); + } + } + + @Override + public void containerFinished(RMContainer container, long finishedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.containerFinished(container, finishedTime); + } + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java new file mode 100644 index 00000000000..f824fa1b93a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests that the RM creates timeline services (v1/v2) as specified by the + * configuration. + */ +public class TestRMTimelineService { + private static MockRM rm; + + private void setup(boolean v1Enabled, boolean v2Enabled) { + Configuration conf = new YarnConfiguration(new Configuration(false)); + Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf)); + + if (v1Enabled || v2Enabled) { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + } + + if (v1Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + } + + if (v2Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + } + + if (v1Enabled && v2Enabled) { + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f"); + } + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + rm = new MockRM(conf, memStore); + rm.start(); + } + + // validate RM services exist or not as we specified + private void validate(boolean v1Enabled, boolean v2Enabled) { + boolean v1PublisherServiceFound = false; + boolean v2PublisherServiceFound = false; + List services = rm.getServices(); + for (Service service : services) { + if (service instanceof TimelineServiceV1Publisher) { + v1PublisherServiceFound = true; + } else if (service instanceof TimelineServiceV2Publisher) { + v2PublisherServiceFound = true; + } + } + + Assert.assertEquals(v1Enabled, v1PublisherServiceFound); + Assert.assertEquals(v2Enabled, v2PublisherServiceFound); + } + + private void cleanup() throws Exception { + rm.close(); + rm.stop(); + } + + // runs test to validate RM creates a timeline service publisher if and + // only if the service is enabled for v1 and v2 (independently). + private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception { + setup(v1Enabled, v2Enabled); + validate(v1Enabled, v2Enabled); + cleanup(); + } + + @Test + public void testTimelineServiceV1V2Enabled() throws Exception { + runTest(true, true); + } + + @Test + public void testTimelineServiceV1Enabled() throws Exception { + runTest(true, false); + } + + @Test + public void testTimelineServiceV2Enabled() throws Exception { + runTest(false, true); + } + + @Test + public void testTimelineServiceDisabled() throws Exception { + runTest(false, false); + } +} + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java new file mode 100644 index 00000000000..9c857a8f748 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java @@ -0,0 +1,480 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; +import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests that a CombinedSystemMetricsPublisher publishes metrics for timeline + * services (v1/v2) as specified by the configuration. + */ +public class TestCombinedSystemMetricsPublisher { + /** + * The folder where the FileSystemTimelineWriterImpl writes the entities. + */ + private static File testRootDir = new File("target", + TestCombinedSystemMetricsPublisher.class.getName() + "-localDir") + .getAbsoluteFile(); + + private static ApplicationHistoryServer timelineServer; + private static CombinedSystemMetricsPublisher metricsPublisher; + private static TimelineStore store; + private static ConcurrentMap rmAppsMapInContext; + private static RMTimelineCollectorManager rmTimelineCollectorManager; + private static DrainDispatcher dispatcher; + private static YarnConfiguration conf; + private static TimelineServiceV1Publisher publisherV1; + private static TimelineServiceV2Publisher publisherV2; + private static ApplicationAttemptId appAttemptId; + private static RMApp app; + + private void testSetup(boolean enableV1, boolean enableV2) throws Exception { + + if (testRootDir.exists()) { + //cleanup before hand + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + + conf = getConf(enableV1, enableV2); + + RMContext rmContext = mock(RMContext.class); + rmAppsMapInContext = new ConcurrentHashMap(); + when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext); + ResourceManager rm = mock(ResourceManager.class); + when(rm.getRMContext()).thenReturn(rmContext); + + if (enableV2) { + dispatcher = new DrainDispatcher(); + rmTimelineCollectorManager = new RMTimelineCollectorManager(rm); + when(rmContext.getRMTimelineCollectorManager()).thenReturn( + rmTimelineCollectorManager); + + rmTimelineCollectorManager.init(conf); + rmTimelineCollectorManager.start(); + } else { + dispatcher = null; + rmTimelineCollectorManager = null; + } + + timelineServer = new ApplicationHistoryServer(); + timelineServer.init(conf); + timelineServer.start(); + store = timelineServer.getTimelineStore(); + + if (enableV2) { + dispatcher.init(conf); + dispatcher.start(); + } + + List publishers = + new ArrayList(); + + if (YarnConfiguration.timelineServiceV1Enabled(conf)) { + Assert.assertTrue(enableV1); + publisherV1 = new TimelineServiceV1Publisher(); + publishers.add(publisherV1); + publisherV1.init(conf); + publisherV1.start(); + } else { + Assert.assertFalse(enableV1); + publisherV1 = null; + } + + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + Assert.assertTrue(enableV2); + publisherV2 = new TimelineServiceV2Publisher( + rmTimelineCollectorManager) { + @Override + protected Dispatcher getDispatcher() { + return dispatcher; + } + }; + publishers.add(publisherV2); + publisherV2.init(conf); + publisherV2.start(); + } else { + Assert.assertFalse(enableV2); + publisherV2 = null; + } + + if (publishers.isEmpty()) { + NoOpSystemMetricPublisher noopPublisher = + new NoOpSystemMetricPublisher(); + publishers.add(noopPublisher); + } + + metricsPublisher = new CombinedSystemMetricsPublisher(publishers); + } + + private void testCleanup() throws Exception { + if (publisherV1 != null) { + publisherV1.stop(); + } + if (publisherV2 != null) { + publisherV2.stop(); + } + if (timelineServer != null) { + timelineServer.stop(); + } + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + if (rmTimelineCollectorManager != null) { + rmTimelineCollectorManager.stop(); + } + } + + private static YarnConfiguration getConf(boolean v1Enabled, + boolean v2Enabled) { + YarnConfiguration yarnConf = new YarnConfiguration(); + + if (v1Enabled || v2Enabled) { + yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + } else { + yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); + } + + if (v1Enabled) { + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, + MemoryTimelineStore.class, TimelineStore.class); + yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, + MemoryTimelineStateStore.class, TimelineStateStore.class); + } + + if (v2Enabled) { + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0"); + yarnConf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + true); + yarnConf.setBoolean( + YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, true); + yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + + try { + yarnConf.set( + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + testRootDir.getCanonicalPath()); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail("Exception while setting the " + + "TIMELINE_SERVICE_STORAGE_DIR_ROOT "); + } + } + + if (v1Enabled && v2Enabled) { + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f"); + } + + yarnConf.setInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2); + + return yarnConf; + } + + // runs test to validate timeline events are published if and only if the + // service is enabled for v1 and v2 (independently). + private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception { + testSetup(v1Enabled, v2Enabled); + publishEvents(v1Enabled, v2Enabled); + validateV1(v1Enabled); + validateV2(v2Enabled); + testCleanup(); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingV1V2Enabled() + throws Exception { + runTest(true, true); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingV1Enabled() throws Exception { + runTest(true, false); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingV2Enabled() throws Exception { + runTest(false, true); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingNoService() throws Exception { + runTest(false, false); + } + + private void publishEvents(boolean v1Enabled, boolean v2Enabled) { + long timestamp = (v1Enabled) ? 1 : 2; + int id = (v2Enabled) ? 3 : 4; + ApplicationId appId = ApplicationId.newInstance(timestamp, id); + + app = createRMApp(appId); + rmAppsMapInContext.putIfAbsent(appId, app); + + if (v2Enabled) { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + rmTimelineCollectorManager.putIfAbsent(appId, collector); + } + appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + RMAppAttempt appAttempt = createRMAppAttempt(true); + + metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); + metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, + app, Integer.MAX_VALUE + 2L); + if (v2Enabled) { + dispatcher.await(); + } + } + + private void validateV1(boolean v1Enabled) throws Exception { + TimelineEntity entity = null; + + if (!v1Enabled) { + Thread.sleep(1000); + entity = + store.getEntity(appAttemptId.toString(), + AppAttemptMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + Assert.assertNull(entity); + return; + } + + do { + entity = + store.getEntity(appAttemptId.toString(), + AppAttemptMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + Thread.sleep(100); + // ensure two events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 2); + + boolean hasRegisteredEvent = false; + boolean hasFinishedEvent = false; + for (org.apache.hadoop.yarn.api.records.timeline.TimelineEvent event : + entity.getEvents()) { + if (event.getEventType().equals( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { + hasRegisteredEvent = true; + } else if (event.getEventType().equals( + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals( + FinalApplicationStatus.UNDEFINED.toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.FINAL_STATUS_INFO)); + Assert.assertEquals( + YarnApplicationAttemptState.FINISHED.toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.STATE_INFO)); + } + Assert + .assertEquals(appAttemptId.toString(), entity.getEntityId()); + } + Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent); + } + + private void validateV2(boolean v2Enabled) throws Exception { + String outputDirApp = + getTimelineEntityDir() + "/" + + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertEquals(v2Enabled, entityFolder.isDirectory()); + + if (v2Enabled) { + String timelineServiceFileName = appAttemptId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File entityFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(entityFile.exists()); + long idPrefix = TimelineServiceHelper + .invertLong(appAttemptId.getAttemptId()); + verifyEntity(entityFile, 2, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 0, idPrefix); + } + } + + private void verifyEntity(File entityFile, long expectedEvents, + String eventForCreatedTime, long expectedMetrics, long idPrefix) + throws IOException { + + BufferedReader reader = null; + String strLine; + long count = 0; + long metricsCount = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + while ((strLine = reader.readLine()) != null) { + if (strLine.trim().length() > 0) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + entity = FileSystemTimelineReaderImpl + .getTimelineRecordFromJSON(strLine.trim(), + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class); + metricsCount = entity.getMetrics().size(); + assertEquals(idPrefix, entity.getIdPrefix()); + for (TimelineEvent event : entity.getEvents()) { + if (event.getId().equals(eventForCreatedTime)) { + assertTrue(entity.getCreatedTime() > 0); + break; + } + } + count++; + } + } + } finally { + reader.close(); + } + assertEquals("Expected " + expectedEvents + " events to be published", + expectedEvents, count); + assertEquals("Expected " + expectedMetrics + " metrics is incorrect", + expectedMetrics, metricsCount); + } + + private String getTimelineEntityDir() { + String outputDirApp = + testRootDir.getAbsolutePath() + "/" + + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + app.getUser() + "/" + + app.getName() + "/" + + TimelineUtils.DEFAULT_FLOW_VERSION + "/" + + app.getStartTime() + "/" + + app.getApplicationId(); + return outputDirApp; + } + + private static RMAppAttempt createRMAppAttempt(boolean unmanagedAMAttempt) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + if (!unmanagedAMAttempt) { + Container container = mock(Container.class); + when(container.getId()) + .thenReturn(ContainerId.newContainerId(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + } + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); + when(appAttempt.getOriginalTrackingUrl()).thenReturn( + "test original tracking url"); + return appAttempt; + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp rmApp = mock(RMAppImpl.class); + when(rmApp.getApplicationId()).thenReturn(appId); + when(rmApp.getName()).thenReturn("test app"); + when(rmApp.getApplicationType()).thenReturn("test app type"); + when(rmApp.getUser()).thenReturn("testUser"); + when(rmApp.getQueue()).thenReturn("test queue"); + when(rmApp.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(rmApp.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(rmApp.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); + when(rmApp.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(appId, 1)); + when(rmApp.getCurrentAppAttempt()).thenReturn(appAttempt); + when(rmApp.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(rmApp.getRMAppMetrics()).thenReturn( + new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, Integer.MAX_VALUE, + Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE)); + when(rmApp.getApplicationTags()).thenReturn( + Collections. emptySet()); + ApplicationSubmissionContext appSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(appSubmissionContext.getPriority()) + .thenReturn(Priority.newInstance(0)); + + ContainerLaunchContext containerLaunchContext = + mock(ContainerLaunchContext.class); + when(containerLaunchContext.getCommands()) + .thenReturn(Collections.singletonList("java -Xmx1024m")); + when(appSubmissionContext.getAMContainerSpec()) + .thenReturn(containerLaunchContext); + when(rmApp.getApplicationPriority()).thenReturn(Priority.newInstance(10)); + when(rmApp.getApplicationSubmissionContext()) + .thenReturn(appSubmissionContext); + return rmApp; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index de282fd0631..d444b64ddfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -246,6 +246,15 @@ public void serviceInit(Configuration conf) throws Exception { useFixedPorts = conf.getBoolean( YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS); + + if (!useFixedPorts) { + String hostname = MiniYARNCluster.getHostname(); + conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0"); + + conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + hostname + ":" + ServerSocketUtil.getPort(9188, 10)); + } + useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, @@ -799,12 +808,6 @@ protected synchronized void serviceInit(Configuration conf) } conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, MemoryTimelineStateStore.class, TimelineStateStore.class); - if (!useFixedPorts) { - String hostname = MiniYARNCluster.getHostname(); - conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - hostname + ":" + ServerSocketUtil.getPort(9188, 10)); - } appHistoryServer.init(conf); super.serviceInit(conf); }