diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index e32f718..c7807b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -337,13 +337,16 @@ private static void loadData() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te); - hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1); - hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); + hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te, + user); + hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1, + user); + hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4, + user); hbi.write(cluster, user, flow2, - flowVersion2, runid2, entity3.getId(), te3); + flowVersion2, runid2, entity3.getId(), te3, user); hbi.write(cluster, user, flow, flowVersion, runid, - "application_1111111111_1111", userEntities); + "application_1111111111_1111", userEntities, user); writeApplicationEntities(hbi, ts); hbi.flush(); } finally { @@ -376,7 +379,7 @@ static void writeApplicationEntities(HBaseTimelineWriterImpl hbi, appEntity.addEvent(finished); te.addEntity(appEntity); hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i, - appEntity.getId(), te); + appEntity.getId(), te, "user1"); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java index 926d8bb..c95d319 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/DataGeneratorForTest.java @@ -161,11 +161,11 @@ public static void loadApps(HBaseTestingUtility util, long ts) String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; String appName = "application_1111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); appName = "application_1111111111_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); appName = "application_1111111111_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te2); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te2, user); hbi.stop(); } finally { if (hbi != null) { @@ -438,10 +438,12 @@ public static void loadEntities(HBaseTestingUtility util, long ts) String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appName1, te); - hbi.write(cluster, user, flow, flowVersion, runid, appName2, te); - hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1); - hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2); + hbi.write(cluster, user, flow, flowVersion, runid, appName1, te, user); + hbi.write(cluster, user, flow, flowVersion, runid, appName2, te, user); + hbi.write(cluster, user, flow, flowVersion, runid, appName1, appTe1, + user); + hbi.write(cluster, user, flow, flowVersion, runid, appName2, appTe2, + user); hbi.stop(); } finally { if (hbi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java index d6b0370..66aa3cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageApps.java @@ -161,7 +161,7 @@ public void testWriteNullApplicationToHBase() throws Exception { String flow = null; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te, user); hbi.stop(); // retrieve the row @@ -279,7 +279,7 @@ public void testWriteApplicationToHBase() throws Exception { String flow = "s!ome_f\tlow _n am!e"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te, user); // Write entity again, this time without created time. entity = new ApplicationEntity(); @@ -291,7 +291,7 @@ public void testWriteApplicationToHBase() throws Exception { entity.addInfo(infoMap1); te = new TimelineEntities(); te.addEntity(entity); - hbi.write(cluster, user, flow, flowVersion, runid, appId, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te, user); hbi.stop(); infoMap.putAll(infoMap1); @@ -512,7 +512,8 @@ public void testEvents() throws IOException { String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; String appName = "application_123465899910_1001"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities, + user); hbi.stop(); // retrieve the row @@ -628,13 +629,14 @@ public void testNonIntegralMetricValues() throws IOException { hbi.start(); // Writing application entity. try { - hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp); + hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp, "u1"); Assert.fail("Expected an exception as metric values are non integral"); } catch (IOException e) {} // Writing generic entity. try { - hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity); + hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity, + "u1"); Assert.fail("Expected an exception as metric values are non integral"); } catch (IOException e) {} hbi.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java index 7ac5b36..8b0f39d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageEntities.java @@ -71,6 +71,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -196,12 +201,14 @@ public void testWriteEntityToHBase() throws Exception { hbi.start(); String cluster = "cluster_test_write_entity"; String user = "user1"; + String subAppUser = "subAppUser1"; String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; String appName = ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1).toString(); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, + subAppUser); hbi.stop(); // scan the table and see that entity exists @@ -352,6 +359,11 @@ public void testWriteEntityToHBase() throws Exception { assertEquals(metricValues.get(ts - 20000), metric.getValues().get(ts - 20000)); } + + // verify for sub application table entities. + verifySubApplicationTableEntities(cluster, user, flow, flowVersion, runid, + appName, subAppUser, c1, entity, id, type, infoMap, isRelatedTo, + relatesTo, conf, metricValues, metrics, cTime, m1); } finally { if (hbi != null) { hbi.stop(); @@ -360,6 +372,98 @@ public void testWriteEntityToHBase() throws Exception { } } + private void verifySubApplicationTableEntities(String cluster, String user, + String flow, String flowVersion, Long runid, String appName, + String subAppUser, Configuration c1, TimelineEntity entity, String id, + String type, Map infoMap, + Map> isRelatedTo, Map> relatesTo, + Map conf, Map metricValues, + Set metrics, Long cTime, TimelineMetric m1) + throws IOException { + Scan s = new Scan(); + // read from SubApplicationTable + byte[] startRow = new SubApplicationRowKeyPrefix(cluster, subAppUser, null, + null, null, null).getRowKeyPrefix(); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + Connection conn = ConnectionFactory.createConnection(c1); + ResultScanner scanner = + new SubApplicationTable().getResultScanner(c1, conn, s); + + int rowCount = 0; + int colCount = 0; + KeyConverter stringKeyConverter = new StringKeyConverter(); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(verifyRowKeyForSubApplication(row1, subAppUser, cluster, + user, entity)); + + // check info column family + String id1 = SubApplicationColumn.ID.readResult(result).toString(); + assertEquals(id, id1); + + String type1 = SubApplicationColumn.TYPE.readResult(result).toString(); + assertEquals(type, type1); + + Long cTime1 = + (Long) SubApplicationColumn.CREATED_TIME.readResult(result); + assertEquals(cTime1, cTime); + + Map infoColumns = SubApplicationColumnPrefix.INFO + .readResults(result, new StringKeyConverter()); + assertEquals(infoMap, infoColumns); + + // Remember isRelatedTo is of type Map> + for (Map.Entry> isRelatedToEntry : isRelatedTo + .entrySet()) { + Object isRelatedToValue = SubApplicationColumnPrefix.IS_RELATED_TO + .readResult(result, isRelatedToEntry.getKey()); + String compoundValue = isRelatedToValue.toString(); + // id7?id9?id6 + Set isRelatedToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(isRelatedTo.get(isRelatedToEntry.getKey()).size(), + isRelatedToValues.size()); + for (String v : isRelatedToEntry.getValue()) { + assertTrue(isRelatedToValues.contains(v)); + } + } + + // RelatesTo + for (Map.Entry> relatesToEntry : relatesTo + .entrySet()) { + String compoundValue = SubApplicationColumnPrefix.RELATES_TO + .readResult(result, relatesToEntry.getKey()).toString(); + // id3?id4?id5 + Set relatesToValues = + new HashSet(Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(relatesTo.get(relatesToEntry.getKey()).size(), + relatesToValues.size()); + for (String v : relatesToEntry.getValue()) { + assertTrue(relatesToValues.contains(v)); + } + } + + // Configuration + Map configColumns = SubApplicationColumnPrefix.CONFIG + .readResults(result, stringKeyConverter); + assertEquals(conf, configColumns); + + NavigableMap> metricsResult = + SubApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result, + stringKeyConverter); + + NavigableMap metricMap = metricsResult.get(m1.getId()); + matchMetrics(metricValues, metricMap); + } + } + assertEquals(1, rowCount); + assertEquals(15, colCount); + } + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow, Long runid, String appName, TimelineEntity te) { @@ -407,7 +511,8 @@ public void testEventsWithEmptyInfo() throws IOException { byte[] startRow = new EntityRowKeyPrefix(cluster, user, flow, runid, appName) .getRowKeyPrefix(); - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities, + user); hbi.stop(); // scan the table and see that entity exists Scan s = new Scan(); @@ -510,7 +615,8 @@ public void testEventsEscapeTs() throws IOException { String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; String appName = "application_123465899910_2001"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities, + user); hbi.stop(); // read the timeline entity using the reader this time @@ -1758,4 +1864,15 @@ public void testListTypesInApp() throws Exception { public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); } + + private boolean verifyRowKeyForSubApplication(byte[] rowKey, String suAppUser, + String cluster, String user, TimelineEntity te) { + SubApplicationRowKey key = SubApplicationRowKey.parseRowKey(rowKey); + assertEquals(suAppUser, key.getSubAppUserId()); + assertEquals(cluster, key.getClusterId()); + assertEquals(te.getType(), key.getEntityType()); + assertEquals(te.getId(), key.getEntityId()); + assertEquals(user, key.getUserId()); + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 0923105..bde393e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -117,13 +117,13 @@ public void testWriteFlowRunMinMax() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another entity with the right min start time te = new TimelineEntities(); te.addEntity(entityMinStartTime); appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity for max end time TimelineEntity entityMaxEndTime = TestFlowDataGenerator @@ -131,7 +131,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityMaxEndTime); appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator @@ -139,7 +139,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // flush everything to hbase hbi.flush(); @@ -227,7 +227,7 @@ public void testWriteFlowActivityOneFlow() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_1111999999_1234"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); hbi.flush(); } finally { if (hbi != null) { @@ -341,19 +341,19 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11888888888_1111"; - hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); + hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te, user); // write an application with to this flow but a different runid/ version te = new TimelineEntities(); te.addEntity(entityApp1); appName = "application_11888888888_2222"; - hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); + hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te, user); // write an application with to this flow but a different runid/ version te = new TimelineEntities(); te.addEntity(entityApp1); appName = "application_11888888888_3333"; - hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); + hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te, user); hbi.flush(); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index acfdc4d..57f42aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -181,13 +181,13 @@ public void testWriteFlowRunMinMax() throws Exception { try { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another entity with the right min start time te = new TimelineEntities(); te.addEntity(entityMinStartTime); appName = "application_100000000000_3333"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity for max end time TimelineEntity entityMaxEndTime = TestFlowDataGenerator @@ -195,7 +195,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityMaxEndTime); appName = "application_100000000000_4444"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // writer another entity with greater start time TimelineEntity entityGreaterStartTime = TestFlowDataGenerator @@ -203,7 +203,7 @@ public void testWriteFlowRunMinMax() throws Exception { te = new TimelineEntities(); te.addEntity(entityGreaterStartTime); appName = "application_1000000000000000_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // flush everything to hbase hbi.flush(); @@ -288,14 +288,14 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); hbi.flush(); } finally { if (hbi != null) { @@ -557,14 +557,16 @@ public void testWriteFlowRunMetricsPrefix() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te); + hbi.write(cluster, user, flow, flowVersion, 1002345678919L, appName, te, + user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te); + hbi.write(cluster, user, flow, flowVersion, 1002345678918L, appName, te, + user); hbi.flush(); } finally { if (hbi != null) { @@ -644,14 +646,14 @@ public void testWriteFlowRunsMetricFields() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); String appName = "application_11111111111111_1111"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te, null); hbi.flush(); } finally { if (hbi != null) { @@ -746,7 +748,7 @@ public void testWriteFlowRunFlush() throws Exception { te1.addEntity(entityApp1); entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); te1.addEntity(entityApp2); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); Thread.sleep(1); appName = "application_1001199480000_7" + appIdSuffix; @@ -758,7 +760,7 @@ public void testWriteFlowRunFlush() throws Exception { entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); te1.addEntity(entityApp2); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); if (i % 1000 == 0) { hbi.flush(); checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, @@ -827,7 +829,7 @@ public void testFilterFlowRunsByCreatedTime() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); + "application_11111111111111_1111", te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( @@ -835,7 +837,7 @@ public void testFilterFlowRunsByCreatedTime() throws Exception { entityApp2.setCreatedTime(1425016502000L); te.addEntity(entityApp2); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); + "application_11111111111111_2222", te, user); hbi.flush(); } finally { if (hbi != null) { @@ -912,14 +914,14 @@ public void testMetricFilters() throws Exception { hbi = new HBaseTimelineWriterImpl(); hbi.init(c1); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678919L, - "application_11111111111111_1111", te); + "application_11111111111111_1111", te, user); // write another application with same metric to this flow te = new TimelineEntities(); TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2( System.currentTimeMillis()); te.addEntity(entityApp2); hbi.write(cluster, user, flow, "CF7022C10F1354", 1002345678918L, - "application_11111111111111_2222", te); + "application_11111111111111_2222", te, user); hbi.flush(); } finally { if (hbi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index fa9d029..faf9cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -292,14 +292,14 @@ public void testWriteFlowRunCompaction() throws Exception { te1 = new TimelineEntities(); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); appName = "application_2048000000000_7" + appIdSuffix; insertTs++; te1 = new TimelineEntities(); entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); te1.addEntity(entityApp1); - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, user); } } finally { String appName = "application_10240000000000_" + appIdSuffix; @@ -308,7 +308,7 @@ public void testWriteFlowRunCompaction() throws Exception { insertTs + 1, c1); te1.addEntity(entityApp1); if (hbi != null) { - hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1, null); hbi.flush(); hbi.close(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index dfd63bf..55183dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -65,6 +65,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; /** * This implements a hbase based backend for storing the timeline entity @@ -85,6 +89,7 @@ private TypedBufferedMutator applicationTable; private TypedBufferedMutator flowActivityTable; private TypedBufferedMutator flowRunTable; + private TypedBufferedMutator subApplicationTable; /** * Used to convert strings key components to and from storage format. @@ -116,6 +121,8 @@ protected void serviceInit(Configuration conf) throws Exception { flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn); + subApplicationTable = + new SubApplicationTable().getTableMutator(hbaseConf, conn); } /** @@ -124,7 +131,7 @@ protected void serviceInit(Configuration conf) throws Exception { @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException { + TimelineEntities data, String subApplicationUser) throws IOException { TimelineWriteResponse putStatus = new TimelineWriteResponse(); // defensive coding to avoid NPE during row key construction @@ -165,6 +172,18 @@ public TimelineWriteResponse write(String clusterId, String userId, storeMetrics(rowKey, te.getMetrics(), isApplication); storeRelations(rowKey, te, isApplication); + if (!isApplication && !userId.equals(subApplicationUser)) { + SubApplicationRowKey subApplicationRowKey = + new SubApplicationRowKey(subApplicationUser, clusterId, + te.getType(), te.getIdPrefix(), te.getId(), userId); + rowKey = subApplicationRowKey.getRowKey(); + storeInfoToSubApplcatioinTable(rowKey, te); + storeEventsToSubApplcatioinTable(rowKey, te.getEvents()); + storeConfigToSubApplcatioinTable(rowKey, te.getConfigs()); + storeMetricsToSubApplcatioinTable(rowKey, te.getMetrics()); + storeRelationsToSubApplcatioinTable(rowKey, te); + } + if (isApplication) { TimelineEvent event = ApplicationEntity.getApplicationEvent(te, @@ -500,6 +519,7 @@ public void flush() throws IOException { applicationTable.flush(); flowRunTable.flush(); flowActivityTable.flush(); + subApplicationTable.flush(); } /** @@ -532,6 +552,9 @@ protected void serviceStop() throws Exception { // The close API performs flushing and releases any resources held flowActivityTable.close(); } + if (subApplicationTable != null) { + subApplicationTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); @@ -539,4 +562,100 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + private void storeInfoToSubApplcatioinTable(byte[] rowKey, TimelineEntity te) + throws IOException { + + SubApplicationColumn.ID.store(rowKey, subApplicationTable, null, + te.getId()); + SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null, + te.getType()); + SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null, + te.getCreatedTime()); + Map info = te.getInfo(); + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + SubApplicationColumnPrefix.INFO.store(rowKey, subApplicationTable, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); + } + } + } + + private void storeMetricsToSubApplcatioinTable(byte[] rowKey, + Set metrics) throws IOException { + if (metrics != null) { + for (TimelineMetric metric : metrics) { + byte[] metricColumnQualifier = + stringKeyConverter.encode(metric.getId()); + Map timeseries = metric.getValues(); + for (Map.Entry timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + SubApplicationColumnPrefix.METRIC.store(rowKey, subApplicationTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } + } + } + } + + private void storeConfigToSubApplcatioinTable(byte[] rowKey, + Map config) throws IOException { + if (config == null) { + return; + } + for (Map.Entry entry : config.entrySet()) { + byte[] configKey = stringKeyConverter.encode(entry.getKey()); + SubApplicationColumnPrefix.CONFIG.store(rowKey, subApplicationTable, + configKey, null, entry.getValue()); + } + } + + /** + * Stores the events from the {@linkplain TimelineEvent} object. + */ + private void storeEventsToSubApplcatioinTable(byte[] rowKey, + Set events) throws IOException { + if (events != null) { + for (TimelineEvent event : events) { + if (event != null) { + String eventId = event.getId(); + if (eventId != null) { + long eventTimestamp = event.getTimestamp(); + // if the timestamp is not set, use the current timestamp + if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { + LOG.warn("timestamp is not set for event " + eventId + + "! Using the current timestamp"); + eventTimestamp = System.currentTimeMillis(); + } + Map eventInfo = event.getInfo(); + if ((eventInfo == null) || (eventInfo.size() == 0)) { + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, null) + .getColumnQualifier(); + SubApplicationColumnPrefix.EVENT.store(rowKey, + subApplicationTable, columnQualifierBytes, null, + Separator.EMPTY_BYTES); + } else { + for (Map.Entry info : eventInfo.entrySet()) { + // eventId=infoKey + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, info.getKey()) + .getColumnQualifier(); + SubApplicationColumnPrefix.EVENT.store(rowKey, + subApplicationTable, columnQualifierBytes, null, + info.getValue()); + } // for info: eventInfo + } + } + } + } // event : events + } + } + + private void storeRelationsToSubApplcatioinTable(byte[] rowKey, + TimelineEntity te) throws IOException { + storeRelations(rowKey, te.getIsRelatedToEntities(), + SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + SubApplicationColumnPrefix.RELATES_TO, subApplicationTable); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java index e42c6cd..0c04959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationRowKeyPrefix.java @@ -56,26 +56,6 @@ public SubApplicationRowKeyPrefix(String subAppUserId, String clusterId, userId); } - /** - * Creates a prefix which generates the following rowKeyPrefixes for the sub - * application table: - * {@code subAppUserId!clusterId!entityType!entityPrefix!entityId!userId}. - * - * subAppUserId is usually the doAsUser. - * userId is the yarn user that the AM runs as. - * - * @param clusterId - * identifying the cluster - * @param subAppUserId - * identifying the sub app user - * @param userId - * identifying the user who runs the AM - */ - public SubApplicationRowKeyPrefix(String clusterId, String subAppUserId, - String userId) { - this(subAppUserId, clusterId, null, null, null, userId); - } - /* * (non-Javadoc) * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 5416b26..f3271aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -138,7 +138,7 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, // flush the writer buffer concurrently and swallow any exception // caused by the timeline enitites that are being put here. synchronized (writer) { - response = writeTimelineEntities(entities); + response = writeTimelineEntities(entities, callerUgi); flushBufferedTimelineEntities(); } @@ -146,7 +146,8 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, } private TimelineWriteResponse writeTimelineEntities( - TimelineEntities entities) throws IOException { + TimelineEntities entities, UserGroupInformation callerUgi) + throws IOException { // Update application metrics for aggregation updateAggregateStatus(entities, aggregationGroups, getEntityTypesSkipAggregation()); @@ -154,7 +155,8 @@ private TimelineWriteResponse writeTimelineEntities( final TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowVersion(), - context.getFlowRunId(), context.getAppId(), entities); + context.getFlowRunId(), context.getAppId(), entities, + callerUgi.getShortUserName()); } /** @@ -186,7 +188,7 @@ public void putEntitiesAsync(TimelineEntities entities, callerUgi + ")"); } - writeTimelineEntities(entities); + writeTimelineEntities(entities, callerUgi); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 1f527f2..97eb75f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -70,7 +70,7 @@ @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities entities) throws IOException { + TimelineEntities entities, String subApplicationUser) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 663a18a..7c1de9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -46,13 +46,15 @@ * @param appId context app ID. * @param data * a {@link TimelineEntities} object. + * @param subApplicationUser + * doAsUser taken from caller UGI. * @return a {@link TimelineWriteResponse} object. * @throws IOException if there is any exception encountered while storing * or writing entities to the backend storage. */ TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException; + TimelineEntities data, String subApplicationUser) throws IOException; /** * Aggregates the entity information to the timeline store based on which diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 0f17553..0734677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -158,7 +158,7 @@ public void testPutEntity() throws IOException { verify(writer, times(1)).write( anyString(), anyString(), anyString(), anyString(), anyLong(), - anyString(), any(TimelineEntities.class)); + anyString(), any(TimelineEntities.class), anyString()); verify(writer, times(1)).flush(); } @@ -177,7 +177,7 @@ public void testPutEntityAsync() throws IOException { verify(writer, times(1)).write( anyString(), anyString(), anyString(), anyString(), anyLong(), - anyString(), any(TimelineEntities.class)); + anyString(), any(TimelineEntities.class), anyString()); verify(writer, never()).flush(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 4f12c57..5ce80ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -90,7 +90,7 @@ public void testWriteEntityToFile() throws Exception { fsi.init(conf); fsi.start(); fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, - "app_id", te); + "app_id", te, "user_id"); String fileName = fsi.getOutputRoot() + File.separator + "entities" + File.separator + "cluster_id" + File.separator + "user_id" +