diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml index 9acf12c..b3b9d5e 100644 --- a/hcatalog/webhcat/java-client/pom.xml +++ b/hcatalog/webhcat/java-client/pom.xml @@ -46,6 +46,11 @@ ${project.version} + org.apache.hive.hcatalog + hive-hcatalog-server-extensions + ${project.version} + + org.apache.hive hive-exec ${project.version} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java index 65e8bc7..e3806c9 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java @@ -18,6 +18,7 @@ */ package org.apache.hive.hcatalog.api; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; @@ -467,6 +469,23 @@ public abstract void cancelDelegationToken(String tokenStrForm) */ public abstract String getMessageBusTopicName(String dbName, String tableName) throws HCatException; + + /** + * Get an iterator that iterates over a list of replication tasks needed to replicate all the + * events that have taken place for a given db/table. + * @param lastEventId : The last event id that was processed for this reader. The returned + * replication tasks will start from this point forward + * @param maxEvents : Maximum number of events to consider for generating the + * replication tasks. If < 1, then all available events will be considered. + * @param dbName : The database name for which we're interested in the events for. + * @param tableName : The table name for which we're interested in the events for - if null, + * then this function will behave as if it were running at a db level. + * @return an iterator over a list of replication events that can be processed one by one. + * @throws HCatException + */ + public abstract Iterator getReplicationTasks( + long lastEventId, int maxEvents, String dbName, String tableName) throws HCatException; + /** * Get a list of notifications * @param lastEventId The last event id that was consumed by this reader. The returned diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java index 8cb1961..54c3073 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java @@ -21,9 +21,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; +import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; @@ -63,6 +65,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hive.hcatalog.api.repl.HCatReplicationTaskIterator; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; @@ -72,6 +76,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + /** * The HCatClientHMSImpl is the Hive Metastore client based implementation of * HCatClient. @@ -965,18 +971,27 @@ public String getMessageBusTopicName(String dbName, String tableName) throws HCa } @Override + public Iterator getReplicationTasks( + long lastEventId, int maxEvents, String dbName, String tableName) throws HCatException { + return new HCatReplicationTaskIterator(this,lastEventId,maxEvents,dbName,tableName); + } + + @Override public List getNextNotification(long lastEventId, int maxEvents, IMetaStoreClient.NotificationFilter filter) throws HCatException { try { - List events = new ArrayList(); NotificationEventResponse rsp = hmsClient.getNextNotification(lastEventId, maxEvents, filter); if (rsp != null && rsp.getEvents() != null) { - for (NotificationEvent event : rsp.getEvents()) { - events.add(new HCatNotificationEvent(event)); - } + return Lists.transform(rsp.getEvents(), new Function() { + @Override + public HCatNotificationEvent apply(@Nullable NotificationEvent notificationEvent) { + return new HCatNotificationEvent(notificationEvent); + } + }); + } else { + return new ArrayList(); } - return events; } catch (TException e) { throw new ConnectionFailureException("TException while getting notifications", e); } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java index 9205c56..2f830fd 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java @@ -32,6 +32,8 @@ private String tableName; private String message; + public enum Scope { DB, TABLE, UNKNOWN }; + HCatNotificationEvent(NotificationEvent event) { eventId = event.getEventId(); eventTime = event.getEventTime(); @@ -45,6 +47,20 @@ public long getEventId() { return eventId; } + public Scope getEventScope() { + // Eventually, we want this to be a richer description of having + // a DB, TABLE, ROLE, etc scope. For now, we have a trivial impl + // of having only DB and TABLE scopes, as determined by whether + // or not the tableName is null. + if (dbName != null){ + if (tableName != null){ + return Scope.TABLE; + } + return Scope.DB; + } + return Scope.UNKNOWN; + } + public int getEventTime() { return eventTime; } diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java new file mode 100644 index 0000000..c479ca3 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/Command.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl; + +import org.apache.hadoop.io.Writable; + +import java.util.List; + +/** + * Interface that abstracts the notion of one atomic command to execute. + * If the command does not execute and raises some exception, then Command + * provides a conditional to check if the operation is intended to be + * retriable - i.e. whether the command is considered idempotent. If it is, + * then the user could attempt to redo the particular command they were + * running. If not, then they can check another conditional to check + * if their action is undo-able. If undoable, then they can then attempt + * to undo the action by asking the command how to undo it. If not, they + * can then in turn act upon the exception in whatever manner they see + * fit (typically by raising an error). + * + * We also have two more methods that help cleanup of temporary locations + * used by this Command. cleanupLocationsPerRetry() provides a list of + * directories that are intended to be cleaned up every time this Command + * needs to be retried. cleanupLocationsAfterEvent() provides a list of + * directories that should be cleaned up after the event for which this + * Command is generated is successfully processed. + */ +public interface Command extends Writable { + List get(); + boolean isRetriable(); + boolean isUndoable(); + List getUndo(); + List cleanupLocationsPerRetry(); + List cleanupLocationsAfterEvent(); + long getEventId(); +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java new file mode 100644 index 0000000..3b8e2ae --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/HCatReplicationTaskIterator.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.common.HCatException; + +import java.util.Iterator; + +public class HCatReplicationTaskIterator implements Iterator{ + private Iterator notifIter = null; + + private class HCatReplicationTaskIteratorNotificationFilter implements IMetaStoreClient.NotificationFilter { + + private String dbName; + private String tableName; + public HCatReplicationTaskIteratorNotificationFilter(String dbName, String tableName){ + this.dbName = dbName; + this.tableName = tableName; + } + @Override + public boolean accept(NotificationEvent event) { + if (event == null){ + return false; // get rid of trivial case first, so that we can safely assume non-null + } + if (this.dbName == null){ + return true; // if our dbName is null, we're interested in all wh events + } + if (this.dbName.equalsIgnoreCase(event.getDbName())){ + if ( + (this.tableName == null) + // if our dbName is equal, but tableName is blank, we're interested in this db-level event + || (this.tableName.equalsIgnoreCase(event.getTableName())) + // table level event that matches us + ){ + return true; + } + } + return false; + } + } + + public HCatReplicationTaskIterator( + HCatClient hcatClient, long eventFrom, int maxEvents, String dbName, String tableName) throws HCatException { + + init(hcatClient,eventFrom,maxEvents, new HCatReplicationTaskIteratorNotificationFilter(dbName,tableName)); + } + + public HCatReplicationTaskIterator( + HCatClient hcatClient, long eventFrom, int maxEvents, + IMetaStoreClient.NotificationFilter filter) throws HCatException{ + init(hcatClient,eventFrom,maxEvents,filter); + } + private void init(HCatClient hcatClient, long eventFrom, int maxEvents, IMetaStoreClient.NotificationFilter filter) throws HCatException { + // Simple implementation for now, this will later expand to do DAG evaluation. + this.notifIter = hcatClient.getNextNotification(eventFrom, maxEvents,filter).iterator(); + } + + @Override + public boolean hasNext() { + return notifIter.hasNext(); + } + + @Override + public ReplicationTask next() { + return ReplicationTask.create(notifIter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove() not supported on HCatReplicationTaskIterator"); + } + + + +} + + + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java new file mode 100644 index 0000000..00f6d4e --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/NoopReplicationTask.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl; + +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.commands.NoopCommand; + +import java.util.ArrayList; +import java.util.List; + +/** + * This class is there to help testing, and to help initial development + * and will be the default Replication Task for under-development replication + * tasks to override. + * + * This is not intended to be a permanent class, and will likely move to the test + * package after initial implementation. + */ + +public class NoopReplicationTask extends ReplicationTask { + + List noopReturn = null; + + public NoopReplicationTask(HCatNotificationEvent event) { + super(event); + noopReturn = new ArrayList(); + noopReturn.add(new NoopCommand(event.getEventId())); + } + + @Override + public boolean needsStagingDirs() { + return false; + } + + @Override + public boolean isActionable(){ + return true; + } + + /** + * Returns a list of commands to send to a hive driver on the source warehouse + * @return a list of commands to send to a hive driver on the source warehouse + */ + @Override + public Iterable getSrcWhCommands() { + verifyActionable(); + return noopReturn; + } + + /** + * Returns a list of commands to send to a hive driver on the dest warehouse + * @return a list of commands to send to a hive driver on the dest warehouse + */ + @Override + public Iterable getDstWhCommands() { + verifyActionable(); + return noopReturn; + } + +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java new file mode 100644 index 0000000..075a3f9 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationTask.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl; + +import com.google.common.base.Function; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.MessageFactory; + + +/** + * ReplicationTask captures the concept of what it'd take to replicate changes from + * one warehouse to another given a notification event that captures what changed. + */ +public abstract class ReplicationTask { + protected HCatNotificationEvent event; + protected StagingDirectoryProvider srcStagingDirProvider = null; + protected StagingDirectoryProvider dstStagingDirProvider = null; + protected Function tableNameMapping = null; + protected Function dbNameMapping = null; + + protected static MessageFactory messageFactory = MessageFactory.getInstance(); + + public interface Factory { + public ReplicationTask create(HCatNotificationEvent event); + } + + /** + * Dummy NoopFactory for testing, returns a NoopReplicationTask for all recognized events. + * Warning : this will eventually go away or move to the test section - it's intended only + * for integration testing purposes. + */ + public static class NoopFactory implements Factory { + @Override + public ReplicationTask create(HCatNotificationEvent event) { + // TODO : Java 1.7+ support using String with switches, but IDEs don't all seem to know that. + // If casing is fine for now. But we should eventually remove this. Also, I didn't want to + // create another enum just for this. + String eventType = event.getEventType(); + if (eventType.equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_CREATE_TABLE_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_ALTER_TABLE_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_ALTER_PARTITION_EVENT)) { + return new NoopReplicationTask(event); + } else if (eventType.equals(HCatConstants.HCAT_INSERT_EVENT)) { + return new NoopReplicationTask(event); + } else { + throw new IllegalStateException("Unrecognized Event type, no replication task available"); + } + } + } + + private static Factory factoryInstance = null; + private static Factory getFactoryInstance() { + if (factoryInstance == null){ + // TODO: Eventually, we'll have a bit here that looks at a config param to instantiate + // the appropriate factory, with EXIMFactory being the default - that allows + // others to implement their own ReplicationTask.Factory for other replication + // implementations. + // That addition will be brought in by the EXIMFactory patch. + factoryInstance = new NoopFactory(); + } + return factoryInstance; + } + + /** + * Factory method to return appropriate subtype of ReplicationTask for given event + * @param event HCatEventMessage returned by the notification subsystem + * @return corresponding ReplicationTask + */ + public static ReplicationTask create(HCatNotificationEvent event){ + if (event == null){ + throw new IllegalArgumentException("event should not be null"); + } + return getFactoryInstance().create(event); + } + + // Primary entry point is a factory method instead of ctor + // to allow for future ctor mutabulity in design + protected ReplicationTask(HCatNotificationEvent event) { + this.event = event; + } + + /** + * Returns the event that this ReplicationTask is attempting to replicate + * @return underlying event + */ + public HCatNotificationEvent getEvent(){ + return this.event; + } + + /** + * Returns true if the replication task in question needs to create staging + * directories to complete its operation. This will mean that you will need + * to copy these directories over to the destination warehouse for each + * source-destination warehouse pair. + * If this is true, you will need to call .withSrcStagingDirProvider(...) + * and .withDstStagingDirProvider(...) before this ReplicationTask is usable + */ + public abstract boolean needsStagingDirs(); + + /** + * Returns true if this ReplicationTask is prepared with all info it needs, and is + * ready to be used + */ + public boolean isActionable(){ + if (! this.needsStagingDirs()) { + return true; + } + if ((srcStagingDirProvider != null) && (dstStagingDirProvider != null)){ + return true; + } + return false; + } + + /** + * See {@link org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider} + * @param srcStagingDirProvider Staging Directory Provider for the source warehouse + * @return this + */ + public ReplicationTask withSrcStagingDirProvider(StagingDirectoryProvider srcStagingDirProvider){ + this.srcStagingDirProvider = srcStagingDirProvider; + return this; + } + + /** + * See {@link org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider} + * @param dstStagingDirProvider Staging Directory Provider for the destination warehouse + * @return this replication task + */ + public ReplicationTask withDstStagingDirProvider(StagingDirectoryProvider dstStagingDirProvider){ + this.dstStagingDirProvider = dstStagingDirProvider; + return this; + } + + /** + * Allows a user to specify a table name mapping, where the the function provided maps the name of + * the table in the source warehouse to the name of the table in the dest warehouse. It is expected + * that if the mapping does not exist, and it throws an IllegalArgumentException, we will use the + * same name in the destination as in the source. + * + * If you want to use a Map mapping instead of a Function, + * simply call this function as .withTableNameMapping(com.google.common.base.Functions.forMap(tableMap)) + * @param tableNameMapping + * @return this replication task + */ + public ReplicationTask withTableNameMapping(Function tableNameMapping){ + this.tableNameMapping = tableNameMapping; + return this; + } + + /** + * Allows a user to specify a db name mapping, where the the function provided maps the name of + * the db in the source warehouse to the name of the db in the dest warehouse. It is expected + * that if the mapping does not exist, and it throws an IllegalArgumentException, we will use the + * same name in the destination as in the source. + * + * If you want to use a Map mapping instead of a Function, + * simply call this function as .withDb(com.google.common.base.Functions.forMap(dbMap)) + * @param dbNameMapping + * @return this replication task + */ + public ReplicationTask withDbNameMapping(Function dbNameMapping){ + this.dbNameMapping = dbNameMapping; + return this; + } + + protected void verifyActionable() { + if (!this.isActionable()){ + throw new IllegalStateException("actionable command on task called when ReplicationTask is still not actionable."); + } + } + + /** + * Returns a Iterable to send to a hive driver on the source warehouse + * + * If you *need* a List instead, you can use guava's + * ImmutableList.copyOf(iterable) or Lists.newArrayList(iterable) to + * get the underlying list, but this defeats the purpose of making this + * interface an Iterable rather than a List, since it is very likely + * that the number of Commands returned here will cause your process + * to run OOM. + */ + abstract public Iterable getSrcWhCommands(); + + /** + * Returns a Iterable to send to a hive driver on the source warehouse + * + * If you *need* a List instead, you can use guava's + * ImmutableList.copyOf(iterable) or Lists.newArrayList(iterable) to + * get the underlying list, but this defeats the purpose of making this + * interface an Iterable rather than a List, since it is very likely + * that the number of Commands returned here will cause your process + * to run OOM. + */ + abstract public Iterable getDstWhCommands(); + + protected void validateEventType(HCatNotificationEvent event, String allowedEventType) { + if (event == null || !allowedEventType.equals(event.getEventType())){ + throw new IllegalStateException(this.getClass().getName() + " valid only for " + + allowedEventType + " events."); + } + } +} + diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java new file mode 100644 index 0000000..15b125d --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOExceptionWithCause; +import org.apache.hive.hcatalog.api.HCatDatabase; +import org.apache.hive.hcatalog.api.HCatPartition; +import org.apache.hive.hcatalog.api.HCatTable; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +public class ReplicationUtils { + + private final static String REPL_STATE_ID = "repl.last.id"; // TODO : define in ReplicationSpec, and point this to that once that's patched in. + + private ReplicationUtils(){ + // dummy private constructor, since this class is a collection of static utility methods. + } + + /** + * Gets the last known replication state of this db. This is + * applicable only if it is the destination of a replication + * and has had data replicated into it via imports previously. + * Defaults to 0. + */ + public static long getLastReplicationId(HCatDatabase db){ + Map props = db.getProperties(); + if (props != null){ + if (props.containsKey(REPL_STATE_ID)){ + return Long.parseLong(props.get(REPL_STATE_ID)); + } + } + return 0l; // default is to return earliest possible state. + } + + + /** + * Gets the last known replication state of the provided table. This + * is applicable only if it is the destination of a replication + * and has had data replicated into it via imports previously. + * Defaults to 0. + */ + public static long getLastReplicationId(HCatTable tbl) { + Map tblProps = tbl.getTblProps(); + if (tblProps != null){ + if (tblProps.containsKey(REPL_STATE_ID)){ + return Long.parseLong(tblProps.get(REPL_STATE_ID)); + } + } + return 0l; // default is to return earliest possible state. + } + + /** + * Gets the last known replication state of the provided partition. + * This is applicable only if it is the destination of a replication + * and has had data replicated into it via imports previously. + * If that is not available, but parent table is provided, + * defaults to parent table's replication state. If that is also + * unknown, defaults to 0. + */ + public static long getLastReplicationId(HCatPartition ptn, @Nullable HCatTable parentTable) { + Map parameters = ptn.getParameters(); + if (parameters != null){ + if (parameters.containsKey(REPL_STATE_ID)){ + return Long.parseLong(parameters.get(REPL_STATE_ID)); + } + } + + if (parentTable != null){ + return getLastReplicationId(parentTable); + } + return 0l; // default is to return earliest possible state. + } + + /** + * Used to generate a unique key for a combination of given event id, dbname, + * tablename and partition keyvalues. This is used to feed in a name for creating + * staging directories for exports and imports. This should be idempotent given + * the same values, i.e. hashcode-like, but at the same time, be guaranteed to be + * different for every possible partition, while being "readable-ish". Basically, + * we concat the alphanumberic versions of all of the above, along with a hashcode + * of the db, tablename and ptn key-value pairs + */ + public static String getUniqueKey(long eventId, String db, String table, Map ptnDesc) { + StringBuilder sb = new StringBuilder(); + sb.append(eventId); + sb.append('.'); + sb.append(toStringWordCharsOnly(db)); + sb.append('.'); + sb.append(toStringWordCharsOnly(table)); + sb.append('.'); + sb.append(toStringWordCharsOnly(ptnDesc)); + sb.append('.'); + sb.append(Objects.hashCode(db, table, ptnDesc)); + return sb.toString(); + } + + /** + * Return alphanumeric(and '_') representation of a Map + * + */ + private static String toStringWordCharsOnly(Map map) { + if (map == null){ + return "null"; + } + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (Map.Entry e : map.entrySet()){ + if (!first){ + sb.append(','); + } + sb.append(toStringWordCharsOnly(e.getKey())); + sb.append('='); + sb.append(toStringWordCharsOnly(e.getValue())); + first = false; + } + return sb.toString(); + } + + /** + * Return alphanumeric(and '_') chars only of a string, lowercased + */ + public static String toStringWordCharsOnly(String s){ + return (s == null) ? "null" : s.replaceAll("[\\W]", "").toLowerCase(); + } + + /** + * Return a mapping from a given map function if available, and the key itself if not. + */ + public static String mapIfMapAvailable(String s, Function mapping){ + try { + return mapping.apply(s); + } catch (IllegalArgumentException iae){ + // The key wasn't present in the mapping, return the key itself, since no mapping was available + return s; + } + } + + public static String partitionDescriptor(Map ptnDesc) { + StringBuilder sb = new StringBuilder(); + if ((ptnDesc != null) && (!ptnDesc.isEmpty())){ + boolean first = true; + sb.append(" PARTITION ("); + for (Map.Entry e : ptnDesc.entrySet()){ + if (!first){ + sb.append(", "); + } else { + first = false; + } + sb.append(e.getKey()); // TODO : verify if any quoting is needed for keys + sb.append('='); + sb.append('"'); + sb.append(e.getValue()); // TODO : verify if any escaping is needed for values + sb.append('"'); + } + sb.append(')'); + } + return sb.toString(); + } + + /** + * Command implements Writable, but that's not terribly easy to use compared + * to String, even if it plugs in easily into the rest of Hadoop. Provide + * utility methods to easily serialize and deserialize Commands + * + * serializeCommand returns a base64 String representation of given command + */ + public static String serializeCommand(Command command) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(baos); + ReaderWriter.writeDatum(dataOutput,command.getClass().getName()); + command.write(dataOutput); + return Base64.encodeBase64URLSafeString(baos.toByteArray()); + } + + /** + * Command implements Writable, but that's not terribly easy to use compared + * to String, even if it plugs in easily into the rest of Hadoop. Provide + * utility methods to easily serialize and deserialize Commands + * + * deserializeCommand instantiates a concrete Command and initializes it, + * given a base64 String representation of it. + */ + public static Command deserializeCommand(String s) throws IOException { + DataInput dataInput = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(s))); + String clazz = (String) ReaderWriter.readDatum(dataInput); + Command cmd; + try { + cmd = (Command)Class.forName(clazz).newInstance(); + } catch (Exception e) { + throw new IOExceptionWithCause("Error instantiating class "+clazz,e); + } + cmd.readFields(dataInput); + return cmd; + } + +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java new file mode 100644 index 0000000..20ec1e3 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/StagingDirectoryProvider.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.api.repl; + +/** + * Interface for a client to provide a Staging Directory specification + */ +public interface StagingDirectoryProvider { + + /** + * Return a temporary staging directory for a given key + * @param key key for the directory, usually a name of a partition + * Note that when overriding this method, no guarantees are made about the + * contents of the key, other than that is unique per partition. + * @return A parth specification to use as a temporary staging directory + */ + String getStagingDirectory(String key); + + /** + * Trivial implementation of this interface - creates + */ + public class TrivialImpl implements StagingDirectoryProvider { + + String prefix = null; + + /** + * Trivial implementation of StagingDirectoryProvider which takes a temporary directory + * and creates directories inside that for each key. Note that this is intended as a + * trivial implementation, and if any further "advanced" behaviour is desired, + * it is better that the user roll their own. + * + * @param base temp directory inside which other tmp dirs are created + * @param separator path separator. Usually should be "/" + */ + public TrivialImpl(String base,String separator){ + this.prefix = base + separator; + } + + @Override + public String getStagingDirectory(String key) { + return prefix + key; + } + } +} diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java new file mode 100644 index 0000000..348b391 --- /dev/null +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/commands/NoopCommand.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.api.repl.commands; + + +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.data.ReaderWriter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This class is there to help testing, and to help initial development + * and will be the default Command for NoopReplicationTask + * + * This is not intended to be a permanent class, and will likely move to the test + * package after initial implementation. + */ + +public class NoopCommand implements Command { + private long eventId; + + public NoopCommand(){ + // trivial ctor to support Writable reflections instantiation + // do not expect to use this object as-is, unless you call + // readFields after using this ctor + } + + public NoopCommand(long eventId){ + this.eventId = eventId; + } + + @Override + public List get() { + return new ArrayList(); + } + + @Override + public boolean isRetriable() { + return true; + } + + @Override + public boolean isUndoable() { + return true; + } + + @Override + public List getUndo() { + return new ArrayList(); + } + + @Override + public List cleanupLocationsPerRetry() { + return new ArrayList(); + } + + @Override + public List cleanupLocationsAfterEvent() { + return new ArrayList(); + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + ReaderWriter.writeDatum(dataOutput, Long.valueOf(eventId)); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + eventId = ((Long)ReaderWriter.readDatum(dataInput)).longValue(); + } +} + diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index 321a86d..845c4cc 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -18,18 +18,24 @@ */ package org.apache.hive.hcatalog.api; +import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.ql.WindowsPathUtil; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -42,12 +48,17 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema.Type; import org.apache.hive.hcatalog.NoExitSecurityManager; +import org.apache.hive.hcatalog.listener.DbNotificationListener; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -63,6 +74,8 @@ import org.apache.hadoop.util.Shell; +import javax.annotation.Nullable; + public class TestHCatClient { private static final Logger LOG = LoggerFactory.getLogger(TestHCatClient.class); private static final String msPort = "20101"; @@ -113,6 +126,8 @@ public static void startMetaStoreServer() throws Exception { WindowsPathUtil.convertPathsFromWindowsToHdfs(hcatConf); } + System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, + DbNotificationListener.class.getName()); // turn on db notification listener on metastore Thread t = new Thread(new RunMS(msPort)); t.start(); Thread.sleep(10000); @@ -793,6 +808,113 @@ private void startReplicationTargetMetaStoreIfRequired() throws Exception { } /** + * Test for event-based replication scenario + * + * Does not test if replication actually happened, merely tests if we're able to consume a repl task + * iter appropriately, calling all the functions expected of the interface, without errors. + */ + @Test + public void testReplicationTaskIter() throws Exception { + + HCatClient sourceMetastore = HCatClient.create(new Configuration(hcatConf)); + + List notifs = sourceMetastore.getNextNotification( + 0, 0, new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + return true; + } + }); + for(HCatNotificationEvent n : notifs){ + LOG.info("notif from dblistener:" + n.getEventId() + + ":" + n.getEventTime() + ",t:" + n.getEventType() + ",o:" + n.getDbName() + "." + n.getTableName()); + } + + Iterator taskIter = sourceMetastore.getReplicationTasks(0, 0, "mydb", null); + while(taskIter.hasNext()){ + ReplicationTask task = taskIter.next(); + HCatNotificationEvent n = task.getEvent(); + LOG.info("notif from tasks:" + n.getEventId() + + ":" + n.getEventTime() + ",t:" + n.getEventType() + ",o:" + n.getDbName() + "." + n.getTableName() + + ",s:" + n.getEventScope()); + LOG.info("task :" + task.getClass().getName()); + if (task.needsStagingDirs()){ + StagingDirectoryProvider provider = new StagingDirectoryProvider() { + @Override + public String getStagingDirectory(String key) { + LOG.info("getStagingDirectory(" + key + ") called!"); + return "/tmp/" + key.replaceAll(" ","_"); + } + }; + task + .withSrcStagingDirProvider(provider) + .withDstStagingDirProvider(provider); + } + if (task.isActionable()){ + LOG.info("task was actionable!"); + Function commandDebugPrinter = new Function() { + @Override + public String apply(@Nullable Command cmd) { + StringBuilder sb = new StringBuilder(); + String serializedCmd = null; + try { + serializedCmd = ReplicationUtils.serializeCommand(cmd); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + sb.append("SERIALIZED:"+serializedCmd+"\n"); + Command command = null; + try { + command = ReplicationUtils.deserializeCommand(serializedCmd); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + sb.append("CMD:[" + command.getClass().getName() + "]\n"); + sb.append("EVENTID:[" +command.getEventId()+"]\n"); + for (String s : command.get()) { + sb.append("CMD:" + s); + sb.append("\n"); + } + sb.append("Retriable:" + command.isRetriable() + "\n"); + sb.append("Undoable:" + command.isUndoable() + "\n"); + if (command.isUndoable()) { + for (String s : command.getUndo()) { + sb.append("UNDO:" + s); + sb.append("\n"); + } + } + List locns = command.cleanupLocationsPerRetry(); + sb.append("cleanupLocationsPerRetry entries :" + locns.size()); + for (String s : locns){ + sb.append("RETRY_CLEANUP:"+s); + sb.append("\n"); + } + locns = command.cleanupLocationsAfterEvent(); + sb.append("cleanupLocationsAfterEvent entries :" + locns.size()); + for (String s : locns){ + sb.append("AFTER_EVENT_CLEANUP:"+s); + sb.append("\n"); + } + return sb.toString(); + } + }; + LOG.info("On src:"); + for (String s : Iterables.transform(task.getSrcWhCommands(), commandDebugPrinter)){ + System.err.print(s); + } + LOG.info("On dest:"); + for (String s : Iterables.transform(task.getDstWhCommands(), commandDebugPrinter)){ + System.err.print(s); + } + } else { + LOG.info("task was not actionable."); + } + } + } + + /** * Test for detecting schema-changes for an HCatalog table, across 2 different HCat instances. * A table is created with the same schema on 2 HCat instances. The table-schema is modified on the source HCat * instance (columns, I/O formats, SerDe definitions, etc.). The table metadata is compared between source