diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 4c9a1a2975..44bc7e5ce3 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -42,7 +42,9 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,17 +64,22 @@ public class TestReplicationScenarios { - final static String DBNOTIF_LISTENER_CLASSNAME = "org.apache.hive.hcatalog.listener.DbNotificationListener"; + @Rule + public final TestName testName = new TestName(); + + private final static String DBNOTIF_LISTENER_CLASSNAME = + "org.apache.hive.hcatalog.listener.DbNotificationListener"; // FIXME : replace with hive copy once that is copied - final static String tid = + private final static String tid = TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" + System.currentTimeMillis(); - final static String TEST_PATH = System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid; + private final static String TEST_PATH = + System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid; - static HiveConf hconf; - static boolean useExternalMS = false; - static int msPort; - static Driver driver; - static HiveMetaStoreClient metaStoreClient; + private static HiveConf hconf; + private static boolean useExternalMS = false; + private static int msPort; + private static Driver driver; + private static HiveMetaStoreClient metaStoreClient; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private ArrayList lastResults; @@ -141,6 +148,32 @@ private synchronized void advanceDumpDir() { ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next)); } + @Test + public void testFunctionReplicationAsPartOfBootstrap() throws IOException { + String dbName = createDB(testName.getMethodName()); + run("CREATE FUNCTION " + dbName + + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " + + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + + String replicatedDbName = loadAndVerify(dbName); + run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'"); + verifyResults(new String[] { replicatedDbName + ".testFunction" }); + } + + private String loadAndVerify(String dbName) throws IOException { + advanceDumpDir(); + run("REPL DUMP " + dbName); + String dumpLocation = getResult(0, 0); + String lastReplicationId = getResult(0, 1, true); + String replicatedDbName = dbName + "_replicated"; + run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + printOutput(); + run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId); + return replicatedDbName; + } + + /** * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned. * Inserts data into one of the ptned tables, and one of the unptned tables, @@ -149,12 +182,8 @@ private synchronized void advanceDumpDir() { */ @Test public void testBasic() throws IOException { - - String testName = "basic"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); @@ -165,9 +194,9 @@ public void testBasic() throws IOException { String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -182,31 +211,19 @@ public void testBasic() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); - advanceDumpDir(); - run("REPL DUMP " + dbName); - String replDumpLocn = getResult(0,0); - String replDumpId = getResult(0,1,true); - run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - printOutput(); - run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - - verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId); + String replicatedDbName = loadAndVerify(dbName); - verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); - verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2); + verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data); + verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1); + verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptn_data_2); verifyRun("SELECT a from " + dbName + ".ptned_empty", empty); verifyRun("SELECT * from " + dbName + ".unptned_empty", empty); } @Test public void testBasicWithCM() throws Exception { - - String testName = "basic_with_cm"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); @@ -218,10 +235,10 @@ public void testBasicWithCM() throws Exception { String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); - String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); + String ptn_locn_2_later = new Path(TEST_PATH, name + "_ptn2_later").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -282,11 +299,8 @@ public void testBasicWithCM() throws Exception { @Test public void testIncrementalAdds() throws IOException { - String testName = "incrementalAdds"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); @@ -305,9 +319,9 @@ public void testIncrementalAdds() throws IOException { String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -369,11 +383,8 @@ public void testIncrementalAdds() throws IOException { @Test public void testDrops() throws IOException { - String testName = "drops"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); @@ -384,9 +395,9 @@ public void testDrops() throws IOException { String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; String[] empty = new String[]{}; - String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); createTestDataFile(unptn_locn, unptn_data); createTestDataFile(ptn_locn_1, ptn_data_1); @@ -483,10 +494,7 @@ public void testDrops() throws IOException { public void testDropsWithCM() throws IOException { String testName = "drops_with_cm"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); @@ -609,10 +617,7 @@ public void testDropsWithCM() throws IOException { public void testAlters() throws IOException { String testName = "alters"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".unptned2(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); @@ -794,10 +799,7 @@ public void testAlters() throws IOException { @Test public void testIncrementalLoad() throws IOException { String testName = "incrementalLoad"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); @@ -879,10 +881,7 @@ public void testIncrementalLoad() throws IOException { @Test public void testIncrementalInserts() throws IOException { String testName = "incrementalInserts"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); advanceDumpDir(); @@ -935,10 +934,7 @@ public void testIncrementalInserts() throws IOException { @Test public void testViewsReplication() throws IOException { String testName = "viewsReplication"; - LOG.info("Testing "+testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String dbName = createDB(testName); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); @@ -1015,11 +1011,8 @@ public void testViewsReplication() throws IOException { @Test public void testDumpLimit() throws IOException { - String testName = "dumpLimit"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); advanceDumpDir(); @@ -1100,11 +1093,8 @@ public void testStatus() throws IOException { // Now, to actually testing status - first, we bootstrap. - String testName = "incrementalStatus"; - LOG.info("Testing " + testName); - String dbName = testName + "_" + tid; - - run("CREATE DATABASE " + dbName); + String name = testName.getMethodName(); + String dbName = createDB(name); advanceDumpDir(); run("REPL DUMP " + dbName); String lastReplDumpLocn = getResult(0, 0); @@ -1159,6 +1149,13 @@ public void testStatus() throws IOException { } + private static String createDB(String name) { + LOG.info("Testing " + name); + String dbName = name + "_" + tid; + run("CREATE DATABASE " + dbName); + return dbName; + } + @Test public void testEventFilters(){ // Test testing that the filters introduced by EventUtils are working correctly. @@ -1319,13 +1316,18 @@ private String getResult(int rowNum, int colNum, boolean reuse) throws IOExcepti return (lastResults.get(rowNum).split("\\t"))[colNum]; } + /** + * All the results that are read from the hive output will not preserve + * case sensitivity and will all be in lower case, hence we will check against + * only lower case data values. + */ private void verifyResults(String[] data) throws IOException { List results = getOutput(); LOG.info("Expecting {}",data); LOG.info("Got {}",results); assertEquals(data.length,results.size()); for (int i = 0; i < data.length; i++){ - assertEquals(data[i],results.get(i)); + assertEquals(data[i].toLowerCase(), results.get(i)); } } @@ -1406,5 +1408,4 @@ private static void createTestDataFile(String filename, String[] lines) throws I } } } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 6deea96a61..1884db3d05 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1406,7 +1406,6 @@ public Table apply(org.apache.hadoop.hive.metastore.api.Table table) { */ public List getTablesByType(String dbName, String pattern, TableType type) throws HiveException { - List retList = new ArrayList(); if (dbName == null) dbName = SessionState.get().getCurrentDatabase(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 1ea51824eb..a9384be707 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -18,43 +18,36 @@ package org.apache.hadoop.hive.ql.parse; -import com.google.common.base.Function; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.parse.repl.dump.DBSerializer; -import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; -import org.apache.hadoop.hive.ql.parse.repl.dump.ReplicationSpecSerializer; -import org.apache.hadoop.hive.ql.parse.repl.dump.TableSerializer; -import org.apache.thrift.TDeserializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.ReplicationSpecSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.TableSerializer; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.MetadataJson; import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.json.JSONArray; import org.json.JSONException; -import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -270,124 +263,30 @@ public static void createExportDump(FileSystem fs, Path metadataPath, } } - /** - * Utility class to help return complex value from readMetaData function - */ - public static class ReadMetaData { - private final Database db; - private final Table table; - private final Iterable partitions; - private final ReplicationSpec replicationSpec; - - public ReadMetaData(){ - this(null,null,null,new ReplicationSpec()); - } - public ReadMetaData(Database db, Table table, Iterable partitions, ReplicationSpec replicationSpec){ - this.db = db; - this.table = table; - this.partitions = partitions; - this.replicationSpec = replicationSpec; - } - - public Database getDatabase(){ - return db; - } - - public Table getTable() { - return table; - } - - public Iterable getPartitions() { - return partitions; - } - - public ReplicationSpec getReplicationSpec() { - return replicationSpec; - } - }; - - public static ReadMetaData readMetaData(FileSystem fs, Path metadataPath) + static MetaData readMetaData(FileSystem fs, Path metadataPath) throws IOException, SemanticException { - FSDataInputStream mdstream = null; + String message = readAsString(fs, metadataPath); try { - mdstream = fs.open(metadataPath); + return new MetadataJson(message).getMetaData(); + } catch (TException | JSONException e) { + throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e); + } + } + + private static String readAsString(final FileSystem fs, final Path fromMetadataPath) + throws IOException { + try (FSDataInputStream stream = fs.open(fromMetadataPath)) { byte[] buffer = new byte[1024]; ByteArrayOutputStream sb = new ByteArrayOutputStream(); - int read = mdstream.read(buffer); + int read = stream.read(buffer); while (read != -1) { sb.write(buffer, 0, read); - read = mdstream.read(buffer); - } - String md = new String(sb.toByteArray(), "UTF-8"); - JSONObject jsonContainer = new JSONObject(md); - String version = jsonContainer.getString("version"); - String fcversion = getJSONStringEntry(jsonContainer, "fcversion"); - checkCompatibility(version, fcversion); - - String dbDesc = getJSONStringEntry(jsonContainer, "db"); - String tableDesc = getJSONStringEntry(jsonContainer,"table"); - TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); - - Database db = null; - if (dbDesc != null){ - db = new Database(); - deserializer.deserialize(db, dbDesc, "UTF-8"); - } - - Table table = null; - List partitionsList = null; - if (tableDesc != null){ - table = new Table(); - deserializer.deserialize(table, tableDesc, "UTF-8"); - // TODO : jackson-streaming-iterable-redo this - JSONArray jsonPartitions = new JSONArray(jsonContainer.getString("partitions")); - partitionsList = new ArrayList(jsonPartitions.length()); - for (int i = 0; i < jsonPartitions.length(); ++i) { - String partDesc = jsonPartitions.getString(i); - Partition partition = new Partition(); - deserializer.deserialize(partition, partDesc, "UTF-8"); - partitionsList.add(partition); - } - } - - return new ReadMetaData(db, table, partitionsList,readReplicationSpec(jsonContainer)); - } catch (JSONException e) { - throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e); - } catch (TException e) { - throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e); - } finally { - if (mdstream != null) { - mdstream.close(); + read = stream.read(buffer); } + return new String(sb.toByteArray(), "UTF-8"); } } - private static ReplicationSpec readReplicationSpec(final JSONObject jsonContainer){ - Function keyFetcher = new Function() { - @Override - public String apply(@Nullable String s) { - return getJSONStringEntry(jsonContainer,s); - } - }; - return new ReplicationSpec(keyFetcher); - } - - private static String getJSONStringEntry(JSONObject jsonContainer, String name) { - String retval = null; - try { - retval = jsonContainer.getString(name); - } catch (JSONException ignored) {} - return retval; - } - - /* check the forward and backward compatibility */ - private static void checkCompatibility(String version, String fcVersion) throws SemanticException { - doCheckCompatibility( - METADATA_FORMAT_VERSION, - version, - fcVersion); - } - /* check the forward and backward compatibility */ public static void doCheckCompatibility(String currVersion, String version, String fcVersion) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 245c48357b..4314de5a63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -38,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; @@ -60,6 +58,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; @@ -186,7 +185,7 @@ public static boolean prepareImport( FileSystem fs = FileSystem.get(fromURI, x.getConf()); x.getInputs().add(toReadEntity(fromPath, x.getConf())); - EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); + MetaData rv = new MetaData(); try { rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 3ac774689e..01d8d0529b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,74 +17,67 @@ */ package org.apache.hadoop.hive.ql.parse; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.primitives.Ints; import org.antlr.runtime.tree.Tree; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.Predicate; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer; -import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler; import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; @@ -117,152 +110,7 @@ private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; - public static final String DUMPMETADATA = "_dumpmetadata"; - - public enum DUMPTYPE { - BOOTSTRAP("BOOTSTRAP"), - INCREMENTAL("INCREMENTAL"), - EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"), - EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"), - EVENT_DROP_TABLE("EVENT_DROP_TABLE"), - EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"), - EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"), - EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), - EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), - EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), - EVENT_INSERT("EVENT_INSERT"), - EVENT_UNKNOWN("EVENT_UNKNOWN"); - - String type = null; - DUMPTYPE(String type) { - this.type = type; - } - - @Override - public String toString(){ - return type; - } - - }; - - public static class DumpMetaData { - // wrapper class for reading and writing metadata about a dump - // responsible for _dumpmetadata files - - private DUMPTYPE dumpType; - private Long eventFrom = null; - private Long eventTo = null; - private String payload = null; - private boolean initialized = false; - - private final Path dumpRoot; - private final Path dumpFile; - private final HiveConf hiveConf; - private Path cmRoot; - - public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { - this.dumpRoot = dumpRoot; - this.hiveConf = hiveConf; - dumpFile = new Path(dumpRoot, DUMPMETADATA); - } - - public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot, - HiveConf hiveConf){ - this(dumpRoot,hiveConf); - setDump(lvl, eventFrom, eventTo, cmRoot); - } - - public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){ - this.dumpType = lvl; - this.eventFrom = eventFrom; - this.eventTo = eventTo; - this.initialized = true; - this.cmRoot = cmRoot; - } - - public void loadDumpFromFile() throws SemanticException { - try { - // read from dumpfile and instantiate self - FileSystem fs = dumpFile.getFileSystem(hiveConf); - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); - String line = null; - if ( (line = br.readLine()) != null){ - String[] lineContents = line.split("\t", 5); - setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]), - new Path(lineContents[3])); - setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]); - ReplChangeManager.setCmRoot(cmRoot); - } else { - throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString()); - } - } catch (IOException ioe){ - throw new SemanticException(ioe); - } - } - - public DUMPTYPE getDumpType() throws SemanticException { - initializeIfNot(); - return this.dumpType; - } - - public String getPayload() throws SemanticException { - initializeIfNot(); - return this.payload; - } - - public void setPayload(String payload) { - this.payload = payload; - } - - public Long getEventFrom() throws SemanticException { - initializeIfNot(); - return eventFrom; - } - - public Long getEventTo() throws SemanticException { - initializeIfNot(); - return eventTo; - } - - public Path getCmRoot() { - return cmRoot; - } - - public void setCmRoot(Path cmRoot) { - this.cmRoot = cmRoot; - } - - public Path getDumpFilePath() { - return dumpFile; - } - - public boolean isIncrementalDump() throws SemanticException { - initializeIfNot(); - return (this.dumpType == DUMPTYPE.INCREMENTAL); - } - - private void initializeIfNot() throws SemanticException { - if (!initialized){ - loadDumpFromFile(); - } - } - - public void write() throws SemanticException { - writeOutput( - Arrays.asList( - dumpType.toString(), - eventFrom.toString(), - eventTo.toString(), - cmRoot.toString(), - payload), - dumpFile, - hiveConf - ); - } - - } - - public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { + ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); } @@ -382,7 +230,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { LOG.info( "Consolidation done, preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); - dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); + dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); // Set the correct last repl id to return to the user @@ -428,10 +276,14 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); - writeOutput( - Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(lastReplId)), + Utils.writeOutput( + Arrays.asList( + "incremental", + String.valueOf(eventFrom), + String.valueOf(lastReplId) + ), dmd.getDumpFilePath(), conf); - dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, lastReplId, cmRoot); + dmd.setDump(DumpType.INCREMENTAL, eventFrom, lastReplId, cmRoot); dmd.write(); } prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); @@ -452,14 +304,13 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex getNewEventOnlyReplicationSpec(ev.getEventId()) ); EventHandlerFactory.handlerFor(ev).handle(context); - } public static void injectNextDumpDirForTest(String dumpdir){ testInjectDumpDir = dumpdir; } - String getNextDumpDir() { + private String getNextDumpDir() { if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { // make it easy to write .q unit tests, instead of unique id generation. // however, this does mean that in writing tests, we have to be aware that @@ -490,8 +341,8 @@ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticExcepti // TODO : instantiating FS objects are generally costly. Refactor FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); - Database dbObj = db.getDatabase(dbName); - EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec()); + HiveWrapper.Tuple database = new HiveWrapper(db, dbName).database(); + EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e); @@ -509,9 +360,16 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE // TODO : This should ideally return the Function Objects and not Strings(function names) that should be done by the caller, Look at this separately. List functionNames = db.getFunctions(dbName, "*"); for (String functionName : functionNames) { - org.apache.hadoop.hive.metastore.api.Function function = - db.getFunction(dbName, functionName); - if (function.getResourceUris().isEmpty()) { + HiveWrapper.Tuple tuple; + try { + tuple = new HiveWrapper(db, dbName).function(functionName); + } catch (HiveException e) { + //This can happen as we are querying the getFunctions before we are getting the actual function + //in between there can be a drop function by a user in which case our call will fail. + LOG.info("Function " + functionName + " could not be found, we are ignoring it as it can be a valid state ", e); + continue; + } + if (tuple.object.getResourceUris().isEmpty()) { SESSION_STATE_LOG.warn( "Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); @@ -522,7 +380,7 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE new Path(new Path(functionsRoot, functionName), FUNCTION_METADATA_DIR_NAME); try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) { - new FunctionSerializer(function).writeTo(jsonWriter, getNewReplicationSpec()); + new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec); } } } catch (Exception e) { @@ -734,7 +592,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; evstage++; - lastEvid = dmd.eventTo; + lastEvid = dmd.getEventTo(); } } @@ -1044,7 +902,7 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) // associated with that // Then, we iterate over all subdirs, and create table imports for each. - EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData(); + MetaData rv = new MetaData(); try { rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME)); } catch (IOException e) { @@ -1065,6 +923,7 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) dbName = dbObj.getName(); } + // Database load CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); createDbDesc.setName(dbName); createDbDesc.setComment(dbObj.getDescription()); @@ -1078,17 +937,71 @@ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) Task createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf); rootTasks.add(createDbTask); - FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs)); + // Table load + List dirsInDbPath = + Arrays.asList(fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs))); - for (FileStatus tableDir : dirsInDbPath) { + for (FileStatus tableDir : Collections2.filter(dirsInDbPath, new TableDirPredicate())) { analyzeTableLoad( dbName, null, tableDir.getPath().toUri().toString(), createDbTask, null, null); } + + //Function load + Path functionMetaDataRoot = new Path(dir.getPath(), FUNCTIONS_ROOT_DIR_NAME); + if (fs.exists(functionMetaDataRoot)) { + List functionDirectories = + Arrays.asList(fs.listStatus(functionMetaDataRoot, EximUtil.getDirectoryFilter(fs))); + for (FileStatus functionDir : functionDirectories) { + analyzeFunctionLoad(dbName, functionDir, createDbTask); + } + } } catch (Exception e) { throw new SemanticException(e); } } + private static class TableDirPredicate implements Predicate { + @Override + public boolean apply(FileStatus fileStatus) { + return !fileStatus.getPath().getName().contains(FUNCTIONS_ROOT_DIR_NAME); + } + } + + private void analyzeFunctionLoad(String dbName, FileStatus functionDir, + Task createDbTask) throws IOException, SemanticException { + URI fromURI = EximUtil + .getValidatedURI(conf, stripQuotes(functionDir.getPath().toUri().toString())); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + + FileSystem fs = FileSystem.get(fromURI, conf); + inputs.add(toReadEntity(fromPath, conf)); + + try { + MetaData metaData = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + ReplicationSpec replicationSpec = metaData.getReplicationSpec(); + if (replicationSpec.isNoop()) { + // nothing to do here, silently return. + return; + } + CreateFunctionDesc desc = new CreateFunctionDesc( + dbName + "." + metaData.function.getFunctionName(), + false, + metaData.function.getClassName(), + metaData.function.getResourceUris() + ); + + Task currentTask = TaskFactory.get(new FunctionWork(desc), conf); + if (createDbTask != null) { + createDbTask.addDependentTask(currentTask); + LOG.debug("Added {}:{} as a precursor of {}:{}", + createDbTask.getClass(), createDbTask.getId(), currentTask.getClass(), + currentTask.getId()); + } + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + } + private List> analyzeTableLoad( String dbName, String tblName, String locn, Task precursor, @@ -1187,27 +1100,7 @@ private void prepareReturnValues(List values, String schema) throws Sema LOG.debug(" > " + s); } ctx.setResFile(ctx.getLocalTmpPath()); - writeOutput(values, ctx.getResFile(), conf); - } - - private static void writeOutput(List values, Path outputFile, HiveConf hiveConf) - throws SemanticException { - FileSystem fs = null; - DataOutputStream outStream = null; - try { - fs = outputFile.getFileSystem(hiveConf); - outStream = fs.create(outputFile); - outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); - for (int i = 1; i < values.size(); i++) { - outStream.write(Utilities.tabCode); - outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); - } - outStream.write(Utilities.newLineCode); - } catch (IOException e) { - throw new SemanticException(e); - } finally { - IOUtils.closeStream(outStream); - } + Utils.writeOutput(values, ctx.getResFile(), conf); } private ReplicationSpec getNewReplicationSpec() throws SemanticException { @@ -1244,14 +1137,11 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws Sema SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase(); static Iterable removeValuesTemporaryTables(List tableNames) { - List allTables = new ArrayList<>(tableNames); - CollectionUtils.filter(allTables, new Predicate() { - @Override - public boolean evaluate(Object tableName) { - return !tableName.toString().toLowerCase().startsWith(TMP_TABLE_PREFIX); - } - }); - return allTables; + return Collections2.filter(tableNames, + tableName -> { + assert tableName != null; + return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX); + }); } private Iterable matchesDb(String dbPattern) throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java new file mode 100644 index 0000000000..a764fad477 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl; + +public enum DumpType { + BOOTSTRAP("BOOTSTRAP"), + INCREMENTAL("INCREMENTAL"), + EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"), + EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"), + EVENT_DROP_TABLE("EVENT_DROP_TABLE"), + EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"), + EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"), + EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), + EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), + EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), + EVENT_INSERT("EVENT_INSERT"), + EVENT_UNKNOWN("EVENT_UNKNOWN"); + + String type = null; + DumpType(String type) { + this.type = type; + } + + @Override + public String toString(){ + return type; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java new file mode 100644 index 0000000000..ae37c73d84 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function { + private final Hive db; + + BootStrapReplicationSpecFunction(Hive db) { + this.db = db; + } + + @Override + public ReplicationSpec fromMetaStore() throws HiveException { + try { + ReplicationSpec replicationSpec = + new ReplicationSpec( + true, + false, + "replv2", + "will-be-set", + false, + true, + false + ); + long currentNotificationId = db.getMSC() + .getCurrentNotificationEventId().getEventId(); + replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId)); + return replicationSpec; + } catch (Exception e) { + throw new SemanticException(e); + // TODO : simple wrap & rethrow for now, clean up with error codes + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java new file mode 100644 index 0000000000..1dcaec2701 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + +/** + * The idea for this class is that since we need to make sure that + * we query the replication id from the db before we do any queries + * to get the object from metastore like tables/functions/partitions etc + * we are devising this wrapper to wrap all such ordering of statements here. + */ + +public class HiveWrapper { + private final Hive db; + private final String dbName; + private final BootStrapReplicationSpecFunction functionForSpec; + + public HiveWrapper(Hive db, String dbName) { + this.dbName = dbName; + this.db = db; + this.functionForSpec = new BootStrapReplicationSpecFunction(db); + } + + public Tuple function(final String name) + throws HiveException { + return new Tuple<>(functionForSpec, () -> db.getFunction(dbName, name)); + } + + public Tuple database() throws HiveException { + return new Tuple<>(functionForSpec, () -> db.getDatabase(dbName)); + } + + public static class Tuple { + + interface Function { + T fromMetaStore() throws HiveException; + } + + public final ReplicationSpec replicationSpec; + public final T object; + + /** + * we have to get the replicationspec before we query for the function object + * from the hive metastore as the spec creation captures the latest event id for replication + * and we dont want to miss any events hence we are ok replaying some events as part of + * incremental load to achieve a consistent state of the warehouse. + */ + Tuple(Function replicationSpecFunction, + Function functionForObject) throws HiveException { + this.replicationSpec = replicationSpecFunction.fromMetaStore(); + this.object = functionForObject.fromMetaStore(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java new file mode 100644 index 0000000000..846b6f5b45 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.io.IOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +public class Utils { + public static void writeOutput(List values, Path outputFile, HiveConf hiveConf) + throws SemanticException { + DataOutputStream outStream = null; + try { + FileSystem fs = outputFile.getFileSystem(hiveConf); + outStream = fs.create(outputFile); + outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); + for (int i = 1; i < values.size(); i++) { + outStream.write(Utilities.tabCode); + outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); + } + outStream.write(Utilities.newLineCode); + } catch (IOException e) { + throw new SemanticException(e); + } finally { + IOUtils.closeStream(outStream); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java similarity index 89% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java index 40770debee..15b7e138ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump; +package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -28,6 +28,7 @@ import java.io.IOException; public class DBSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME = "db"; private final Database dbObject; public DBSerializer(Database dbObject) { @@ -43,8 +44,8 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi ); TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { - String value = serializer.toString(dbObject, "UTF-8"); - writer.jsonGenerator.writeStringField("db", value); + String value = serializer.toString(dbObject, UTF_8); + writer.jsonGenerator.writeStringField(FIELD_NAME, value); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java similarity index 90% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java index 6b03766c01..5dc702386e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump; +package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -28,6 +28,7 @@ import java.io.IOException; public class FunctionSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME="function"; private Function function; public FunctionSerializer(Function function) { @@ -40,7 +41,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { writer.jsonGenerator - .writeStringField("function", serializer.toString(function, "UTF-8")); + .writeStringField(FIELD_NAME, serializer.toString(function, UTF_8)); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java similarity index 95% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java index 1aa11957d2..e20be68b6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/JsonWriter.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump; +package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,6 +48,7 @@ public void close() throws IOException { } public interface Serializer { + String UTF_8 = "UTF-8"; void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java similarity index 92% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index 313d1085f0..077d39b844 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump; +package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -28,7 +28,8 @@ import java.io.IOException; import java.util.Map; -class PartitionSerializer implements JsonWriter.Serializer { +public class PartitionSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME="partitions"; private Partition partition; PartitionSerializer(Partition partition) { @@ -49,7 +50,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi partition.putToParameters("EXTERNAL", "FALSE"); } } - writer.jsonGenerator.writeString(serializer.toString(partition, "UTF-8")); + writer.jsonGenerator.writeString(serializer.toString(partition, UTF_8)); writer.jsonGenerator.flush(); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java similarity index 96% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java index d88a553063..3a92e8a51a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ReplicationSpecSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump; +package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java similarity index 94% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index a2e258f064..948cb3956c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump; +package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; @@ -31,6 +31,7 @@ import java.util.Map; public class TableSerializer implements JsonWriter.Serializer { + public static final String FIELD_NAME = "table"; private final org.apache.hadoop.hive.ql.metadata.Table tableHandle; private final Iterable partitions; @@ -52,8 +53,8 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi try { TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); writer.jsonGenerator - .writeStringField("table", serializer.toString(tTable, "UTF-8")); - writer.jsonGenerator.writeFieldName("partitions"); + .writeStringField(FIELD_NAME, serializer.toString(tTable, UTF_8)); + writer.jsonGenerator.writeFieldName(PartitionSerializer.FIELD_NAME); writePartitions(writer, additionalPropertiesProvider); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java similarity index 96% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java index 3ebc803696..8201173d93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/VersionCompatibleSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump; +package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java index 9a4f8b9197..1616ab9c4d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java @@ -35,7 +35,7 @@ import java.io.OutputStreamWriter; import java.util.Iterator; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; public class AddPartitionHandler extends AbstractHandler { protected AddPartitionHandler(NotificationEvent notificationEvent) { @@ -108,7 +108,7 @@ private BufferedWriter writer(Context withinContext, Partition qlPtn) } @Override - public DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_ADD_PARTITION; + public DumpType dumpType() { + return DumpType.EVENT_ADD_PARTITION; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java index 1073cd093c..650f9e0ccb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java @@ -28,8 +28,9 @@ import java.util.Iterator; import java.util.List; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; public class AlterPartitionHandler extends AbstractHandler { private final org.apache.hadoop.hive.metastore.api.Partition after; @@ -48,18 +49,18 @@ private enum Scenario { ALTER { @Override - DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_ALTER_PARTITION; + DumpType dumpType() { + return DumpType.EVENT_ALTER_PARTITION; } }, RENAME { @Override - DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_RENAME_PARTITION; + DumpType dumpType() { + return DumpType.EVENT_RENAME_PARTITION; } }; - abstract DUMPTYPE dumpType(); + abstract DumpType dumpType(); } private static Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before, @@ -97,7 +98,7 @@ public void handle(Context withinContext) throws Exception { } @Override - public DUMPTYPE dumpType() { + public DumpType dumpType() { return scenario.dumpType(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java index 04d9d79d87..be6cbea8fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java @@ -23,8 +23,9 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; public class AlterTableHandler extends AbstractHandler { private final org.apache.hadoop.hive.metastore.api.Table before; @@ -34,18 +35,18 @@ private enum Scenario { ALTER { @Override - DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_ALTER_TABLE; + DumpType dumpType() { + return DumpType.EVENT_ALTER_TABLE; } }, RENAME { @Override - DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_RENAME_TABLE; + DumpType dumpType() { + return DumpType.EVENT_RENAME_TABLE; } }; - abstract DUMPTYPE dumpType(); + abstract DumpType dumpType(); } AlterTableHandler(NotificationEvent event) throws Exception { @@ -86,7 +87,7 @@ public void handle(Context withinContext) throws Exception { } @Override - public DUMPTYPE dumpType() { + public DumpType dumpType() { return scenario.dumpType(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java index 03f400de61..88600fde8b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java @@ -28,7 +28,7 @@ import java.io.IOException; import java.io.OutputStreamWriter; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; public class CreateTableHandler extends AbstractHandler { @@ -80,7 +80,7 @@ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOExc } @Override - public DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_CREATE_TABLE; + public DumpType dumpType() { + return DumpType.EVENT_CREATE_TABLE; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java index 61c5f37334..78cd74f2d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java @@ -19,8 +19,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; public class DefaultHandler extends AbstractHandler { @@ -37,7 +38,7 @@ public void handle(Context withinContext) throws Exception { } @Override - public DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_UNKNOWN; + public DumpType dumpType() { + return DumpType.EVENT_UNKNOWN; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java index 3ad794e383..c4a0908b33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java @@ -19,8 +19,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; public class DropPartitionHandler extends AbstractHandler { @@ -37,7 +38,7 @@ public void handle(Context withinContext) throws Exception { } @Override - public DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_DROP_PARTITION; + public DumpType dumpType() { + return DumpType.EVENT_DROP_PARTITION; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java index cae379b4fc..e3addafe46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java @@ -19,8 +19,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; public class DropTableHandler extends AbstractHandler { @@ -37,7 +38,7 @@ public void handle(Context withinContext) throws Exception { } @Override - public DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_DROP_TABLE; + public DumpType dumpType() { + return DumpType.EVENT_DROP_TABLE; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java index 199145a5d4..29f3b42789 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java @@ -22,8 +22,8 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; public interface EventHandler { void handle(Context withinContext) throws Exception; @@ -32,7 +32,7 @@ long toEventId(); - DUMPTYPE dumpType(); + DumpType dumpType(); class Context { final Path eventRoot, cmRoot; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java index 13462766b5..aa83ea81d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java @@ -32,8 +32,9 @@ import java.util.List; import java.util.Map; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; -import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; public class InsertHandler extends AbstractHandler { @@ -90,7 +91,7 @@ private BufferedWriter writer(Context withinContext) throws IOException { } @Override - public DUMPTYPE dumpType() { - return DUMPTYPE.EVENT_INSERT; + public DumpType dumpType() { + return DumpType.EVENT_INSERT; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java new file mode 100644 index 0000000000..12ad19b2ea --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; + +public class DumpMetaData { + // wrapper class for reading and writing metadata about a dump + // responsible for _dumpmetadata files + private static final String DUMP_METADATA = "_dumpmetadata"; + + private DumpType dumpType; + private Long eventFrom = null; + private Long eventTo = null; + private String payload = null; + private boolean initialized = false; + + private final Path dumpFile; + private final HiveConf hiveConf; + private Path cmRoot; + + public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { + this.hiveConf = hiveConf; + dumpFile = new Path(dumpRoot, DUMP_METADATA); + } + + public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, + HiveConf hiveConf) { + this(dumpRoot, hiveConf); + setDump(lvl, eventFrom, eventTo, cmRoot); + } + + public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot) { + this.dumpType = lvl; + this.eventFrom = eventFrom; + this.eventTo = eventTo; + this.initialized = true; + this.cmRoot = cmRoot; + } + + private void loadDumpFromFile() throws SemanticException { + try { + // read from dumpfile and instantiate self + FileSystem fs = dumpFile.getFileSystem(hiveConf); + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); + String line = null; + if ((line = br.readLine()) != null) { + String[] lineContents = line.split("\t", 5); + setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), + Long.valueOf(lineContents[2]), + new Path(lineContents[3])); + setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]); + ReplChangeManager.setCmRoot(cmRoot); + } else { + throw new IOException( + "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString()); + } + } catch (IOException ioe) { + throw new SemanticException(ioe); + } + } + + public DumpType getDumpType() throws SemanticException { + initializeIfNot(); + return this.dumpType; + } + + public String getPayload() throws SemanticException { + initializeIfNot(); + return this.payload; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public Long getEventFrom() throws SemanticException { + initializeIfNot(); + return eventFrom; + } + + public Long getEventTo() throws SemanticException { + initializeIfNot(); + return eventTo; + } + + public Path getDumpFilePath() { + return dumpFile; + } + + public boolean isIncrementalDump() throws SemanticException { + initializeIfNot(); + return (this.dumpType == DumpType.INCREMENTAL); + } + + private void initializeIfNot() throws SemanticException { + if (!initialized) { + loadDumpFromFile(); + } + } + + + public void write() throws SemanticException { + Utils.writeOutput( + Arrays.asList( + dumpType.toString(), + eventFrom.toString(), + eventTo.toString(), + cmRoot.toString(), + payload), + dumpFile, + hiveConf + ); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java new file mode 100644 index 0000000000..fc02dfd528 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + +/** + * Utility class to help return complex value from readMetaData function + */ +public class MetaData { + private final Database db; + private final Table table; + private final Iterable partitions; + private final ReplicationSpec replicationSpec; + public final Function function; + + public MetaData() { + this(null, null, null, new ReplicationSpec(), null); + } + + MetaData(Database db, Table table, Iterable partitions, + ReplicationSpec replicationSpec, Function function) { + this.db = db; + this.table = table; + this.partitions = partitions; + this.replicationSpec = replicationSpec; + this.function = function; + } + + public Database getDatabase() { + return db; + } + + public Table getTable() { + return table; + } + + public Iterable getPartitions() { + return partitions; + } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java new file mode 100644 index 0000000000..b7a568088c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.PartitionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.TableSerializer; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter.Serializer.UTF_8; + +public class MetadataJson { + private final JSONObject json; + private final TDeserializer deserializer; + private final String tableDesc; + + public MetadataJson(String message) throws JSONException, SemanticException { + deserializer = new TDeserializer(new TJSONProtocol.Factory()); + json = new JSONObject(message); + checkCompatibility(); + tableDesc = jsonEntry(TableSerializer.FIELD_NAME); + } + + public MetaData getMetaData() throws TException, JSONException { + return new MetaData( + database(), + table(), + partitions(), + readReplicationSpec(), + function() + ); + } + + private Function function() throws TException { + return deserialize(new Function(), jsonEntry(FunctionSerializer.FIELD_NAME)); + } + + private Database database() throws TException { + return deserialize(new Database(), jsonEntry(DBSerializer.FIELD_NAME)); + } + + private Table table() throws TException { + return deserialize(new Table(), tableDesc); + } + + private T deserialize(T intoObject, String json) throws TException { + if (json == null) { + return null; + } + deserializer.deserialize(intoObject, json, UTF_8); + return intoObject; + } + + private List partitions() throws JSONException, TException { + if (tableDesc == null) { + return null; + } + // TODO : jackson-streaming-iterable-redo this + JSONArray jsonPartitions = new JSONArray(json.getString(PartitionSerializer.FIELD_NAME)); + List partitionsList = new ArrayList<>(jsonPartitions.length()); + for (int i = 0; i < jsonPartitions.length(); ++i) { + String partDesc = jsonPartitions.getString(i); + partitionsList.add(deserialize(new Partition(), partDesc)); + } + return partitionsList; + } + + private ReplicationSpec readReplicationSpec() { + com.google.common.base.Function keyFetcher = + new com.google.common.base.Function() { + @Override + public String apply(@Nullable String s) { + return jsonEntry(s); + } + }; + return new ReplicationSpec(keyFetcher); + } + + private void checkCompatibility() throws SemanticException, JSONException { + String version = json.getString("version"); + String fcVersion = jsonEntry("fcversion"); + EximUtil.doCheckCompatibility( + EximUtil.METADATA_FORMAT_VERSION, + version, + fcVersion); + } + + private String jsonEntry(String forName) { + try { + return json.getString(forName); + } catch (JSONException ignored) { + return null; + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java new file mode 100644 index 0000000000..3028e76518 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapperTest.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class HiveWrapperTest { + @Mock + private HiveWrapper.Tuple.Function specFunction; + @Mock + private HiveWrapper.Tuple.Function tableFunction; + + @Test + public void replicationIdIsRequestedBeforeObjectDefinition() throws HiveException { + new HiveWrapper.Tuple<>(specFunction, tableFunction); + InOrder inOrder = Mockito.inOrder(specFunction, tableFunction); + inOrder.verify(specFunction).fromMetaStore(); + inOrder.verify(tableFunction).fromMetaStore(); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java index 0526700a8e..5b365b0aa0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java +++ ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java @@ -1,7 +1,7 @@ package org.apache.hadoop.hive.ql.parse.repl.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.junit.Test; import static org.junit.Assert.assertTrue; @@ -26,7 +26,7 @@ public long toEventId() { } @Override - public ReplicationSemanticAnalyzer.DUMPTYPE dumpType() { + public DumpType dumpType() { return null; } }