diff --git a/beeline/src/java/org/apache/hive/beeline/Rows.java b/beeline/src/java/org/apache/hive/beeline/Rows.java index 7180f34b00..3d5eaec9b9 100644 --- a/beeline/src/java/org/apache/hive/beeline/Rows.java +++ b/beeline/src/java/org/apache/hive/beeline/Rows.java @@ -166,7 +166,7 @@ public String toString(){ } else if (o instanceof byte[]) { value = convertBinaryArrayToString ? new String((byte[])o, StandardCharsets.UTF_8) : Base64.getEncoder().withoutPadding().encodeToString((byte[])o); } else { - value = rs.getString(i + 1); + value = o.toString(); } if (beeLine.getOpts().getEscapeCRLF()) { diff --git a/beeline/src/test/org/apache/hive/beeline/TestBufferedRows.java b/beeline/src/test/org/apache/hive/beeline/TestBufferedRows.java index e02c7530d6..1add3c01b3 100644 --- a/beeline/src/test/org/apache/hive/beeline/TestBufferedRows.java +++ b/beeline/src/test/org/apache/hive/beeline/TestBufferedRows.java @@ -107,15 +107,6 @@ public String answer(InvocationOnMock invocation) { return mockRow.getColumn(index); } }); - - when(mockResultSet.getString(anyInt())).thenAnswer(new Answer() { - @Override - public String answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - int index = ((Integer) args[0]).intValue(); - return mockRow.getColumn(index); - } - }); } static class MockRow { @@ -129,4 +120,4 @@ public String getColumn(int idx) { return rowData[idx - 1]; } } -} +} \ No newline at end of file diff --git a/beeline/src/test/org/apache/hive/beeline/TestIncrementalRows.java b/beeline/src/test/org/apache/hive/beeline/TestIncrementalRows.java index 62420f845d..eb4e27bb31 100644 --- a/beeline/src/test/org/apache/hive/beeline/TestIncrementalRows.java +++ b/beeline/src/test/org/apache/hive/beeline/TestIncrementalRows.java @@ -115,7 +115,6 @@ public void testIncrementalRowsWithNormalization() throws SQLException { initNrOfResultSetCalls(10); when(mockResultSet.getObject(1)).thenReturn("Hello World"); - when(mockResultSet.getString(1)).thenReturn("Hello World"); // IncrementalRows constructor should buffer the first "incrementalBufferRows" rows IncrementalRowsWithNormalization incrementalRowsWithNormalization = new IncrementalRowsWithNormalization( diff --git a/beeline/src/test/org/apache/hive/beeline/TestJSONFileOutputFormat.java b/beeline/src/test/org/apache/hive/beeline/TestJSONFileOutputFormat.java index 680510017f..7cb85ab75f 100644 --- a/beeline/src/test/org/apache/hive/beeline/TestJSONFileOutputFormat.java +++ b/beeline/src/test/org/apache/hive/beeline/TestJSONFileOutputFormat.java @@ -102,14 +102,5 @@ public String answer(final InvocationOnMock invocation) { return mockRow.getColumn(index); } }); - - when(mockResultSet.getString(anyInt())).thenAnswer(new Answer() { - @Override - public String answer(final InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - int index = ((Integer) args[0]); - return mockRow.getColumn(index); - } - }); } } diff --git a/beeline/src/test/org/apache/hive/beeline/TestJSONOutputFormat.java b/beeline/src/test/org/apache/hive/beeline/TestJSONOutputFormat.java index 1ea6cd60e0..ac216d73eb 100644 --- a/beeline/src/test/org/apache/hive/beeline/TestJSONOutputFormat.java +++ b/beeline/src/test/org/apache/hive/beeline/TestJSONOutputFormat.java @@ -104,14 +104,5 @@ public String answer(final InvocationOnMock invocation) { return mockRow.getColumn(index); } }); - - when(mockResultSet.getString(anyInt())).thenAnswer(new Answer() { - @Override - public String answer(final InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - int index = ((Integer) args[0]); - return mockRow.getColumn(index); - } - }); } } diff --git a/beeline/src/test/org/apache/hive/beeline/TestTableOutputFormat.java b/beeline/src/test/org/apache/hive/beeline/TestTableOutputFormat.java index 59e9408373..5b8407088f 100644 --- a/beeline/src/test/org/apache/hive/beeline/TestTableOutputFormat.java +++ b/beeline/src/test/org/apache/hive/beeline/TestTableOutputFormat.java @@ -105,14 +105,5 @@ public String answer(final InvocationOnMock invocation) { return mockRow.getColumn(index); } }); - - when(mockResultSet.getString(anyInt())).thenAnswer(new Answer() { - @Override - public String answer(final InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - int index = ((Integer) args[0]); - return mockRow.getColumn(index); - } - }); } } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1e6e5ca0be..04325aaf50 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4050,11 +4050,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " are produced by this query and finish earlier will be available for querying\n" + " much earlier. Since the locks are only released once the query finishes, this\n" + " does not apply if concurrency is enabled."), - HIVE_HDFS_ENCRYPTION_SHIM_CACHE_ON("hive.hdfs.encryption.shim.cache.on", true, - "Hive keeps a cache of hdfs encryption shims in SessionState. Each encryption shim in the cache stores a " - + "FileSystem object. If one of these FileSystems is closed anywhere in the system and HDFS config" - + "fs.hdfs.impl.disable.cache is false, its encryption shim in the cache will be unusable. " - + "If this is config set to false, then the encryption shim cache will be disabled."), + HIVE_INFER_BUCKET_SORT("hive.exec.infer.bucket.sort", false, "If this is set, when writing partitions, the metadata will include the bucketing/sorting\n" + "properties with which the data was written if any (this will not overwrite the metadata\n" + diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 70151eebb1..d47263db06 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -61,6 +61,7 @@ import java.util.Collections; import java.util.Map; + import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; @@ -2207,131 +2208,154 @@ public void testORCTableRegularCopyWithCopyOnTarget() throws Throwable { public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable { //Distcp copy List withClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'", - "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", - "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", - "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" - + UserGroupInformation.getCurrentUser().getUserName() + "'"); + "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) - .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") - .run("insert into t1 values (1)") - .run("insert into t1 values (11)") - .run("insert into t2 values (2)") - .run("insert into t2 values (22)") - .run("insert into t3 values (33)") - .run("insert into tpart1 partition(name='Tom') values(100)") - .run("insert into tpart1 partition(name='Jerry') values(101)") - .run("insert into tpart2 partition(name='Bob') values(200)") - .run("insert into tpart2 partition(name='Carl') values(201)") - .run("insert into text1 values ('ricky')") - .dump(primaryDbName, withClause); + .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (11)") + .run("insert into t2 values (2)") + .run("insert into t2 values (22)") + .run("insert into t3 values (33)") + .run("insert into tpart1 partition(name='Tom') values(100)") + .run("insert into tpart1 partition(name='Jerry') values(101)") + .run("insert into tpart2 partition(name='Bob') values(200)") + .run("insert into tpart2 partition(name='Carl') values(201)") + .run("insert into text1 values ('ricky')") + .dump(primaryDbName, withClause); replica.run("DROP TABLE t3"); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) - .run("select * from " + replicatedDbName + ".t1") - .verifyResults(new String[] {"1", "11"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"100", "101"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Tom", "name=Jerry"}) - .run("select a from " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"200", "201"}) - .run("show partitions " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"name=Bob", "name=Carl"}) - .run("select a from " + replicatedDbName + ".text1") - .verifyResults(new String[]{"ricky"}); + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[]{"1", "11"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky"}); WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) - .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("insert into t1 values (111)") - .run("insert into t2 values (222)") - .run("insert into t4 values (4)") - .run("insert into tpart1 partition(name='Tom') values(102)") - .run("insert into tpart1 partition(name='Jerry') values(103)") - .run("insert into tpart2 partition(name='Bob') values(202)") - .run("insert into tpart2 partition(name='Carl') values(203)") - .run("insert into tpart3 partition(name='Tom3') values(300)") - .run("insert into tpart3 partition(name='Jerry3') values(301)") - .run("insert into tpart4 partition(name='Bob4') values(400)") - .run("insert into tpart4 partition(name='Carl4') values(401)") - .run("insert into text1 values ('martin')") - .dump(primaryDbName, withClause); + .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (111)") + .run("insert into t2 values (222)") + .run("insert into t4 values (4)") + .run("insert into tpart1 partition(name='Tom') values(102)") + .run("insert into tpart1 partition(name='Jerry') values(103)") + .run("insert into tpart2 partition(name='Bob') values(202)") + .run("insert into tpart2 partition(name='Carl') values(203)") + .run("insert into tpart3 partition(name='Tom3') values(300)") + .run("insert into tpart3 partition(name='Jerry3') values(301)") + .run("insert into tpart4 partition(name='Bob4') values(400)") + .run("insert into tpart4 partition(name='Carl4') values(401)") + .run("insert into text1 values ('martin')") + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables ") - .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) - .run("select * from " + replicatedDbName + ".t1") - .verifyResults(new String[] {"1", "11", "111"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22", "222"}) - .run("select * from " + replicatedDbName + ".t4") - .verifyResults(new String[]{"4"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"100", "101", "102", "103"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Tom", "name=Jerry"}) - .run("select a from " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"200", "201", "202", "203"}) - .run("show partitions " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"name=Bob", "name=Carl"}) - .run("select a from " + replicatedDbName + ".tpart3") - .verifyResults(new String[]{"300", "301"}) - .run("show partitions " + replicatedDbName + ".tpart3") - .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) - .run("select a from " + replicatedDbName + ".tpart4") - .verifyResults(new String[]{"400", "401"}) - .run("show partitions " + replicatedDbName + ".tpart4") - .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) - .run("select a from " + replicatedDbName + ".text1") - .verifyResults(new String[]{"ricky", "martin"}); + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[]{"1", "11", "111"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222"}) + .run("select * from " + replicatedDbName + ".t4") + .verifyResults(new String[]{"4"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101", "102", "103"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201", "202", "203"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"300", "301"}) + .run("show partitions " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) + .run("select a from " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"400", "401"}) + .run("show partitions " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky", "martin"}); incrementalDump = primary.run("use " + primaryDbName) - .run("insert into t4 values (44)") - .run("insert into t1 values (1111)") - .run("DROP TABLE t1") - .run("insert into t2 values (2222)") - .run("insert into tpart1 partition(name='Tom') values(104)") - .run("insert into tpart1 partition(name='Tom_del') values(1000)") - .run("insert into tpart1 partition(name='Harry') values(10001)") - .run("insert into tpart1 partition(name='Jerry') values(105)") - .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") - .run("DROP TABLE tpart2") - .dump(primaryDbName, withClause); + .run("insert into t4 values (44)") + .run("insert into t1 values (1111)") + .run("DROP TABLE t1") + .run("insert into t2 values (2222)") + .run("insert into tpart1 partition(name='Tom') values(104)") + .run("insert into tpart1 partition(name='Tom_del') values(1000)") + .run("insert into tpart1 partition(name='Harry') values(10001)") + .run("insert into tpart1 partition(name='Jerry') values(105)") + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") + .run("DROP TABLE tpart2") + .dump(primaryDbName, withClause); replica.run("DROP TABLE t4") - .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables ") - .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22", "222", "2222"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222", "2222"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + } + + public void testTableWithPartitionsInBatch() throws Throwable { + + List withClause = new ArrayList<>(); + withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE.varname + "'='" + 1 + "'"); + + primary.run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into t2 partition(country='india') values ('bangalore')") + .run("insert into t2 partition(country='france') values ('paris')") + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] { "t2" }) + .run("select distinct(country) from t2") + .verifyResults(new String[] { "india", "france", "australia" }) + .run("select place from t2") + .verifyResults(new String[] { "bangalore", "paris", "sydney" }) + .verifyReplTargetProperty(replicatedDbName); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index 7b639ecb4c..9294fb3a63 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -75,7 +75,6 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -84,7 +83,6 @@ import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; @@ -810,18 +808,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) try { populateHeaders(mapIds, jobId, dagId, user, reduceId, response, keepAliveParam, mapOutputInfoMap); - } catch (DiskErrorException e) { // fatal error: fetcher should be aware of that - LOG.error("Shuffle error in populating headers (fatal: DiskErrorException):", e); - String errorMessage = getErrorMessage(e); - // custom message, might be noticed by fetchers - // it should reuse the current response object, as headers have been already set for it - sendFakeShuffleHeaderWithError(ctx, "DISK_ERROR_EXCEPTION: " + errorMessage, response); - return; - } catch (IOException e) { + } catch(IOException e) { ch.write(response); LOG.error("Shuffle error in populating headers :", e); String errorMessage = getErrorMessage(e); - sendError(ctx, errorMessage, INTERNAL_SERVER_ERROR); + sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR); return; } ch.write(response); @@ -1054,37 +1045,22 @@ public void operationComplete(ChannelFuture future) { return writeFuture; } - protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { + protected void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { sendError(ctx, "", status); } - protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - sendError(ctx, message, response); - } - - protected void sendError(ChannelHandlerContext ctx, String message, HttpResponse response) { - sendError(ctx, ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8), response); - } - - private void sendFakeShuffleHeaderWithError(ChannelHandlerContext ctx, String message, - HttpResponse response) throws IOException { - ShuffleHeader header = new ShuffleHeader(message, -1, -1, -1); - DataOutputBuffer out = new DataOutputBuffer(); - header.write(out); - - sendError(ctx, wrappedBuffer(out.getData(), 0, out.getLength()), response); - } - - protected void sendError(ChannelHandlerContext ctx, ChannelBuffer content, - HttpResponse response) { - response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().add(CONTENT_TYPE, "text/plain; charset=UTF-8"); // Put shuffle version into http header response.headers().add(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); response.headers().add(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); - response.setContent(content); + response.setContent( + ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); // Close the connection as soon as the error message is sent. ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java index c4b2dab439..d61c575173 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.ddl.table.partition.add; import java.io.Serializable; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,6 +106,12 @@ public void setLocation(String location) { return params; } + public void addPartParams(Map partParams) { + if (params != null) { + params.putAll(partParams); + } + } + @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getPartParamsForExplain() { return params.toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 48c5e737ba..5dabbbfec0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -58,6 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -175,7 +176,7 @@ private boolean isMetaDataOp() { * * @throws SemanticException */ - private void addConsolidatedPartitionDesc() throws Exception { + private void addConsolidatedPartitionDesc(Task ptnRootTask) throws Exception { //Load partitions equal to batch size at one go for metadata only and for external tables. int maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE); int currentPartitionCount = 0; @@ -205,7 +206,7 @@ private void addConsolidatedPartitionDesc() throws Exception { tableDesc.getTableName(), true, partitions); //don't need to add ckpt task separately. Added as part of add partition task - addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, null); + addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, ptnRootTask); if (partitions.size() > 0) { LOG.info("Added {} partitions", partitions.size()); } @@ -214,21 +215,8 @@ private void addConsolidatedPartitionDesc() throws Exception { } private TaskTracker forNewTable() throws Exception { - if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - // Place all partitions in single task to reduce load on HMS. - addConsolidatedPartitionDesc(); - return tracker; - } - - Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); - while (iterator.hasNext() && tracker.canAddMoreTasks()) { - AlterTableAddPartitionDesc currentPartitionDesc = iterator.next(); - /* - the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the - current retrieved lastReplicatedPartition - */ - addPartition(iterator.hasNext(), currentPartitionDesc, null); - } + // Place all partitions in single task to reduce load on HMS. + addConsolidatedPartitionDesc(null); return tracker; } @@ -243,7 +231,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc } /** - * returns the root task for adding a partition + * returns the root task for adding all partitions in a batch */ private Task tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task ptnRootTask) throws MetaException, HiveException { @@ -251,7 +239,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), context.hiveConf ); - //checkpointing task already added as part of add batch of partition in case for metadata only and external tables + //checkpointing task already added as part of add batch of partition if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { if (ptnRootTask == null) { ptnRootTask = addPartTask; @@ -261,66 +249,68 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc return ptnRootTask; } - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); - Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); - Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); - partSpec.setLocation(replicaWarehousePartitionLocation.toString()); - LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - Task ckptTask = ReplUtils.getTableCheckpointTask( - tableDesc, - (HashMap)partSpec.getPartSpec(), - context.dumpDirectory, - context.hiveConf - ); - - Path stagingDir = replicaWarehousePartitionLocation; - // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. - LoadFileType loadFileType; - if (event.replicationSpec().isInReplicationScope() && - context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { - loadFileType = LoadFileType.IGNORE; - } else { - loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; - stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); - } - boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - Task copyTask = ReplCopyTask.getLoadCopyTask( + //Add Copy task for all partitions + List> copyTaskList = new ArrayList<>(); + List> moveTaskList = new ArrayList<>(); + for (AlterTableAddPartitionDesc.PartitionDesc partSpec : addPartitionDesc.getPartitions()) { + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); + partSpec.setLocation(replicaWarehousePartitionLocation.toString()); + LOG.debug("adding dependent CopyWork for partition " + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + + Path stagingDir = replicaWarehousePartitionLocation; + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType; + if (event.replicationSpec().isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + } + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); + Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), - new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), + new Path(event.dataPath() + Path.SEPARATOR + Warehouse.makePartPath(partSpec.getPartSpec())), stagingDir, context.hiveConf, copyAtLoad, false - ); + ); + copyTaskList.add(copyTask); - Task movePartitionTask = null; - if (loadFileType != LoadFileType.IGNORE) { - // no need to create move task, if file is moved directly to target location. - movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); + Task movePartitionTask = null; + if (loadFileType != LoadFileType.IGNORE) { + // no need to create move task, if file is moved directly to target location. + movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); + moveTaskList.add(movePartitionTask); + } + } + Iterator> copyTaskIterator = copyTaskList.iterator(); + Iterator> moveTaskIterator = moveTaskList.iterator(); + //Add the copy tasks. once copy tasks are done, add partition metadata + while (copyTaskIterator.hasNext()) { + if (ptnRootTask == null) { + ptnRootTask = copyTaskIterator.next(); + } else { + ptnRootTask.addDependentTask(copyTaskIterator.next()); + } } - if (ptnRootTask == null) { - ptnRootTask = copyTask; + ptnRootTask = addPartTask; } else { - ptnRootTask.addDependentTask(copyTask); + ptnRootTask.addDependentTask(addPartTask); } - // Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is - // retried for bootstrap, we skip current partition update. - copyTask.addDependentTask(addPartTask); - if (movePartitionTask != null) { - addPartTask.addDependentTask(movePartitionTask); - movePartitionTask.addDependentTask(ckptTask); - } else { - addPartTask.addDependentTask(ckptTask); + //Add the move tasks after add partition metadata is done + while (moveTaskIterator.hasNext()) { + if (ptnRootTask == null) { + ptnRootTask = moveTaskIterator.next(); + } else { + ptnRootTask.addDependentTask(moveTaskIterator.next()); + } } - return ptnRootTask; - } - private String getPartitionName(Path partitionMetadataFullPath) { - //Get partition name by removing the metadata base path. - //Needed for getting the data path - return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length()); + return ptnRootTask; } /** @@ -407,25 +397,36 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep Map currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } - + Task ptnRootTask = null; while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); - Task ptnRootTask = null; + AlterTableAddPartitionDesc.PartitionDesc src = addPartitionDesc.getPartitions().get(0); + //Add check point task as part of add partition + Map partParams = new HashMap<>(); + partParams.put(REPL_CHECKPOINT_KEY, context.dumpDirectory); + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, src); + src.setLocation(replicaWarehousePartitionLocation.toString()); + src.addPartParams(partParams); + Map partSpec = src.getPartSpec(); + ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); switch (loadPtnType) { case LOAD_NEW: break; case LOAD_REPLACE: - ptnRootTask = dropPartitionTask(table, partSpec); + if (ptnRootTask == null) { + ptnRootTask = dropPartitionTask(table, partSpec); + } else { + ptnRootTask.addDependentTask(dropPartitionTask(table, partSpec)); + } break; case LOAD_SKIP: continue; default: break; } - addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask); } + addConsolidatedPartitionDesc(ptnRootTask); return tracker; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f6a9b349c3..d898e44420 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4649,8 +4649,8 @@ static private boolean needToCopy(final HiveConf conf, Path srcf, Path destf, Fi return false; } //Check if different encryption zones - HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs, conf); - HadoopShims.HdfsEncryptionShim destHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(destFs, conf); + HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs); + HadoopShims.HdfsEncryptionShim destHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(destFs); try { return srcHdfsEncryptionShim != null && destHdfsEncryptionShim != null diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index d0026a7dad..b5a752e643 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -2507,8 +2507,7 @@ private void getMetaData(QB qb, ReadEntity parentInput) private boolean isPathEncrypted(Path path) throws HiveException { try { - HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = - SessionState.get().getHdfsEncryptionShim(path.getFileSystem(conf), conf); + HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(path.getFileSystem(conf)); if (hdfsEncryptionShim != null) { if (hdfsEncryptionShim.isPathEncrypted(path)) { return true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 1c3537f02b..4bcf60272c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -549,7 +549,7 @@ public HiveTxnManager setTxnMgr(HiveTxnManager mgr) { } public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException { try { - return getHdfsEncryptionShim(FileSystem.get(sessionConf), sessionConf); + return getHdfsEncryptionShim(FileSystem.get(sessionConf)); } catch(HiveException hiveException) { throw hiveException; @@ -559,31 +559,20 @@ public HiveTxnManager setTxnMgr(HiveTxnManager mgr) { } } - public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim(FileSystem fs, HiveConf conf) throws HiveException { - - if (!"hdfs".equals(fs.getUri().getScheme())) { - LOG.warn("Unable to get hdfs encryption shim, because FileSystem URI schema is not hdfs. Returning null. " - + "FileSystem URI: " + fs.getUri()); - return null; - } - - if (conf.getBoolVar(ConfVars.HIVE_HDFS_ENCRYPTION_SHIM_CACHE_ON)) { - if (!hdfsEncryptionShims.containsKey(fs.getUri())) { - hdfsEncryptionShims.put(fs.getUri(), getHdfsEncryptionShimInternal(fs)); + public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim(FileSystem fs) throws HiveException { + if (!hdfsEncryptionShims.containsKey(fs.getUri())) { + try { + if ("hdfs".equals(fs.getUri().getScheme())) { + hdfsEncryptionShims.put(fs.getUri(), ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, sessionConf)); + } else { + LOG.info("Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem."); + } + } catch (Exception e) { + throw new HiveException(e); } - return hdfsEncryptionShims.get(fs.getUri()); - - } else { // skip the cache - return getHdfsEncryptionShimInternal(fs); } - } - private HadoopShims.HdfsEncryptionShim getHdfsEncryptionShimInternal(FileSystem fs) throws HiveException { - try { - return ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, sessionConf); - } catch (Exception e) { - throw new HiveException(e); - } + return hdfsEncryptionShims.get(fs.getUri()); } // SessionState is not available in runtime and Hive.get().getConf() is not safe to call diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index ba3513d5a4..29535a2b74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -95,7 +95,6 @@ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor throws IOException { Util.disableLlapCaching(conf); conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, true); - conf.setBoolVar(HiveConf.ConfVars.HIVE_HDFS_ENCRYPTION_SHIM_CACHE_ON, false); String user = compactionInfo.runAs; SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true); long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);