diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 2688f35c95..9fa2e2e66b 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -34,14 +34,14 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.util.Shell; import org.apache.thrift.TException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,17 +61,22 @@ public class TestReplicationScenarios { - final static String DBNOTIF_LISTENER_CLASSNAME = "org.apache.hive.hcatalog.listener.DbNotificationListener"; + @Rule + public final TestName testName = new TestName(); + + private final static String DBNOTIF_LISTENER_CLASSNAME = + "org.apache.hive.hcatalog.listener.DbNotificationListener"; // FIXME : replace with hive copy once that is copied final static String tid = TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis(); - final static String TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid; + private final static String TEST_PATH = + System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid; - static HiveConf hconf; - static boolean useExternalMS = false; - static int msPort; - static Driver driver; - static HiveMetaStoreClient metaStoreClient; + private static HiveConf hconf; + private static boolean useExternalMS = false; + private static int msPort; + private static Driver driver; + private static HiveMetaStoreClient metaStoreClient; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private ArrayList lastResults; @@ -140,6 +145,32 @@ private synchronized void advanceDumpDir() { ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next)); } + @Test + public void testFunctionReplicationAsPartOfBootstrap() throws IOException { + String dbName = createDB(testName.getMethodName()); + run("CREATE FUNCTION " + dbName + + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " + + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + + String replicatedDbName = loadAndVerify(dbName); + run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'"); + verifyResults(new String[] { replicatedDbName + ".testFunction" }); + } + + private String loadAndVerify(String dbName) throws IOException { + advanceDumpDir(); + run("REPL DUMP " + dbName); + String dumpLocation = getResult(0, 0); + String lastReplicationId = getResult(0, 1, true); + String replicatedDbName = dbName + "_replicated"; + run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + printOutput(); + run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId); + return replicatedDbName; + } + + /** * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned. * Inserts data into one of the ptned tables, and one of the unptned tables, @@ -148,12 +179,8 @@ private synchronized void advanceDumpDir() { */ @Test public void testBasic() throws IOException { - - String testName = "basic"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); @@ -164,9 +191,9 @@ public void testBasic() throws IOException { String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -181,31 +208,19 @@ public void testBasic() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); - advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - - verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId); + String replicatedDbName = loadAndVerify(dbName); - verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2); + verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data); + verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1); + verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2); verifyRun("SELECT a from " + dbName + ".ptned_empty", empty); verifyRun("SELECT * from " + dbName + ".unptned_empty", empty); } @Test public void testBasicWithCM() throws Exception { - - String testName = "basic_with_cm"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); @@ -217,10 +232,10 @@ public void testBasicWithCM() throws Exception { String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); - String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); + String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -281,11 +296,8 @@ public void testBasicWithCM() throws Exception { @Test public void testIncrementalAdds() throws IOException { - String testName = "incrementalAdds"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); @@ -304,9 +316,9 @@ public void testIncrementalAdds() throws IOException { String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -368,11 +380,8 @@ public void testIncrementalAdds() throws IOException { @Test public void testDrops() throws IOException { - String testName = "drops"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); @@ -383,9 +392,9 @@ public void testDrops() throws IOException { String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -482,10 +491,7 @@ public void testDrops() throws IOException { public void testDropsWithCM() throws IOException { String testName = "drops_with_cm"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); @@ -608,10 +614,7 @@ public void testDropsWithCM() throws IOException { public void testAlters() throws IOException { String testName = "alters"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); @@ -793,10 +796,7 @@ public void testAlters() throws IOException { @Test public void testIncrementalLoad() throws IOException { String testName = "incrementalLoad"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); @@ -878,10 +878,7 @@ public void testIncrementalLoad() throws IOException { @Test public void testIncrementalInserts() throws IOException { String testName = "incrementalInserts"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); advanceDumpDir(); @@ -934,10 +931,7 @@ public void testIncrementalInserts() throws IOException { @Test public void testViewsReplication() throws IOException { String testName = "viewsReplication"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); @@ -1014,11 +1008,8 @@ public void testViewsReplication() throws IOException { @Test public void testDumpLimit() throws IOException { - String testName = "dumpLimit"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); advanceDumpDir(); @@ -1099,11 +1090,8 @@ public void testStatus() throws IOException { // Now, to actually testing status - first, we bootstrap. - String testName = "incrementalStatus"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); advanceDumpDir(); run("REPL DUMP " + dbName); String lastReplDumpLocn = getResult(0, 0); @@ -1158,6 +1146,13 @@ public void testStatus() throws IOException { } + private static String createDB(String name) { + LOG.info("Testing " + name); + String dbName = name + "_" + tid; + run("CREATE DATABASE " + dbName); + return dbName; + } + @Test public void testEventFilters(){ // Test testing that the filters introduced by EventUtils are working correctly. @@ -1318,13 +1313,18 @@ private String getResult(int rowNum, int colNum, boolean reuse) throws IOExcepti return (lastResults.get(rowNum).split("\\t"))[colNum]; } + /** + * All the results that are read from the hive output will not preserve + * case sensitivity and will all be in lower case, hence we will check against + * only lower case data values. + */ private void verifyResults(String[] data) throws IOException { List results = getOutput(); LOG.info("Expecting {}",data); LOG.info("Got {}",results); assertEquals(data.length,results.size()); for (int i = 0; i < data.length; i++){ - assertEquals(data[i],results.get(i)); + assertEquals(data[i].toLowerCase(), results.get(i)); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 1ea51824eb..750e71ed39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -18,43 +18,36 @@ package org.apache.hadoop.hive.ql.parse; -import com.google.common.base.Function; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.parse.repl.dump.DBSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.ReplicationSpecSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.TableSerializer; -import org.apache.thrift.TDeserializer; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.MetadataJson; import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.json.JSONArray; import org.json.JSONException; -import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -270,124 +263,30 @@ public static void createExportDump(FileSystem fs, Path metadataPath, } } - /** - * Utility class to help return complex value from readMetaData function - */ - public static class ReadMetaData { - private final Database db; - private final Table table; - private final Iterable partitions; - private final ReplicationSpec replicationSpec; - - public ReadMetaData(){ - this(null,null,null,new ReplicationSpec()); - } - public ReadMetaData(Database db, Table table, Iterable partitions, ReplicationSpec replicationSpec){ - this.db = db; - this.table = table; - this.partitions = partitions; - this.replicationSpec = replicationSpec; - } - - public Database getDatabase(){ - return db; - } - - public Table getTable() { - return table; - } - - public Iterable getPartitions() { - return partitions; - } - - public ReplicationSpec getReplicationSpec() { - return replicationSpec; - } - }; - - public static ReadMetaData readMetaData(FileSystem fs, Path metadataPath) + static MetaData readMetaData(FileSystem fs, Path metadataPath) throws IOException, SemanticException { - FSDataInputStream mdstream = null; + String message = readAsString(fs, metadataPath); try { - mdstream = fs.open(metadataPath); + return new MetadataJson(message).getMetaData(); + } catch (TException | JSONException e) { + throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e); + } + } + + private static String readAsString(final FileSystem fs, final Path fromMetadataPath) + throws IOException { + try (FSDataInputStream stream = fs.open(fromMetadataPath)) { byte[] buffer = new byte[1024]; ByteArrayOutputStream sb = new ByteArrayOutputStream(); - int read = mdstream.read(buffer); + int read = stream.read(buffer); while (read != -1) { sb.write(buffer, 0, read); - read = mdstream.read(buffer); - } - String md = new String(sb.toByteArray(), "UTF-8"); - JSONObject jsonContainer = new JSONObject(md); - String version = jsonContainer.getString("version"); - String fcversion = getJSONStringEntry(jsonContainer, "fcversion"); - checkCompatibility(version, fcversion); - - String dbDesc = getJSONStringEntry(jsonContainer, "db"); - String tableDesc = getJSONStringEntry(jsonContainer,"table"); - TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); - - Database db = null; - if (dbDesc != null){ - db = new Database(); - deserializer.deserialize(db, dbDesc, "UTF-8"); - } - - Table table = null; - List partitionsList = null; - if (tableDesc != null){ - table = new Table(); - deserializer.deserialize(table, tableDesc, "UTF-8"); - // TODO : jackson-streaming-iterable-redo this - JSONArray jsonPartitions = new JSONArray(jsonContainer.getString("partitions")); - partitionsList = new ArrayList(jsonPartitions.length()); - for (int i = 0; i < jsonPartitions.length(); ++i) { - String partDesc = jsonPartitions.getString(i); - Partition partition = new Partition(); - deserializer.deserialize(partition, partDesc, "UTF-8"); - partitionsList.add(partition); - } - } - - return new ReadMetaData(db, table, partitionsList,readReplicationSpec(jsonContainer)); - } catch (JSONException e) { - throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e); - } catch (TException e) { - throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e); - } finally { - if (mdstream != null) { - mdstream.close(); + read = stream.read(buffer); } + return new String(sb.toByteArray(), "UTF-8"); } } - private static ReplicationSpec readReplicationSpec(final JSONObject jsonContainer){ - Function keyFetcher = new Function() { - @Override - public String apply(@Nullable String s) { - return getJSONStringEntry(jsonContainer,s); - } - }; - return new ReplicationSpec(keyFetcher); - } - - private static String getJSONStringEntry(JSONObject jsonContainer, String name) { - String retval = null; - try { - retval = jsonContainer.getString(name); - } catch (JSONException ignored) {} - return retval; - } - - /* check the forward and backward compatibility */ - private static void checkCompatibility(String version, String fcVersion) throws SemanticException { - doCheckCompatibility( - METADATA_FORMAT_VERSION, - version, - fcVersion); - } - /* check the forward and backward compatibility */ public static void doCheckCompatibility(String currVersion, String version, String fcVersion) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 245c48357b..4314de5a63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -38,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; @@ -60,6 +58,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; @@ -186,7 +185,7 @@ public static boolean prepareImport( FileSystem fs = FileSystem.get(fromURI, x.getConf()); x.getInputs().add(toReadEntity(fromPath, x.getConf())); - EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); + MetaData rv = new MetaData(); try { rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 05d7be1859..0ba951a8cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.primitives.Ints; import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; @@ -30,39 +30,39 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.PTFUtils; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler; import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; @@ -73,16 +73,15 @@ import javax.annotation.Nullable; import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -253,7 +252,6 @@ public void write() throws SemanticException { hiveConf ); } - } public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { @@ -446,7 +444,6 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex getNewEventOnlyReplicationSpec(ev.getEventId()) ); EventHandlerFactory.handlerFor(ev).handle(context); - } public static void injectNextDumpDirForTest(String dumpdir){ @@ -1038,7 +1035,7 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) // associated with that // Then, we iterate over all subdirs, and create table imports for each. - EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); + MetaData rv = new MetaData(); try { rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME)); } catch (IOException e) { @@ -1059,6 +1056,7 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) dbName = dbObj.getName(); } + // Database load CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); createDbDesc.setName(dbName); createDbDesc.setComment(dbObj.getDescription()); @@ -1072,17 +1070,71 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) Task createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf); rootTasks.add(createDbTask); - FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs)); + // Table load + List dirsInDbPath = + Arrays.asList(fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs))); - for (FileStatus tableDir : dirsInDbPath) { + for (FileStatus tableDir : Collections2.filter(dirsInDbPath, new TableDirPredicate())) { analyzeTableLoad( dbName, null, tableDir.getPath().toUri().toString(), createDbTask, null, null); } + + //Function load + Path functionMetaDataRoot = new Path(dir.getPath(), FUNCTIONS_ROOT_DIR_NAME); + if (fs.exists(functionMetaDataRoot)) { + List functionDirectories = + Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs))); + for (FileStatus functionDir : functionDirectories) { + analyzeFunctionLoad(dbName, functionDir, createDbTask); + } + } } catch (Exception e) { throw new SemanticException(e); } } + private static class TableDirPredicate implements Predicate { + @Override + public boolean apply(FileStatus fileStatus) { + return !fileStatus.getPath().getName().contains(FUNCTIONS_ROOT_DIR_NAME); + } + } + + private void analyzeFunctionLoad(String dbName, FileStatus functionDir, + Task createDbTask) throws IOException, SemanticException { + URI fromURI = EximUtil + .getValidatedURI(conf, stripQuotes(functionDir.getPath().toUri().toString())); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + + FileSystem fs = FileSystem.get(fromURI, conf); + inputs.add(toReadEntity(fromPath, conf)); + + try { + MetaData metaData = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + ReplicationSpec replicationSpec = metaData.getReplicationSpec(); + if (replicationSpec.isNoop()) { + // nothing to do here, silently return. + return; + } + CreateFunctionDesc desc = new CreateFunctionDesc( + dbName + "." + metaData.function.getFunctionName(), + false, + metaData.function.getClassName(), + metaData.function.getResourceUris() + ); + + Task currentTask = TaskFactory.get(new FunctionWork(desc), conf); + if (createDbTask != null) { + createDbTask.addDependentTask(currentTask); + LOG.debug("Added {}:{} as a precursor of {}:{}", + createDbTask.getClass(), createDbTask.getId(), currentTask.getClass(), + currentTask.getId()); + } + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + } + private List> analyzeTableLoad( String dbName, String tblName, String locn, Task precursor, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java index 40770debee..dda279f670 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java @@ -28,6 +28,7 @@ import java.io.IOException; public class DBSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME = "db"; private final Database dbObject; public DBSerializer(Database dbObject) { @@ -43,8 +44,8 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi ); TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { - String value = serializer.toString(dbObject, "UTF-8"); - writer.jsonGenerator.writeStringField("db", value); + String value = serializer.toString(dbObject, UTF_8); + writer.jsonGenerator.writeStringField(FIELD_NAME, value); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java index 6b03766c01..fe0afbfe05 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java @@ -28,6 +28,7 @@ import java.io.IOException; public class FunctionSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME="function"; private Function function; public FunctionSerializer(Function function) { @@ -40,7 +41,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { writer.jsonGenerator - .writeStringField("function", serializer.toString(function, "UTF-8")); + .writeStringField(FIELD_NAME, serializer.toString(function, UTF_8)); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java index 1aa11957d2..45b4fd1ae2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java @@ -48,6 +48,7 @@ public void close() throws IOException { } public interface Serializer { + String UTF_8 = "UTF-8"; void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java index 313d1085f0..f8a91be889 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java @@ -28,7 +28,8 @@ import java.io.IOException; import java.util.Map; -class PartitionSerializer implements JsonWriter.Serializer { +public class PartitionSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME="partitions"; private Partition partition; PartitionSerializer(Partition partition) { @@ -49,7 +50,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi partition.putToParameters("EXTERNAL", "FALSE"); } } - writer.jsonGenerator.writeString(serializer.toString(partition, "UTF-8")); + writer.jsonGenerator.writeString(serializer.toString(partition, UTF_8)); writer.jsonGenerator.flush(); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java index a2e258f064..4d4fc34bf6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java @@ -31,6 +31,7 @@ import java.util.Map; public class TableSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME = "table"; private final org.apache.hadoop.hive.ql.metadata.Table tableHandle; private final Iterable partitions; @@ -52,8 +53,8 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi try { TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); writer.jsonGenerator - .writeStringField("table", serializer.toString(tTable, "UTF-8")); - writer.jsonGenerator.writeFieldName("partitions"); + .writeStringField(FIELD_NAME, serializer.toString(tTable, UTF_8)); + writer.jsonGenerator.writeFieldName(PartitionSerializer.FIELD_NAME); writePartitions(writer, additionalPropertiesProvider); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java new file mode 100644 index 0000000000..e160e7027a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + +/** + * Utility class to help return complex value from readMetaData function + */ +public class MetaData { + private final Database db; + private final Table table; + private final Iterable partitions; + private final ReplicationSpec replicationSpec; + public final Function function; + + public MetaData() { + this(null, null, null, new ReplicationSpec(), null); + } + + MetaData(Database db, Table table, Iterable partitions, + ReplicationSpec replicationSpec, Function function) { + this.db = db; + this.table = table; + this.partitions = partitions; + this.replicationSpec = replicationSpec; + this.function = function; + } + + public Database getDatabase() { + return db; + } + + public Table getTable() { + return table; + } + + public Iterable getPartitions() { + return partitions; + } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java new file mode 100644 index 0000000000..23a52c51ef --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java @@ -0,0 +1,111 @@ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.DBSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.PartitionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableSerializer; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter.Serializer.UTF_8; + +public class MetadataJson { + private final JSONObject json; + private final TDeserializer deserializer; + private final String tableDesc; + + public MetadataJson(String message) throws JSONException, SemanticException { + deserializer = new TDeserializer(new TJSONProtocol.Factory()); + json = new JSONObject(message); + checkCompatibility(); + tableDesc = jsonEntry(TableSerializer.FIELD_NAME); + } + + public MetaData getMetaData() throws TException, JSONException { + return new MetaData( + database(), + table(), + partitions(), + readReplicationSpec(), + function() + ); + } + + private Function function() throws TException { + return deserialize(new Function(), jsonEntry(FunctionSerializer.FIELD_NAME)); + } + + private Database database() throws TException { + return deserialize(new Database(), jsonEntry(DBSerializer.FIELD_NAME)); + } + + private Table table() throws TException { + return deserialize(new Table(), tableDesc); + } + + private T deserialize(T intoObject, String json) throws TException { + if (json == null) { + return null; + } + deserializer.deserialize(intoObject, json, UTF_8); + return intoObject; + } + + private List partitions() throws JSONException, TException { + if (tableDesc == null) { + return null; + } + // TODO : jackson-streaming-iterable-redo this + JSONArray jsonPartitions = new JSONArray(json.getString(PartitionSerializer.FIELD_NAME)); + List partitionsList = new ArrayList<>(jsonPartitions.length()); + for (int i = 0; i < jsonPartitions.length(); ++i) { + String partDesc = jsonPartitions.getString(i); + partitionsList.add(deserialize(new Partition(), partDesc)); + } + return partitionsList; + } + + private ReplicationSpec readReplicationSpec() { + com.google.common.base.Function keyFetcher = + new com.google.common.base.Function() { + @Override + public String apply(@Nullable String s) { + return jsonEntry(s); + } + }; + return new ReplicationSpec(keyFetcher); + } + + private void checkCompatibility() throws SemanticException, JSONException { + String version = json.getString("version"); + String fcVersion = jsonEntry("fcversion"); + EximUtil.doCheckCompatibility( + EximUtil.METADATA_FORMAT_VERSION, + version, + fcVersion); + } + + private String jsonEntry(String forName) { + try { + return json.getString(forName); + } catch (JSONException ignored) { + return null; + } + } +}