diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java index fe747df..d311a0a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -50,6 +50,13 @@ protected InsertMessage() { */ public abstract List getFiles(); + /** + * Get the list of fileChecksums for the files created by this DML operation. May be null. + * + * @return List of fileChecksums, or null. + */ + public abstract List getFileChecksums(); + @Override public EventMessage checkValid() { if (getTable() == null) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index bd9f9ec..e7d8ab9 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -112,6 +112,7 @@ public Long getTimestamp() { return timestamp; } + @Override public List getFileChecksums() { return fileChecksums; } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 9954902..7b2b24e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -373,4 +373,17 @@ public static Index getIndexObj(ObjectNode jsonTree, String indexObjKey) throws } return hashMap; } + + public static String getDBName(ObjectNode jsonTree) { + return jsonTree.get("db").asText(); + } + + public static String getTableName(ObjectNode jsonTree) { + return jsonTree.get("table").asText(); + } + + public static LinkedHashMap getPartitionSpec(ObjectNode jsonTree) { + return JSONMessageFactory.getAsMap((ObjectNode) jsonTree.get("partKeyVals"), + new LinkedHashMap()); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 4c0f817..1768c65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.exec; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.ReplCopyWork; @@ -27,7 +27,6 @@ import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Serializable; import java.util.ArrayList; @@ -113,7 +112,7 @@ protected int execute(DriverContext driverContext) { BufferedWriter listBW = null; if (rwork.getListFilesOnOutputBehaviour()){ - Path listPath = new Path(toPath,"_files"); + Path listPath = new Path(toPath,EximUtil.FILES_NAME); LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString()); if (dstFs.exists(listPath)){ console.printError("Cannot make target _files file:" + listPath.toString()); @@ -169,7 +168,7 @@ protected int execute(DriverContext driverContext) { private List filesInFileListing(FileSystem fs, Path path) throws IOException { - Path fileListing = new Path(path, "_files"); + Path fileListing = new Path(path, EximUtil.FILES_NAME); LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri()); if (! fs.exists(fileListing)){ LOG.debug("ReplCopyTask : _files does not exist"); @@ -184,8 +183,10 @@ protected int execute(DriverContext driverContext) { String line = null; while ( (line = br.readLine()) != null){ LOG.debug("ReplCopyTask :_filesReadLine:" + line); - Path p = new Path(line); - FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit + String fileUriStr = EximUtil.getCMDecodedFileName(line); + Path p = new Path(fileUriStr); + // TODO : again, fs cache should make this okay, but if not, revisit + FileSystem srcFs = p.getFileSystem(conf); ret.add(srcFs.getFileStatus(p)); // Note - we need srcFs rather than fs, because it is possible that the _files lists files // which are from a different filesystem than the fs where the _files file itself was loaded diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 6e9602f..fca7a9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Task; @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -50,6 +51,7 @@ import org.json.JSONObject; import javax.annotation.Nullable; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -73,7 +75,10 @@ */ public class EximUtil { - public static final String METADATA_NAME="_metadata"; + public static final String METADATA_NAME = "_metadata"; + public static final String FILES_NAME = "_files"; + public static final String DATA_PATH_NAME = "data"; + public static final String URI_FRAGMENT_SEPARATOR = "#"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); @@ -278,9 +283,6 @@ public static void createExportDump(FileSystem fs, Path metadataPath, if (replicationSpec == null){ replicationSpec = new ReplicationSpec(); // instantiate default values if not specified } - if (tableHandle == null){ - replicationSpec.setNoop(true); - } OutputStream out = fs.create(metadataPath); JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out); @@ -351,10 +353,6 @@ public static void createExportDump(FileSystem fs, Path metadataPath, jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close. } - private static void write(OutputStream out, String s) throws IOException { - out.write(s.getBytes("UTF-8")); - } - /** * Utility class to help return complex value from readMetaData function */ @@ -571,4 +569,20 @@ public boolean accept(Path p) { } }; } + + public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) { + // The checksum is set as the fragment portion of the file uri + return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum; + } + + public static String getCMDecodedFileName(String encodedFileURIStr) { + String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR); + return uriAndFragment[0]; + } + + public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) { + // TODO: Implement this as part of HIVE-15490 + return null; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index f61274b..08bad63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -170,7 +170,7 @@ public static void prepareExport( partitions = null; } - Path path = new Path(ctx.getLocalTmpPath(), "_metadata"); + Path path = new Path(ctx.getLocalTmpPath(), EximUtil.METADATA_NAME); EximUtil.createExportDump( FileSystem.getLocal(conf), path, @@ -202,7 +202,7 @@ public static void prepareExport( } } else { Path fromPath = ts.tableHandle.getDataLocation(); - Path toDataPath = new Path(parentPath, "data"); + Path toDataPath = new Path(parentPath, EximUtil.DATA_PATH_NAME); Task rTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf); rootTasks.add(rTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 5561e06..8c5cac2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -344,7 +344,7 @@ private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) { - Path dataPath = new Path(fromURI.toString(), "data"); + Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, @@ -777,7 +777,7 @@ private static void createRegularImportTasks( x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) { x.getLOG().debug("Importing in place, no emptiness check, no copying/loading"); - Path dataPath = new Path(fromURI.toString(), "data"); + Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); tblDesc.setLocation(dataPath.toString()); } else { Path tablePath = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 9b83407..64fb988 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -20,6 +20,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -36,8 +37,10 @@ import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; @@ -61,13 +64,18 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.ObjectNode; import javax.annotation.Nullable; + import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; @@ -108,6 +116,7 @@ EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), + EVENT_INSERT("EVENT_INSERT"), EVENT_UNKNOWN("EVENT_UNKNOWN"); String type = null; @@ -559,7 +568,49 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition dmd.write(); break; } - + } + case MessageFactory.INSERT_EVENT: { + String tblName = JSONMessageFactory.getTableName(JSONMessageFactory.getJsonTree(ev)); + Table qlMdTable = db.getTable(tblName); + LinkedHashMap partSpec = + JSONMessageFactory.getPartitionSpec(JSONMessageFactory.getJsonTree(ev)); + List qlPtns = null; + if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { + qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false)); + } + Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable, qlPtns, + replicationSpec); + Path dataPath = new Path(evRoot, EximUtil.DATA_PATH_NAME); + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + FileSystem fs = dataPath.getFileSystem(conf); + BufferedWriter fileListWriter = + new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + try { + // TODO: HIVE-15205: move this metadata generation to a task + // Parse the message field + ObjectNode jsonTree = JSONMessageFactory.getJsonTree(ev); + // Get files inserted + List files = + JSONMessageFactory + .getAsList((ArrayNode) jsonTree.get("files"), new ArrayList()); + // TODO: HIVE-15490: not using checksums for now + List fileChecksums = + JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("fileChecksums"), + new ArrayList()); + // Write encoded filename + for (int i = 0; i < files.size(); i++) { + String encodedFileName = EximUtil.getCMEncodedFileName(files.get(i), fileChecksums.get(i)); + fileListWriter.write(encodedFileName + "\n"); + } + } finally { + fileListWriter.close(); + } + LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; } // TODO : handle other event types default: @@ -957,6 +1008,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); return tasks; } + case EVENT_INSERT: { + md = MessageFactory.getInstance().getDeserializer(); + InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); + // Piggybacking in Import logic for now + return analyzeTableLoad(insertMessage.getDB(), insertMessage.getTable(), locn, precursor); + } case EVENT_UNKNOWN: { break; }