diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/BufferedMutatorDelegator.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/BufferedMutatorDelegator.java new file mode 100644 index 0000000..1d59ef3 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/BufferedMutatorDelegator.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hbase.stubbs.BufferedMutator; +import org.apache.hbase.stubbs.TableName; + +/** + * To be used to wrap an actual {@link BufferedMutator} in a type safe manner + * @param The class referring to the table to be written to. + */ +class BufferedMutatorDelegator implements TypedBufferedMutator { + + private final BufferedMutator bufferedMutator; + + /** + * Constructor + * + * @param bufferedMutator + * the mutator to be wrapped for delegation. Shall not be null. + */ + public BufferedMutatorDelegator(BufferedMutator bufferedMutator) { + this.bufferedMutator = bufferedMutator; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.hbasestubb. + * BufferedMutator#getName() + */ + public TableName getName() { + return bufferedMutator.getName(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.hbasestubb. + * BufferedMutator#getConfiguration() + */ + public Configuration getConfiguration() { + return bufferedMutator.getConfiguration(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.hbasestubb. + * BufferedMutator#mutate(org.apache.hadoop.hbase.client.Mutation) + */ + public void mutate(Mutation mutation) throws IOException { + bufferedMutator.mutate(mutation); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.hbasestubb. + * BufferedMutator#mutate(java.util.List) + */ + public void mutate(List mutations) throws IOException { + bufferedMutator.mutate(mutations); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.hbasestubb. + * BufferedMutator#close() + */ + public void close() throws IOException { + bufferedMutator.close(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.hbasestubb. + * BufferedMutator#flush() + */ + public void flush() throws IOException { + bufferedMutator.flush(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.yarn.server.timelineservice.storage.hbasestubb. + * BufferedMutator#getWriteBufferSize() + */ + public long getWriteBufferSize() { + return bufferedMutator.getWriteBufferSize(); + } + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Column.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Column.java new file mode 100644 index 0000000..0aaed46 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Column.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +/** + * A Column represents the way to store a fully qualified column in a specific + * table. + */ +public interface Column { + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table. Caller is responsible + * to pass a mutator for the table that actually has this column. + * @param timestamp + * version timestamp. When null the server timestamp will be used. + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + Long timestamp, Object inputValue) throws IOException; + +} \ No newline at end of file diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnFamily.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnFamily.java new file mode 100644 index 0000000..53cb457 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnFamily.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +/** + * Type safe column family. + * + * @param + * refers to the table for which this column family is used for. + */ +public interface ColumnFamily { + + /** + * @return the byte representation of the column family. + */ + public byte[] getBytes(); + +} \ No newline at end of file diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnImpl.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnImpl.java new file mode 100644 index 0000000..8bc2a1a --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnImpl.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hbase.stubbs.GenericObjectMapper; + +/** + * This class is meant to be used only by explicit Columns, and not directly to + * write by clients. + * @param refers to the table. + */ +class ColumnImpl { + + private final ColumnFamily columnFamily; + + public ColumnImpl(ColumnFamily columnFamily) { + this.columnFamily = columnFamily; + } + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table + * @param columnQualifier + * column qualifier. Nothing gets written when null. + * @param timestamp + * version timestamp. When null the server timestamp will be used. + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + byte[] columnQualifier, Long timestamp, Object inputValue) + throws IOException { + if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) { + return; + } + Put p = new Put(rowKey); + + if (timestamp == null) { + // TODO: in HBase1 this is p.addColumn + p.add(columnFamily.getBytes(), columnQualifier, + GenericObjectMapper.write(inputValue)); + tableMutator.mutate(p); + } else { + p.add(columnFamily.getBytes(), columnQualifier, timestamp, + GenericObjectMapper.write(inputValue)); + } + } + + /** + * @return the folumn family for this column implementation. + */ + public ColumnFamily getColumnFamily() { + return columnFamily; + } + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnPrefix.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnPrefix.java new file mode 100644 index 0000000..df0a5e4 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ColumnPrefix.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +/** + * Used to represent a partially qualified column, where the actual column name + * will be composed of a prefix and the remainder of the column qualifier. The + * prefix can be null, in which case the column qualifier will be completely + * determined when the values are stored. + */ +public interface ColumnPrefix { + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table. Caller is responsible + * to pass a mutator for the table that actually has this column. + * @param qualifier + * column qualifier. Nothing gets written when null. + * @param timestamp + * version timestamp. When null the server timestamp will be used. + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + String qualifier, Long timestamp, Object inputValue) throws IOException; + +} \ No newline at end of file diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumn.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumn.java new file mode 100644 index 0000000..0969363 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumn.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies fully qualified columns for the {@link EntityTable}. + */ +public enum EntityColumn implements Column { + + ID(EntityColumnFamily.INFO, "id"), + TYPE(EntityColumnFamily.INFO, "type"), + CREATED_TIME(EntityColumnFamily.INFO, "created_time"), + MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"), + FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"); + + private final ColumnImpl column; + private final ColumnFamily columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + private EntityColumn(ColumnFamily columnFamily, String value) { + this.columnFamily = columnFamily; + this.columnQualifier = value; + this.columnQualifierBytes = Bytes.toBytes(this.columnQualifier.toLowerCase()); + column = new ColumnImpl(columnFamily); + } + + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + + /* + * (non-Javadoc) + * + * @see com.twitter.hraven.ats.Column#store(byte[], + * com.twitter.hraven.ats.TypedBufferedMutator, java.lang.Long, + * java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, Long timestamp, + Object inputValue) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue); + } + + /** + * Retrieve an {@link EntityColumn} given a name, or null if there is no match. + * The following holds true: {@code columnFor(x) == columnFor(y)} if and only + * if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier + * Name of the column to retrieve + * @return the corresponding {@link EntityColumn} or null + */ + public static final EntityColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (EntityColumn eic : EntityColumn.values()) { + // Find a match based only on name. + if (eic.getColumnQualifier().equals(columnQualifier)) { + return eic; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link EntityColumn} given a name, or null if there is no match. + * The following holds true: {@code columnFor(a,x) == columnFor(b,y)} if and + * only if {@code a.equals(b) & x.equals(y)} or {@code (x == y == null)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param name + * Name of the column to retrieve + * @return the corresponding {@link EntityColumn} or null if both arguments don't + * match. + */ + public static final EntityColumn columnFor(EntityColumnFamily columnFamily, + String name) { + + for (EntityColumn eic : EntityColumn.values()) { + // Find a match based column family and on name. + if (eic.columnFamily.equals(columnFamily) && eic.getColumnQualifier().equals(name)) { + return eic; + } + } + + // Default to null + return null; + } + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java new file mode 100644 index 0000000..8c0ddf7 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Represents the entity table column families. + */ +public enum EntityColumnFamily implements ColumnFamily { + + INFO("i"), CONFIG("c"), METRICS("m"); + + private final String value; + private final byte[] bytes; + + /** + * Constructor. + * + * @param value + * create a column family with this name. Will be considered to be + * lower case. + */ + private EntityColumnFamily(String value) { + this.value = value; + this.bytes = Bytes.toBytes(this.value.toLowerCase()); + } + + /* + * (non-Javadoc) + * + * @see com.twitter.hraven.ats.ColumnFamily#getBytes() + */ + public byte[] getBytes() { + return bytes; + } + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnPrefix.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnPrefix.java new file mode 100644 index 0000000..12e1b78 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnPrefix.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies partially qualified columns for the entity table. + */ +public enum EntityColumnPrefix implements ColumnPrefix { + + NONE(EntityColumnFamily.INFO, null), + // TODO: how to retrieve column prefix enum for null qualifier? + IS_RELATED_TO(EntityColumnFamily.INFO, "s"), // Note that r and s are flipped + // compared to original impl. + RELATES_TO(EntityColumnFamily.INFO, "r"), + EVENTS(EntityColumnFamily.INFO, "e"); + + private final ColumnImpl column; + private final ColumnFamily columnFamily; + + /** + * Can be null + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private EntityColumnPrefix(ColumnFamily columnFamily, String columnPrefix) { + column = new ColumnImpl(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + this.columnPrefixBytes = Bytes.toBytes(columnPrefix.toLowerCase()); + } + } + + /** + * @return the column name value + */ + private String getColumnPrefix() { + return columnPrefix; + } + + + /* + * (non-Javadoc) + * + * @see com.twitter.hraven.ats.ColumnPrefix#store(byte[], + * com.twitter.hraven.ats.TypedBufferedMutator, java.lang.String, + * java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue) throws IOException { + + // Convert qualifier to lower case, strip of separators and tag on column + // prefix. + byte[] columnQualifier = TimelineWriterUtils.join( + TimelineEntitySchemaConstants.SEPARATOR_BYTES, this.columnPrefixBytes, + Bytes.toBytes(qualifier.toLowerCase())); + // TODO: confirm that join properly deals with nulls and does not add + // superfluous prefix for null prefix. + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + } + + /** + * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link EntityColumnPrefix} or null + */ + public static final EntityColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (EntityColumnPrefix eic : EntityColumnPrefix.values()) { + // Find a match based only on name. + if (eic.getColumnPrefix().equals(columnPrefix)) { + return eic; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link EntityColumnPrefix} or null if both + * arguments don't match. + */ + public static final EntityColumnPrefix columnFor( + EntityColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (EntityColumnPrefix eic : EntityColumnPrefix.values()) { + // Find a match based column family and on name. + if (eic.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (eic.getColumnPrefix() == null)) || (eic + .getColumnPrefix().equals(columnPrefix)))) { + return eic; + } + } + + // Default to null + return null; + } + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTable.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTable.java new file mode 100644 index 0000000..5868826 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTable.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.stubbs.BufferedMutator; +import org.apache.hbase.stubbs.Connection; +import org.apache.hbase.stubbs.TableName; + +/** + * Define the entity table. + */ +public class EntityTable { + /** entity prefix */ + public static final String ENTITY_PREFIX = + // TODO: add this back in (I don't have trunk setup correctly + // YarnConfiguration.TIMELINE_SERVICE_PREFIX + + ".entity"; + + /** config param name that specifies the entity table name */ + public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * entity table + */ + public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX + + ".table.metrics.ttl"; + + /** default value for entity table name */ + public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity"; + + /** in bytes default value for entity table name */ + static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes + .toBytes(DEFAULT_ENTITY_TABLE_NAME); + + /** default TTL is 30 days for metrics timeseries */ + public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000; + + /** default max number of versions */ + public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000; + + /** + * Used to create a type-safe mutator for this table. + * + * @param hbaseConf + * used to read table name + * @param conn + * used to create a table from. + * @return a {@link BufferedMutator} + */ + static final TypedBufferedMutator getTableMutator( + HBaseConfiguration hbaseConf, Connection conn) { + + TableName entityTableName = TableName.valueOf(hbaseConf.get( + ENTITY_TABLE_NAME, DEFAULT_ENTITY_TABLE_NAME)); + + // Plain buffered mutator + BufferedMutator bufferedMutator = conn.getBufferedMutator(entityTableName); + + // Now make this thing type safe. + // This is how service initialization should hang on to this variable, with + // the proper type + TypedBufferedMutator entityTable = new BufferedMutatorDelegator( + bufferedMutator); + + return entityTable; + } + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java new file mode 100644 index 0000000..829b14c --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +public class TimelineEntitySchemaConstants { + + /** separator in key fields */ + public static final String ROW_KEY_SEPARATOR = "!"; + + /** byte representation of the separator in key fields */ + static final byte[] SEPARATOR_BYTES = Bytes + .toBytes(ROW_KEY_SEPARATOR); + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java new file mode 100644 index 0000000..ef0fa41 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * bunch of utility functions used across TimelineWriter classes + * TODO: Stripped the store methods from here, as they are not needed. + * TODO: also dropped some other methods, that will be needed for reading, + * but that I did not have handy due to incorrect HBase setup + * For example, missing range. those need to be added back in. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineWriterUtils { + + /** empty bytes */ + public static final byte[] EMPTY_BYTES = new byte[0]; + private static final String SPACE = " "; + private static final String UNDERSCORE = "_"; + private static final String EMPTY_STRING = ""; + + /** + * Returns a single byte array containing all of the individual component + * arrays separated by the separator array. + * + * @param separator + * @param components + * @return byte array after joining the components + */ + public static byte[] join(byte[] separator, byte[]... components) { + if (components == null || components.length == 0) { + return EMPTY_BYTES; + } + + int finalSize = 0; + if (separator != null) { + finalSize = separator.length * (components.length - 1); + } + for (byte[] comp : components) { + if (comp != null) { + finalSize += comp.length; + } + } + + byte[] buf = new byte[finalSize]; + int offset = 0; + for (int i = 0; i < components.length; i++) { + if (components[i] != null) { + System.arraycopy(components[i], 0, buf, offset, components[i].length); + offset += components[i].length; + if (i < (components.length - 1) && separator != null + && separator.length > 0) { + System.arraycopy(separator, 0, buf, offset, separator.length); + offset += separator.length; + } + } + } + return buf; + } + + + + + /** + * converts run id into it's inverse timestamp + * + * @param flowRunId + * @return inverted long + */ + public static long encodeRunId(Long flowRunId) { + return Long.MAX_VALUE - flowRunId; + } + + + + + /** + * concates the values from a Set to return a single delimited string + * value + * + * @param rowKeySeparator + * @param values + * @return Value from the set of strings as a string + */ + public static String getValueAsString(String rowKeySeparator, + Set values) { + + if (values == null) { + return EMPTY_STRING; + } + StringBuilder concatStrings = new StringBuilder(); + for (String value : values) { + concatStrings.append(value); + concatStrings.append(rowKeySeparator); + } + // remove the last separator + if (concatStrings.length() > 1) { + concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator)); + } + return concatStrings.toString(); + } + + /** + * Constructs a row key prefix for the entity table + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return byte array with the row key prefix + */ + static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId, + Long flowRunId, String appId) { + return TimelineWriterUtils.join( + TimelineEntitySchemaConstants.SEPARATOR_BYTES, + Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)), + Bytes.toBytes(cleanse(flowId)), + Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)), + Bytes.toBytes(cleanse(appId))); + } + + /** + * Takes a string token to be used as a key or qualifier and cleanses out + * reserved tokens. This operation is not symmetrical. Logic is to replace all + * spaces and separator chars in input with underscores. + * + * @param token + * token to cleanse. + * @return String with no spaces and no separator chars + */ + public static String cleanse(String token) { + if (token == null || token.length() == 0) { + return token; + } + + String cleansed = token.replaceAll(SPACE, UNDERSCORE); + cleansed = cleansed.replaceAll( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE); + + return cleansed; + } + + +} diff --git a/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TypedBufferedMutator.java b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TypedBufferedMutator.java new file mode 100644 index 0000000..80fb9a7 --- /dev/null +++ b/hraven-core/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TypedBufferedMutator.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +// Note that this is just a stubb (because I don't have HBase correctly setup. +// This should be the real HBase BufferedMutator +import org.apache.hbase.stubbs.BufferedMutator; + +/** + * Just a typed wrapper around {@link BufferedMutator} used to ensure that + * columns can write only to the table mutator for the right table. + */ +public interface TypedBufferedMutator extends BufferedMutator { + // This class is intentionally left (almost) blank +}