diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 11af116..a6eede5 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,6 +34,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; @@ -145,7 +145,8 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { List fileWithChksumList = new ArrayList(); for (FileStatus file : fs.listStatus(loc, FileUtils.HIDDEN_FILES_PATH_FILTER)) { if (file.isFile()) { - fileWithChksumList.add(buildFileWithChksum(file.getPath(), fs)); + fileWithChksumList.add(new FileWithChksum(file.getPath().toString(), + ReplChangeManager.getChksumString(file.getPath(), fs))); } } @@ -208,7 +209,8 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio List fileWithChksumList = new ArrayList(); for (FileStatus file : fs.listStatus(loc, FileUtils.HIDDEN_FILES_PATH_FILTER)) { if (file.isFile()) { - fileWithChksumList.add(buildFileWithChksum(file.getPath(), fs)); + fileWithChksumList.add(new FileWithChksum(file.getPath().toString(), + ReplChangeManager.getChksumString(file.getPath(), fs))); } } fileWithChksumMap.put(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()), @@ -363,7 +365,7 @@ public void onInsert(InsertEvent insertEvent) throws MetaException { if (insertEvent.getFiles() != null) { for (int i=0;i{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); + + // Drop table after dump + run("DROP TABLE " + dbName + ".unptned_copy"); + // Drop partition after dump + run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')"); + + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + + Exception e = null; + try { + Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned"); + assertNull(tbl); + } catch (TException te) { + e = te; + } + assertNotNull(e); + assertEquals(NoSuchObjectException.class, e.getClass()); + + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + "_dupe.ptned"); + verifyResults(ptn_data_1); + + Exception e2 = null; + try { + Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2"); + assertNull(tbl); + } catch (TException te) { + e2 = te; + } + assertNotNull(e2); + assertEquals(NoSuchObjectException.class, e.getClass()); + + run("SELECT a from " + dbName + "_dupe.unptned_copy"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + "_dupe.ptned_copy"); + verifyResults(ptn_data_1); + } @Test public void testAlters() throws IOException { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 99cba9d..525d826 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.messaging.FileWithChksum; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -55,6 +56,7 @@ public static final String ORIG_LOC_TAG = "user.original-loc"; public static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; + public static final String URI_FRAGMENT_SEPARATOR = "#"; public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException { if (instance == null) { @@ -121,7 +123,7 @@ public int recycle(Path path, boolean ifPurge) throws MetaException { count += recycle(file.getPath(), ifPurge); } } else { - Path cmPath = getCMPath(path, hiveConf, getCksumString(path, hiveConf)); + Path cmPath = getCMPath(path, hiveConf, getChksumString(path, fs)); if (LOG.isDebugEnabled()) { LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); @@ -151,7 +153,11 @@ public int recycle(Path path, boolean ifPurge) throws MetaException { // Note we currently only track the last known trace as // xattr has limited capacity. We shall revisit and store all original // locations if orig-loc becomes important - fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + try { + fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for " + path.toString()); + } count++; } @@ -159,7 +165,11 @@ public int recycle(Path path, boolean ifPurge) throws MetaException { // If multiple files share the same content, then // any file claim remain in trash would be granted if (!ifPurge) { - fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); + try { + fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for " + cmPath.toString()); + } } } return count; @@ -169,16 +179,22 @@ public int recycle(Path path, boolean ifPurge) throws MetaException { } // Get checksum of a file - static public String getCksumString(Path path, Configuration conf) throws IOException { + static public String getChksumString(Path path, FileSystem fs) throws IOException { // TODO: fs checksum only available on hdfs, need to // find a solution for other fs (eg, local fs, s3, etc) - FileSystem fs = path.getFileSystem(conf); + String checksumString = null; FileChecksum checksum = fs.getFileChecksum(path); - String checksumString = StringUtils.byteToHexString( - checksum.getBytes(), 0, checksum.getLength()); + if (checksum != null) { + checksumString = StringUtils.byteToHexString( + checksum.getBytes(), 0, checksum.getLength()); + } return checksumString; } + static public void setCmRoot(Path cmRoot) { + ReplChangeManager.cmroot = cmRoot; + } + /*** * Convert a path of file inside a partition or table (if non-partitioned) * to a deterministic location of cmroot. So user can retrieve the file back @@ -205,6 +221,69 @@ static public Path getCMPath(Path path, Configuration conf, String chksum) return cmPath; } + /*** + * Get original file specified by src and chksumString. If the file exists and checksum + * matches, return the file; otherwise, use chksumString to retrieve it from cmroot + * @param src Original file location + * @param chksumString Checksum of the original file + * @param conf + * @return Corresponding FileStatus object + * @throws MetaException + */ + static public FileStatus getFileStatus(Path src, String chksumString, + HiveConf conf) throws MetaException { + try { + FileSystem srcFs = src.getFileSystem(conf); + if (chksumString == null) { + return srcFs.getFileStatus(src); + } + + if (!srcFs.exists(src)) { + return srcFs.getFileStatus(getCMPath(src, conf, chksumString)); + } + + String currentChksumString = getChksumString(src, srcFs); + if (currentChksumString == null || chksumString.equals(currentChksumString)) { + return srcFs.getFileStatus(src); + } else { + return srcFs.getFileStatus(getCMPath(src, conf, chksumString)); + } + } catch (IOException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } + } + + /*** + * Concatenate filename and checksum with "#" + * @param fileUriStr Filename string + * @param fileChecksum Checksum string + * @return Concatenated Uri string + */ + // TODO: this needs to be enhanced once change management based filesystem is implemented + // Currently using fileuri#checksum as the format + static public String encodeFileUri(String fileUriStr, String fileChecksum) { + if (fileChecksum != null) { + return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum; + } else { + return fileUriStr; + } + } + + /*** + * Split uri with fragment into file uri and checksum + * @param fileURIStr uri with fragment + * @return FileWithChksum with original uri and checksum + */ + static public FileWithChksum getFileWithChksumFromURI(String fileURIStr) { + String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); + FileWithChksum result = new FileWithChksum(); + result.setFile(uriAndFragment[0]); + if (uriAndFragment.length>1) { + result.setChksum(uriAndFragment[1]); + } + return result; + } + /** * Thread to clear old files of cmroot recursively */ @@ -231,24 +310,28 @@ public void run() { for (FileStatus file : files) { long modifiedTime = file.getModificationTime(); if (now - modifiedTime > secRetain*1000) { - if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { - boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf); - if (succ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Move " + file.toString() + " to trash"); + try { + if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { + boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Move " + file.toString() + " to trash"); + } + } else { + LOG.warn("Fail to move " + file.toString() + " to trash"); } } else { - LOG.warn("Fail to move " + file.toString() + " to trash"); - } - } else { - boolean succ = fs.delete(file.getPath(), false); - if (succ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Remove " + file.toString()); + boolean succ = fs.delete(file.getPath(), false); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove " + file.toString()); + } + } else { + LOG.warn("Fail to remove " + file.toString()); } - } else { - LOG.warn("Fail to remove " + file.toString()); } + } catch (UnsupportedOperationException e) { + LOG.warn("Error getting xattr for " + file.getPath().toString()); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index e6b943b..b489c77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.messaging.FileWithChksum; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.CopyWork; @@ -126,15 +129,16 @@ protected int execute(DriverContext driverContext) { for (FileStatus oneSrc : srcFiles) { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("Copying file: " + oneSrc.getPath().toString()); + + FileSystem actualSrcFs = null; + if (rwork.getReadListFromInput()){ + // TODO : filesystemcache prevents this from being a perf nightmare, but we + // should still probably follow up to see if we need to do something better here. + actualSrcFs = oneSrc.getPath().getFileSystem(conf); + } else { + actualSrcFs = srcFs; + } if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){ - FileSystem actualSrcFs = null; - if (rwork.getReadListFromInput()){ - // TODO : filesystemcache prevents this from being a perf nightmare, but we - // should still probably follow up to see if we need to do something better here. - actualSrcFs = oneSrc.getPath().getFileSystem(conf); - } else { - actualSrcFs = srcFs; - } LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath, @@ -148,7 +152,9 @@ protected int execute(DriverContext driverContext) { }else{ LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri()); console.printInfo("Tracking file: " + oneSrc.getPath().toUri()); - listBW.write(oneSrc.getPath().toUri().toString() + "\n"); + String chksumString = ReplChangeManager.getChksumString(oneSrc.getPath(), actualSrcFs); + listBW.write(ReplChangeManager.encodeFileUri + (oneSrc.getPath().toUri().toString(), chksumString) + "\n"); } } @@ -183,12 +189,16 @@ protected int execute(DriverContext driverContext) { String line = null; while ( (line = br.readLine()) != null){ LOG.debug("ReplCopyTask :_filesReadLine:" + line); - String fileUriStr = EximUtil.getCMDecodedFileName(line); - // TODO HIVE-15490: Add checksum validation here - Path p = new Path(fileUriStr); - // TODO: again, fs cache should make this okay, but if not, revisit - FileSystem srcFs = p.getFileSystem(conf); - ret.add(srcFs.getFileStatus(p)); + + FileWithChksum fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); + try { + FileStatus f = ReplChangeManager.getFileStatus(new Path(fileWithChksum.getFile()), + fileWithChksum.getChksum(), conf); + ret.add(f); + } catch (MetaException e) { + // skip and issue warning for missing file + LOG.warn("Cannot find " + fileWithChksum.getFile() + " in source repo or cmroot"); + } // Note - we need srcFs rather than fs, because it is possible that the _files lists files // which are from a different filesystem than the fs where the _files file itself was loaded // from. Currently, it is possible, for eg., to do REPL LOAD hdfs:///dir/ and for the _files diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 34e53d2..796ccc8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -78,7 +77,6 @@ public static final String METADATA_NAME = "_metadata"; public static final String FILES_NAME = "_files"; public static final String DATA_PATH_NAME = "data"; - public static final String URI_FRAGMENT_SEPARATOR = "#"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); @@ -574,19 +572,4 @@ public boolean accept(Path p) { }; } - public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) { - // The checksum is set as the fragment portion of the file uri - return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum; - } - - public static String getCMDecodedFileName(String encodedFileURIStr) { - String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR); - return uriAndFragment[0]; - } - - public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) { - // TODO: Implement this as part of HIVE-15490 - return null; - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 46a9006..bb28c4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -43,7 +44,6 @@ import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -140,22 +140,24 @@ public String toString(){ private final Path dumpRoot; private final Path dumpFile; + private Path cmRoot; public DumpMetaData(Path dumpRoot) { this.dumpRoot = dumpRoot; dumpFile = new Path(dumpRoot, DUMPMETADATA); } - public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){ + public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){ this(dumpRoot); - setDump(lvl, eventFrom, eventTo); + setDump(lvl, eventFrom, eventTo, cmRoot); } - public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){ + public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){ this.dumpType = lvl; this.eventFrom = eventFrom; this.eventTo = eventTo; this.initialized = true; + this.cmRoot = cmRoot; } public void loadDumpFromFile() throws SemanticException { @@ -165,9 +167,11 @@ public void loadDumpFromFile() throws SemanticException { BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line = null; if ( (line = br.readLine()) != null){ - String[] lineContents = line.split("\t", 4); - setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2])); - setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]); + String[] lineContents = line.split("\t", 5); + setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]), + new Path(lineContents[3])); + setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]); + ReplChangeManager.setCmRoot(cmRoot); } else { throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString()); } @@ -200,6 +204,14 @@ public Long getEventTo() throws SemanticException { return eventTo; } + public Path getCmRoot() { + return cmRoot; + } + + public void setCmRoot(Path cmRoot) { + this.cmRoot = cmRoot; + } + public Path getDumpFilePath() { return dumpFile; } @@ -216,7 +228,8 @@ private void initializeIfNot() throws SemanticException { } public void write() throws SemanticException { - writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), payload), dumpFile); + writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), + cmRoot.toString(), payload), dumpFile); } } @@ -300,6 +313,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); Path dumpRoot = new Path(replRoot, getNextDumpDir()); DumpMetaData dmd = new DumpMetaData(dumpRoot); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; try { if (eventFrom == null){ @@ -337,7 +351,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) } LOG.info("Consolidation done, preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); - dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId); + dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); // Set the correct last repl id to return to the user @@ -372,12 +386,12 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { while (evIter.hasNext()){ NotificationEvent ev = evIter.next(); Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); - dumpEvent(ev, evRoot); + dumpEvent(ev, evRoot, cmRoot); } LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo); writeOutput(Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), dmd.getDumpFilePath()); - dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo); + dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo, cmRoot); dmd.write(); // Set the correct last repl id to return to the user lastReplId = eventTo; @@ -391,7 +405,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } - private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception { long evid = ev.getEventId(); String evidStr = String.valueOf(evid); ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr); @@ -427,14 +441,14 @@ private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { new OutputStreamWriter(fs.create(filesPath))); try { for (FileWithChksum file : files) { - fileListWriter.write(encodeFileUri(file.getFile(), file.getChksum()) + "\n"); + fileListWriter.write(ReplChangeManager.encodeFileUri(file.getFile(), file.getChksum()) + "\n"); } } finally { fileListWriter.close(); } } - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid)).write(); + (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid, cmRoot)).write(); break; } case MessageFactory.ADD_PARTITION_EVENT : { @@ -493,7 +507,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition new OutputStreamWriter(fs.create(filesPath))); try { for (FileWithChksum file : files) { - fileListWriter.write(encodeFileUri(file.getFile(), file.getChksum()) + "\n"); + fileListWriter.write(ReplChangeManager.encodeFileUri(file.getFile(), file.getChksum()) + "\n"); } } finally { fileListWriter.close(); @@ -501,19 +515,19 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition } } - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid)).write(); + (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid, cmRoot)).write(); break; } case MessageFactory.DROP_TABLE_EVENT : { LOG.info("Processing#{} DROP_TABLE message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; } case MessageFactory.DROP_PARTITION_EVENT : { LOG.info("Processing#{} DROP_PARTITION message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -537,12 +551,12 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition null, replicationSpec); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); } else { // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); } @@ -579,13 +593,13 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition qlMdTable, qlPtns, replicationSpec); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; } else { // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -615,7 +629,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition try { for (FileWithChksum file : files) { - fileListWriter.write(encodeFileUri(file.getFile(), file.getChksum()) + "\n"); + fileListWriter.write(ReplChangeManager.encodeFileUri(file.getFile(), file.getChksum()) + "\n"); } } finally { fileListWriter.close(); @@ -623,7 +637,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition } LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -631,7 +645,7 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition // TODO : handle other event types default: LOG.info("Dummy processing#{} message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -1287,14 +1301,4 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(String evState) throws Se return db.getDatabasesByPattern(dbPattern); } } - - // TODO: this needs to be enhanced once change management based filesystem is implemented - // Currently using fileuri#checksum as the format - private String encodeFileUri(String fileUriStr, String fileChecksum) { - if (fileChecksum != null) { - return fileUriStr + "#" + fileChecksum; - } else { - return fileUriStr; - } - } }