diff --git accumulo-handler/src/test/results/positive/accumulo_queries.q.out accumulo-handler/src/test/results/positive/accumulo_queries.q.out index a6d2632..d7cceec 100644 --- accumulo-handler/src/test/results/positive/accumulo_queries.q.out +++ accumulo-handler/src/test/results/positive/accumulo_queries.q.out @@ -40,7 +40,8 @@ POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE accumulo_table_1 SELECT POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-0 is a root stage - Stage-1 is a root stage + Stage-1 + Stage-2 is a root stage STAGE PLANS: Stage: Stage-0 @@ -52,6 +53,10 @@ STAGE PLANS: COLUMN_STATS_ACCURATE Stage: Stage-1 + Insert operator: + Insert + + Stage: Stage-2 Map Reduce Map Operator Tree: TableScan @@ -490,8 +495,9 @@ ON (x.key = Y.key) POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-0 is a root stage - Stage-2 is a root stage - Stage-1 depends on stages: Stage-2 + Stage-1 + Stage-3 is a root stage + Stage-2 depends on stages: Stage-3 STAGE PLANS: Stage: Stage-0 @@ -502,7 +508,11 @@ STAGE PLANS: properties: COLUMN_STATS_ACCURATE - Stage: Stage-2 + Stage: Stage-1 + Insert operator: + Insert + + Stage: Stage-3 Map Reduce Map Operator Tree: TableScan @@ -537,7 +547,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe - Stage: Stage-1 + Stage: Stage-2 Map Reduce Map Operator Tree: TableScan diff --git accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out index 55e7176..7330746 100644 --- accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out +++ accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out @@ -34,14 +34,15 @@ select value,"" where a.key > 50 AND a.key < 100 POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-0 is a root stage - Stage-2 is a root stage - Stage-8 depends on stages: Stage-2 , consists of Stage-5, Stage-4, Stage-6 - Stage-5 - Stage-1 depends on stages: Stage-5, Stage-4, Stage-7 - Stage-3 depends on stages: Stage-1 - Stage-4 + Stage-1 + Stage-3 is a root stage + Stage-9 depends on stages: Stage-3 , consists of Stage-6, Stage-5, Stage-7 Stage-6 - Stage-7 depends on stages: Stage-6 + Stage-2 depends on stages: Stage-6, Stage-5, Stage-8 + Stage-4 depends on stages: Stage-2 + Stage-5 + Stage-7 + Stage-8 depends on stages: Stage-7 STAGE PLANS: Stage: Stage-0 @@ -52,7 +53,11 @@ STAGE PLANS: properties: COLUMN_STATS_ACCURATE - Stage: Stage-2 + Stage: Stage-1 + Insert operator: + Insert + + Stage: Stage-3 Map Reduce Map Operator Tree: TableScan @@ -89,16 +94,16 @@ STAGE PLANS: serde: org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe name: default.src_x2 - Stage: Stage-8 + Stage: Stage-9 Conditional Operator - Stage: Stage-5 + Stage: Stage-6 Move Operator files: hdfs directory: true #### A masked pattern was here #### - Stage: Stage-1 + Stage: Stage-2 Move Operator tables: replace: true @@ -108,10 +113,10 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src_x1 - Stage: Stage-3 + Stage: Stage-4 Stats-Aggr Operator - Stage: Stage-4 + Stage: Stage-5 Map Reduce Map Operator Tree: TableScan @@ -123,7 +128,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src_x1 - Stage: Stage-6 + Stage: Stage-7 Map Reduce Map Operator Tree: TableScan @@ -135,7 +140,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src_x1 - Stage: Stage-7 + Stage: Stage-8 Move Operator files: hdfs directory: true diff --git druid-handler/pom.xml druid-handler/pom.xml index b057fff..c1638f4 100644 --- druid-handler/pom.xml +++ druid-handler/pom.xml @@ -86,6 +86,11 @@ + joda-time + joda-time + ${joda.version} + + io.druid druid-server ${druid.version} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index a08a4e3..b9965f7 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.serde.DruidSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaHookV2; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -62,7 +63,7 @@ * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @SuppressWarnings({ "deprecation", "rawtypes" }) -public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook { +public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHookV2 { protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class); @@ -341,6 +342,26 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio } @Override + public void commitInsertTable(Table table, boolean overwrite) throws MetaException { + if (overwrite) { + LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName())); + this.commitCreateTable(table); + } else { + throw new MetaException("Insert into is not supported yet"); + } + } + + @Override + public void preInsertTable(Table table, boolean overwrite) throws MetaException { + //do nothing + } + + @Override + public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException { + // do nothing + } + + @Override public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties ) { jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString()); diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java index 8770749..f784c46 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.druid; import com.google.common.collect.ImmutableMap; @@ -43,14 +61,14 @@ private String segmentsTable; - private String tablePath; + private String tableWorkingPath; private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1") .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build(); @Before public void before() throws Throwable { - tablePath = temporaryFolder.newFolder().getAbsolutePath(); + tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath(); segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); Map mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME); Mockito.when(tableMock.getParameters()).thenReturn(mockMap); @@ -112,14 +130,10 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); - config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tablePath); + config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); druidStorageHandler.setConf(config); LocalFileSystem localFileSystem = FileSystem.getLocal(config); - /* - final descriptor path is in the form tablePath/taskId_Attempt_ID/segmentDescriptorDir/segmentIdentifier.json - UUID.randomUUID() will fake the taskId_attemptID - */ - Path taskDirPath = new Path(tablePath, druidStorageHandler.makeStagingName()); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -138,6 +152,31 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() } @Test + public void testCommitInsertTable() throws MetaException, IOException { + DruidStorageHandler druidStorageHandler = new DruidStorageHandler( + derbyConnectorRule.getConnector(), + new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), + derbyConnectorRule.metadataTablesConfigSupplier().get() + ); + druidStorageHandler.preCreateTable(tableMock); + Configuration config = new Configuration(); + config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); + config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); + druidStorageHandler.setConf(config); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitCreateTable(tableMock); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + } + + @Test public void testDeleteSegment() throws IOException, SegmentLoadingException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java index 75c0129..1014ab6 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.druid; import com.google.common.base.Supplier; diff --git druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java index a4272ee..b1310b6 100644 --- druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java +++ druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.io; import com.fasterxml.jackson.databind.ObjectMapper; diff --git hbase-handler/src/test/results/positive/hbase_queries.q.out hbase-handler/src/test/results/positive/hbase_queries.q.out index d5c1cfa..1eeaf80 100644 --- hbase-handler/src/test/results/positive/hbase_queries.q.out +++ hbase-handler/src/test/results/positive/hbase_queries.q.out @@ -40,7 +40,8 @@ POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT * POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-0 is a root stage - Stage-1 is a root stage + Stage-1 + Stage-2 is a root stage STAGE PLANS: Stage: Stage-0 @@ -52,6 +53,10 @@ STAGE PLANS: COLUMN_STATS_ACCURATE Stage: Stage-1 + Insert operator: + Insert + + Stage: Stage-2 Map Reduce Map Operator Tree: TableScan @@ -493,8 +498,9 @@ ON (x.key = Y.key) POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-0 is a root stage - Stage-2 is a root stage - Stage-1 depends on stages: Stage-2 + Stage-1 + Stage-3 is a root stage + Stage-2 depends on stages: Stage-3 STAGE PLANS: Stage: Stage-0 @@ -505,7 +511,11 @@ STAGE PLANS: properties: COLUMN_STATS_ACCURATE - Stage: Stage-2 + Stage: Stage-1 + Insert operator: + Insert + + Stage: Stage-3 Map Reduce Map Operator Tree: TableScan @@ -540,7 +550,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe - Stage: Stage-1 + Stage: Stage-2 Map Reduce Map Operator Tree: TableScan diff --git hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out index a552350..079fb0e 100644 --- hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out +++ hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out @@ -34,14 +34,15 @@ select value,"" where a.key > 50 AND a.key < 100 POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-0 is a root stage - Stage-2 is a root stage - Stage-8 depends on stages: Stage-2 , consists of Stage-5, Stage-4, Stage-6 - Stage-5 - Stage-1 depends on stages: Stage-5, Stage-4, Stage-7 - Stage-3 depends on stages: Stage-1 - Stage-4 + Stage-1 + Stage-3 is a root stage + Stage-9 depends on stages: Stage-3 , consists of Stage-6, Stage-5, Stage-7 Stage-6 - Stage-7 depends on stages: Stage-6 + Stage-2 depends on stages: Stage-6, Stage-5, Stage-8 + Stage-4 depends on stages: Stage-2 + Stage-5 + Stage-7 + Stage-8 depends on stages: Stage-7 STAGE PLANS: Stage: Stage-0 @@ -52,7 +53,11 @@ STAGE PLANS: properties: COLUMN_STATS_ACCURATE - Stage: Stage-2 + Stage: Stage-1 + Insert operator: + Insert + + Stage: Stage-3 Map Reduce Map Operator Tree: TableScan @@ -89,16 +94,16 @@ STAGE PLANS: serde: org.apache.hadoop.hive.hbase.HBaseSerDe name: default.src_x2 - Stage: Stage-8 + Stage: Stage-9 Conditional Operator - Stage: Stage-5 + Stage: Stage-6 Move Operator files: hdfs directory: true #### A masked pattern was here #### - Stage: Stage-1 + Stage: Stage-2 Move Operator tables: replace: true @@ -108,10 +113,10 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src_x1 - Stage: Stage-3 + Stage: Stage-4 Stats-Aggr Operator - Stage: Stage-4 + Stage: Stage-5 Map Reduce Map Operator Tree: TableScan @@ -123,7 +128,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src_x1 - Stage: Stage-6 + Stage: Stage-7 Map Reduce Map Operator Tree: TableScan @@ -135,7 +140,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src_x1 - Stage: Stage-7 + Stage: Stage-8 Move Operator files: hdfs directory: true diff --git hbase-handler/src/test/results/positive/hbasestats.q.out hbase-handler/src/test/results/positive/hbasestats.q.out index cf4138e..4e47bf5 100644 --- hbase-handler/src/test/results/positive/hbasestats.q.out +++ hbase-handler/src/test/results/positive/hbasestats.q.out @@ -63,7 +63,8 @@ POSTHOOK: query: explain INSERT OVERWRITE TABLE users SELECT 'user1', 'IA', 'USA POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-0 is a root stage - Stage-1 is a root stage + Stage-1 + Stage-2 is a root stage STAGE PLANS: Stage: Stage-0 @@ -75,6 +76,10 @@ STAGE PLANS: COLUMN_STATS_ACCURATE Stage: Stage-1 + Insert operator: + Insert + + Stage: Stage-2 Map Reduce Map Operator Tree: TableScan diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java new file mode 100644 index 0000000..e691c1f --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; + +public interface HiveMetaHookV2 extends HiveMetaHook { + /** + * Called after successfully after INSERT [OVERWRITE] statement is executed. + * @param table table definition + * @param overwrite true if it is INSERT OVERWRITE + * + * @throws MetaException + */ + public void commitInsertTable(Table table, boolean overwrite) throws MetaException; + + /** + * called before commit insert method is called + * @param table table definition + * @param overwrite true if it is INSERT OVERWRITE + * + * @throws MetaException + */ + public void preInsertTable(Table table, boolean overwrite) throws MetaException; + + /** + * called in case pre commit or commit insert fail. + * @param table table definition + * @param overwrite true if it is INSERT OVERWRITE + * + * @throws MetaException + */ + public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException; +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 9eec56a..83b481c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2178,6 +2178,24 @@ public void addDynamicPartitions(long txnId, String dbName, String tableName, client.add_dynamic_partitions(adp); } + @Override + public void insertTable(Table table, boolean overwrite) throws MetaException { + boolean failed = true; + HiveMetaHook hook = getHook(table); + if (hook == null || !(hook instanceof HiveMetaHookV2)) { + return; + } + HiveMetaHookV2 hiveMetaHook = (HiveMetaHookV2) hook; + try { + hiveMetaHook.preInsertTable(table, overwrite); + hiveMetaHook.commitInsertTable(table, overwrite); + } finally { + if (failed) { + hiveMetaHook.rollbackInsertTable(table, overwrite); + } + } + } + @InterfaceAudience.LimitedPrivate({"HCatalog"}) @Override public NotificationEventResponse getNextNotification(long lastEventId, int maxEvents, diff --git metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 8ba7352..fb61db1 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -1507,6 +1507,15 @@ void addDynamicPartitions(long txnId, String dbName, String tableName, List primaryKeys, List foreignKeys) throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException; - void dropConstraint(String dbName, String tableName, String constraintName) throws + void dropConstraint(String dbName, String tableName, String constraintName) throws MetaException, NoSuchObjectException, TException; void addPrimaryKey(List primaryKeyCols) throws diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index fc156c7..a930408 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -165,6 +165,7 @@ import org.apache.hadoop.hive.ql.plan.FileMergeDesc; import org.apache.hadoop.hive.ql.plan.GrantDesc; import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL; +import org.apache.hadoop.hive.ql.plan.InsertTableDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.LockTableDesc; @@ -562,6 +563,10 @@ public int execute(DriverContext driverContext) { if (cacheMetadataDesc != null) { return cacheMetadata(db, cacheMetadataDesc); } + InsertTableDesc insertTableDesc = work.getInsertTableDesc(); + if (insertTableDesc != null) { + return insertCommitWork(db, insertTableDesc); + } } catch (Throwable e) { failed(e); return 1; @@ -570,6 +575,15 @@ public int execute(DriverContext driverContext) { return 0; } + private int insertCommitWork(Hive db, InsertTableDesc insertTableDesc) throws HiveException { + try { + db.getMSC().insertTable(insertTableDesc.getTable(), insertTableDesc.isOverwrite()); + return 0; + } catch (MetaException e) { + throw new HiveException(e); + } + } + private int cacheMetadata(Hive db, CacheMetadataDesc desc) throws HiveException { db.cacheFileMetadata(desc.getDbName(), desc.getTableName(), desc.getPartName(), desc.isAllParts()); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java index 7ea4754..7a4f22a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -128,8 +129,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Bail out, nothing to do return null; } - String segmentGranularity = parseCtx.getCreateTable().getTblProps() - .get(Constants.DRUID_SEGMENT_GRANULARITY); + String segmentGranularity = null; + final Table table = fsOp.getConf().getTable(); + if (table != null) { + // case the statement is an INSERT + segmentGranularity = table.getParameters().get(Constants.DRUID_SEGMENT_GRANULARITY); + } else { + // case the statement is a CREATE TABLE AS + segmentGranularity = parseCtx.getCreateTable().getTblProps() + .get(Constants.DRUID_SEGMENT_GRANULARITY); + } segmentGranularity = !Strings.isNullOrEmpty(segmentGranularity) ? segmentGranularity : HiveConf.getVar(parseCtx.getConf(), diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f275f6a..a1fda40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -177,6 +177,7 @@ import org.apache.hadoop.hive.ql.plan.ForwardDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.InsertTableDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc; @@ -1842,7 +1843,7 @@ private void handleInsertStatementSpecPhase1(ASTNode ast, QBParseInfo qbp, Phase public void getMaterializationMetadata(QB qb) throws SemanticException { try { gatherCTEReferences(qb, rootClause); - int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD); + int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD); for (CTEClause cte : Sets.newHashSet(aliasToCTEs.values())) { if (threshold >= 0 && cte.reference >= threshold) { cte.materialize = true; @@ -2556,7 +2557,7 @@ void parseJoinCondPopulateAlias(QBJoinTree joinTree, ASTNode condn, case HiveParser.TOK_CHARSETLITERAL: case HiveParser.KW_TRUE: case HiveParser.KW_FALSE: - break; + break; case HiveParser.TOK_FUNCTION: // check all the arguments @@ -6765,6 +6766,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // This is a non-native table. // We need to set stats as inaccurate. setStatsForNonNativeTable(dest_tab); + createInsertDesc(dest_tab, !qb.getParseInfo().isInsertIntoTable(dest_tab.getTableName())); } WriteEntity output = null; @@ -7029,7 +7031,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx); - + inputRR = opParseCtx.get(input).getRowResolver(); ArrayList vecCol = new ArrayList(); @@ -7183,6 +7185,14 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) return output; } + private void createInsertDesc(Table table, boolean overwrite) { + Task[] tasks = new Task[this.rootTasks.size()]; + tasks = this.rootTasks.toArray(tasks); + InsertTableDesc insertTableDesc = new InsertTableDesc(table.getTTable(), overwrite); + TaskFactory + .getAndMakeChild(new DDLWork(getInputs(), getOutputs(), insertTableDesc), conf, tasks); + } + private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc, Map partSpec, Operator curr, boolean isInsertInto) throws SemanticException { String tableName = table_desc.getTableName(); @@ -10757,7 +10767,7 @@ private void walkASTMarkTABREF(ASTNode ast, Set cteAlias) colNames.add(col.getName()); colTypes.add(col.getType()); } - + basicInfos.put(new HivePrivilegeObject(table.getDbName(), table.getTableName(), colNames), new MaskAndFilterInfo(colTypes, additionalTabInfo.toString(), alias, astNode, table.isView())); } @@ -10782,7 +10792,7 @@ private void walkASTMarkTABREF(ASTNode ast, Set cteAlias) } } } - + // We walk through the AST. // We replace all the TOK_TABREF by adding additional masking and filter if // the table needs to be masked or filtered. @@ -10904,7 +10914,7 @@ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) { // masking and filtering should be created here // the basic idea is similar to unparseTranslator. tableMask = new TableMask(this, conf, ctx); - + // 4. continue analyzing from the child ASTNode. Phase1Ctx ctx_1 = initPhase1Ctx(); preProcessForInsert(child, qb); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java index e069acd..c4efb3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java @@ -31,6 +31,8 @@ */ public class DDLWork implements Serializable { private static final long serialVersionUID = 1L; + + private InsertTableDesc insertTableDesc; private CreateIndexDesc createIndexDesc; private AlterIndexDesc alterIndexDesc; private DropIndexDesc dropIdxDesc; @@ -524,6 +526,12 @@ public DDLWork(HashSet inputs, HashSet outputs, this.cacheMetadataDesc = cacheMetadataDesc; } + public DDLWork(HashSet inputs, HashSet outputs, + InsertTableDesc insertTableDesc) { + this(inputs, outputs); + this.insertTableDesc = insertTableDesc; + } + /** * @return Create Database descriptor */ @@ -1185,4 +1193,13 @@ public ShowConfDesc getShowConfDesc() { public void setShowConfDesc(ShowConfDesc showConfDesc) { this.showConfDesc = showConfDesc; } + + @Explain(displayName = "Insert operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public InsertTableDesc getInsertTableDesc() { + return insertTableDesc; + } + + public void setInsertTableDesc(InsertTableDesc insertTableDesc) { + this.insertTableDesc = insertTableDesc; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java new file mode 100644 index 0000000..1397b8a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.hive.metastore.api.Table; + +@Explain(displayName = "Insert", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) +public class InsertTableDesc extends DDLDesc { + private final Table table; + private final boolean overwrite; + + public InsertTableDesc(Table table, boolean overwrite) { + this.table = table; + this.overwrite = overwrite; + } + + public Table getTable() { + return table; + } + + public boolean isOverwrite() { + return overwrite; + } +}