diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index f691a2c..0fb58fe 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -86,6 +86,11 @@
+ joda-time
+ joda-time
+ ${joda.version}
+
+
io.druid
druid-server
${druid.version}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index a08a4e3..b9965f7 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaHookV2;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -62,7 +63,7 @@
* DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
*/
@SuppressWarnings({ "deprecation", "rawtypes" })
-public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
+public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHookV2 {
protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
@@ -341,6 +342,26 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio
}
@Override
+ public void commitInsertTable(Table table, boolean overwrite) throws MetaException {
+ if (overwrite) {
+ LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName()));
+ this.commitCreateTable(table);
+ } else {
+ throw new MetaException("Insert into is not supported yet");
+ }
+ }
+
+ @Override
+ public void preInsertTable(Table table, boolean overwrite) throws MetaException {
+ //do nothing
+ }
+
+ @Override
+ public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException {
+ // do nothing
+ }
+
+ @Override
public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties
) {
jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString());
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
index 8770749..f784c46 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.druid;
import com.google.common.collect.ImmutableMap;
@@ -43,14 +61,14 @@
private String segmentsTable;
- private String tablePath;
+ private String tableWorkingPath;
private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
.interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
@Before
public void before() throws Throwable {
- tablePath = temporaryFolder.newFolder().getAbsolutePath();
+ tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath();
segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
Map mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
@@ -112,14 +130,10 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
druidStorageHandler.preCreateTable(tableMock);
Configuration config = new Configuration();
config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
- config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tablePath);
+ config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
druidStorageHandler.setConf(config);
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
- /*
- final descriptor path is in the form tablePath/taskId_Attempt_ID/segmentDescriptorDir/segmentIdentifier.json
- UUID.randomUUID() will fake the taskId_attemptID
- */
- Path taskDirPath = new Path(tablePath, druidStorageHandler.makeStagingName());
+ Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
);
@@ -138,6 +152,31 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
}
@Test
+ public void testCommitInsertTable() throws MetaException, IOException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ Configuration config = new Configuration();
+ config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+ config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+ Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+ new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+ );
+ DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+ druidStorageHandler.commitCreateTable(tableMock);
+ Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+ }
+
+ @Test
public void testDeleteSegment() throws IOException, SegmentLoadingException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
index 75c0129..1014ab6 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.druid;
import com.google.common.base.Supplier;
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
index a4272ee..b1310b6 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.io;
import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/hbase-handler/src/test/results/positive/hbase_queries.q.out b/hbase-handler/src/test/results/positive/hbase_queries.q.out
index d5c1cfa..1eeaf80 100644
--- a/hbase-handler/src/test/results/positive/hbase_queries.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_queries.q.out
@@ -40,7 +40,8 @@ POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-0 is a root stage
- Stage-1 is a root stage
+ Stage-1
+ Stage-2 is a root stage
STAGE PLANS:
Stage: Stage-0
@@ -52,6 +53,10 @@ STAGE PLANS:
COLUMN_STATS_ACCURATE
Stage: Stage-1
+ Insert operator:
+ Insert
+
+ Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
@@ -493,8 +498,9 @@ ON (x.key = Y.key)
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-0 is a root stage
- Stage-2 is a root stage
- Stage-1 depends on stages: Stage-2
+ Stage-1
+ Stage-3 is a root stage
+ Stage-2 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-0
@@ -505,7 +511,11 @@ STAGE PLANS:
properties:
COLUMN_STATS_ACCURATE
- Stage: Stage-2
+ Stage: Stage-1
+ Insert operator:
+ Insert
+
+ Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
@@ -540,7 +550,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
- Stage: Stage-1
+ Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
diff --git a/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out b/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out
index e487a0b..4082997 100644
--- a/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out
@@ -36,14 +36,15 @@ select value,"" where a.key > 50 AND a.key < 100
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-0 is a root stage
- Stage-2 is a root stage
- Stage-8 depends on stages: Stage-2 , consists of Stage-5, Stage-4, Stage-6
- Stage-5
- Stage-1 depends on stages: Stage-5, Stage-4, Stage-7
- Stage-3 depends on stages: Stage-1
- Stage-4
+ Stage-1
+ Stage-3 is a root stage
+ Stage-9 depends on stages: Stage-3 , consists of Stage-6, Stage-5, Stage-7
Stage-6
- Stage-7 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-6, Stage-5, Stage-8
+ Stage-4 depends on stages: Stage-2
+ Stage-5
+ Stage-7
+ Stage-8 depends on stages: Stage-7
STAGE PLANS:
Stage: Stage-0
@@ -54,7 +55,11 @@ STAGE PLANS:
properties:
COLUMN_STATS_ACCURATE
- Stage: Stage-2
+ Stage: Stage-1
+ Insert operator:
+ Insert
+
+ Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
@@ -91,16 +96,16 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.hbase.HBaseSerDe
name: default.src_x2
- Stage: Stage-8
+ Stage: Stage-9
Conditional Operator
- Stage: Stage-5
+ Stage: Stage-6
Move Operator
files:
hdfs directory: true
#### A masked pattern was here ####
- Stage: Stage-1
+ Stage: Stage-2
Move Operator
tables:
replace: true
@@ -110,10 +115,10 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.src_x1
- Stage: Stage-3
+ Stage: Stage-4
Stats-Aggr Operator
- Stage: Stage-4
+ Stage: Stage-5
Map Reduce
Map Operator Tree:
TableScan
@@ -125,7 +130,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.src_x1
- Stage: Stage-6
+ Stage: Stage-7
Map Reduce
Map Operator Tree:
TableScan
@@ -137,7 +142,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.src_x1
- Stage: Stage-7
+ Stage: Stage-8
Move Operator
files:
hdfs directory: true
diff --git a/hbase-handler/src/test/results/positive/hbasestats.q.out b/hbase-handler/src/test/results/positive/hbasestats.q.out
index cf4138e..4e47bf5 100644
--- a/hbase-handler/src/test/results/positive/hbasestats.q.out
+++ b/hbase-handler/src/test/results/positive/hbasestats.q.out
@@ -63,7 +63,8 @@ POSTHOOK: query: explain INSERT OVERWRITE TABLE users SELECT 'user1', 'IA', 'USA
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-0 is a root stage
- Stage-1 is a root stage
+ Stage-1
+ Stage-2 is a root stage
STAGE PLANS:
Stage: Stage-0
@@ -75,6 +76,10 @@ STAGE PLANS:
COLUMN_STATS_ACCURATE
Stage: Stage-1
+ Insert operator:
+ Insert
+
+ Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java
new file mode 100644
index 0000000..e691c1f
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+public interface HiveMetaHookV2 extends HiveMetaHook {
+ /**
+ * Called after successfully after INSERT [OVERWRITE] statement is executed.
+ * @param table table definition
+ * @param overwrite true if it is INSERT OVERWRITE
+ *
+ * @throws MetaException
+ */
+ public void commitInsertTable(Table table, boolean overwrite) throws MetaException;
+
+ /**
+ * called before commit insert method is called
+ * @param table table definition
+ * @param overwrite true if it is INSERT OVERWRITE
+ *
+ * @throws MetaException
+ */
+ public void preInsertTable(Table table, boolean overwrite) throws MetaException;
+
+ /**
+ * called in case pre commit or commit insert fail.
+ * @param table table definition
+ * @param overwrite true if it is INSERT OVERWRITE
+ *
+ * @throws MetaException
+ */
+ public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException;
+}
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 9eec56a..83b481c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2178,6 +2178,24 @@ public void addDynamicPartitions(long txnId, String dbName, String tableName,
client.add_dynamic_partitions(adp);
}
+ @Override
+ public void insertTable(Table table, boolean overwrite) throws MetaException {
+ boolean failed = true;
+ HiveMetaHook hook = getHook(table);
+ if (hook == null || !(hook instanceof HiveMetaHookV2)) {
+ return;
+ }
+ HiveMetaHookV2 hiveMetaHook = (HiveMetaHookV2) hook;
+ try {
+ hiveMetaHook.preInsertTable(table, overwrite);
+ hiveMetaHook.commitInsertTable(table, overwrite);
+ } finally {
+ if (failed) {
+ hiveMetaHook.rollbackInsertTable(table, overwrite);
+ }
+ }
+ }
+
@InterfaceAudience.LimitedPrivate({"HCatalog"})
@Override
public NotificationEventResponse getNextNotification(long lastEventId, int maxEvents,
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 8ba7352..fb61db1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -1507,6 +1507,15 @@ void addDynamicPartitions(long txnId, String dbName, String tableName, List primaryKeys, List foreignKeys)
throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException;
- void dropConstraint(String dbName, String tableName, String constraintName) throws
+ void dropConstraint(String dbName, String tableName, String constraintName) throws
MetaException, NoSuchObjectException, TException;
void addPrimaryKey(List primaryKeyCols) throws
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index a1fb874..f74e1e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -165,6 +165,7 @@
import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.GrantDesc;
import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL;
+import org.apache.hadoop.hive.ql.plan.InsertTableDesc;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.LockTableDesc;
@@ -562,6 +563,10 @@ public int execute(DriverContext driverContext) {
if (cacheMetadataDesc != null) {
return cacheMetadata(db, cacheMetadataDesc);
}
+ InsertTableDesc insertTableDesc = work.getInsertTableDesc();
+ if (insertTableDesc != null) {
+ return insertCommitWork(db, insertTableDesc);
+ }
} catch (Throwable e) {
failed(e);
return 1;
@@ -570,6 +575,15 @@ public int execute(DriverContext driverContext) {
return 0;
}
+ private int insertCommitWork(Hive db, InsertTableDesc insertTableDesc) throws HiveException {
+ try {
+ db.getMSC().insertTable(insertTableDesc.getTable(), insertTableDesc.isOverwrite());
+ return 0;
+ } catch (MetaException e) {
+ throw new HiveException(e);
+ }
+ }
+
private int cacheMetadata(Hive db, CacheMetadataDesc desc) throws HiveException {
db.cacheFileMetadata(desc.getDbName(), desc.getTableName(),
desc.getPartName(), desc.isAllParts());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
index 7ea4754..7a4f22a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -128,8 +129,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
// Bail out, nothing to do
return null;
}
- String segmentGranularity = parseCtx.getCreateTable().getTblProps()
- .get(Constants.DRUID_SEGMENT_GRANULARITY);
+ String segmentGranularity = null;
+ final Table table = fsOp.getConf().getTable();
+ if (table != null) {
+ // case the statement is an INSERT
+ segmentGranularity = table.getParameters().get(Constants.DRUID_SEGMENT_GRANULARITY);
+ } else {
+ // case the statement is a CREATE TABLE AS
+ segmentGranularity = parseCtx.getCreateTable().getTblProps()
+ .get(Constants.DRUID_SEGMENT_GRANULARITY);
+ }
segmentGranularity = !Strings.isNullOrEmpty(segmentGranularity)
? segmentGranularity
: HiveConf.getVar(parseCtx.getConf(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index d0131b7..e8ef720 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -177,6 +177,7 @@
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.InsertTableDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
@@ -1822,7 +1823,7 @@ private void handleInsertStatementSpecPhase1(ASTNode ast, QBParseInfo qbp, Phase
public void getMaterializationMetadata(QB qb) throws SemanticException {
try {
gatherCTEReferences(qb, rootClause);
- int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD);
+ int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD);
for (CTEClause cte : Sets.newHashSet(aliasToCTEs.values())) {
if (threshold >= 0 && cte.reference >= threshold) {
cte.materialize = true;
@@ -2536,7 +2537,7 @@ void parseJoinCondPopulateAlias(QBJoinTree joinTree, ASTNode condn,
case HiveParser.TOK_CHARSETLITERAL:
case HiveParser.KW_TRUE:
case HiveParser.KW_FALSE:
- break;
+ break;
case HiveParser.TOK_FUNCTION:
// check all the arguments
@@ -6745,6 +6746,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
// This is a non-native table.
// We need to set stats as inaccurate.
setStatsForNonNativeTable(dest_tab);
+ createInsertDesc(dest_tab, !qb.getParseInfo().isInsertIntoTable(dest_tab.getTableName()));
}
WriteEntity output = null;
@@ -7009,7 +7011,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
}
input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
-
+
inputRR = opParseCtx.get(input).getRowResolver();
ArrayList vecCol = new ArrayList();
@@ -7163,6 +7165,14 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
return output;
}
+ private void createInsertDesc(Table table, boolean overwrite) {
+ Task extends Serializable>[] tasks = new Task[this.rootTasks.size()];
+ tasks = this.rootTasks.toArray(tasks);
+ InsertTableDesc insertTableDesc = new InsertTableDesc(table.getTTable(), overwrite);
+ TaskFactory
+ .getAndMakeChild(new DDLWork(getInputs(), getOutputs(), insertTableDesc), conf, tasks);
+ }
+
private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc,
Map partSpec, Operator curr, boolean isInsertInto) throws SemanticException {
String tableName = table_desc.getTableName();
@@ -10732,7 +10742,7 @@ private void walkASTMarkTABREF(ASTNode ast, Set cteAlias)
colNames.add(col.getName());
colTypes.add(col.getType());
}
-
+
basicInfos.put(new HivePrivilegeObject(table.getDbName(), table.getTableName(), colNames),
new MaskAndFilterInfo(colTypes, additionalTabInfo.toString(), alias, astNode, table.isView()));
}
@@ -10757,7 +10767,7 @@ private void walkASTMarkTABREF(ASTNode ast, Set cteAlias)
}
}
}
-
+
// We walk through the AST.
// We replace all the TOK_TABREF by adding additional masking and filter if
// the table needs to be masked or filtered.
@@ -10879,7 +10889,7 @@ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) {
// masking and filtering should be created here
// the basic idea is similar to unparseTranslator.
tableMask = new TableMask(this, conf, ctx);
-
+
// 4. continue analyzing from the child ASTNode.
Phase1Ctx ctx_1 = initPhase1Ctx();
preProcessForInsert(child, qb);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
index e069acd..c4efb3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
@@ -31,6 +31,8 @@
*/
public class DDLWork implements Serializable {
private static final long serialVersionUID = 1L;
+
+ private InsertTableDesc insertTableDesc;
private CreateIndexDesc createIndexDesc;
private AlterIndexDesc alterIndexDesc;
private DropIndexDesc dropIdxDesc;
@@ -524,6 +526,12 @@ public DDLWork(HashSet inputs, HashSet outputs,
this.cacheMetadataDesc = cacheMetadataDesc;
}
+ public DDLWork(HashSet inputs, HashSet outputs,
+ InsertTableDesc insertTableDesc) {
+ this(inputs, outputs);
+ this.insertTableDesc = insertTableDesc;
+ }
+
/**
* @return Create Database descriptor
*/
@@ -1185,4 +1193,13 @@ public ShowConfDesc getShowConfDesc() {
public void setShowConfDesc(ShowConfDesc showConfDesc) {
this.showConfDesc = showConfDesc;
}
+
+ @Explain(displayName = "Insert operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public InsertTableDesc getInsertTableDesc() {
+ return insertTableDesc;
+ }
+
+ public void setInsertTableDesc(InsertTableDesc insertTableDesc) {
+ this.insertTableDesc = insertTableDesc;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java
new file mode 100644
index 0000000..1397b8a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+
+@Explain(displayName = "Insert", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED })
+public class InsertTableDesc extends DDLDesc {
+ private final Table table;
+ private final boolean overwrite;
+
+ public InsertTableDesc(Table table, boolean overwrite) {
+ this.table = table;
+ this.overwrite = overwrite;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public boolean isOverwrite() {
+ return overwrite;
+ }
+}
diff --git a/ql/src/test/results/clientpositive/case_sensitivity.q.out b/ql/src/test/results/clientpositive/case_sensitivity.q.out
index 1952c8a..b3969cc 100644
--- a/ql/src/test/results/clientpositive/case_sensitivity.q.out
+++ b/ql/src/test/results/clientpositive/case_sensitivity.q.out
@@ -30,7 +30,6 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: src_thrift
- Pruned Column Paths: lintstring.mystring
Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (lint[0] > 0) (type: boolean)
diff --git a/ql/src/test/results/clientpositive/input_testxpath.q.out b/ql/src/test/results/clientpositive/input_testxpath.q.out
index 422a936..e07628a 100644
--- a/ql/src/test/results/clientpositive/input_testxpath.q.out
+++ b/ql/src/test/results/clientpositive/input_testxpath.q.out
@@ -30,7 +30,6 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: src_thrift
- Pruned Column Paths: lintstring.mystring
Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: lint[1] (type: int), lintstring[0].mystring (type: string), mstringstring['key_2'] (type: string)
diff --git a/ql/src/test/results/clientpositive/udf_coalesce.q.out b/ql/src/test/results/clientpositive/udf_coalesce.q.out
index dc3d9df..361d173 100644
--- a/ql/src/test/results/clientpositive/udf_coalesce.q.out
+++ b/ql/src/test/results/clientpositive/udf_coalesce.q.out
@@ -140,7 +140,6 @@ STAGE PLANS:
Processor Tree:
TableScan
alias: src_thrift
- Pruned Column Paths: lintstring.mystring
Statistics: Num rows: 11 Data size: 3070 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: COALESCE(lint[1],999) (type: int), COALESCE(lintstring[0].mystring,'999') (type: string), COALESCE(mstringstring['key_2'],'999') (type: string)