diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index e32f718..c7807b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -337,13 +337,16 @@ private static void loadData() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te); - hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1); - hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); + hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te, + user); + hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1, + user); + hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4, + user); hbi.write(cluster, user, flow2, - flowVersion2, runid2, entity3.getId(), te3); + flowVersion2, runid2, entity3.getId(), te3, user); hbi.write(cluster, user, flow, flowVersion, runid, - "application_1111111111_1111", userEntities); + "application_1111111111_1111", userEntities, user); writeApplicationEntities(hbi, ts); hbi.flush(); } finally { @@ -376,7 +379,7 @@ static void writeApplicationEntities(HBaseTimelineWriterImpl hbi, appEntity.addEvent(finished); te.addEntity(appEntity); hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i, - appEntity.getId(), te); + appEntity.getId(), te, "user1"); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java index 926d8bb..c95d319 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java @@ -161,11 +161,11 @@ public static void loadApps(HBaseTestingUtility util, long ts) String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; String appName = "application_1111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); appName = "application_1111111111_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); appName = "application_1111111111_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te2); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te2, user); hbi.stop(); } finally { if (hbi != null) { @@ -438,10 +438,12 @@ public static void loadEntities(HBaseTestingUtility util, long ts) String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appName1, te); - hbi.write(cluster, user, flow, flowVersion, runid, appName2, te); - hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1); - hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2); + hbi.write(cluster, user, flow, flowVersion, runid, appName1, te, user); + hbi.write(cluster, user, flow, flowVersion, runid, appName2, te, user); + hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1, + user); + hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2, + user); hbi.stop(); } finally { if (hbi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java index d6b0370..66aa3cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java @@ -161,7 +161,7 @@ public void testWriteNullApplicationToHBase() throws Exception { String flow = null; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te, user); hbi.stop(); // retrieve the row @@ -279,7 +279,7 @@ public void testWriteApplicationToHBase() throws Exception { String flow = "s!ome_f\tlow _n am!e"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te, user); // Write entity again, this time without created time. entity = new ApplicationEntity(); @@ -291,7 +291,7 @@ public void testWriteApplicationToHBase() throws Exception { entity.addInfo(infoMap1); te = new TimelineEntities(); te.addEntity(entity); - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te, user); hbi.stop(); infoMap.putAll(infoMap1); @@ -512,7 +512,8 @@ public void testEvents() throws IOException { String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; String appName = "application_123465899910_1001"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities, + user); hbi.stop(); // retrieve the row @@ -628,13 +629,14 @@ public void testNonIntegralMetricValues() throws IOException { hbi.start(); // Writing application entity. try { - hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp); + hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp, "u1"); Assert.fail("Expected an exception as metric values are non integral"); } catch (IOException e) {} // Writing generic entity. try { - hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity); + hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity, + "u1"); Assert.fail("Expected an exception as metric values are non integral"); } catch (IOException e) {} hbi.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java index 7ac5b36..31dbefd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java @@ -71,6 +71,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -196,12 +201,14 @@ public void testWriteEntityToHBase() throws Exception { hbi.start(); String cluster = "cluster_test_write_entity"; String user = "user1"; + String subAppUser = "subAppUser1"; String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; String appName = ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1).toString(); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, + subAppUser); hbi.stop(); // scan the table and see that entity exists @@ -352,6 +359,11 @@ public void testWriteEntityToHBase() throws Exception { assertEquals(metricValues.get(ts - 20000), metric.getValues().get(ts - 20000)); } + + // verify for sub application table entities. + verifySubApplicationTableEntities(cluster, user, flow, flowVersion, runid, + appName, subAppUser, c1, entity, id, type, infoMap, isRelatedTo, + relatesTo, conf, metricValues, metrics, cTime, m1); } finally { if (hbi != null) { hbi.stop(); @@ -360,6 +372,98 @@ public void testWriteEntityToHBase() throws Exception { } } + private void verifySubApplicationTableEntities(String cluster, String user, + String flow, String flowVersion, Long runid, String appName, + String subAppUser, Configuration c1, TimelineEntity entity, String id, + String type, Map infoMap, + Map> isRelatedTo, Map> relatesTo, + Map conf, Map metricValues, + Set metrics, Long cTime, TimelineMetric m1) + throws IOException { + Scan s = new Scan(); + // read from SubApplicationTable + byte[] startRow = new SubApplicationRowKeyPrefix(cluster, subAppUser, null, + null, null, null).getRowKeyPrefix(); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + Connection conn = ConnectionFactory.createConnection(c1); + ResultScanner scanner = + new SubApplicationTable().getResultScanner(c1, conn, s); + + int rowCount = 0; + int colCount = 0; + KeyConverter stringKeyConverter = new StringKeyConverter(); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(verifyRowKeyForSubApplication(row1, subAppUser, cluster, + user, entity)); + + // check info column family + String id1 = SubApplicationColumn.ID.readResult(result).toString(); + assertEquals(id, id1); + + String type1 = SubApplicationColumn.TYPE.readResult(result).toString(); + assertEquals(type, type1); + + Long cTime1 = + (Long) SubApplicationColumn.CREATED_TIME.readResult(result); + assertEquals(cTime1, cTime); + + Map infoColumns = SubApplicationColumnPrefix.INFO + .readResults(result, new StringKeyConverter()); + assertEquals(infoMap, infoColumns); + + // Remember isRelatedTo is of type Map> + for (Map.Entry> isRelatedToEntry : isRelatedTo + .entrySet()) { + Object isRelatedToValue = SubApplicationColumnPrefix.IS_RELATED_TO + .readResult(result, isRelatedToEntry.getKey()); + String compoundValue = isRelatedToValue.toString(); + // id7?id9?id6 + Set isRelatedToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(isRelatedTo.get(isRelatedToEntry.getKey()).size(), + isRelatedToValues.size()); + for (String v : isRelatedToEntry.getValue()) { + assertTrue(isRelatedToValues.contains(v)); + } + } + + // RelatesTo + for (Map.Entry> relatesToEntry : relatesTo + .entrySet()) { + String compoundValue = SubApplicationColumnPrefix.RELATES_TO + .readResult(result, relatesToEntry.getKey()).toString(); + // id3?id4?id5 + Set relatesToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(relatesTo.get(relatesToEntry.getKey()).size(), + relatesToValues.size()); + for (String v : relatesToEntry.getValue()) { + assertTrue(relatesToValues.contains(v)); + } + } + + // Configuration + Map configColumns = SubApplicationColumnPrefix.CONFIG + .readResults(result, stringKeyConverter); + assertEquals(conf, configColumns); + + NavigableMap> metricsResult = + SubApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result, + stringKeyConverter); + + NavigableMap metricMap = metricsResult.get(m1.getId()); + matchMetrics(metricValues, metricMap); + } + } + assertEquals(1, rowCount); + assertEquals(16, colCount); + } + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow, Long runid, String appName, TimelineEntity te) { @@ -407,7 +511,8 @@ public void testEventsWithEmptyInfo() throws IOException { byte[] startRow = new EntityRowKeyPrefix(cluster, user, flow, runid, appName) .getRowKeyPrefix(); - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities, + user); hbi.stop(); // scan the table and see that entity exists Scan s = new Scan(); @@ -510,7 +615,8 @@ public void testEventsEscapeTs() throws IOException { String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; String appName = "application_123465899910_2001"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities, + user); hbi.stop(); // read the timeline entity using the reader this time @@ -1758,4 +1864,15 @@ public void testListTypesInApp() throws Exception { public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); } + + private boolean verifyRowKeyForSubApplication(byte[] rowKey, String suAppUser, + String cluster, String user, TimelineEntity te) { + SubApplicationRowKey key = SubApplicationRowKey.parseRowKey(rowKey); + assertEquals(suAppUser, key.getSubAppUserId()); + assertEquals(cluster, key.getClusterId()); + assertEquals(te.getType(), key.getEntityType()); + assertEquals(te.getId(), key.getEntityId()); + assertEquals(user, key.getUserId()); + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 0923105..bde393e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -117,13 +117,13 @@ public void testWriteFlowRunMinMax() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another entity with the right min start time te = new TimelineEntities(); te.addEntity(entityMinStartTime); appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity for max end time TimelineEntity entityMaxEndTime = TestFlowDataGenerator @@ -131,7 +131,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityMaxEndTime); appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator @@ -139,7 +139,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // flush everything to hbase hbi.flush(); @@ -227,7 +227,7 @@ public void testWriteFlowActivityOneFlow() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_1111999999_1234"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); hbi.flush(); } finally { if (hbi != null) { @@ -341,19 +341,19 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11888888888_1111"; - hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); + hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te, user); // write an application with to this flow but a different runid/ version te = new TimelineEntities(); te.addEntity(entityApp1); appName = "application_11888888888_2222"; - hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); + hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te, user); // write an application with to this flow but a different runid/ version te = new TimelineEntities(); te.addEntity(entityApp1); appName = "application_11888888888_3333"; - hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); + hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te, user); hbi.flush(); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index acfdc4d..e5318ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -181,13 +181,13 @@ public void testWriteFlowRunMinMax() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another entity with the right min start time te = new TimelineEntities(); te.addEntity(entityMinStartTime); appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity for max end time TimelineEntity entityMaxEndTime = TestFlowDataGenerator @@ -195,7 +195,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityMaxEndTime); appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator @@ -203,7 +203,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // flush everything to hbase hbi.flush(); @@ -288,14 +288,14 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); hbi.flush(); } finally { if (hbi != null) { @@ -557,14 +557,16 @@ public void testWriteFlowRunMetricsPrefix() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te); + hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te, + user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te); + hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te, + user); hbi.flush(); } finally { if (hbi != null) { @@ -644,14 +646,14 @@ public void testWriteFlowRunsMetricFields() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); hbi.flush(); } finally { if (hbi != null) { @@ -746,7 +748,7 @@ public void testWriteFlowRunFlush() throws Exception { te1.addEntity(entityApp1); entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); te1.addEntity(entityApp2); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); Thread.sleep(1); appName = "application_1001199480000_7" + appIdSuffix; @@ -758,7 +760,7 @@ public void testWriteFlowRunFlush() throws Exception { entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); te1.addEntity(entityApp2); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); if (i % 1000 == 0) { hbi.flush(); checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, @@ -827,7 +829,7 @@ public void testFilterFlowRunsByCreatedTime() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); + "application_11111111111111_1111", te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( @@ -835,7 +837,7 @@ public void testFilterFlowRunsByCreatedTime() throws Exception { entityApp2.setCreatedTime(1425016502000L); te.addEntity(entityApp2); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); + "application_11111111111111_2222", te, user); hbi.flush(); } finally { if (hbi != null) { @@ -912,14 +914,14 @@ public void testMetricFilters() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); + "application_11111111111111_1111", te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( System.currentTimeMillis()); te.addEntity(entityApp2); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); + "application_11111111111111_2222", te, user); hbi.flush(); } finally { if (hbi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index fa9d029..faf9cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -292,14 +292,14 @@ public void testWriteFlowRunCompaction() throws Exception { te1 = new TimelineEntities(); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); appName = "application_2048000000000_7" + appIdSuffix; insertTs++; te1 = new TimelineEntities(); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); } } finally { String appName = "application_10240000000000_" + appIdSuffix; @@ -308,7 +308,7 @@ public void testWriteFlowRunCompaction() throws Exception { insertTs + 1, c1); te1.addEntity(entityApp1); if (hbi != null) { - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, null); hbi.flush(); hbi.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index dfd63bf..f2f3835 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -65,6 +65,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; /** * This implements a hbase based backend for storing the timeline entity @@ -85,6 +89,7 @@ private TypedBufferedMutator applicationTable; private TypedBufferedMutator flowActivityTable; private TypedBufferedMutator flowRunTable; + private TypedBufferedMutator subApplicationTable; /** * Used to convert strings key components to and from storage format. @@ -97,6 +102,10 @@ */ private final KeyConverter longKeyConverter = new LongKeyConverter(); + private enum Tables { + APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE + }; + public HBaseTimelineWriterImpl() { super(HBaseTimelineWriterImpl.class.getName()); } @@ -116,6 +125,8 @@ protected void serviceInit(Configuration conf) throws Exception { flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn); + subApplicationTable = + new SubApplicationTable().getTableMutator(hbaseConf, conn); } /** @@ -124,7 +135,7 @@ protected void serviceInit(Configuration conf) throws Exception { @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException { + TimelineEntities data, String subApplicationUser) throws IOException { TimelineWriteResponse putStatus = new TimelineWriteResponse(); // defensive coding to avoid NPE during row key construction @@ -152,18 +163,22 @@ public TimelineWriteResponse write(String clusterId, String userId, new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); rowKey = applicationRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE); } else { EntityRowKey entityRowKey = new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, te.getType(), te.getIdPrefix(), te.getId()); rowKey = entityRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); } - storeInfo(rowKey, te, flowVersion, isApplication); - storeEvents(rowKey, te.getEvents(), isApplication); - storeConfig(rowKey, te.getConfigs(), isApplication); - storeMetrics(rowKey, te.getMetrics(), isApplication); - storeRelations(rowKey, te, isApplication); + if (!isApplication && !userId.equals(subApplicationUser)) { + SubApplicationRowKey subApplicationRowKey = + new SubApplicationRowKey(subApplicationUser, clusterId, + te.getType(), te.getIdPrefix(), te.getId(), userId); + rowKey = subApplicationRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE); + } if (isApplication) { TimelineEvent event = @@ -304,72 +319,108 @@ private void storeFlowMetrics(byte[] rowKey, Set metrics, } } - private void storeRelations(byte[] rowKey, TimelineEntity te, - boolean isApplication) throws IOException { - if (isApplication) { - storeRelations(rowKey, te.getIsRelatedToEntities(), - ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); - storeRelations(rowKey, te.getRelatesToEntities(), - ApplicationColumnPrefix.RELATES_TO, applicationTable); - } else { - storeRelations(rowKey, te.getIsRelatedToEntities(), - EntityColumnPrefix.IS_RELATED_TO, entityTable); - storeRelations(rowKey, te.getRelatesToEntities(), - EntityColumnPrefix.RELATES_TO, entityTable); - } - } - /** * Stores the Relations from the {@linkplain TimelineEntity} object. */ private void storeRelations(byte[] rowKey, - Map> connectedEntities, - ColumnPrefix columnPrefix, TypedBufferedMutator table) - throws IOException { - for (Map.Entry> connectedEntity : connectedEntities - .entrySet()) { - // id3?id4?id5 - String compoundValue = - Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, - stringKeyConverter.encode(connectedEntity.getKey()), null, - compoundValue); + Map> connectedEntities, ColumnPrefix columnPrefix, + TypedBufferedMutator table) throws IOException { + if (connectedEntities != null) { + for (Map.Entry> connectedEntity : connectedEntities + .entrySet()) { + // id3?id4?id5 + String compoundValue = + Separator.VALUES.joinEncoded(connectedEntity.getValue()); + columnPrefix.store(rowKey, table, + stringKeyConverter.encode(connectedEntity.getKey()), null, + compoundValue); + } } } /** * Stores information from the {@linkplain TimelineEntity} object. */ - private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, - boolean isApplication) throws IOException { - - if (isApplication) { + private void store(byte[] rowKey, TimelineEntity te, + String flowVersion, + Tables table) throws IOException { + switch (table) { + case APPLICATION_TABLE: ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, te.getCreatedTime()); ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, flowVersion); - Map info = te.getInfo(); - if (info != null) { - for (Map.Entry entry : info.entrySet()) { - ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, - stringKeyConverter.encode(entry.getKey()), null, - entry.getValue()); - } - } - } else { + storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO, + applicationTable); + storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC, + applicationTable); + storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT, + applicationTable); + storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG, + applicationTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + ApplicationColumnPrefix.RELATES_TO, applicationTable); + break; + case ENTITY_TABLE: EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, te.getCreatedTime()); EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); - Map info = te.getInfo(); - if (info != null) { - for (Map.Entry entry : info.entrySet()) { - EntityColumnPrefix.INFO.store(rowKey, entityTable, - stringKeyConverter.encode(entry.getKey()), null, - entry.getValue()); - } + storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO, + entityTable); + storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC, + entityTable); + storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT, + entityTable); + storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG, + entityTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO, entityTable); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO, entityTable); + break; + case SUBAPPLICATION_TABLE: + SubApplicationColumn.ID.store(rowKey, subApplicationTable, null, + te.getId()); + SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null, + te.getType()); + SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null, + te.getCreatedTime()); + SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null, + flowVersion); + storeInfo(rowKey, te.getInfo(), flowVersion, + SubApplicationColumnPrefix.INFO, subApplicationTable); + storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC, + subApplicationTable); + storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT, + subApplicationTable); + storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG, + subApplicationTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + SubApplicationColumnPrefix.RELATES_TO, subApplicationTable); + break; + default: + LOG.info("Invalid table name provided."); + break; + } + } + + /** + * stores the info information from {@linkplain TimelineEntity}. + */ + private void storeInfo(byte[] rowKey, Map info, + String flowVersion, ColumnPrefix columnPrefix, + TypedBufferedMutator table) throws IOException { + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + columnPrefix.store(rowKey, table, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } } @@ -377,19 +428,13 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, /** * stores the config information from {@linkplain TimelineEntity}. */ - private void storeConfig(byte[] rowKey, Map config, - boolean isApplication) throws IOException { - if (config == null) { - return; - } - for (Map.Entry entry : config.entrySet()) { - byte[] configKey = stringKeyConverter.encode(entry.getKey()); - if (isApplication) { - ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, - configKey, null, entry.getValue()); - } else { - EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey, - null, entry.getValue()); + private void storeConfig(byte[] rowKey, Map config, + ColumnPrefix columnPrefix, TypedBufferedMutator table) + throws IOException { + if (config != null) { + for (Map.Entry entry : config.entrySet()) { + byte[] configKey = stringKeyConverter.encode(entry.getKey()); + columnPrefix.store(rowKey, table, configKey, null, entry.getValue()); } } } @@ -398,8 +443,9 @@ private void storeConfig(byte[] rowKey, Map config, * stores the {@linkplain TimelineMetric} information from the * {@linkplain TimelineEvent} object. */ - private void storeMetrics(byte[] rowKey, Set metrics, - boolean isApplication) throws IOException { + private void storeMetrics(byte[] rowKey, Set metrics, + ColumnPrefix columnPrefix, TypedBufferedMutator table) + throws IOException { if (metrics != null) { for (TimelineMetric metric : metrics) { byte[] metricColumnQualifier = @@ -407,13 +453,8 @@ private void storeMetrics(byte[] rowKey, Set metrics, Map timeseries = metric.getValues(); for (Map.Entry timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); - if (isApplication) { - ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); - } else { - EntityColumnPrefix.METRIC.store(rowKey, entityTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); - } + columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp, + timeseriesEntry.getValue()); } } } @@ -422,8 +463,9 @@ private void storeMetrics(byte[] rowKey, Set metrics, /** * Stores the events from the {@linkplain TimelineEvent} object. */ - private void storeEvents(byte[] rowKey, Set events, - boolean isApplication) throws IOException { + private void storeEvents(byte[] rowKey, Set events, + ColumnPrefix columnPrefix, TypedBufferedMutator table) + throws IOException { if (events != null) { for (TimelineEvent event : events) { if (event != null) { @@ -441,26 +483,16 @@ private void storeEvents(byte[] rowKey, Set events, byte[] columnQualifierBytes = new EventColumnName(eventId, eventTimestamp, null) .getColumnQualifier(); - if (isApplication) { - ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - columnQualifierBytes, null, Separator.EMPTY_BYTES); - } else { - EntityColumnPrefix.EVENT.store(rowKey, entityTable, - columnQualifierBytes, null, Separator.EMPTY_BYTES); - } + columnPrefix.store(rowKey, table, columnQualifierBytes, null, + Separator.EMPTY_BYTES); } else { for (Map.Entry info : eventInfo.entrySet()) { // eventId=infoKey byte[] columnQualifierBytes = new EventColumnName(eventId, eventTimestamp, info.getKey()) .getColumnQualifier(); - if (isApplication) { - ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - columnQualifierBytes, null, info.getValue()); - } else { - EntityColumnPrefix.EVENT.store(rowKey, entityTable, - columnQualifierBytes, null, info.getValue()); - } + columnPrefix.store(rowKey, table, columnQualifierBytes, null, + info.getValue()); } // for info: eventInfo } } @@ -500,6 +532,7 @@ public void flush() throws IOException { applicationTable.flush(); flowRunTable.flush(); flowActivityTable.flush(); + subApplicationTable.flush(); } /** @@ -532,11 +565,13 @@ protected void serviceStop() throws Exception { // The close API performs flushing and releases any resources held flowActivityTable.close(); } + if (subApplicationTable != null) { + subApplicationTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); } super.serviceStop(); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java index e42c6cd..0c04959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java @@ -56,26 +56,6 @@ public SubApplicationRowKeyPrefix(String subAppUserId, String clusterId, userId); } - /** - * Creates a prefix which generates the following rowKeyPrefixes for the sub - * application table: - * {@code subAppUserId!clusterId!entityType!entityPrefix!entityId!userId}. - * - * subAppUserId is usually the doAsUser. - * userId is the yarn user that the AM runs as. - * - * @param clusterId - * identifying the cluster - * @param subAppUserId - * identifying the sub app user - * @param userId - * identifying the user who runs the AM - */ - public SubApplicationRowKeyPrefix(String clusterId, String subAppUserId, - String userId) { - this(subAppUserId, clusterId, null, null, null, userId); - } - /* * (non-Javadoc) * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 5416b26..f3271aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -138,7 +138,7 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, // flush the writer buffer concurrently and swallow any exception // caused by the timeline enitites that are being put here. synchronized (writer) { - response = writeTimelineEntities(entities); + response = writeTimelineEntities(entities, callerUgi); flushBufferedTimelineEntities(); } @@ -146,7 +146,8 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, } private TimelineWriteResponse writeTimelineEntities( - TimelineEntities entities) throws IOException { + TimelineEntities entities, UserGroupInformation callerUgi) + throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); @@ -154,7 +155,8 @@ private TimelineWriteResponse writeTimelineEntities( final TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowVersion(), - context.getFlowRunId(), context.getAppId(), entities); + context.getFlowRunId(), context.getAppId(), entities, + callerUgi.getShortUserName()); } /** @@ -186,7 +188,7 @@ public void putEntitiesAsync(TimelineEntities entities, callerUgi + ")"); } - writeTimelineEntities(entities); + writeTimelineEntities(entities, callerUgi); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 1f527f2..97eb75f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -70,7 +70,7 @@ @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities entities) throws IOException { + TimelineEntities entities, String subApplicationUser) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 663a18a..7c1de9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -46,13 +46,15 @@ * @param appId context app ID. * @param data * a {@link TimelineEntities} object. + * @param subApplicationUser + * doAsUser taken from caller UGI. * @return a {@link TimelineWriteResponse} object. * @throws IOException if there is any exception encountered while storing * or writing entities to the backend storage. */ TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException; + TimelineEntities data, String subApplicationUser) throws IOException; /** * Aggregates the entity information to the timeline store based on which diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 0f17553..0734677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -158,7 +158,7 @@ public void testPutEntity() throws IOException { verify(writer, times(1)).write( anyString(), anyString(), anyString(), anyString(), anyLong(), - anyString(), any(TimelineEntities.class)); + anyString(), any(TimelineEntities.class), anyString()); verify(writer, times(1)).flush(); } @@ -177,7 +177,7 @@ public void testPutEntityAsync() throws IOException { verify(writer, times(1)).write( anyString(), anyString(), anyString(), anyString(), anyLong(), - anyString(), any(TimelineEntities.class)); + anyString(), any(TimelineEntities.class), anyString()); verify(writer, never()).flush(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 4f12c57..5ce80ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -90,7 +90,7 @@ public void testWriteEntityToFile() throws Exception { fsi.init(conf); fsi.start(); fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, - "app_id", te); + "app_id", te, "user_id"); String fileName = fsi.getOutputRoot() + File.separator + "entities" + File.separator + "cluster_id" + File.separator + "user_id" +