diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 3ac774689e..0e4b502918 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import org.antlr.runtime.tree.Tree; import org.apache.commons.collections.CollectionUtils; @@ -30,23 +28,13 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; -import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; @@ -54,44 +42,39 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; -import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler; -import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandlerFactory; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; -import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -131,7 +114,8 @@ EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), EVENT_INSERT("EVENT_INSERT"), - EVENT_UNKNOWN("EVENT_UNKNOWN"); + EVENT_UNKNOWN("EVENT_UNKNOWN"), + EVENT_CREATE_FUNCTION("EVENT_CREATE_FUNCTION"); String type = null; DUMPTYPE(String type) { @@ -693,8 +677,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { int evstage = 0; Long lastEvid = null; - Map dbsUpdated = new ReplicationSpec.ReplStateMap(); - Map tablesUpdated = new ReplicationSpec.ReplStateMap(); + Map dbsUpdated = new ReplicationSpec.ReplStateMap<>(); + Map tablesUpdated = new ReplicationSpec.ReplStateMap<>(); for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); @@ -784,7 +768,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { for (String tableName : tablesUpdated.keySet()){ // weird - AlterTableDesc requires a HashMap to update props instead of a Map. - HashMap mapProp = new HashMap(); + HashMap mapProp = new HashMap<>(); mapProp.put( ReplicationSpec.KEY.CURR_STATE_ID.toString(), tablesUpdated.get(tableName).toString()); @@ -798,7 +782,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { taskChainTail = updateReplIdTask; } for (String dbName : dbsUpdated.keySet()){ - Map mapProp = new HashMap(); + Map mapProp = new HashMap<>(); mapProp.put( ReplicationSpec.KEY.CURR_STATE_ID.toString(), dbsUpdated.get(dbName).toString()); @@ -819,219 +803,28 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } private List> analyzeEventLoad( - String dbName, String tblName, String locn, - Task precursor, - Map dbsUpdated, Map tablesUpdated, - DumpMetaData dmd) throws SemanticException { - MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); - switch (dmd.getDumpType()) { - case EVENT_CREATE_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_ADD_PARTITION: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_DROP_TABLE: { - DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropTableMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropTableMessage.getTable() : tblName); - DropTableDesc dropTableDesc = new DropTableDesc( - actualDbName + "." + actualTblName, - null, true, true, - getNewEventOnlyReplicationSpec(dmd.getEventFrom())); - Task dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf); - if (precursor != null){ - precursor.addDependentTask(dropTableTask); - } - List> tasks = new ArrayList>(); - tasks.add(dropTableTask); - LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); - dbsUpdated.put(actualDbName,dmd.getEventTo()); - return tasks; - } - case EVENT_DROP_PARTITION: { - try { - DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropPartitionMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropPartitionMessage.getTable() : tblName); - Map> partSpecs; - partSpecs = - genPartSpecs(new Table(dropPartitionMessage.getTableObj()), - dropPartitionMessage.getPartitions()); - if (partSpecs.size() > 0) { - DropTableDesc dropPtnDesc = new DropTableDesc( - actualDbName + "." + actualTblName, - partSpecs, null, true, - getNewEventOnlyReplicationSpec(dmd.getEventFrom())); - Task dropPtnTask = - TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf); - if (precursor != null) { - precursor.addDependentTask(dropPtnTask); - } - List> tasks = new ArrayList>(); - tasks.add(dropPtnTask); - LOG.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), - dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions()); - dbsUpdated.put(actualDbName, dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, dmd.getEventTo()); - return tasks; - } else { - throw new SemanticException( - "DROP PARTITION EVENT does not return any part descs for event message :" - + dmd.getPayload()); - } - } catch (Exception e) { - if (!(e instanceof SemanticException)){ - throw new SemanticException("Error reading message members", e); - } else { - throw (SemanticException)e; - } - } - } - case EVENT_ALTER_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_RENAME_TABLE: { - AlterTableMessage renameTableMessage = md.getAlterTableMessage(dmd.getPayload()); - if ((tblName != null) && (!tblName.isEmpty())){ - throw new SemanticException("RENAMES of tables are not supported for table-level replication"); - } - try { - String oldDbName = renameTableMessage.getTableObjBefore().getDbName(); - String newDbName = renameTableMessage.getTableObjAfter().getDbName(); - - if ((dbName != null) && (!dbName.isEmpty())){ - // If we're loading into a db, instead of into the warehouse, then the oldDbName and - // newDbName must be the same - if (!oldDbName.equalsIgnoreCase(newDbName)){ - throw new SemanticException("Cannot replicate an event renaming a table across" - + " databases into a db level load " + oldDbName +"->" + newDbName); - } else { - // both were the same, and can be replaced by the new db we're loading into. - oldDbName = dbName; - newDbName = dbName; - } - } - - String oldName = oldDbName + "." + renameTableMessage.getTableObjBefore().getTableName(); - String newName = newDbName + "." + renameTableMessage.getTableObjAfter().getTableName(); - AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false); - Task renameTableTask = TaskFactory.get(new DDLWork(inputs, outputs, renameTableDesc), conf); - if (precursor != null){ - precursor.addDependentTask(renameTableTask); - } - List> tasks = new ArrayList>(); - tasks.add(renameTableTask); - LOG.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName); - dbsUpdated.put(newDbName, dmd.getEventTo()); // oldDbName and newDbName *will* be the same if we're here - tablesUpdated.remove(oldName); - tablesUpdated.put(newName, dmd.getEventTo()); - // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated - // However, we explicitly don't support repl of that sort, and error out above if so. If that should - // ever change, this will need reworking. - return tasks; - } catch (Exception e) { - if (!(e instanceof SemanticException)){ - throw new SemanticException("Error reading message members", e); - } else { - throw (SemanticException)e; - } - } - } - case EVENT_ALTER_PARTITION: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_RENAME_PARTITION: { - AlterPartitionMessage renamePtnMessage = md.getAlterPartitionMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? renamePtnMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? renamePtnMessage.getTable() : tblName); - - Map newPartSpec = new LinkedHashMap(); - Map oldPartSpec = new LinkedHashMap(); - String tableName = actualDbName + "." + actualTblName; - try { - org.apache.hadoop.hive.metastore.api.Table tblObj = renamePtnMessage.getTableObj(); - org.apache.hadoop.hive.metastore.api.Partition pobjBefore = renamePtnMessage.getPtnObjBefore(); - org.apache.hadoop.hive.metastore.api.Partition pobjAfter = renamePtnMessage.getPtnObjAfter(); - Iterator beforeValIter = pobjBefore.getValuesIterator(); - Iterator afterValIter = pobjAfter.getValuesIterator(); - for (FieldSchema fs : tblObj.getPartitionKeys()){ - oldPartSpec.put(fs.getName(), beforeValIter.next()); - newPartSpec.put(fs.getName(), afterValIter.next()); - } - } catch (Exception e) { - if (!(e instanceof SemanticException)){ - throw new SemanticException("Error reading message members", e); - } else { - throw (SemanticException)e; - } - } - - RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec); - Task renamePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, renamePtnDesc), conf); - if (precursor != null){ - precursor.addDependentTask(renamePtnTask); - } - List> tasks = new ArrayList>(); - tasks.add(renamePtnTask); - LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); - dbsUpdated.put(actualDbName, dmd.getEventTo()); - tablesUpdated.put(tableName, dmd.getEventTo()); - return tasks; - } - case EVENT_INSERT: { - md = MessageFactory.getInstance().getDeserializer(); - InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? insertMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? insertMessage.getTable() : tblName); - - // Piggybacking in Import logic for now - return analyzeTableLoad(actualDbName, actualTblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_UNKNOWN: { - break; - } - default: { - break; + String dbName, String tblName, String locn, Task precursor, + Map dbsUpdated, Map tablesUpdated, DumpMetaData dmd) + throws SemanticException { + MessageHandler.Context context = + new MessageHandler.Context(dbName, tblName, locn, precursor, dmd, conf, db, ctx, LOG); + MessageHandler messageHandler = MessageHandlerFactory.handlerFor(dmd.getDumpType()); + List> tasks = messageHandler.handle(context); + + if (precursor != null) { + for (Task t : tasks) { + precursor.addDependentTask(t); + LOG.debug("Added {}:{} as a precursor of {}:{}", + precursor.getClass(), precursor.getId(), t.getClass(), t.getId()); } } - return null; + dbsUpdated.putAll(messageHandler.databasesUpdated()); + tablesUpdated.putAll(messageHandler.tablesUpdated()); + inputs.addAll(messageHandler.readEntities()); + outputs.addAll(messageHandler.writeEntities()); + return tasks; } - private Map> genPartSpecs(Table table, - List> partitions) throws SemanticException { - Map> partSpecs = - new HashMap>(); - int partPrefixLength = 0; - if ((partitions != null) && (partitions.size() > 0)) { - partPrefixLength = partitions.get(0).size(); - // pick the length of the first ptn, we expect all ptns listed to have the same number of - // key-vals. - } - List ptnDescs = new ArrayList(); - for (Map ptn : partitions) { - // convert each key-value-map to appropriate expression. - ExprNodeGenericFuncDesc expr = null; - for (Map.Entry kvp : ptn.entrySet()) { - String key = kvp.getKey(); - Object val = kvp.getValue(); - String type = table.getPartColByName(key).getType(); - ; - PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); - ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); - ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( - "=", column, new ExprNodeConstantDesc(pti, val)); - expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); - } - if (expr != null) { - ptnDescs.add(expr); - } - } - if (ptnDescs.size() > 0) { - partSpecs.put(partPrefixLength, ptnDescs); - } - return partSpecs; - } private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java similarity index 92% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java index ab059c20a4..ba699e3ed4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; @@ -23,7 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractHandler implements EventHandler { +abstract class AbstractHandler implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class); final NotificationEvent event; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java similarity index 97% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 9a4f8b9197..24487fcc6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -37,7 +37,7 @@ import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -public class AddPartitionHandler extends AbstractHandler { +class AddPartitionHandler extends AbstractHandler { protected AddPartitionHandler(NotificationEvent notificationEvent) { super(notificationEvent); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java similarity index 96% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 1073cd093c..d3889a2337 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -31,7 +31,7 @@ import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; -public class AlterPartitionHandler extends AbstractHandler { +class AlterPartitionHandler extends AbstractHandler { private final org.apache.hadoop.hive.metastore.api.Partition after; private final org.apache.hadoop.hive.metastore.api.Table tableObject; private final Scenario scenario; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java similarity index 96% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 04d9d79d87..730ef0eded 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -26,7 +26,7 @@ import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; -public class AlterTableHandler extends AbstractHandler { +class AlterTableHandler extends AbstractHandler { private final org.apache.hadoop.hive.metastore.api.Table before; private final org.apache.hadoop.hive.metastore.api.Table after; private final Scenario scenario; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java new file mode 100644 index 0000000000..02b8ae8123 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; + +class CreateFunctionHandler extends AbstractHandler { + CreateFunctionHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + CreateFunctionMessage createFunctionMessage = + deserializer.getCreateFunctionMessage(event.getMessage()); + LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), event.getMessage()); + Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf); + + try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) { + new FunctionSerializer(createFunctionMessage.getFunctionObj()) + .writeTo(jsonWriter, withinContext.replicationSpec); + } + } + + @Override + public DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_CREATE_FUNCTION; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java similarity index 96% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 03f400de61..e58004f686 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,7 +30,7 @@ import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -public class CreateTableHandler extends AbstractHandler { +class CreateTableHandler extends AbstractHandler { CreateTableHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java similarity index 93% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java index 61c5f37334..3906045dab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; -public class DefaultHandler extends AbstractHandler { +class DefaultHandler extends AbstractHandler { DefaultHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java similarity index 92% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java index 3ad794e383..261fa31998 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; -public class DropPartitionHandler extends AbstractHandler { +class DropPartitionHandler extends AbstractHandler { DropPartitionHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java similarity index 93% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java index cae379b4fc..d33e82f3ba 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; -public class DropTableHandler extends AbstractHandler { +class DropTableHandler extends AbstractHandler { DropTableHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java similarity index 97% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index 199145a5d4..41adbd75b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java similarity index 96% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index 53adea8af9..08dbd13822 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; @@ -36,6 +36,7 @@ private EventHandlerFactory() { register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class); register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class); register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class); + register(MessageFactory.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class); register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class); register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class); register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java similarity index 97% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 13462766b5..dada07507a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,7 +35,7 @@ import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; -public class InsertHandler extends AbstractHandler { +class InsertHandler extends AbstractHandler { InsertHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java new file mode 100644 index 0000000000..95e51e4f1a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +abstract class AbstractMessageHandler implements MessageHandler { + final HashSet readEntitySet = new HashSet<>(); + final HashSet writeEntitySet = new HashSet<>(); + final Map tablesUpdated = new HashMap<>(), + databasesUpdated = new HashMap<>(); + final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); + + @Override + public Set readEntities() { + return readEntitySet; + } + + @Override + public Set writeEntities() { + return writeEntitySet; + } + + @Override + public Map tablesUpdated() { + return tablesUpdated; + } + + @Override + public Map databasesUpdated() { + return databasesUpdated; + } + + ReplicationSpec eventOnlyReplicationSpec(Context forContext) throws SemanticException { + String eventId = forContext.dmd.getEventTo().toString(); + return replicationSpec(eventId, eventId); + } + + private ReplicationSpec replicationSpec(String fromId, String toId) throws SemanticException { + return new ReplicationSpec(true, false, fromId, toId, false, true, false); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java new file mode 100644 index 0000000000..6d346b6fe0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +class DefaultHandler extends AbstractMessageHandler { + @Override + public List> handle(Context withinContext) + throws SemanticException { + return new ArrayList<>(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java new file mode 100644 index 0000000000..4febf46cc5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class DropPartitionHandler extends AbstractMessageHandler { + @Override + public List> handle(Context withinContext) + throws SemanticException { + try { + DropPartitionMessage dropPartitionMessage = + deserializer.getDropPartitionMessage(withinContext.dmd.getPayload()); + String actualDbName = + withinContext.isDbNameEmpty() ? dropPartitionMessage.getDB() : withinContext.dbName; + String actualTblName = withinContext.isTableNameEmpty() ? dropPartitionMessage.getTable() : + withinContext.tableName; + Map> partSpecs = + genPartSpecs(new Table(dropPartitionMessage.getTableObj()), + dropPartitionMessage.getPartitions()); + if (partSpecs.size() > 0) { + DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName, + partSpecs, null, true, eventOnlyReplicationSpec(withinContext)); + Task dropPtnTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, dropPtnDesc), + withinContext.hiveConf + ); + List> tasks = new ArrayList<>(); + tasks.add(dropPtnTask); + withinContext.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), + dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions()); + databasesUpdated.put(actualDbName, withinContext.dmd.getEventTo()); + tablesUpdated.put(actualDbName + "." + actualTblName, withinContext.dmd.getEventTo()); + return tasks; + } else { + throw new SemanticException( + "DROP PARTITION EVENT does not return any part descs for event message :" + + withinContext.dmd.getPayload()); + } + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + } + + private Map> genPartSpecs(Table table, + List> partitions) throws SemanticException { + Map> partSpecs = new HashMap<>(); + int partPrefixLength = 0; + if ((partitions != null) && (partitions.size() > 0)) { + partPrefixLength = partitions.get(0).size(); + // pick the length of the first ptn, we expect all ptns listed to have the same number of + // key-vals. + } + List ptnDescs = new ArrayList<>(); + for (Map ptn : partitions) { + // convert each key-value-map to appropriate expression. + ExprNodeGenericFuncDesc expr = null; + for (Map.Entry kvp : ptn.entrySet()) { + String key = kvp.getKey(); + Object val = kvp.getValue(); + String type = table.getPartColByName(key).getType(); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); + ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); + ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( + "=", column, new ExprNodeConstantDesc(pti, val)); + expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); + } + if (expr != null) { + ptnDescs.add(expr); + } + } + if (ptnDescs.size() > 0) { + partSpecs.put(partPrefixLength, ptnDescs); + } + return partSpecs; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java new file mode 100644 index 0000000000..b82b1671d0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +class DropTableHandler extends AbstractMessageHandler { + @Override + public List> handle(Context withinContext) + throws SemanticException { + DropTableMessage dropTableMessage = + deserializer.getDropTableMessage(withinContext.dmd.getPayload()); + String actualDbName = + withinContext.isDbNameEmpty() ? dropTableMessage.getDB() : withinContext.dbName; + String actualTblName = + withinContext.isTableNameEmpty() ? dropTableMessage.getTable() : withinContext.tableName; + DropTableDesc dropTableDesc = new DropTableDesc( + actualDbName + "." + actualTblName, + null, true, true, + eventOnlyReplicationSpec(withinContext)); + Task dropTableTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, dropTableDesc), + withinContext.hiveConf + ); + List> tasks = new ArrayList<>(); + tasks.add(dropTableTask); + withinContext.log + .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); + databasesUpdated.put(actualDbName, withinContext.dmd.getEventTo()); + return tasks; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java new file mode 100644 index 0000000000..fa63169b7d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; +import java.util.List; + +class InsertHandler extends AbstractMessageHandler { + @Override + public List> handle(Context withinContext) + throws SemanticException { + InsertMessage insertMessage = deserializer.getInsertMessage(withinContext.dmd.getPayload()); + String actualDbName = + withinContext.isDbNameEmpty() ? insertMessage.getDB() : withinContext.dbName; + String actualTblName = + withinContext.isTableNameEmpty() ? insertMessage.getTable() : withinContext.tableName; + + Context currentContext = new Context(withinContext, actualDbName, actualTblName); + // Piggybacking in Import logic for now + TableHandler tableHandler = new TableHandler(); + List> tasks = tableHandler.handle(currentContext); + readEntitySet.addAll(tableHandler.readEntities()); + writeEntitySet.addAll(tableHandler.writeEntities()); + databasesUpdated.putAll(tableHandler.databasesUpdated); + tablesUpdated.putAll(tableHandler.tablesUpdated); + return tasks; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java new file mode 100644 index 0000000000..dc87b6213d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; + +public interface MessageHandler { + + List> handle(Context withinContext) throws SemanticException; + + Set readEntities(); + + Set writeEntities(); + + Map tablesUpdated(); + + Map databasesUpdated(); + + class Context { + final String dbName, tableName, location; + final Task precursor; + DumpMetaData dmd; + final HiveConf hiveConf; + final Hive db; + final org.apache.hadoop.hive.ql.Context nestedContext; + final Logger log; + + public Context(String dbName, String tableName, String location, + Task precursor, DumpMetaData dmd, HiveConf hiveConf, + Hive db, org.apache.hadoop.hive.ql.Context nestedContext, Logger log) { + this.dbName = dbName; + this.tableName = tableName; + this.location = location; + this.precursor = precursor; + this.dmd = dmd; + this.hiveConf = hiveConf; + this.db = db; + this.nestedContext = nestedContext; + this.log = log; + } + + public Context(Context other, String dbName, String tableName) { + this.dbName = dbName; + this.tableName = tableName; + this.location = other.location; + this.precursor = other.precursor; + this.dmd = other.dmd; + this.hiveConf = other.hiveConf; + this.db = other.db; + this.nestedContext = other.nestedContext; + this.log = other.log; + } + + boolean isTableNameEmpty() { + return StringUtils.isEmpty(tableName); + } + + boolean isDbNameEmpty() { + return StringUtils.isEmpty(dbName); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java new file mode 100644 index 0000000000..8e99a6104d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; + +public class MessageHandlerFactory { + private static Map> messageHandlers = new HashMap<>(); + + static { + register(DUMPTYPE.EVENT_CREATE_TABLE, TableHandler.class); + register(DUMPTYPE.EVENT_ADD_PARTITION, TableHandler.class); + register(DUMPTYPE.EVENT_ALTER_TABLE, TableHandler.class); + register(DUMPTYPE.EVENT_ALTER_PARTITION, TableHandler.class); + + register(DUMPTYPE.EVENT_DROP_TABLE, DropTableHandler.class); + register(DUMPTYPE.EVENT_DROP_PARTITION, DropPartitionHandler.class); + register(DUMPTYPE.EVENT_RENAME_TABLE, RenameTableHandler.class); + register(DUMPTYPE.EVENT_RENAME_PARTITION, RenamePartitionHandler.class); + register(DUMPTYPE.EVENT_INSERT, InsertHandler.class); + } + + private static void register(DUMPTYPE eventType, Class handlerClazz) { + try { + Constructor constructor = + handlerClazz.getDeclaredConstructor(); + assert constructor != null; + assert !Modifier.isPrivate(constructor.getModifiers()); + messageHandlers.put(eventType, handlerClazz); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName() + + " does not have the a constructor with only parameter of type:" + + NotificationEvent.class.getCanonicalName(), e); + } + } + + public static MessageHandler handlerFor(DUMPTYPE eventType) { + if (messageHandlers.containsKey(eventType)) { + Class handlerClazz = messageHandlers.get(eventType); + try { + Constructor constructor = + handlerClazz.getDeclaredConstructor(); + return constructor.newInstance(); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + // this should never happen. however we want to make sure we propagate the exception + throw new RuntimeException( + "failed when creating handler for " + eventType + + " with the responsible class being " + handlerClazz.getCanonicalName(), e); + } + } + return new DefaultHandler(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java new file mode 100644 index 0000000000..4dcbf0818b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class RenamePartitionHandler extends AbstractMessageHandler { + @Override + public List> handle(Context withinContext) + throws SemanticException { + + AlterPartitionMessage renamePtnMessage = + deserializer.getAlterPartitionMessage(withinContext.dmd.getPayload()); + String actualDbName = + withinContext.isDbNameEmpty() ? renamePtnMessage.getDB() : withinContext.dbName; + String actualTblName = + withinContext.isTableNameEmpty() ? renamePtnMessage.getTable() : withinContext.tableName; + + Map newPartSpec = new LinkedHashMap<>(); + Map oldPartSpec = new LinkedHashMap<>(); + String tableName = actualDbName + "." + actualTblName; + try { + Table tblObj = renamePtnMessage.getTableObj(); + Iterator beforeValIter = renamePtnMessage.getPtnObjBefore().getValuesIterator(); + Iterator afterValIter = renamePtnMessage.getPtnObjAfter().getValuesIterator(); + for (FieldSchema fs : tblObj.getPartitionKeys()) { + oldPartSpec.put(fs.getName(), beforeValIter.next()); + newPartSpec.put(fs.getName(), afterValIter.next()); + } + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + + RenamePartitionDesc renamePtnDesc = + new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec); + Task renamePtnTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), withinContext.hiveConf + ); + List> tasks = new ArrayList<>(); + tasks.add(renamePtnTask); + withinContext.log + .debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, + newPartSpec); + databasesUpdated.put(actualDbName, withinContext.dmd.getEventTo()); + tablesUpdated.put(tableName, withinContext.dmd.getEventTo()); + return tasks; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java new file mode 100644 index 0000000000..0f78478093 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +class RenameTableHandler extends AbstractMessageHandler { + @Override + public List> handle(Context withinContext) + throws SemanticException { + + AlterTableMessage renameTableMessage = + deserializer.getAlterTableMessage(withinContext.dmd.getPayload()); + if (withinContext.isTableNameEmpty()) { + throw new SemanticException( + "RENAMES of tables are not supported for table-level replication"); + } + try { + String oldDbName = renameTableMessage.getTableObjBefore().getDbName(); + String newDbName = renameTableMessage.getTableObjAfter().getDbName(); + + if (withinContext.isDbNameEmpty()) { + // If we're loading into a db, instead of into the warehouse, then the oldDbName and + // newDbName must be the same + if (!oldDbName.equalsIgnoreCase(newDbName)) { + throw new SemanticException("Cannot replicate an event renaming a table across" + + " databases into a db level load " + oldDbName + "->" + newDbName); + } else { + // both were the same, and can be replaced by the new db we're loading into. + oldDbName = withinContext.dbName; + newDbName = withinContext.dbName; + } + } + + String oldName = oldDbName + "." + renameTableMessage.getTableObjBefore().getTableName(); + String newName = newDbName + "." + renameTableMessage.getTableObjAfter().getTableName(); + AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false); + Task renameTableTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), withinContext.hiveConf + ); + List> tasks = new ArrayList<>(); + tasks.add(renameTableTask); + withinContext.log.debug( + "Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName + ); + // oldDbName and newDbName *will* be the same if we're here + databasesUpdated.put(newDbName, withinContext.dmd.getEventTo()); + tablesUpdated.remove(oldName); + tablesUpdated.put(newName, withinContext.dmd.getEventTo()); + // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated + // However, we explicitly don't support repl of that sort, and error out above if so. If that should + // ever change, this will need reworking. + return tasks; + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java new file mode 100644 index 0000000000..df791a29bd --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +class TableHandler extends AbstractMessageHandler { + @Override + public List> handle(Context context) throws SemanticException { + // Path being passed to us is a table dump location. We go ahead and load it in as needed. + // If tblName is null, then we default to the table name specified in _metadata, which is good. + // or are both specified, in which case, that's what we are intended to create the new table as. + if (context.dbName == null || context.dbName.isEmpty()) { + throw new SemanticException("Database name cannot be null for a table load"); + } + try { + // TODO: why not have the below variables as static / inline seems to have no possibility of updates back here + + // no location set on repl loads + boolean isLocationSet = false; + // all repl imports are non-external + boolean isExternalSet = false; + // bootstrap loads are not partition level + boolean isPartSpecSet = false; + // repl loads are not partition level + LinkedHashMap parsedPartSpec = null; + // no location for repl imports + String parsedLocation = null; + List> importTasks = new ArrayList<>(); + + EximUtil.SemanticAnalyzerWrapperContext x = + new EximUtil.SemanticAnalyzerWrapperContext( + context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log, + context.nestedContext); + ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, + (context.precursor != null), parsedLocation, context.tableName, context.dbName, + parsedPartSpec, context.location, x, + databasesUpdated, tablesUpdated); + + return importTasks; + } catch (Exception e) { + throw new SemanticException(e); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java similarity index 81% rename from ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java rename to ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java index 0526700a8e..8e77652a3e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java +++ ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java @@ -1,7 +1,10 @@ -package org.apache.hadoop.hive.ql.parse.repl.events; +package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.DefaultHandler; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; import org.junit.Test; import static org.junit.Assert.assertTrue;