commit b7ba61b5da7fcd06eadd4e0883a70ba241b39528 Author: Mithun RK Date: Fri Dec 8 14:17:59 2017 -0800 HIVE-17794: HCatLoader breaks when a member is added to a struct-column of a table (Mithun Radhakrishnan, reviewed by Owen O'Malley) diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java index d9d8251c85..b44e87c246 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniCluster.java @@ -21,183 +21,70 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -/** - * This class builds a single instance of itself with the Singleton - * design pattern. While building the single instance, it sets up a - * mini cluster that actually consists of a mini DFS cluster and a - * mini MapReduce cluster on the local machine and also sets up the - * environment for Pig to run on top of the mini cluster. - */ -public class MiniCluster { - private MiniDFSCluster m_dfs = null; - private MiniMRCluster m_mr = null; - private FileSystem m_fileSys = null; - private JobConf m_conf = null; +public class MiniCluster extends MiniGenericCluster { + private static final File CONF_DIR = new File("target/test/pigtest/conf/"); + private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml"); - private final static MiniCluster INSTANCE = new MiniCluster(); - private static boolean isSetup = true; + private MiniMRCluster m_mr = null; - private MiniCluster() { - setupMiniDfsAndMrClusters(); + @Override + public ExecType getExecType() { + return ExecType.MAPREDUCE; } - private void setupMiniDfsAndMrClusters() { + @Override + protected void setupMiniDfsAndMrClusters(Configuration config) { try { - final int dataNodes = 1; // There will be 4 data nodes - final int taskTrackers = 1; // There will be 4 task tracker nodes - Configuration config = new Configuration(); + System.setProperty("hadoop.log.dir", "target/test/logs"); + final int dataNodes = 1; + final int taskTrackers = 1; + + // Create the dir that holds hadoop-site.xml file + // Delete if hadoop-site.xml exists already + CONF_DIR.mkdirs(); + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } // Builds and starts the mini dfs and mapreduce clusters - if(System.getProperty("hadoop.log.dir") == null) { - System.setProperty("hadoop.log.dir", "target/tmp/logs/"); - } m_dfs = new MiniDFSCluster(config, dataNodes, true, null); - m_fileSys = m_dfs.getFileSystem(); m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1); - // Create the configuration hadoop-site.xml file - File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/"); - conf_dir.mkdirs(); - File conf_file = new File(conf_dir, "hadoop-site.xml"); - // Write the necessary config info to hadoop-site.xml m_conf = m_mr.createJobConf(); - m_conf.setInt("mapred.submit.replication", 1); + m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2); + m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2); + m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2); m_conf.set("dfs.datanode.address", "0.0.0.0:0"); m_conf.set("dfs.datanode.http.address", "0.0.0.0:0"); - m_conf.writeXml(new FileOutputStream(conf_file)); + m_conf.set("pig.jobcontrol.sleep", "100"); + m_conf.writeXml(new FileOutputStream(CONF_FILE)); // Set the system properties needed by Pig - System.setProperty("cluster", m_conf.get("mapred.job.tracker")); + System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER)); System.setProperty("namenode", m_conf.get("fs.default.name")); - System.setProperty("junit.hadoop.conf", conf_dir.getPath()); + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); } catch (IOException e) { throw new RuntimeException(e); } } - /** - * Returns the single instance of class MiniClusterBuilder that - * represents the resouces for a mini dfs cluster and a mini - * mapreduce cluster. - */ - public static MiniCluster buildCluster() { - if (!isSetup) { - INSTANCE.setupMiniDfsAndMrClusters(); - isSetup = true; - } - return INSTANCE; - } - - public void shutDown() { - INSTANCE.shutdownMiniDfsAndMrClusters(); - } - @Override - protected void finalize() { - shutdownMiniDfsAndMrClusters(); - } - - private void shutdownMiniDfsAndMrClusters() { - isSetup = false; - try { - if (m_fileSys != null) { - m_fileSys.close(); - } - } catch (IOException e) { - e.printStackTrace(); - } - if (m_dfs != null) { - m_dfs.shutdown(); + protected void shutdownMiniMrClusters() { + // Delete hadoop-site.xml on shutDown + if(CONF_FILE.exists()) { + CONF_FILE.delete(); } - if (m_mr != null) { - m_mr.shutdown(); - } - m_fileSys = null; - m_dfs = null; + if (m_mr != null) { m_mr.shutdown(); } m_mr = null; } - - public Properties getProperties() { - errorIfNotSetup(); - Properties properties = new Properties(); - assert m_conf != null; - Iterator> iter = m_conf.iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - properties.put(entry.getKey(), entry.getValue()); - } - return properties; - } - - public void setProperty(String name, String value) { - errorIfNotSetup(); - m_conf.set(name, value); - } - - public FileSystem getFileSystem() { - errorIfNotSetup(); - return m_fileSys; - } - - /** - * Throw RunTimeException if isSetup is false - */ - private void errorIfNotSetup() { - if (isSetup) { - return; - } - String msg = "function called on MiniCluster that has been shutdown"; - throw new RuntimeException(msg); - } - - static public void createInputFile(MiniCluster miniCluster, String fileName, - String[] inputData) - throws IOException { - FileSystem fs = miniCluster.getFileSystem(); - createInputFile(fs, fileName, inputData); - } - - static public void createInputFile(FileSystem fs, String fileName, - String[] inputData) throws IOException { - Path path = new Path(fileName); - if (fs.exists(path)) { - throw new IOException("File " + fileName + " already exists on the minicluster"); - } - FSDataOutputStream stream = fs.create(path); - PrintWriter pw = new PrintWriter(new OutputStreamWriter(stream, "UTF-8")); - for (int i = 0; i < inputData.length; i++) { - pw.println(inputData[i]); - } - pw.close(); - - } - - /** - * Helper to remove a dfs file from the minicluster DFS - * - * @param miniCluster reference to the Minicluster where the file should be deleted - * @param fileName pathname of the file to be deleted - * @throws IOException - */ - static public void deleteFile(MiniCluster miniCluster, String fileName) - throws IOException { - FileSystem fs = miniCluster.getFileSystem(); - fs.delete(new Path(fileName), true); - } } + diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniGenericCluster.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniGenericCluster.java new file mode 100644 index 0000000000..703f7ef84f --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/MiniGenericCluster.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; + +import java.io.IOException; +import java.util.Properties; + +abstract public class MiniGenericCluster { + protected MiniDFSCluster m_dfs = null; + protected FileSystem m_fileSys = null; + protected Configuration m_conf = null; + + protected static MiniGenericCluster INSTANCE = null; + protected static boolean isSetup = false; + + public static String EXECTYPE_MR = "mr"; + + /** + * Returns the single instance of class MiniGenericCluster that represents + * the resources for a mini dfs cluster and a mini mr (or tez) cluster. The + * system property "test.exec.type" is used to decide whether a mr or tez mini + * cluster will be returned. + */ + public static MiniGenericCluster buildCluster() { + return buildCluster(new Configuration()); + } + + /** + * Returns the single instance of class MiniGenericCluster that represents + * the resources for a mini dfs cluster and a mini mr (or tez) cluster. The + * system property "test.exec.type" is used to decide whether a mr or tez mini + * cluster will be returned. + */ + public static MiniGenericCluster buildCluster(Configuration configuration) { + if (INSTANCE == null) { + String execType = System.getProperty("test.exec.type"); + if (execType == null) { + // Default to MR + System.setProperty("test.exec.type", EXECTYPE_MR); + return buildCluster(configuration, EXECTYPE_MR); + } + + return buildCluster(configuration, execType); + } + return INSTANCE; + } + + public static MiniGenericCluster buildCluster(Configuration configuration, String execType) { + if (INSTANCE == null) { + if (execType.equalsIgnoreCase(EXECTYPE_MR)) { + INSTANCE = new MiniCluster(); + } + // TODO: Add support for TezMiniCluster. + else { + throw new RuntimeException("Unknown test.exec.type: " + execType); + } + } + if (!isSetup) { + INSTANCE.setupMiniDfsAndMrClusters(configuration); + isSetup = true; + } + return INSTANCE; + } + + abstract public ExecType getExecType(); + + abstract protected void setupMiniDfsAndMrClusters(Configuration configuration); + + public void shutDown(){ + INSTANCE.shutdownMiniDfsAndMrClusters(); + } + + @Override + protected void finalize() { + shutdownMiniDfsAndMrClusters(); + } + + protected void shutdownMiniDfsAndMrClusters() { + isSetup = false; + shutdownMiniDfsClusters(); + shutdownMiniMrClusters(); + m_conf = null; + } + + protected void shutdownMiniDfsClusters() { + try { + if (m_fileSys != null) { m_fileSys.close(); } + } catch (IOException e) { + e.printStackTrace(); + } + if (m_dfs != null) { m_dfs.shutdown(); } + m_fileSys = null; + m_dfs = null; + } + + abstract protected void shutdownMiniMrClusters(); + + public Properties getProperties() { + errorIfNotSetup(); + return ConfigurationUtil.toProperties(m_conf); + } + + public Configuration getConfiguration() { + return new Configuration(m_conf); + } + + public void setProperty(String name, String value) { + errorIfNotSetup(); + m_conf.set(name, value); + } + + public FileSystem getFileSystem() { + errorIfNotSetup(); + return m_fileSys; + } + + /** + * Throw RunTimeException if isSetup is false + */ + private void errorIfNotSetup(){ + if(isSetup) + return; + String msg = "function called on MiniCluster that has been shutdown"; + throw new RuntimeException(msg); + } +} diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java index ff56234cc1..c04bbaff26 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/data/HCatDataCheckUtil.java @@ -55,15 +55,6 @@ public static Driver instantiateDriver(MiniCluster cluster) { return driver; } - public static void generateDataFile(MiniCluster cluster, String fileName) throws IOException { - MiniCluster.deleteFile(cluster, fileName); - String[] input = new String[50]; - for (int i = 0; i < 50; i++) { - input[i] = (i % 5) + "\t" + i + "\t" + "_S" + i + "S_"; - } - MiniCluster.createInputFile(cluster, fileName, input); - } - public static void createTable(Driver driver, String tableName, String createTableArgs) throws CommandNeedRetryException, IOException { String createTable = "create table " + tableName + createTableArgs; diff --git a/hcatalog/hcatalog-pig-adapter/pom.xml b/hcatalog/hcatalog-pig-adapter/pom.xml index c50a4d5b09..f8a0ab0dc0 100644 --- a/hcatalog/hcatalog-pig-adapter/pom.xml +++ b/hcatalog/hcatalog-pig-adapter/pom.xml @@ -110,6 +110,17 @@ tests test + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + test + + + com.sun.jersey + jersey-servlet + test + org.apache.hadoop hadoop-mapreduce-client-common diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java index 2e756b47d7..46d8074b12 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java @@ -462,11 +462,15 @@ private static Tuple transformToTuple(List objList, HCatSchema hs) throws Exc if (objList == null) { return null; } - Tuple t = tupFac.newTuple(objList.size()); List subFields = hs.getFields(); - for (int i = 0; i < subFields.size(); i++) { + Tuple t = tupFac.newTuple(subFields.size()); + int i = 0; + for (; i < objList.size(); ++i) { t.set(i, extractPigObject(objList.get(i), subFields.get(i))); } + for (; i < subFields.size(); ++i) { + t.set(i, null); + } return t; } diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStructColumnAddition.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStructColumnAddition.java new file mode 100644 index 0000000000..faf6f0aeb3 --- /dev/null +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStructColumnAddition.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.pig; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.HcatTestUtils; +import org.apache.hive.hcatalog.MiniGenericCluster; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +public class TestHCatLoaderStructColumnAddition { + private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderStructColumnAddition.class); + private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") + + File.separator + + TestHCatLoaderStructColumnAddition.class.getCanonicalName() + + "-" + System.currentTimeMillis()); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + private static final String TEXT_DATA_FILE = TEST_DATA_DIR + "/basic.input.data"; + private static final String TABLE_NAME = "TestHCatLoaderStructColumnAddition_table"; + + private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + private static FileSystem clusterFS = cluster.getFileSystem(); + private static Driver driver; + private static Set tablesCreated = Sets.newHashSet(); + + private String tableName; + + public TestHCatLoaderStructColumnAddition() { + this.tableName = TABLE_NAME; + } + + @BeforeClass + public static void setupAllTests() throws Exception { + setUpCluster(); + setUpLocalFileSystemDirectories(); + setUpClusterFileSystemDirectories(); + setUpHiveDriver(); + createTextData(); + } + + @Before + public void setupSingleTest() throws Exception { + if (!tablesCreated.contains(tableName)) { + createInputTable(); + tablesCreated.add(tableName); + } + } + + @AfterClass + public static void tearDownAllTests() throws Exception { + for (String table : tablesCreated) { + dropTable(table); + } + tearDownCluster(); + clearUpLocalFileSystemDirectories(); + } + + private static void setUpCluster() throws Exception { + cluster = MiniGenericCluster.buildCluster(); + clusterFS = cluster.getFileSystem(); + } + + private static void tearDownCluster() throws Exception { + cluster.shutDown(); + } + + private static void setUpLocalFileSystemDirectories() { + File f = new File(TEST_DATA_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if(!(new File(TEST_DATA_DIR).mkdirs())) { + throw new RuntimeException("Could not create test-directory " + TEST_DATA_DIR + " on local filesystem."); + } + } + + private static void clearUpLocalFileSystemDirectories() { + File f = new File(TEST_DATA_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + } + + private static void setUpClusterFileSystemDirectories() throws IOException { + FileSystem clusterFS = cluster.getFileSystem(); + Path warehouseDir = new Path(TEST_WAREHOUSE_DIR); + if (clusterFS.exists(warehouseDir)) { + clusterFS.delete(warehouseDir, true); + } + clusterFS.mkdirs(warehouseDir); + } + + private static void setUpHiveDriver() throws IOException { + HiveConf hiveConf = createHiveConf(); + driver = new Driver(hiveConf); + driver.setMaxRows(1000); + SessionState.start(new CliSessionState(hiveConf)); + } + + private static HiveConf createHiveConf() { + HiveConf hiveConf = new HiveConf(cluster.getConfiguration(), TestHCatLoaderStructColumnAddition.class); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + hiveConf.set(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, ""); + return hiveConf; + } + + /** + * Create data with schema: + * number \t string \t filler_string + * @throws Exception + */ + private static void createTextData() throws Exception { + ArrayList input = Lists.newArrayListWithExpectedSize(3); + + input.add("1\u00011\u0002ONE"); + input.add("2\u00012\u0002TWO"); + input.add("3\u00013\u0002THREE"); + + HcatTestUtils.createTestDataFile(TEXT_DATA_FILE, input.toArray(new String[input.size()])); + cluster.getFileSystem().copyFromLocalFile(new Path(TEXT_DATA_FILE), new Path(TEXT_DATA_FILE)); + } + + private void createInputTable() throws IOException, CommandNeedRetryException { + createTable(tableName, "foo STRING, bar STRUCT", "dt STRING", "TEXTFILE", "('foo'='bar')"); + executeStatementOnDriver( + "LOAD DATA INPATH '" + TEXT_DATA_FILE + "' INTO TABLE " + tableName + " PARTITION (dt='1')" + ); + } + + private void createTable(String tableName, String schema, + String partitionedBy, String storageFormat, String tblProperties) + throws IOException, CommandNeedRetryException { + String createTable; + createTable = "create table " + tableName + "(" + schema + ") "; + if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) { + createTable = createTable + "partitioned by (" + partitionedBy + ") "; + } + createTable = createTable + "stored as " + storageFormat; + createTable += " TBLPROPERTIES " + tblProperties; + executeStatementOnDriver(createTable); + + String describeTable = "describe formatted " + tableName; + executeStatementOnDriver(describeTable); + } + + /** + * Execute Hive CLI statement + * @param cmd arbitrary statement to execute + */ + private void executeStatementOnDriver(String cmd) throws IOException, CommandNeedRetryException { + LOG.debug("Executing: " + cmd); + CommandProcessorResponse cpr = driver.run(cmd); + if(cpr.getResponseCode() != 0) { + throw new IOException("Failed to execute \"" + cmd + "\". " + + "Driver returned " + cpr.getResponseCode() + + " Error: " + cpr.getErrorMessage()); + } + else { + List results = Lists.newArrayList(); + if (driver.getResults(results)) { + System.out.println("Got results: "); + for (String result : results) { + System.out.println(result); + } + } + else { + System.out.println("Got no results!"); + } + } + } + + private static void dropTable(String tablename) throws IOException, CommandNeedRetryException { + driver.run("drop table if exists " + tablename); + } + + private Iterator getResultsForAlias(PigServer server, String query, String resultAlias) throws IOException { + for (String line : query.split("\n")) { + server.registerQuery(line); + } + return server.openIterator(resultAlias); + } + + private PigServer getPigServer() throws IOException { + return new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + } + + private static final String HCAT_LOADER = "org.apache.hive.hcatalog.pig.HCatLoader"; + + private void addColumnToStruct() throws IOException, CommandNeedRetryException { + executeStatementOnDriver("ALTER TABLE " + tableName + " CHANGE bar bar STRUCT"); + } + + @Test + public void testBeforeAndAfterSchemaChange() throws IOException, CommandNeedRetryException { + + String query = + "x = LOAD '" + tableName + "' USING " + HCAT_LOADER + "();\n" + + "x = FOREACH x GENERATE dt,bar ;\n" + + "x = FILTER x BY dt == '1' ;\n" + ; + + Iterator beforeSchemaChange = getResultsForAlias(getPigServer(), query, "x"); + + addColumnToStruct(); + + Iterator afterSchemaChange = getResultsForAlias(getPigServer(), query, "x"); + + while (beforeSchemaChange.hasNext() && afterSchemaChange.hasNext()) { + assertTuplesAreEquivalent((Tuple)beforeSchemaChange.next().get(1), (Tuple)afterSchemaChange.next().get(1)); + } + + } + + private static void assertTuplesAreEquivalent(Tuple beforeSchemaChange, Tuple afterSchemaChange) { + // For pre-existing struct-members, check that the values before and after schema-change match. + Iterator structMembersBeforeSchemaChange = beforeSchemaChange.getAll().iterator(); + Iterator structMembersAfterSchemaChange = afterSchemaChange.getAll().iterator(); + + while (structMembersBeforeSchemaChange.hasNext()) { + Assert.assertEquals("Pre-existing struct members should have been equal", + structMembersBeforeSchemaChange.next(), structMembersAfterSchemaChange.next()); + } + + // New columns should be null for partitions added before the schema change. + Assert.assertNull("New struct-members should have been filled in with null.", + structMembersAfterSchemaChange.next()); + } +} diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index 78e767e7fc..c805348829 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -1030,12 +1030,16 @@ public void testTableSchemaPropagation() throws Exception { assertEquals("Table after deserialization should have been identical to sourceTable.", HCatTable.NO_DIFF, sourceTable.diff(targetTable)); + EnumSet ignoreTableProperties + = EnumSet.copyOf(HCatTable.DEFAULT_COMPARISON_ATTRIBUTES); + ignoreTableProperties.remove(HCatTable.TableAttribute.TABLE_PROPERTIES); + // Create table on Target. targetMetaStore().createTable(HCatCreateTableDesc.create(targetTable).build()); // Verify that the created table is identical to sourceTable. targetTable = targetMetaStore().getTable(dbName, tableName); assertEquals("Table after deserialization should have been identical to sourceTable.", - HCatTable.NO_DIFF, sourceTable.diff(targetTable)); + HCatTable.NO_DIFF, sourceTable.diff(targetTable, ignoreTableProperties)); // Ignore differences in table-properties. // Modify sourceTable. List newColumnSchema = new ArrayList(columnSchema);