diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml index bee9618..67a65fa 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml @@ -98,6 +98,12 @@ test-jar + org.apache.hadoop + hadoop-yarn-server-timelineservice + test + test-jar + + org.hsqldb hsqldb test diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index fde9e64..00a8451 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Assert; import org.junit.Test; @@ -64,7 +65,7 @@ private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; private static final Log LOG = LogFactory.getLog(TestMRTimelineEventHandling.class); - + @Test public void testTimelineServiceStartInMiniCluster() throws Exception { Configuration conf = new YarnConfiguration(); @@ -79,6 +80,8 @@ public void testTimelineServiceStartInMiniCluster() throws Exception { try { cluster = new MiniMRYarnCluster( TestMRTimelineEventHandling.class.getSimpleName(), 1); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); cluster.init(conf); cluster.start(); @@ -167,7 +170,7 @@ public void testMRTimelineEventHandling() throws Exception { } } } - + @Test public void testMRNewTimelineServiceEventHandling() throws Exception { LOG.info("testMRNewTimelineServiceEventHandling start."); @@ -181,7 +184,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); - + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); MiniMRYarnCluster cluster = null; @@ -212,9 +215,9 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(new Configuration(cluster.getConfig())); yarnClient.start(); - EnumSet appStates = + EnumSet appStates = EnumSet.allOf(YarnApplicationState.class); - + ApplicationId firstAppId = null; List apps = yarnClient.getApplications(appStates); Assert.assertEquals(apps.size(), 1); @@ -227,7 +230,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir); Assert.assertEquals(JobStatus.FAILED, job.getJobStatus().getState().getValue()); - + apps = yarnClient.getApplications(appStates); Assert.assertEquals(apps.size(), 2); @@ -247,10 +250,10 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { if(testRootFolder.isDirectory()) { FileUtils.deleteDirectory(testRootFolder); } - + } } - + private void checkNewTimelineEvent(ApplicationId appId, ApplicationReport appReport) throws IOException { String tmpRoot = @@ -258,7 +261,7 @@ private void checkNewTimelineEvent(ApplicationId appId, + "/entities/"; File tmpRootFolder = new File(tmpRoot); - + Assert.assertTrue(tmpRootFolder.isDirectory()); String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + UserGroupInformation.getCurrentUser().getShortUserName() + @@ -316,7 +319,7 @@ private void checkNewTimelineEvent(ApplicationId appId, Assert.assertTrue("Task output directory: " + outputDirTask + " does not exist.", taskFolder.isDirectory()); - + String taskEventFileName = appId.toString().replaceAll("application", "task") + "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; @@ -327,15 +330,15 @@ private void checkNewTimelineEvent(ApplicationId appId, taskEventFile.exists()); verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(), true, false, null); - + // check for task attempt event file String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/"; File taskAttemptFolder = new File(outputDirTaskAttempt); - Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + + Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + " does not exist.", taskAttemptFolder.isDirectory()); - + String taskAttemptEventFileName = appId.toString().replaceAll( - "application", "attempt") + "_m_000000_0" + + "application", "attempt") + "_m_000000_0" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; String taskAttemptEventFilePath = outputDirTaskAttempt + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3837a2b..6ca021c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -101,7 +101,7 @@ private static void addDeprecatedKeys() { YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled"; public static final boolean DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO = false; - + //////////////////////////////// // IPC Configs //////////////////////////////// @@ -110,27 +110,27 @@ private static void addDeprecatedKeys() { /** Factory to create client IPC classes.*/ public static final String IPC_CLIENT_FACTORY_CLASS = IPC_PREFIX + "client.factory.class"; - public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = + public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl"; /** Factory to create server IPC classes.*/ - public static final String IPC_SERVER_FACTORY_CLASS = + public static final String IPC_SERVER_FACTORY_CLASS = IPC_PREFIX + "server.factory.class"; - public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS = + public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl"; /** Factory to create serializeable records.*/ - public static final String IPC_RECORD_FACTORY_CLASS = + public static final String IPC_RECORD_FACTORY_CLASS = IPC_PREFIX + "record.factory.class"; - public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS = + public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl"; /** RPC class implementation*/ public static final String IPC_RPC_IMPL = IPC_PREFIX + "rpc.class"; - public static final String DEFAULT_IPC_RPC_IMPL = + public static final String DEFAULT_IPC_RPC_IMPL = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC"; - + //////////////////////////////// // Resource Manager Configs //////////////////////////////// @@ -142,7 +142,7 @@ private static void addDeprecatedKeys() { public static final String RM_HOSTNAME = RM_PREFIX + "hostname"; /** The address of the applications manager interface in the RM.*/ - public static final String RM_ADDRESS = + public static final String RM_ADDRESS = RM_PREFIX + "address"; public static final int DEFAULT_RM_PORT = 8032; public static final String DEFAULT_RM_ADDRESS = @@ -170,9 +170,9 @@ private static void addDeprecatedKeys() { /** The Kerberos principal for the resource manager.*/ public static final String RM_PRINCIPAL = RM_PREFIX + "principal"; - + /** The address of the scheduler interface.*/ - public static final String RM_SCHEDULER_ADDRESS = + public static final String RM_SCHEDULER_ADDRESS = RM_PREFIX + "scheduler.address"; public static final int DEFAULT_RM_SCHEDULER_PORT = 8030; public static final String DEFAULT_RM_SCHEDULER_ADDRESS = "0.0.0.0:" + @@ -200,12 +200,12 @@ private static void addDeprecatedKeys() { public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; /** If the port should be included or not in the node name. The node name - * is used by the scheduler for resource requests allocation location + * is used by the scheduler for resource requests allocation location * matching. Typically this is just the hostname, using the port is needed * when using minicluster and specific NM are required.*/ public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME = YARN_PREFIX + "scheduler.include-port-in-node-name"; - public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = + public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false; /** Enable Resource Manager webapp ui actions */ @@ -246,23 +246,23 @@ private static void addDeprecatedKeys() { RM_PREFIX + "scheduler.monitor.policies"; /** The address of the RM web application.*/ - public static final String RM_WEBAPP_ADDRESS = + public static final String RM_WEBAPP_ADDRESS = RM_PREFIX + "webapp.address"; public static final int DEFAULT_RM_WEBAPP_PORT = 8088; public static final String DEFAULT_RM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_RM_WEBAPP_PORT; - + /** The https address of the RM web application.*/ public static final String RM_WEBAPP_HTTPS_ADDRESS = RM_PREFIX + "webapp.https.address"; public static final boolean YARN_SSL_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false; public static final String YARN_SSL_SERVER_RESOURCE_DEFAULT = "ssl-server.xml"; - + public static final int DEFAULT_RM_WEBAPP_HTTPS_PORT = 8090; public static final String DEFAULT_RM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_RM_WEBAPP_HTTPS_PORT; - + public static final String RM_RESOURCE_TRACKER_ADDRESS = RM_PREFIX + "resource-tracker.address"; public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031; @@ -270,17 +270,17 @@ private static void addDeprecatedKeys() { "0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT; /** The expiry interval for application master reporting.*/ - public static final String RM_AM_EXPIRY_INTERVAL_MS = + public static final String RM_AM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms"; public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000; /** How long to wait until a node manager is considered dead.*/ - public static final String RM_NM_EXPIRY_INTERVAL_MS = + public static final String RM_NM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms"; public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000; /** Are acls enabled.*/ - public static final String YARN_ACL_ENABLE = + public static final String YARN_ACL_ENABLE = YARN_PREFIX + "acl.enable"; public static final boolean DEFAULT_YARN_ACL_ENABLE = false; @@ -294,10 +294,10 @@ public static boolean isAclEnabled(Configuration conf) { } /** ACL of who can be admin of YARN cluster.*/ - public static final String YARN_ADMIN_ACL = + public static final String YARN_ADMIN_ACL = YARN_PREFIX + "admin.acl"; public static final String DEFAULT_YARN_ADMIN_ACL = "*"; - + /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; @@ -373,17 +373,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false; /** The address of the RM admin interface.*/ - public static final String RM_ADMIN_ADDRESS = + public static final String RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"; public static final int DEFAULT_RM_ADMIN_PORT = 8033; public static final String DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:" + DEFAULT_RM_ADMIN_PORT; - + /**Number of threads used to handle RM admin interface.*/ public static final String RM_ADMIN_CLIENT_THREAD_COUNT = RM_PREFIX + "admin.client.thread-count"; public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1; - + /** * The maximum number of application attempts. * It's a global setting for all application masters. @@ -391,15 +391,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_AM_MAX_ATTEMPTS = RM_PREFIX + "am.max-attempts"; public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2; - + /** The keytab for the resource manager.*/ - public static final String RM_KEYTAB = + public static final String RM_KEYTAB = RM_PREFIX + "keytab"; /**The kerberos principal to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_USER_NAME_KEY = RM_PREFIX + "webapp.spnego-principal"; - + /**The kerberos keytab to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = RM_PREFIX + "webapp.spnego-keytab-file"; @@ -421,30 +421,30 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_RM_WEBAPP_ENABLE_CORS_FILTER = false; /** How long to wait until a container is considered dead.*/ - public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = + public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000; - + /** Path to file with nodes to include.*/ - public static final String RM_NODES_INCLUDE_FILE_PATH = + public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = ""; - + /** Path to file with nodes to exclude.*/ - public static final String RM_NODES_EXCLUDE_FILE_PATH = + public static final String RM_NODES_EXCLUDE_FILE_PATH = RM_PREFIX + "nodes.exclude-path"; public static final String DEFAULT_RM_NODES_EXCLUDE_FILE_PATH = ""; - + /** Number of threads to handle resource tracker calls.*/ public static final String RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = RM_PREFIX + "resource-tracker.client.thread-count"; public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 50; - + /** The class to use as the resource scheduler.*/ - public static final String RM_SCHEDULER = + public static final String RM_SCHEDULER = RM_PREFIX + "scheduler.class"; - - public static final String DEFAULT_RM_SCHEDULER = + + public static final String DEFAULT_RM_SCHEDULER = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; /** RM set next Heartbeat interval for NM */ @@ -504,7 +504,7 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "delegation.token.max-lifetime"; public static final long RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days - + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; @@ -648,7 +648,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; - + /** URI for FileSystemRMStateStore */ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; @@ -676,7 +676,7 @@ public static boolean isAclEnabled(Configuration conf) { + "leveldb-state-store.compaction-interval-secs"; public static final long DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS = 3600; - /** The maximum number of completed applications RM keeps. */ + /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000; @@ -698,7 +698,7 @@ public static boolean isAclEnabled(Configuration conf) { /** Default application type length */ public static final int APPLICATION_TYPE_LENGTH = 20; - + /** Default queue name */ public static final String DEFAULT_QUEUE_NAME = "default"; @@ -711,7 +711,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Default sizes of the runtime metric buckets in minutes. */ - public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = + public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = "60,300,1440"; public static final String RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX @@ -728,7 +728,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX + "nm-tokens.master-key-rolling-interval-secs"; - + public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = 24 * 60 * 60; @@ -746,7 +746,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// // Node Manager Configs //////////////////////////////// - + /** Prefix for all node manager configs.*/ public static final String NM_PREFIX = "yarn.nodemanager."; @@ -768,57 +768,57 @@ public static boolean isAclEnabled(Configuration conf) { ApplicationConstants.Environment.HADOOP_CONF_DIR.key(), ApplicationConstants.Environment.CLASSPATH_PREPEND_DISTCACHE.key(), ApplicationConstants.Environment.HADOOP_YARN_HOME.key())); - + /** address of node manager IPC.*/ public static final String NM_ADDRESS = NM_PREFIX + "address"; public static final int DEFAULT_NM_PORT = 0; public static final String DEFAULT_NM_ADDRESS = "0.0.0.0:" + DEFAULT_NM_PORT; - + /** The actual bind address or the NM.*/ public static final String NM_BIND_HOST = NM_PREFIX + "bind-host"; /** who will execute(launch) the containers.*/ - public static final String NM_CONTAINER_EXECUTOR = + public static final String NM_CONTAINER_EXECUTOR = NM_PREFIX + "container-executor.class"; - /** + /** * Adjustment to make to the container os scheduling priority. * The valid values for this could vary depending on the platform. - * On Linux, higher values mean run the containers at a less - * favorable priority than the NM. + * On Linux, higher values mean run the containers at a less + * favorable priority than the NM. * The value specified is an int. */ - public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = + public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = NM_PREFIX + "container-executor.os.sched.priority.adjustment"; public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0; - + /** Number of threads container manager uses.*/ public static final String NM_CONTAINER_MGR_THREAD_COUNT = NM_PREFIX + "container-manager.thread-count"; public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; - + /** Number of threads container manager uses.*/ public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT = NM_PREFIX + "collector-service.thread-count"; public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5; /** Number of threads used in cleanup.*/ - public static final String NM_DELETE_THREAD_COUNT = + public static final String NM_DELETE_THREAD_COUNT = NM_PREFIX + "delete.thread-count"; public static final int DEFAULT_NM_DELETE_THREAD_COUNT = 4; - + /** Keytab for NM.*/ public static final String NM_KEYTAB = NM_PREFIX + "keytab"; - + /**List of directories to store localized files in.*/ public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs"; public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir"; /** * Number of files in each localized directories - * Avoid tuning this too low. + * Avoid tuning this too low. */ public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = NM_PREFIX + "local-cache.max-files-per-directory"; @@ -830,7 +830,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_NM_LOCALIZER_PORT = 8040; public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; - + /** Address where the collector service IPC is.*/ public static final String NM_COLLECTOR_SERVICE_ADDRESS = NM_PREFIX + "collector-service.address"; @@ -841,9 +841,9 @@ public static boolean isAclEnabled(Configuration conf) { /** Interval in between cache cleanups.*/ public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = NM_PREFIX + "localizer.cache.cleanup.interval-ms"; - public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = + public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = 10 * 60 * 1000; - + /** * Target size of localizer cache in MB, per nodemanager. It is a target * retention size that only includes resources with PUBLIC and PRIVATE @@ -852,14 +852,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOCALIZER_CACHE_TARGET_SIZE_MB = NM_PREFIX + "localizer.cache.target-size-mb"; public static final long DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB = 10 * 1024; - + /** Number of threads to handle localization requests.*/ public static final String NM_LOCALIZER_CLIENT_THREAD_COUNT = NM_PREFIX + "localizer.client.thread-count"; public static final int DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT = 5; - + /** Number of threads to use for localization fetching.*/ - public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = + public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = NM_PREFIX + "localizer.fetch.thread-count"; public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4; @@ -896,7 +896,7 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; - + /** Delegation Token renewer thread count */ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = RM_PREFIX + "delegation-token-renewer.thread-count"; @@ -925,15 +925,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX + "log-aggregation-enable"; public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false; - - /** + + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. */ public static final String LOG_AGGREGATION_RETAIN_SECONDS = YARN_PREFIX + "log-aggregation.retain-seconds"; public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; - + /** * How long to wait between aggregated log retention checks. If set to * a value {@literal <=} 0 then the value is computed as one-tenth of the @@ -974,12 +974,12 @@ public static boolean isAclEnabled(Configuration conf) { * Number of threads used in log cleanup. Only applicable if Log aggregation * is disabled */ - public static final String NM_LOG_DELETION_THREADS_COUNT = + public static final String NM_LOG_DELETION_THREADS_COUNT = NM_PREFIX + "log.deletion-threads-count"; public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4; /** Where to aggregate logs to.*/ - public static final String NM_REMOTE_APP_LOG_DIR = + public static final String NM_REMOTE_APP_LOG_DIR = NM_PREFIX + "remote-app-log-dir"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; @@ -987,14 +987,14 @@ public static boolean isAclEnabled(Configuration conf) { * The remote log dir will be created at * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} */ - public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = + public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; public static final String YARN_LOG_SERVER_URL = YARN_PREFIX + "log.server.url"; - - public static final String YARN_TRACKING_URL_GENERATOR = + + public static final String YARN_TRACKING_URL_GENERATOR = YARN_PREFIX + "tracking.url.generator"; /** Amount of memory in MB that can be allocated for containers.*/ @@ -1019,7 +1019,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_VMEM_PMEM_RATIO = NM_PREFIX + "vmem-pmem-ratio"; public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f; - + /** Number of Virtual CPU Cores which can be allocated for containers.*/ public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores"; public static final int DEFAULT_NM_VCORES = 8; @@ -1154,13 +1154,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_NM_WEBAPP_PORT = 8042; public static final String DEFAULT_NM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_NM_WEBAPP_PORT; - + /** NM Webapp https address.**/ public static final String NM_WEBAPP_HTTPS_ADDRESS = NM_PREFIX + "webapp.https.address"; public static final int DEFAULT_NM_WEBAPP_HTTPS_PORT = 8044; public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" - + DEFAULT_NM_WEBAPP_HTTPS_PORT; + + DEFAULT_NM_WEBAPP_HTTPS_PORT; /** Enable/disable CORS filter. */ public static final String NM_WEBAPP_ENABLE_CORS_FILTER = @@ -1280,22 +1280,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0; /** Frequency of running node health script.*/ - public static final String NM_HEALTH_CHECK_INTERVAL_MS = + public static final String NM_HEALTH_CHECK_INTERVAL_MS = NM_PREFIX + "health-checker.interval-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000; - /** Health check script time out period.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + /** Health check script time out period.*/ + public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = NM_PREFIX + "health-checker.script.timeout-ms"; - public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; - + /** The health check script to run.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_PATH = + public static final String NM_HEALTH_CHECK_SCRIPT_PATH = NM_PREFIX + "health-checker.script.path"; - + /** The arguments to pass to the health check script.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = + public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = NM_PREFIX + "health-checker.script.opts"; /** The JVM options used on forking ContainerLocalizer process @@ -1365,10 +1365,10 @@ public static boolean isAclEnabled(Configuration conf) { /** The path to the Linux container executor.*/ public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH = NM_PREFIX + "linux-container-executor.path"; - - /** + + /** * The UNIX group that the linux-container-executor should run as. - * This is intended to be set as part of container-executor.cfg. + * This is intended to be set as part of container-executor.cfg. */ public static final String NM_LINUX_CONTAINER_GROUP = NM_PREFIX + "linux-container-executor.group"; @@ -1392,30 +1392,30 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NM_NONSECURE_MODE_LOCAL_USER = "nobody"; /** - * The allowed pattern for UNIX user names enforced by - * Linux-container-executor when used in nonsecure mode (use case for this + * The allowed pattern for UNIX user names enforced by + * Linux-container-executor when used in nonsecure mode (use case for this * is using cgroups). The default value is taken from /usr/sbin/adduser */ public static final String NM_NONSECURE_MODE_USER_PATTERN_KEY = NM_PREFIX + "linux-container-executor.nonsecure-mode.user-pattern"; - public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = + public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = "^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$"; /** The type of resource enforcement to use with the * linux container executor. */ - public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = + public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = NM_PREFIX + "linux-container-executor.resources-handler.class"; - + /** The path the linux container executor should use for cgroups */ public static final String NM_LINUX_CONTAINER_CGROUPS_HIERARCHY = NM_PREFIX + "linux-container-executor.cgroups.hierarchy"; - + /** Whether the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT = NM_PREFIX + "linux-container-executor.cgroups.mount"; - + /** Where the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH = NM_PREFIX + "linux-container-executor.cgroups.mount-path"; @@ -1433,7 +1433,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Interval of time the linux container executor should try cleaning up - * cgroups entry when cleaning up a container. This is required due to what + * cgroups entry when cleaning up a container. This is required due to what * it seems a race condition because the SIGTERM/SIGKILL is asynch. */ public static final String NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT = @@ -1463,24 +1463,24 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "windows-container.cpu-limit.enabled"; public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false; - /** + /** /* The Windows group that the windows-secure-container-executor should run as. */ public static final String NM_WINDOWS_SECURE_CONTAINER_GROUP = NM_PREFIX + "windows-secure-container-executor.group"; /** T-file compression types used to compress aggregated logs.*/ - public static final String NM_LOG_AGG_COMPRESSION_TYPE = + public static final String NM_LOG_AGG_COMPRESSION_TYPE = NM_PREFIX + "log-aggregation.compression-type"; public static final String DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE = "none"; - + /** The kerberos principal for the node manager.*/ public static final String NM_PRINCIPAL = NM_PREFIX + "principal"; - - public static final String NM_AUX_SERVICES = + + public static final String NM_AUX_SERVICES = NM_PREFIX + "aux-services"; - + public static final String NM_AUX_SERVICE_FMT = NM_PREFIX + "aux-services.%s.class"; @@ -1501,11 +1501,11 @@ public static boolean isAclEnabled(Configuration conf) { /**The kerberos principal to be used for spnego filter for NM.*/ public static final String NM_WEBAPP_SPNEGO_USER_NAME_KEY = NM_PREFIX + "webapp.spnego-principal"; - + /**The kerberos keytab to be used for spnego filter for NM.*/ public static final String NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = NM_PREFIX + "webapp.spnego-keytab-file"; - + public static final String DEFAULT_NM_USER_HOME_DIR= "/home/"; public static final String NM_RECOVERY_PREFIX = NM_PREFIX + "recovery."; @@ -1536,41 +1536,41 @@ public static boolean isAclEnabled(Configuration conf) { // Web Proxy Configs //////////////////////////////// public static final String PROXY_PREFIX = "yarn.web-proxy."; - + /** The kerberos principal for the proxy.*/ public static final String PROXY_PRINCIPAL = PROXY_PREFIX + "principal"; - + /** Keytab for Proxy.*/ public static final String PROXY_KEYTAB = PROXY_PREFIX + "keytab"; - + /** The address for the web proxy.*/ public static final String PROXY_ADDRESS = PROXY_PREFIX + "address"; public static final int DEFAULT_PROXY_PORT = 9099; public static final String DEFAULT_PROXY_ADDRESS = "0.0.0.0:" + DEFAULT_PROXY_PORT; - + /** * YARN Service Level Authorization */ - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL = "security.resourcetracker.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONCLIENT_PROTOCOL = "security.applicationclient.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCEMANAGER_ADMINISTRATION_PROTOCOL = "security.resourcemanager-administration.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONMASTER_PROTOCOL = "security.applicationmaster.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL = "security.containermanagement.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; @@ -1907,11 +1907,13 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD = TIMELINE_SERVICE_PREFIX - + "coprocessor.app-final-value-retention-milliseconds"; + + "hbase.coprocessor.app-final-value-retention-milliseconds"; /** - * The setting that controls how long the final value of a metric - * of a completed app is retained before merging into the flow sum. + * The setting that controls how long the final value of a metric of a + * completed app is retained before merging into the flow sum. Up to this time + * after an application is completed out-of-order values that arrive can be + * recognized and discarded at the cost of increased storage. */ public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24 * 60 * 60 * 1000L; @@ -1991,7 +1993,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = TIMELINE_SERVICE_PREFIX + "handler-thread-count"; public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10; - + /** The address of the timeline service web application.*/ public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS = @@ -2212,7 +2214,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String SHARED_CACHE_NESTED_LEVEL = SHARED_CACHE_PREFIX + "nested-level"; public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; - + // Shared Cache Manager Configs public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store."; @@ -2246,7 +2248,7 @@ public static boolean isAclEnabled(Configuration conf) { "0.0.0.0:" + DEFAULT_SCM_WEBAPP_PORT; // In-memory SCM store configuration - + public static final String IN_MEMORY_STORE_PREFIX = SCM_STORE_PREFIX + "in-memory."; @@ -2267,7 +2269,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String IN_MEMORY_INITIAL_DELAY_MINS = IN_MEMORY_STORE_PREFIX + "initial-delay-mins"; public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10; - + /** * The frequency at which the in-memory store checks to remove dead initial * applications. Specified in minutes. @@ -2449,13 +2451,13 @@ public static boolean isAclEnabled(Configuration conf) { * Node-labels configurations */ public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels."; - + /** Node label store implementation class */ public static final String FS_NODE_LABELS_STORE_IMPL_CLASS = NODE_LABELS_PREFIX + "fs-store.impl.class"; public static final String DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS = "org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore"; - + /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX + "fs-store.root-dir"; @@ -2463,7 +2465,7 @@ public static boolean isAclEnabled(Configuration conf) { NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; - + /** * Flag to indicate if the node labels feature enabled, by default it's * disabled @@ -2471,10 +2473,10 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX + "enabled"; public static final boolean DEFAULT_NODE_LABELS_ENABLED = false; - + public static final String NODELABEL_CONFIGURATION_TYPE = NODE_LABELS_PREFIX + "configuration-type"; - + public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE = "centralized"; @@ -2483,7 +2485,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE = "distributed"; - + public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = CENTRALIZED_NODELABEL_CONFIGURATION_TYPE; @@ -2617,7 +2619,7 @@ public static boolean areNodeLabelsEnabled( public YarnConfiguration() { super(); } - + public YarnConfiguration(Configuration conf) { super(conf); if (! (conf instanceof YarnConfiguration)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml index 90378ee..30f1beb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -77,6 +77,7 @@ org.apache.hadoop hadoop-yarn-server-timelineservice + test-jar test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 22c16e3..cf2d5b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -64,29 +64,26 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter; +import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; -import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.client.api.YarnClient; - import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.TimelineVersion; import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -152,7 +149,7 @@ private void setupInternal(int numNodeManager, float timelineVersion) // Enable ContainersMonitorImpl conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, LinuxResourceCalculatorPlugin.class.getName()); - conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, ProcfsBasedProcessTree.class.getName()); conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); @@ -185,16 +182,19 @@ private void setupInternal(int numNodeManager, float timelineVersion) conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, + org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter.class); } else { Assert.fail("Wrong timeline version number: " + timelineVersion); } - + if (yarnCluster == null) { yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, numNodeManager, 1, 1); yarnCluster.init(conf); - + yarnCluster.start(); conf.set( @@ -256,7 +256,7 @@ public void tearDown() throws IOException { .get("yarn.timeline-service.leveldb-timeline-store.path")), true); } - + @Test public void testDSShellWithDomain() throws Exception { testDSShell(true); @@ -1065,7 +1065,7 @@ public void testDSShellWithInvalidArgs() throws Exception { Assert.assertTrue("The throw exception is not expected", e.getMessage().contains("Invalid no. of containers")); } - + LOG.info("Initializing DS Client with invalid no. of vcores"); try { String[] args = { @@ -1178,7 +1178,7 @@ protected TimelineWriter createTimelineWriter(Configuration conf, protected void waitForNMsToRegister() throws Exception { int sec = 60; while (sec >= 0) { - if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() + if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() >= NUM_NMS) { break; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 4899dcb..b8a1a79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2128,7 +2128,6 @@ 604800 - yarn.timeline-service.entity-group-fs-store.leveldb-cache-read-cache-size @@ -2166,6 +2165,23 @@ 300 + + + yarn.timeline-service.writer.class + + Storage implementation ATS v2 will use for the TimelineWriter service. + + org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl + + + + yarn.timeline-service.reader.class + + Storage implementation ATS v2 will use for the TimelineReader service. + + org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl + + yarn.timeline-service.client.internal-timers-ttl-secs @@ -2199,10 +2215,15 @@ - The setting that controls how long the final value - of a metric of a completed app is retained before merging into - the flow sum. - yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds + + The setting that controls how long the final value + of a metric of a completed app is retained before merging into + the flow sum. Up to this time after an application is completed + out-of-order values that arrive can be recognized and discarded at the + cost of increased storage. + + yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds + 259200000 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index f25fb48..2a39d46 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -158,6 +158,12 @@ log4j log4j + + org.apache.hadoop + hadoop-yarn-server-timelineservice + test + test-jar + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 13c67f8..0da395a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -104,6 +105,8 @@ public static void setup() throws Exception { rmTimelineCollectorManager); Configuration conf = getTimelineV2Conf(); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); rmTimelineCollectorManager.init(conf); rmTimelineCollectorManager.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index a734340..3a5c797 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -66,6 +68,8 @@ public static void setupClass() throws Exception { // enable timeline service v.2 conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], collectorManager, conf); @@ -159,7 +163,9 @@ protected CollectorNodemanagerProtocol getNMCollectorService() { mock(CollectorNodemanagerProtocol.class); try { GetTimelineCollectorContextResponse response = - GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L); + GetTimelineCollectorContextResponse.newInstance( + UserGroupInformation.getCurrentUser().getShortUserName(), + "test_flow_name", "test_flow_version", 1L); when(protocol.getTimelineCollectorContext(any( GetTimelineCollectorContextRequest.class))).thenReturn(response); } catch (YarnException | IOException e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index a8f88e5..9758320 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import com.google.common.annotations.VisibleForTesting; @@ -61,7 +61,7 @@ public void serviceInit(Configuration conf) throws Exception { writer = ReflectionUtils.newInstance(conf.getClass( YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, - FileSystemTimelineWriterImpl.class, + HBaseTimelineWriterImpl.class, TimelineWriter.class), conf); writer.init(conf); // create a single dedicated thread for flushing the writer on a periodic diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index 97725e6..110d1dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -41,7 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; @@ -81,7 +81,7 @@ protected void serviceInit(Configuration conf) throws Exception { private TimelineReader createTimelineReaderStore(Configuration conf) { TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass( YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, - FileSystemTimelineReaderImpl.class, TimelineReader.class), conf); + HBaseTimelineReaderImpl.class, TimelineReader.class), conf); LOG.info("Using store " + readerStore.getClass().getName()); readerStore.init(conf); return readerStore; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index 0d69fbc..854e046 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,7 +57,10 @@ @Before public void setup() throws Exception { collectorManager = createCollectorManager(); - collectorManager.init(new YarnConfiguration()); + Configuration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + collectorManager.init(conf); collectorManager.start(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index f2775d5..7c2a471 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.After; import org.junit.Test; @@ -60,6 +62,8 @@ public TestPerNodeTimelineCollectorsAuxService() { // enable timeline service v.2 conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); } @After diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java index b42488c..fb95493 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderServer.java @@ -23,7 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.junit.Test; public class TestTimelineReaderServer { @@ -37,6 +38,8 @@ public void testStartStopServer() throws Exception { config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0"); + config.setClass(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + FileSystemTimelineReaderImpl.class, TimelineReader.class); try { server.init(config); assertEquals(STATE.INITED, server.getServiceState()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java index 0bddf1b..510b6e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java @@ -36,8 +36,10 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -74,9 +76,11 @@ public void init() throws Exception { Configuration config = new YarnConfiguration(); config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); - config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0"); config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + config.setClass(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + FileSystemTimelineReaderImpl.class, TimelineReader.class); server = new TimelineReaderServer(); server.init(config); server.start(); @@ -101,13 +105,12 @@ private static TimelineEntity newEntity(String type, String id) { } private static void verifyHttpResponse(Client client, URI uri, - Status status) { + Status expectedStatus) { ClientResponse resp = client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); assertNotNull(resp); - assertTrue("Response from server should have been " + status, - resp.getClientResponseStatus().equals(status)); + assertEquals(resp.getClientResponseStatus(), expectedStatus); } private static Client createClient() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 00aa686..64b5d1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -87,6 +87,7 @@ private final CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); + @VisibleForTesting public FileSystemTimelineReaderImpl() { super(FileSystemTimelineReaderImpl.class.getName()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java similarity index 98% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 74a03ac..0b50dd2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import com.google.common.annotations.VisibleForTesting; + /** * This implements a local file based backend for storing application timeline * information. @@ -60,6 +62,7 @@ /** Default extension for output files. */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + @VisibleForTesting FileSystemTimelineWriterImpl() { super((FileSystemTimelineWriterImpl.class.getName())); }