diff --git a/data/conf/hive-site-old.xml b/data/conf/hive-site-old.xml
index 4e6ff16..c0aee8a 100644
--- a/data/conf/hive-site-old.xml
+++ b/data/conf/hive-site-old.xml
@@ -157,7 +157,7 @@
fs.pfile.impl
- org.apache.hadoop.fs.ProxyLocalFileSystem
+ org.apache.hadoop.fs.PfileProxyLocalFileSystem
A proxy for local file system used for cross file system testing
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index b09c159..3d9c1db 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -172,7 +172,7 @@
fs.pfile.impl
- org.apache.hadoop.fs.ProxyLocalFileSystem
+ org.apache.hadoop.fs.PfileProxyLocalFileSystem
A proxy for local file system used for cross file system testing
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index 05ab6ee..4c042a2 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -175,7 +175,7 @@
fs.pfile.impl
- org.apache.hadoop.fs.ProxyLocalFileSystem
+ org.apache.hadoop.fs.PfileProxyLocalFileSystem
A proxy for local file system used for cross file system testing
diff --git a/data/conf/perf-reg/hive-site.xml b/data/conf/perf-reg/hive-site.xml
index 012369f..ec6df28 100644
--- a/data/conf/perf-reg/hive-site.xml
+++ b/data/conf/perf-reg/hive-site.xml
@@ -174,7 +174,7 @@
fs.pfile.impl
- org.apache.hadoop.fs.ProxyLocalFileSystem
+ org.apache.hadoop.fs.PfileProxyLocalFileSystem
A proxy for local file system used for cross file system testing
diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml
index 989e65e..b58e504 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -145,7 +145,7 @@
fs.pfile.impl
- org.apache.hadoop.fs.ProxyLocalFileSystem
+ org.apache.hadoop.fs.PfileProxyLocalFileSystem
A proxy for local file system used for cross file system testing
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index 4e63245..201efd0 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -145,7 +145,7 @@
fs.pfile.impl
- org.apache.hadoop.fs.ProxyLocalFileSystem
+ org.apache.hadoop.fs.PfileProxyLocalFileSystem
A proxy for local file system used for cross file system testing
diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml
index dbff10c..0d631f2 100644
--- a/data/conf/tez/hive-site.xml
+++ b/data/conf/tez/hive-site.xml
@@ -169,7 +169,7 @@
fs.pfile.impl
- org.apache.hadoop.fs.ProxyLocalFileSystem
+ org.apache.hadoop.fs.PfileProxyLocalFileSystem
A proxy for local file system used for cross file system testing
diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java
index 1823f2e..5dd0122 100644
--- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java
+++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitionPublish.java
@@ -90,7 +90,7 @@ public static void setup() throws Exception {
File workDir = handleWorkDir();
conf.set("yarn.scheduler.capacity.root.queues", "default");
conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
- conf.set("fs.pfile.impl", "org.apache.hadoop.fs.ProxyLocalFileSystem");
+ conf.set("fs.pfile.impl", "org.apache.hadoop.fs.PfileProxyLocalFileSystem");
fs = FileSystem.get(conf);
System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 95db9e8..9b7014b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -286,7 +286,8 @@ private String getResult(int rowNum, int colNum, boolean reuse) throws IOExcepti
throw new RuntimeException(e);
}
}
- return (lastResults.get(rowNum).split("\\001"))[colNum];
+ // Split around the 'tab' character
+ return (lastResults.get(rowNum).split("\\t"))[colNum];
}
private void verifyResults(String[] data) throws IOException {
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index c84570b..3d4057b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -28,10 +28,12 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.net.URI;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
@@ -1286,4 +1288,37 @@ private void checkForNotExist(ResultSet res) throws Exception {
}
assertTrue("Rows returned from describe function", numRows > 0);
}
+
+ @Test
+ public void testReplDumpResultSet() throws Exception {
+ String tid =
+ TestJdbcWithMiniHS2.class.getCanonicalName().toLowerCase().replace('.', '_') + "_"
+ + System.currentTimeMillis();
+ String testPathName = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
+ Path testPath = new Path(testPathName);
+ FileSystem fs = testPath.getFileSystem(new HiveConf());
+ Statement stmt = conDefault.createStatement();
+ try {
+ stmt.execute("set hive.repl.rootdir = " + testPathName);
+ ResultSet rs = stmt.executeQuery("repl dump " + testDbName);
+ ResultSetMetaData rsMeta = rs.getMetaData();
+ assertEquals(2, rsMeta.getColumnCount());
+ int numRows = 0;
+ while (rs.next()) {
+ numRows++;
+ URI uri = new URI(rs.getString(1));
+ int notificationId = rs.getInt(2);
+ assertNotNull(uri);
+ assertEquals(testPath.toUri().getScheme(), uri.getScheme());
+ assertEquals(testPath.toUri().getAuthority(), uri.getAuthority());
+ // In test setup, we append '/next' to hive.repl.rootdir and use that as the dump location
+ assertEquals(testPath.toUri().getPath() + "/next", uri.getPath());
+ assertNotNull(notificationId);
+ }
+ assertEquals(1, numRows);
+ } finally {
+ // Clean up
+ fs.delete(testPath, true);
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 7b63c52..3e749eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
import org.antlr.runtime.tree.Tree;
@@ -53,10 +54,12 @@
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -67,16 +70,20 @@
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +115,7 @@
* back and set it once we actually start running the query.
*/
protected Set acidFileSinks = new HashSet();
-
+
// whether any ACID table is involved in a query
protected boolean acidInQuery;
@@ -756,7 +763,7 @@ protected static void processForeignKeys(
String[] qualifiedTabName = getQualifiedTableName((ASTNode) parent.getChild(0));
// The ANTLR grammar looks like :
// 1. KW_CONSTRAINT idfr=identifier KW_FOREIGN KW_KEY fkCols=columnParenthesesList
- // KW_REFERENCES tabName=tableName parCols=columnParenthesesList
+ // KW_REFERENCES tabName=tableName parCols=columnParenthesesList
// enableSpec=enableSpecification validateSpec=validateSpecification relySpec=relySpecification
// -> ^(TOK_FOREIGN_KEY $idfr $fkCols $tabName $parCols $relySpec $enableSpec $validateSpec)
// when the user specifies the constraint name (i.e. child.getChildCount() == 11)
@@ -1324,7 +1331,7 @@ public QueryProperties getQueryProperties() {
public Set getAcidFileSinks() {
return acidFileSinks;
}
-
+
public boolean hasAcidInQuery() {
return acidInQuery;
}
@@ -1744,7 +1751,29 @@ protected String toMessage(ErrorMsg message, Object detail) {
public HashSet getAllOutputs() {
return outputs;
}
+
public QueryState getQueryState() {
return queryState;
}
+
+ /**
+ * Create a FetchTask for a given schema.
+ *
+ * @param schema string
+ */
+ protected FetchTask createFetchTask(String schema) {
+ Properties prop = new Properties();
+ // Sets delimiter to tab (ascii 9)
+ prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, Integer.toString(Utilities.tabCode));
+ prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " ");
+ String[] colTypes = schema.split("#");
+ prop.setProperty("columns", colTypes[0]);
+ prop.setProperty("columns.types", colTypes[1]);
+ prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName());
+ FetchWork fetch =
+ new FetchWork(ctx.getResFile(), new TableDesc(TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class, prop), -1);
+ fetch.setSerializationNullFormat(" ");
+ return (FetchTask) TaskFactory.get(fetch, conf);
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index c7389a8..3f58130 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -48,7 +48,6 @@
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -59,7 +58,6 @@
import org.apache.hadoop.hive.ql.index.HiveIndex;
import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
-import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -72,7 +70,6 @@
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.PKInfo;
import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils;
import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactory;
import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactoryImpl;
@@ -104,7 +101,6 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDefaultDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -139,7 +135,6 @@
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
@@ -149,7 +144,6 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.FileNotFoundException;
@@ -169,7 +163,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DATABASELOCATION;
@@ -1938,27 +1931,6 @@ static public String getColPath(
}
- /**
- * Create a FetchTask for a given thrift ddl schema.
- *
- * @param schema
- * thrift ddl
- */
- private FetchTask createFetchTask(String schema) {
- Properties prop = new Properties();
-
- prop.setProperty(serdeConstants.SERIALIZATION_FORMAT, "9");
- prop.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, " ");
- String[] colTypes = schema.split("#");
- prop.setProperty("columns", colTypes[0]);
- prop.setProperty("columns.types", colTypes[1]);
- prop.setProperty(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName());
- FetchWork fetch = new FetchWork(ctx.getResFile(), new TableDesc(
- TextInputFormat.class,IgnoreKeyTextOutputFormat.class, prop), -1);
- fetch.setSerializationNullFormat(" ");
- return (FetchTask) TaskFactory.get(fetch, conf);
- }
-
private void validateDatabase(String databaseName) throws SemanticException {
try {
if (!db.databaseExists(databaseName)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index a0d492d..6e9602f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -154,40 +154,36 @@ static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticExceptio
String scheme = uri.getScheme();
String authority = uri.getAuthority();
String path = uri.getPath();
+ FileSystem fs = FileSystem.get(uri, conf);
+
LOG.info("Path before norm :" + path);
// generate absolute path relative to home directory
if (!path.startsWith("/")) {
if (testMode) {
- path = (new Path(System.getProperty("test.tmp.dir"),
- path)).toUri().getPath();
- } else {
- path = (new Path(new Path("/user/" + System.getProperty("user.name")),
- path)).toUri().getPath();
- }
- }
- // set correct scheme and authority
- if (StringUtils.isEmpty(scheme)) {
- if (testMode) {
- scheme = "pfile";
+ path = (new Path(System.getProperty("test.tmp.dir"), path)).toUri().getPath();
} else {
- scheme = "hdfs";
+ path =
+ (new Path(new Path("/user/" + System.getProperty("user.name")), path)).toUri()
+ .getPath();
}
}
- // if scheme is specified but not authority then use the default
- // authority
+ // Get scheme from FileSystem
+ scheme = fs.getScheme();
+
+ // if scheme is specified but not authority then use the default authority
if (StringUtils.isEmpty(authority)) {
URI defaultURI = FileSystem.get(conf).getUri();
authority = defaultURI.getAuthority();
}
LOG.info("Scheme:" + scheme + ", authority:" + authority + ", path:" + path);
- Collection eximSchemes = conf.getStringCollection(
- HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname);
+ Collection eximSchemes =
+ conf.getStringCollection(HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname);
if (!eximSchemes.contains(scheme)) {
throw new SemanticException(
- ErrorMsg.INVALID_PATH.getMsg(
- "only the following file systems accepted for export/import : "
+ ErrorMsg.INVALID_PATH
+ .getMsg("only the following file systems accepted for export/import : "
+ conf.get(HiveConf.ConfVars.HIVE_EXIM_URI_SCHEME_WL.varname)));
}
@@ -197,7 +193,7 @@ static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticExceptio
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
}
} catch (IOException e) {
- throw new SemanticException(ErrorMsg.IO_ERROR.getMsg(), e);
+ throw new SemanticException(ErrorMsg.IO_ERROR.getMsg() + ": " + e.getMessage(), e);
}
}
@@ -210,29 +206,31 @@ static void validateTable(org.apache.hadoop.hive.ql.metadata.Table table) throws
}
}
- public static String relativeToAbsolutePath(HiveConf conf, String location) throws SemanticException {
- boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
- if (testMode) {
- URI uri = new Path(location).toUri();
- String scheme = uri.getScheme();
- String authority = uri.getAuthority();
- String path = uri.getPath();
- if (!path.startsWith("/")) {
- path = (new Path(System.getProperty("test.tmp.dir"),
- path)).toUri().getPath();
- }
- if (StringUtils.isEmpty(scheme)) {
- scheme = "pfile";
- }
- try {
- uri = new URI(scheme, authority, path, null, null);
- } catch (URISyntaxException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ public static String relativeToAbsolutePath(HiveConf conf, String location)
+ throws SemanticException {
+ try {
+ boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
+ if (testMode) {
+ URI uri = new Path(location).toUri();
+ FileSystem fs = FileSystem.get(uri, conf);
+ String scheme = fs.getScheme();
+ String authority = uri.getAuthority();
+ String path = uri.getPath();
+ if (!path.startsWith("/")) {
+ path = (new Path(System.getProperty("test.tmp.dir"), path)).toUri().getPath();
+ }
+ try {
+ uri = new URI(scheme, authority, path, null, null);
+ } catch (URISyntaxException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ }
+ return uri.toString();
+ } else {
+ // no-op for non-test mode for now
+ return location;
}
- return uri.toString();
- } else {
- //no-op for non-test mode for now
- return location;
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.IO_ERROR.getMsg() + ": " + e.getMessage(), e);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 8007c4e..6fff98d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -74,6 +74,7 @@
private String path;
private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
+ private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
@@ -154,6 +155,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException {
String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
Path dumpRoot = new Path(replRoot, getNextDumpDir());
Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata");
+ String lastReplId;
try {
if (eventFrom == null){
// bootstrap case
@@ -192,8 +194,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException {
// FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
}
LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId);
- prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), bootDumpEndReplId),
- "dump_dir,last_repl_id#string,string");
+ // Set the correct last repl id to return to the user
+ lastReplId = bootDumpEndReplId;
} else {
// get list of events matching dbPattern & tblPattern
// go through each event, and dump out each event to a event-level dump dir inside dumproot
@@ -231,9 +233,11 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException {
LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo);
List vals;
writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)), dumpMetadata);
- prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(eventTo)),
- "dump_dir,last_repl_id#string,string");
+ // Set the correct last repl id to return to the user
+ lastReplId = String.valueOf(eventTo);
}
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), lastReplId), dumpSchema);
+ setFetchTask(createFetchTask(dumpSchema));
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
LOG.warn("Error during analyzeReplDump",e);
@@ -681,14 +685,7 @@ private void prepareReturnValues(List values, String schema) throws Sema
for (String s : values) {
LOG.debug(" > " + s);
}
-
ctx.setResFile(ctx.getLocalTmpPath());
- // FIXME : this should not accessible by the user if we write to it from the frontend.
- // Thus, we should Desc/Work this, otherwise there is a security issue here.
- // Note: if we don't call ctx.setResFile, we get a NPE from the following code section
- // If we do call it, then FetchWork thinks that the "table" here winds up thinking that
- // this is a partitioned dir, which does not work. Thus, this does not work.
-
writeOutput(values,ctx.getResFile());
}
@@ -700,16 +697,14 @@ private void writeOutput(List values, Path outputFile) throws SemanticEx
outStream = fs.create(outputFile);
outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
for (int i = 1; i < values.size(); i++) {
- outStream.write(Utilities.ctrlaCode);
+ outStream.write(Utilities.tabCode);
outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
}
outStream.write(Utilities.newLineCode);
} catch (IOException e) {
- throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
- // codes
+ throw new SemanticException(e);
} finally {
- IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask -
- // replace with this
+ IOUtils.closeStream(outStream);
}
}
diff --git a/ql/src/test/results/clientnegative/authorization_import.q.out b/ql/src/test/results/clientnegative/authorization_import.q.out
index 9972a8a..30a2be3 100644
--- a/ql/src/test/results/clientnegative/authorization_import.q.out
+++ b/ql/src/test/results/clientnegative/authorization_import.q.out
@@ -45,4 +45,4 @@ PREHOOK: query: set role public
PREHOOK: type: SHOW_ROLES
POSTHOOK: query: set role public
POSTHOOK: type: SHOW_ROLES
-FAILED: HiveAccessControlException Permission denied: Principal [name=hive_test_user, type=USER] does not have following privileges for operation IMPORT [[OBJECT OWNERSHIP] on Object [type=DATABASE, name=importer]]
+#### A masked pattern was here ####
diff --git a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
index 0caa42a..dbcf6f4 100644
--- a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
+++ b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
@@ -19,4 +19,4 @@ POSTHOOK: type: LOAD
#### A masked pattern was here ####
POSTHOOK: Output: default@exim_department
#### A masked pattern was here ####
-FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile,file
+FAILED: SemanticException [Error 10320]: Error while peforming IO operation : No FileSystem for scheme: nosuchschema
diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/PfileProxyLocalFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/PfileProxyLocalFileSystem.java
new file mode 100644
index 0000000..80a93b4
--- /dev/null
+++ b/shims/common/src/main/java/org/apache/hadoop/fs/PfileProxyLocalFileSystem.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+/****************************************************************
+ * A Proxy for LocalFileSystem
+ *
+ * Serves uri's corresponding to 'pfile:///' namespace with using
+ * a LocalFileSystem
+ *****************************************************************/
+
+public class PfileProxyLocalFileSystem extends FilterFileSystem {
+
+ protected LocalFileSystem localFs;
+
+ /**
+ * URI Scheme for pfile://namenode/ URIs.
+ */
+ public static final String PFILE_URI_SCHEME = "pfile";
+
+ public PfileProxyLocalFileSystem() {
+ localFs = new LocalFileSystem();
+ }
+
+ public PfileProxyLocalFileSystem(FileSystem fs) {
+ throw new RuntimeException ("Unsupported Constructor");
+ }
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ // create a proxy for the local filesystem
+ // the scheme/authority serving as the proxy is derived
+ // from the supplied URI
+ String scheme = name.getScheme();
+ String nameUriString = name.toString();
+ if (Shell.WINDOWS) {
+ // Replace the encoded backward slash with forward slash
+ // Remove the windows drive letter
+ nameUriString = nameUriString.replaceAll("%5C", "/")
+ .replaceFirst("/[c-zC-Z]:", "/")
+ .replaceFirst("^[c-zC-Z]:", "");
+ name = URI.create(nameUriString);
+ }
+
+ String authority = name.getAuthority() != null ? name.getAuthority() : "";
+ String proxyUriString = nameUriString + "://" + authority + "/";
+
+ fs = ShimLoader.getHadoopShims().createProxyFileSystem(
+ localFs, URI.create(proxyUriString));
+
+ fs.initialize(name, conf);
+ }
+
+ @Override
+ public String getScheme() {
+ return PFILE_URI_SCHEME;
+ }
+}
diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
deleted file mode 100644
index 228a972..0000000
--- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.shims.HadoopShims;
-
-/****************************************************************
- * A Proxy for LocalFileSystem
- *
- * Serves uri's corresponding to 'pfile:///' namespace with using
- * a LocalFileSystem
- *****************************************************************/
-
-public class ProxyLocalFileSystem extends FilterFileSystem {
-
- protected LocalFileSystem localFs;
-
- public ProxyLocalFileSystem() {
- localFs = new LocalFileSystem();
- }
-
- public ProxyLocalFileSystem(FileSystem fs) {
- throw new RuntimeException ("Unsupported Constructor");
- }
-
- @Override
- public void initialize(URI name, Configuration conf) throws IOException {
- // create a proxy for the local filesystem
- // the scheme/authority serving as the proxy is derived
- // from the supplied URI
- String scheme = name.getScheme();
- String nameUriString = name.toString();
- if (Shell.WINDOWS) {
- // Replace the encoded backward slash with forward slash
- // Remove the windows drive letter
- nameUriString = nameUriString.replaceAll("%5C", "/")
- .replaceFirst("/[c-zC-Z]:", "/")
- .replaceFirst("^[c-zC-Z]:", "");
- name = URI.create(nameUriString);
- }
-
- String authority = name.getAuthority() != null ? name.getAuthority() : "";
- String proxyUriString = nameUriString + "://" + authority + "/";
-
- fs = ShimLoader.getHadoopShims().createProxyFileSystem(
- localFs, URI.create(proxyUriString));
-
- fs.initialize(name, conf);
- }
-}