diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 5574057..1490e65 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -517,7 +517,7 @@ public void run() { containerId, containerMetricsPeriodMs).recordCpuUsage ((int)cpuUsagePercentPerCore, milliVcoresUsed); } - + boolean isMemoryOverLimit = false; String msg = ""; int containerExitStatus = ContainerExitStatus.INVALID; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/OfflineAggregator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/OfflineAggregator.java new file mode 100644 index 0000000..4d72bd3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/OfflineAggregator.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.aggregation.timebased; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseAggregatorReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.OfflineAggregationWriter; +import org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixOfflineAggregationWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; + +/** + * Perform flow and user level time-based aggregation + */ +public class OfflineAggregator { + + public static final String FLOW_AGGREGATOR_JOB_NAME + = "timeline_v2_flow_aggregation"; + public static final String USER_AGGREGATOR_JOB_NAME + = "timeline_v2_user_aggregation"; + @VisibleForTesting + static final String KEY_SEPARATOR = ";"; + private static final int KEY_IDX_CLUSTER_ID = 0; + private static final int KEY_IDX_USER = 1; + private static final int KEY_IDX_FLOW_NAME = 2; + + public static class UserAggregatorMapper + extends Mapper { + @Override + public void map(Text key, TimelineEntityWritable values, Context context) + throws IOException, InterruptedException{ + // Chop down the key and remove flow name + String[] rawContext = key.toString().split(KEY_SEPARATOR); + String[] newContext = new String[rawContext.length - 1]; + System.arraycopy(rawContext, 0, newContext, 0, rawContext.length - 1); + context.write(new Text(StringUtils.join(newContext, KEY_SEPARATOR)), + values); + } + } + + public static class UserAggregatorReducer extends Reducer { + @Override + public void reduce(Text key, Iterable values, + Context context) throws IOException, InterruptedException{ + reduceTimelineEntities(key, values, context, + OfflineAggregationInfo.USER_AGGREGATION); + } + } + + public static class FlowAggregatorMapper + extends Mapper { + @Override + public void map(LongWritable key, Text flowInfo, Context context) + throws IOException, InterruptedException { + String[] rawContext = flowInfo.toString().split(KEY_SEPARATOR); + String clusterId = rawContext[KEY_IDX_CLUSTER_ID]; + String user = rawContext[KEY_IDX_USER]; + String flowName = rawContext[KEY_IDX_FLOW_NAME]; + + try { + HBaseAggregatorReader reader = new HBaseAggregatorReader(); + reader.connect(context.getConfiguration()); + + TimelineEntities entities + = reader.getEntities(clusterId, user, flowName); + TimelineEntity resultEntity + = aggregateEntities(entities, context.getJobName()); + + context.write(flowInfo, new TimelineEntityWritable(resultEntity)); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + public static class FlowAggregatorReducer + extends Reducer { + @Override + public void reduce(Text key, Iterable values, + Context context) + throws IOException, InterruptedException { + reduceTimelineEntities(key, values, context, + OfflineAggregationInfo.FLOW_AGGREGATION); + } + } + + static int runAggregations(String[] args) throws Exception { + Configuration conf = new Configuration(); + + // Offline flow aggregation + long timestamp = System.currentTimeMillis(); + Job flowJob = Job.getInstance(conf, + FLOW_AGGREGATOR_JOB_NAME + "_" + timestamp); + flowJob.setJarByClass(OfflineAggregator.class); + flowJob.setMapperClass(FlowAggregatorMapper.class); + flowJob.setReducerClass(FlowAggregatorReducer.class); + flowJob.setOutputKeyClass(Text.class); + flowJob.setOutputValueClass(TimelineEntityWritable.class); + + FileInputFormat.addInputPath(flowJob, new Path(args[0])); + FileOutputFormat.setOutputPath(flowJob, new Path(args[1])); + + int returnCode = flowJob.waitForCompletion(true) ? 0 : 1; + + // Offline user aggregation + Job userJob = Job.getInstance(conf, + USER_AGGREGATOR_JOB_NAME + "_" + timestamp); + userJob.setJarByClass(OfflineAggregator.class); + userJob.setMapperClass(UserAggregatorMapper.class); + userJob.setReducerClass(UserAggregatorReducer.class); + userJob.setOutputKeyClass(Text.class); + userJob.setOutputValueClass(TimelineEntityWritable.class); + + FileInputFormat.addInputPath(userJob, new Path(args[1])); + FileOutputFormat.setOutputPath(userJob, new Path(args[2])); + + returnCode += userJob.waitForCompletion(true) ? 0 : 1; + + return (returnCode == 0) ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.out.println("FlowAggregator "); + return; + } + System.exit(runAggregations(args)); + } + + // Helper functions + + private static TimelineEntity aggregateEntities(TimelineEntities entities, + String jobName) { + // The aggregated entity will have their id as the mapreduce job name, and + // type as a predefined offline aggregation type. Note that we're + // distinguishing if a job is for flow or for user by its job name. + TimelineEntity resultEntity = new TimelineEntity(); + resultEntity.setId(jobName); + resultEntity.setType( + TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString()); + + Map aggregateMap = new HashMap<>(); + + aggregateMap = TimelineCollector.aggregateMetrics(entities, aggregateMap, + new HashMap(), + new HashMap>()); + resultEntity.setMetrics(new HashSet<>(aggregateMap.values())); + return resultEntity; + } + + private static TimelineEntities toTimelineEntities( + Iterable values) { + TimelineEntities entities = new TimelineEntities(); + Iterator valuesIterator = values.iterator(); + while (valuesIterator.hasNext()) { + TimelineEntityWritable entityWritable + = (TimelineEntityWritable) valuesIterator.next(); + TimelineEntity entity = entityWritable.get(); + entities.addEntity(entity); + } + return entities; + } + + private static TimelineCollectorContext parseContext(Text key, + OfflineAggregationInfo aggregationInfo) { + + String[] rawContext = key.toString().split(KEY_SEPARATOR); + String clusterId = rawContext[KEY_IDX_CLUSTER_ID]; + String user = rawContext[KEY_IDX_USER]; + String flowName = null; + + if (aggregationInfo.equals(OfflineAggregationInfo.FLOW_AGGREGATION)) { + flowName = rawContext[KEY_IDX_FLOW_NAME]; + } + return new TimelineCollectorContext(clusterId, user, flowName, null, 0, + null); + } + + private static void reduceTimelineEntities(Text key, + Iterable values, Reducer.Context context, + OfflineAggregationInfo aggregationInfo) + throws IOException, InterruptedException{ + OfflineAggregationWriter aggregatorStorage + = new PhoenixOfflineAggregationWriterImpl(); + try { + TimelineCollectorContext collectorContext + = parseContext(key, aggregationInfo); + TimelineEntities entities = toTimelineEntities(values); + + TimelineEntity resultEntity + = aggregateEntities(entities, context.getJobName()); + TimelineEntities resultEntities = new TimelineEntities(); + resultEntities.addEntity(resultEntity); + + aggregatorStorage.init(context.getConfiguration()); + aggregatorStorage.writeAggregatedEntity(collectorContext, + resultEntities, aggregationInfo); + aggregatorStorage.stop(); + context.write(key, new TimelineEntityWritable(resultEntity)); + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TimelineEntityWritable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TimelineEntityWritable.java new file mode 100644 index 0000000..9b79dab --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TimelineEntityWritable.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.aggregation.timebased; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +/** + * Serialization wrapper for timeline entities + */ +@InterfaceStability.Unstable +@InterfaceAudience.Private +public class TimelineEntityWritable implements Writable { + + private TimelineEntity entityData; + + public TimelineEntityWritable() {} + + public TimelineEntityWritable(TimelineEntity entity) { + set(entity); + } + + public void set(TimelineEntity entity) { + entityData = entity; + } + + public TimelineEntity get() { + return entityData; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(entityData.getId()); + out.writeUTF(entityData.getType()); + out.writeLong(entityData.getCreatedTime()); + out.writeLong(entityData.getModifiedTime()); + // For aggregations only. Only stores metric data + out.writeInt(entityData.getMetrics().size()); + for (TimelineMetric m : entityData.getMetrics()) { + WritableUtils.writeEnum(out, m.getType()); + out.writeUTF(m.getId()); + out.writeBoolean(m.getToAggregate()); + writeMetricValues(m, out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + if (entityData == null) { + entityData = new TimelineEntity(); + } + entityData.setId(in.readUTF()); + entityData.setType(in.readUTF()); + entityData.setCreatedTime(in.readLong()); + entityData.setModifiedTime(in.readLong()); + int numMetrics = in.readInt(); + for (int i = 0; i < numMetrics; i++) { + TimelineMetric m = new TimelineMetric(); + m.setType(WritableUtils.readEnum(in, TimelineMetric.Type.class)); + m.setId(in.readUTF()); + m.setToAggregate(in.readBoolean()); + readMetricValues(m, in); + } + } + + private static void writeMetricValues(TimelineMetric m, DataOutput out) + throws IOException{ + MapWritable writableValues = new MapWritable(); + Map values = m.getValues(); + if (values != null) { + for (Map.Entry entry : values.entrySet()) { + writableValues.put(new LongWritable(entry.getKey()), + new BytesWritable(GenericObjectMapper.write(entry.getValue()))); + } + } + writableValues.write(out); + } + + private static void readMetricValues(TimelineMetric m, DataInput in) + throws IOException { + MapWritable resultWritable = new MapWritable(); + resultWritable.readFields(in); + for (Map.Entry entryWritable : + resultWritable.entrySet()) { + Long key = ((LongWritable) entryWritable.getKey()).get(); + Number value = (Number) GenericObjectMapper.read( + ((BytesWritable) entryWritable.getValue()).getBytes()); + m.addValue(key, value); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseAggregatorReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseAggregatorReader.java new file mode 100644 index 0000000..1ae8248 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseAggregatorReader.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseAggregatorReader implements Closeable { + private Configuration hbaseConf; + private Connection conn; + private ApplicationTable appLevelAggregationTable; + + private static final Log LOG = LogFactory.getLog(HBaseAggregatorReader.class); + + public void connect(Configuration conf) throws Exception{ + hbaseConf = HBaseConfiguration.create(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + // TODO: read data from a different table than entity table. + appLevelAggregationTable = new ApplicationTable(); + + } + + @Override + public void close() throws IOException { + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + } + + public TimelineEntities getEntities(String clusterId, String userId, + String flowName) throws IOException { + + byte[] startPrefix = getAppLevelAggregationRowKeyPrefix(clusterId, + userId, flowName); + Scan scan = new Scan(); + scan.setRowPrefixFilter(startPrefix); + ResultScanner scanner = appLevelAggregationTable.getResultScanner( + hbaseConf, conn, scan); + TimelineEntities entities = new TimelineEntities(); + for (Result r : scanner) { + TimelineEntity entity = getEntity(r); + entities.addEntity(entity); + } + return entities; + } + + private static byte[] getAppLevelAggregationRowKeyPrefix( + String clusterId, String userId, String flowId) { + return + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId, + flowId)); + } + + private static TimelineEntity getEntity(Result r) throws IOException { + TimelineEntity entity = HBaseTimelineReaderImpl.parseEntity(r, + EnumSet.of(TimelineReader.Field.ALL), + false, HBaseTimelineReaderImpl.DEFAULT_BEGIN_TIME, + HBaseTimelineReaderImpl.DEFAULT_END_TIME, false, + HBaseTimelineReaderImpl.DEFAULT_BEGIN_TIME, + HBaseTimelineReaderImpl.DEFAULT_END_TIME, null, null, null, null, + null, null, true); + entity.setType(TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString()); + return entity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 68d54d6..8d70b8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -69,8 +69,9 @@ private static final Log LOG = LogFactory .getLog(HBaseTimelineReaderImpl.class); - private static final long DEFAULT_BEGIN_TIME = 0L; - private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + // TODO: merge these with HBaseAggregationReader + static final long DEFAULT_BEGIN_TIME = 0L; + static final long DEFAULT_END_TIME = Long.MAX_VALUE; private Configuration hbaseConf = null; private Connection conn; @@ -253,7 +254,7 @@ private static void validateParams(String userId, String clusterId, } } - private static TimelineEntity parseEntity( + static TimelineEntity parseEntity( Result result, EnumSet fieldsToRetrieve, boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd, boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java index e1219e0..819da62 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java @@ -60,7 +60,7 @@ public OfflineAggregationWriter(String name) { * @return a {@link TimelineWriteResponse} object. * @throws IOException */ - abstract TimelineWriteResponse writeAggregatedEntity( + public abstract TimelineWriteResponse writeAggregatedEntity( TimelineCollectorContext context, TimelineEntities entities, OfflineAggregationInfo info) throws IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java index 4c1352c..b66a309 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java @@ -199,7 +199,7 @@ public void createPhoenixTables() throws IOException { + "flow_name VARCHAR NOT NULL, " + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, " + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, " - + "metric_names VARCHAR, info_keys VARCHAR " + + "metric_names VARCHAR " + "CONSTRAINT pk PRIMARY KEY(" + "user, cluster, flow_name))"; stmt.executeUpdate(sql); @@ -208,7 +208,7 @@ public void createPhoenixTables() throws IOException { + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, " + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, " - + "metric_names VARCHAR, info_keys VARCHAR " + + "metric_names VARCHAR " + "CONSTRAINT pk PRIMARY KEY(user, cluster))"; stmt.executeUpdate(sql); conn.commit(); @@ -269,11 +269,18 @@ public DynamicColumns(String columnFamilyPrefix, String type, // Prepare the sql template by iterating through all keys for (K key : cfInfo.columns) { colNames.append(",").append(cfInfo.columnFamilyPrefix) - .append(key.toString()).append(cfInfo.type); + .append(formatKey(key)).append(cfInfo.type); } return colNames; } + private static StringBuilder formatKey (K key) { + String escape = "\""; + StringBuilder builder = new StringBuilder(); + builder.append(escape).append(key.toString()).append(escape); + return builder; + } + private static int setValuesForColumnFamily( PreparedStatement ps, Map keyValues, int startPos, boolean converToBytes) throws SQLException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/AggregationTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/AggregationTestUtil.java new file mode 100644 index 0000000..75a0bd3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/AggregationTestUtil.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregation.timebased; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AggregationTestUtil { + + public static final TimelineCollectorContext sampleContext1 = + new TimelineCollectorContext("cluster1", "user1", "some_flow_name", + "AB7822C10F1111", 1002345678919L, "some app name"); + + public static TimelineEntities getStandardTestTimelineEntities(int size) { + TimelineEntities entities = new TimelineEntities(); + for (int i = 0; i < size; i++) { + TimelineEntity entity = new TimelineEntity(); + String id = "hello" + i; + String type = TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L + i); + entity.setModifiedTime(1425016502000L + i); + + TimelineMetric metric = new TimelineMetric(); + metric.setId("HDFS_BYTES_READ"); + metric.setToAggregate(true); + metric.addValue(1425016501100L + i, 8000 + i); + entity.addMetric(metric); + } + + return entities; + } + + public static TimelineEntity getTestAggregationTimelineEntity() { + TimelineEntity entity = new TimelineEntity(); + String id = "hello1"; + String type = TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L); + entity.setModifiedTime(1425016502000L); + + TimelineMetric metric = new TimelineMetric(); + metric.setId("HDFS_BYTES_READ"); + metric.setToAggregate(true); + metric.addValue(1425016501100L, 8000); + entity.addMetric(metric); + + return entity; + } + + public static void createHBaseSchema(HBaseTestingUtility util) + throws IOException { + new EntityTable() + .createTable(util.getHBaseAdmin(), util.getConfiguration()); + new ApplicationTable() + .createTable(util.getHBaseAdmin(), util.getConfiguration()); + } + + public static void verifyEntity(TimelineEntity entity, TimelineEntity sample) { + assertEquals(entity.getCreatedTime(), sample.getCreatedTime()); + assertEquals(entity.getModifiedTime(), sample.getModifiedTime()); + + assertEquals(entity.getMetrics().size(), sample.getMetrics().size()); + for (TimelineMetric m : entity.getMetrics()) { + assertTrue(sample.getMetrics().contains(m)); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TestOfflineAggregator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TestOfflineAggregator.java new file mode 100644 index 0000000..a2aba8d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TestOfflineAggregator.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.aggregation.timebased; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixRelatedTest; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestOfflineAggregator extends PhoenixRelatedTest { + + HBaseTestingUtility util; + YarnConfiguration confForPhoenix; + + @Before + public void setup() throws Exception { + // setup Phoenix + confForPhoenix = new YarnConfiguration(); + writer = setupPhoenixClusterAndWriterForTest(confForPhoenix); + // setup hbase + util = getUtility(); + AggregationTestUtil.createHBaseSchema(util); + TimelineEntity entity = + AggregationTestUtil.getTestAggregationTimelineEntity(); + TimelineEntities te = new TimelineEntities(); + te.addEntity(entity); + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + TimelineCollectorContext sampleContext = AggregationTestUtil.sampleContext1; + hbi.write(sampleContext.getClusterId(), sampleContext.getUserId(), + sampleContext.getFlowName(), sampleContext.getFlowVersion(), + sampleContext.getFlowRunId(), sampleContext.getAppId(), te); + hbi.stop(); + } finally { + hbi.stop(); + hbi.close(); + } + } + + @Test(timeout = 90000) + public void testFlowLevelAggregation() throws Exception { + TimelineEntityWritable mapperResult = testFlowAggregationMapper(); + testOfflineAggregationReducer(mapperResult, + OfflineAggregationInfo.FLOW_AGGREGATION); + } + + @Test(timeout = 90000) + public void testUserLevelAggregation() throws Exception { + TimelineEntityWritable testEntityWritable = new TimelineEntityWritable( + AggregationTestUtil.getTestAggregationTimelineEntity()); + TimelineEntityWritable mapperResult + = testUserAggregationMapper(testEntityWritable); + testOfflineAggregationReducer(mapperResult, + OfflineAggregationInfo.USER_AGGREGATION); + } + + private TimelineEntityWritable testFlowAggregationMapper() throws Exception { + TimelineCollectorContext sampleContext = AggregationTestUtil.sampleContext1; + String sampleInput = + sampleContext.getClusterId() + OfflineAggregator.KEY_SEPARATOR + + sampleContext.getUserId() + OfflineAggregator.KEY_SEPARATOR + + sampleContext.getFlowName(); + + // Test mapper + Mapper.Context context = mock(Mapper.Context.class); + // Return conf for HBase here + when(context.getConfiguration()).thenReturn(util.getConfiguration()); + when(context.getJobName()).thenReturn("test_job_name"); + + OfflineAggregator.FlowAggregatorMapper testMapper + = new OfflineAggregator.FlowAggregatorMapper(); + testMapper.map(new LongWritable(1), new Text(sampleInput), context); + + ArgumentCaptor outputFlowInfo = ArgumentCaptor.forClass(Text.class); + ArgumentCaptor outputWritable + = ArgumentCaptor.forClass(TimelineEntityWritable.class); + verify(context).write(outputFlowInfo.capture(), outputWritable.capture()); + + TimelineEntityWritable mapperOutput = outputWritable.getValue(); + assertEquals(TimelineEntityType.YARN_APPLICATION_AGGREGATION.toString(), + mapperOutput.get().getType()); + assertEquals(AggregationTestUtil.getTestAggregationTimelineEntity() + .getMetrics().size() * 2, mapperOutput.get().getMetrics().size()); + return mapperOutput; + } + + private TimelineEntityWritable testUserAggregationMapper( + TimelineEntityWritable entityWritable) throws Exception { + TimelineCollectorContext sampleContext = AggregationTestUtil.sampleContext1; + String sampleInput = + sampleContext.getClusterId() + OfflineAggregator.KEY_SEPARATOR + + sampleContext.getUserId() + OfflineAggregator.KEY_SEPARATOR + + sampleContext.getFlowName(); + + // Test mapper + Mapper.Context context = mock(Mapper.Context.class); + // Return conf for HBase here + when(context.getConfiguration()).thenReturn(util.getConfiguration()); + when(context.getJobName()).thenReturn("test_job_name"); + + OfflineAggregator.UserAggregatorMapper testMapper + = new OfflineAggregator.UserAggregatorMapper(); + testMapper.map(new Text(sampleInput), entityWritable, context); + + ArgumentCaptor outputUserInfo = ArgumentCaptor.forClass(Text.class); + ArgumentCaptor outputWritable + = ArgumentCaptor.forClass(TimelineEntityWritable.class); + verify(context).write(outputUserInfo.capture(), outputWritable.capture()); + + TimelineEntityWritable mapperOutput = outputWritable.getValue(); + assertEquals(AggregationTestUtil.getTestAggregationTimelineEntity() + .getMetrics().size(), mapperOutput.get().getMetrics().size()); + + String[] newKeys = outputUserInfo.getValue() + .toString().split(OfflineAggregator.KEY_SEPARATOR); + assertEquals(2, newKeys.length); + + return mapperOutput; + } + + private void testOfflineAggregationReducer(TimelineEntityWritable mapperOutput, + OfflineAggregationInfo aggregationInfo) throws Exception { + TimelineCollectorContext sampleContext = AggregationTestUtil.sampleContext1; + String sampleInput = + sampleContext.getClusterId() + OfflineAggregator.KEY_SEPARATOR + + sampleContext.getUserId() + OfflineAggregator.KEY_SEPARATOR + + sampleContext.getFlowName(); + + // Test reducer + Reducer.Context reducerContext = mock(Reducer.Context.class); + // Return conf for Phoenix here + when(reducerContext.getConfiguration()).thenReturn(confForPhoenix); + when(reducerContext.getJobName()).thenReturn("test_job_name"); + + final List entityWritableList = new ArrayList(); + entityWritableList.add(mapperOutput); + + if (aggregationInfo.equals(OfflineAggregationInfo.FLOW_AGGREGATION)) { + OfflineAggregator.FlowAggregatorReducer testReducer + = new OfflineAggregator.FlowAggregatorReducer(); + testReducer.reduce(new Text(sampleInput), + new Iterable() { + @Override public Iterator iterator() { + return entityWritableList.iterator(); + } + }, + reducerContext); + } else if (aggregationInfo.equals(OfflineAggregationInfo.USER_AGGREGATION)) { + OfflineAggregator.UserAggregatorReducer testReducer + = new OfflineAggregator.UserAggregatorReducer(); + testReducer.reduce(new Text(sampleInput), + new Iterable() { + @Override public Iterator iterator() { + return entityWritableList.iterator(); + } + }, + reducerContext); + } + + // Verify if we're storing all entities + String[] primaryKeyList = aggregationInfo.getPrimaryKeyList(); + String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1] + +") FROM " + aggregationInfo.getTableName(); + verifySQLWithCount(sql, 1, "Number of entities should be "); + sql = "SELECT COUNT(m.\"HDFS_BYTES_READ\") FROM " + + aggregationInfo.getTableName() + "(m.\"HDFS_BYTES_READ\" VARBINARY) "; + verifySQLWithCount(sql, 1, + "Number of entities with metric should be "); + + } + + @After + public void cleanup() throws Exception { + dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); + dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); + writer.stop(); + tearDownMiniCluster(); + //util.shutdownMiniCluster(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TestTimelineEntityWritable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TestTimelineEntityWritable.java new file mode 100644 index 0000000..d671dab --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregation/timebased/TestTimelineEntityWritable.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.aggregation.timebased; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.junit.Assert; +import org.junit.Test; + +public class TestTimelineEntityWritable { + @Test + public void testTimelineEntityWritable() throws Exception { + int testListSize = 3; + TimelineEntities entityList = + AggregationTestUtil.getStandardTestTimelineEntities(testListSize); + for (TimelineEntity entity : entityList.getEntities()) { + checkTimelineEntityWritable(new TimelineEntityWritable(entity)); + } + } + + private void checkTimelineEntityWritable(TimelineEntityWritable before) + throws Exception { + DataOutputBuffer dob = new DataOutputBuffer(); + before.write(dob); + + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(dob.getData(), dob.getLength()); + + TimelineEntityWritable after = new TimelineEntityWritable(); + after.readFields(dib); + + verifyEntityEquivalance(before.get(), after.get()); + } + + private void verifyEntityEquivalance(TimelineEntity e1, TimelineEntity e2) { + if (e1 == e2) { + return; + } + if (e1 == null) { + Assert.fail("Incoming entity 1 is null! "); + } + if (!e1.getId().equals(e2.getId())) { + Assert.fail("id mismatch! "); + } + if (!e1.getType().equals(e2.getType())) { + Assert.fail("type mismatch!"); + } +/* if (!e1.getEvents().equals(e2.getEvents())) { + Assert.fail("events mismatch! "); + } + if (!e1.getInfo().equals(e2.getInfo())) { + Assert.fail("info mismatch! "); + } + if (!e1.getConfigs().equals(e2.getConfigs())) { + Assert.fail("config mismatch! "); + }*/ + if (!e1.getMetrics().equals(e2.getMetrics())) { + Assert.fail("metrics mismatch! "); + } + /*if (!e1.getIsRelatedToEntities().equals(e2.getIsRelatedToEntities())) { + Assert.fail("isRelatedTo mismatch! "); + } + if (!e1.getRelatesToEntities().equals(e2.getRelatesToEntities())) { + Assert.fail("relatesTo mismatch! "); + }*/ + if (e1.getCreatedTime() != e2.getCreatedTime()) { + Assert.fail("createdTime mismatch! "); + } + if (e1.getModifiedTime() != e2.getModifiedTime()) { + Assert.fail("modifiedTime mismatch! "); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixRelatedTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixRelatedTest.java new file mode 100644 index 0000000..fefc51d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixRelatedTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixOfflineAggregationWriterImpl; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class PhoenixRelatedTest extends BaseTest { + protected static PhoenixOfflineAggregationWriterImpl writer; + public static final int BATCH_SIZE = 3; + + + protected static PhoenixOfflineAggregationWriterImpl + setupPhoenixClusterAndWriterForTest(YarnConfiguration conf) + throws Exception { + Map props = new HashMap<>(); + // Must update config before starting server + props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + Boolean.FALSE.toString()); + props.put("java.security.krb5.realm", ""); + props.put("java.security.krb5.kdc", ""); + props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, + Boolean.FALSE.toString()); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); + props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); + // Make a small batch size to test multiple calls to reserve sequences + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, + Long.toString(BATCH_SIZE)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + + PhoenixOfflineAggregationWriterImpl myWriter + = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES); + // Change connection settings for test + conf.set( + YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, + getUrl()); + myWriter.serviceInit(conf); + myWriter.start(); + myWriter.createPhoenixTables(); + return myWriter; + } + + protected void verifySQLWithCount(String sql, int targetCount, String message) + throws Exception { + try ( + Statement stmt = + writer.getConnection().createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + assertTrue("Result set empty on statement " + sql, rs.next()); + assertNotNull("Fail to execute query " + sql, rs); + assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); + } catch (SQLException se) { + fail("SQL exception on query: " + sql + + " With exception message: " + se.getLocalizedMessage()); + } + } + + protected void dropTable(String tableName) throws Exception { + writer.dropTable(tableName); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseAggregatorReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseAggregatorReader.java new file mode 100644 index 0000000..30bdf33 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseAggregatorReader.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.aggregation.timebased.AggregationTestUtil; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHBaseAggregatorReader { + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(); + AggregationTestUtil.createHBaseSchema(util); + + TimelineEntity entity = + AggregationTestUtil.getTestAggregationTimelineEntity(); + TimelineEntities te = new TimelineEntities(); + te.addEntity(entity); + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + TimelineCollectorContext sampleContext = AggregationTestUtil.sampleContext1; + hbi.write(sampleContext.getClusterId(), sampleContext.getUserId(), + sampleContext.getFlowName(), sampleContext.getFlowVersion(), + sampleContext.getFlowRunId(), sampleContext.getAppId(), te); + hbi.stop(); + } finally { + hbi.stop(); + hbi.close(); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testHBaseAggregatorReader() throws Exception { + TimelineCollectorContext sampleContext = AggregationTestUtil.sampleContext1; + try (HBaseAggregatorReader reader = new HBaseAggregatorReader()) { + Configuration c1 = util.getConfiguration(); + reader.connect(c1); + + TimelineEntities entities = reader.getEntities(sampleContext.getClusterId(), + sampleContext.getUserId(), sampleContext.getFlowName()); + Assert.assertTrue(entities.getEntities().size() == 1); + + TimelineEntity entity = entities.getEntities().iterator().next(); + TimelineEntity sample = + AggregationTestUtil.getTestAggregationTimelineEntity(); + + Assert.assertEquals(entity, sample); + Assert.assertEquals(entity.getCreatedTime(), sample.getCreatedTime()); + Assert.assertEquals(entity.getModifiedTime(), sample.getModifiedTime()); + + TimelineMetric metric = entity.getMetrics().iterator().next(); + TimelineMetric sampleMetric = sample.getMetrics().iterator().next(); + Assert.assertEquals(metric.getId(), sampleMetric.getId()); + Assert.assertEquals(metric.getType(), sampleMetric.getType()); + + Long metricKey = metric.getValues().keySet().iterator().next(); + Long sampleMetricKey = sampleMetric.getValues().keySet().iterator().next(); + Assert.assertEquals(metricKey, sampleMetricKey); + Assert.assertEquals(metric.getValues().get(metricKey), + sampleMetric.getValues().get(sampleMetricKey)); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java index de66a17..4882bed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java @@ -23,113 +23,49 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.aggregation.timebased.AggregationTestUtil; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.phoenix.hbase.index.write.IndexWriterUtils; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.ReadOnlyProps; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - -public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest { - private static PhoenixOfflineAggregationWriterImpl storage; - private static final int BATCH_SIZE = 3; +public class TestPhoenixOfflineAggregationWriterImpl extends PhoenixRelatedTest { @BeforeClass public static void setup() throws Exception { YarnConfiguration conf = new YarnConfiguration(); - storage = setupPhoenixClusterAndWriterForTest(conf); + writer = setupPhoenixClusterAndWriterForTest(conf); } @Test(timeout = 90000) public void testFlowLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION); + testAggregationWriter(OfflineAggregationInfo.FLOW_AGGREGATION); } @Test(timeout = 90000) public void testUserLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.USER_AGGREGATION); + testAggregationWriter(OfflineAggregationInfo.USER_AGGREGATION); } @AfterClass public static void cleanup() throws Exception { - storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); - storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); + writer.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); + writer.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); tearDownMiniCluster(); } - private static PhoenixOfflineAggregationWriterImpl - setupPhoenixClusterAndWriterForTest(YarnConfiguration conf) - throws Exception{ - Map props = new HashMap<>(); - // Must update config before starting server - props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - Boolean.FALSE.toString()); - props.put("java.security.krb5.realm", ""); - props.put("java.security.krb5.kdc", ""); - props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, - Boolean.FALSE.toString()); - props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); - props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); - // Make a small batch size to test multiple calls to reserve sequences - props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, - Long.toString(BATCH_SIZE)); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - - // Change connection settings for test - conf.set( - YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, - getUrl()); - PhoenixOfflineAggregationWriterImpl - myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES); - myWriter.init(conf); - myWriter.start(); - myWriter.createPhoenixTables(); - return myWriter; - } - - private static TimelineEntity getTestAggregationTimelineEntity() { - TimelineEntity entity = new TimelineEntity(); - String id = "hello1"; - String type = "testAggregationType"; - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(1425016501000L); - entity.setModifiedTime(1425016502000L); - - TimelineMetric metric = new TimelineMetric(); - metric.setId("HDFS_BYTES_READ"); - metric.addValue(1425016501100L, 8000); - entity.addMetric(metric); - - return entity; - } - - private void testAggregator(OfflineAggregationInfo aggregationInfo) + private void testAggregationWriter(OfflineAggregationInfo aggregationInfo) throws Exception { // Set up a list of timeline entities and write them back to Phoenix int numEntity = 1; TimelineEntities te = new TimelineEntities(); - te.addEntity(getTestAggregationTimelineEntity()); + te.addEntity(AggregationTestUtil.getTestAggregationTimelineEntity()); TimelineCollectorContext context = new TimelineCollectorContext("cluster_1", "user1", "testFlow", null, 0, null); - storage.writeAggregatedEntity(context, te, + writer.writeAggregatedEntity(context, te, aggregationInfo); // Verify if we're storing all entities @@ -138,25 +74,10 @@ private void testAggregator(OfflineAggregationInfo aggregationInfo) +") FROM " + aggregationInfo.getTableName(); verifySQLWithCount(sql, numEntity, "Number of entities should be "); // Check metric - sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM " - + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) "; + sql = "SELECT COUNT(m.\"HDFS_BYTES_READ\") FROM " + + aggregationInfo.getTableName() + "(m.\"HDFS_BYTES_READ\" VARBINARY) "; verifySQLWithCount(sql, numEntity, "Number of entities with info should be "); } - - private void verifySQLWithCount(String sql, int targetCount, String message) - throws Exception { - try ( - Statement stmt = - storage.getConnection().createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - assertTrue("Result set empty on statement " + sql, rs.next()); - assertNotNull("Fail to execute query " + sql, rs); - assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); - } catch (SQLException se) { - fail("SQL exception on query: " + sql - + " With exception message: " + se.getLocalizedMessage()); - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java deleted file mode 100644 index 4f96f87..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.junit.Test; - -public class TestTimelineWriterUtils { - - @Test - public void test() { - // TODO: implement a test for the remaining method in TimelineWriterUtils. - } - -}