diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index c0feb8d138..7c021d7718 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.RetryUtils; import com.metamx.common.lifecycle.Lifecycle; @@ -118,6 +117,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -329,7 +330,7 @@ private void updateKafkaIngestion(Table table){ null ), "UTF-8"); - Map inputParser = DruidStorageHandlerUtils.JSON_MAPPER + Map inputParser = JSON_MAPPER .convertValue(inputRowParser, Map.class); final DataSchema dataSchema = new DataSchema( dataSourceName, @@ -414,7 +415,7 @@ private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) { try { - String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec); + String task = JSON_MAPPER.writeValueAsString(spec); console.printInfo("submitting kafka Spec {}", task); LOG.info("submitting kafka Supervisor Spec {}", task); @@ -422,7 +423,7 @@ private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSuperv new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) .setContent( "application/json", - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)), + JSON_MAPPER.writeValueAsBytes(spec)), new StatusResponseHandler( Charset.forName("UTF-8"))).get(); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -504,7 +505,7 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { - return DruidStorageHandlerUtils.JSON_MAPPER + return JSON_MAPPER .readValue(response.getContent(), KafkaSupervisorSpec.class); // Druid Returns 400 Bad Request when not found. } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) { @@ -522,38 +523,46 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException { - // at this point we have Druid segments from reducers but we need to atomically - // rename and commit to metadata + final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); - final List segmentList = Lists.newArrayList(); - final Path tableDir = getSegmentDescriptorDir(); - // Read the created segments metadata from the table staging directory + final Path segmentDescriptorDir = getSegmentDescriptorDir(); try { - segmentList.addAll(DruidStorageHandlerUtils.getCreatedSegments(tableDir, getConf())); + if (!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) { + LOG.warn( + "Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows"); + LOG.info("no Druid segments to move, cleaning working directory {}", + getStagingWorkingDir().toString()); + cleanWorkingDir(); + return; + } } catch (IOException e) { - LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString()); + LOG.error("Failed to load segments descriptor from directory {}", segmentDescriptorDir.toString()); Throwables.propagate(e); cleanWorkingDir(); } - // Moving Druid segments and committing to druid metadata as one transaction. - final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); - List publishedDataSegmentList = Lists.newArrayList(); - final String segmentDirectory = - table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null - ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) - : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); - LOG.info(String.format( - "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", - segmentList.size(), - getStagingWorkingDir(), - segmentDirectory - )); - hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); try { + // at this point we have Druid segments from reducers but we need to atomically + // rename and commit to metadata + // Moving Druid segments and committing to druid metadata as one transaction. + List segmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf()); + final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); + List publishedDataSegmentList; + final String segmentDirectory = + table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); + LOG.info(String.format( + "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", + segmentList.size(), + getStagingWorkingDir(), + segmentDirectory + + )); + hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig, getConf(), - DruidStorageHandlerUtils.JSON_MAPPER + JSON_MAPPER ); publishedDataSegmentList = DruidStorageHandlerUtils.publishSegmentsAndCommit( getConnector(), @@ -564,7 +573,7 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce getConf(), dataSegmentPusher ); - + checkLoadStatus(publishedDataSegmentList); } catch (CallbackFailedException | IOException e) { LOG.error("Failed to move segments from staging directory"); if (e instanceof CallbackFailedException) { @@ -574,7 +583,6 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce } finally { cleanWorkingDir(); } - checkLoadStatus(publishedDataSegmentList); } /** diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 1aef565cf3..808351de74 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -299,14 +299,7 @@ public static String getURL(HttpClient client, URL url) throws IOException { ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); FileSystem fs = taskDir.getFileSystem(conf); FileStatus[] fss; - try { - fss = fs.listStatus(taskDir); - } catch (FileNotFoundException e) { - // This is a CREATE TABLE statement or query executed for CTAS/INSERT - // did not produce any result. We do not need to do anything, this is - // expected behavior. - return publishedSegmentsBuilder.build(); - } + fss = fs.listStatus(taskDir); for (FileStatus fileStatus : fss) { final DataSegment segment = JSON_MAPPER .readValue((InputStream) fs.open(fileStatus.getPath()), DataSegment.class); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 7d2bb91926..2ebca165e9 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -315,7 +315,7 @@ public void write(NullWritable key, DruidWritable value) throws IOException { @Override public void close(Reporter reporter) throws IOException { - this.close(true); + this.close(false); } } diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java new file mode 100644 index 0000000000..d079e4f031 --- /dev/null +++ druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class DruidStorageHandlerUtilsTest { + + @Test public void testCreateSelectStarQuery() throws IOException { + Assert.assertTrue("this should not be null", + DruidStorageHandlerUtils.createSelectStarQuery("dummy_ds").contains("dummy_ds")); + } +} \ No newline at end of file diff --git ql/src/test/queries/clientpositive/druidmini_test_insert.q ql/src/test/queries/clientpositive/druidmini_test_insert.q index 558e246db6..454da7944b 100644 --- ql/src/test/queries/clientpositive/druidmini_test_insert.q +++ ql/src/test/queries/clientpositive/druidmini_test_insert.q @@ -51,3 +51,26 @@ SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, SELECT COUNT(*) FROM druid_alltypesorc; DROP TABLE druid_alltypesorc; + +-- Test create then insert + +create database druid_test_create_then_insert; +use druid_test_create_then_insert; + +create table test_table(`timecolumn` timestamp, `userid` string, `num_l` float); + +insert into test_table values ('2015-01-08 00:00:00', 'i1-start', 4); +insert into test_table values ('2015-01-08 23:59:59', 'i1-end', 1); + +CREATE TABLE druid_table (`__time` timestamp with local time zone, `userid` string, `num_l` float) +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "DAY"); + + +INSERT INTO TABLE druid_table +select cast(`timecolumn` as timestamp with local time zone) as `__time`, `userid`, `num_l` FROM test_table; + +select count(*) FROM druid_table; + +DROP TABLE test_table; +DROP TABLE druid_table; \ No newline at end of file diff --git ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out index 8b79f6a794..625fe7029f 100644 --- ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out @@ -148,3 +148,92 @@ POSTHOOK: query: DROP TABLE druid_alltypesorc POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@druid_alltypesorc POSTHOOK: Output: default@druid_alltypesorc +PREHOOK: query: create database druid_test_create_then_insert +PREHOOK: type: CREATEDATABASE +PREHOOK: Output: database:druid_test_create_then_insert +POSTHOOK: query: create database druid_test_create_then_insert +POSTHOOK: type: CREATEDATABASE +POSTHOOK: Output: database:druid_test_create_then_insert +PREHOOK: query: use druid_test_create_then_insert +PREHOOK: type: SWITCHDATABASE +PREHOOK: Input: database:druid_test_create_then_insert +POSTHOOK: query: use druid_test_create_then_insert +POSTHOOK: type: SWITCHDATABASE +POSTHOOK: Input: database:druid_test_create_then_insert +PREHOOK: query: create table test_table(`timecolumn` timestamp, `userid` string, `num_l` float) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:druid_test_create_then_insert +PREHOOK: Output: druid_test_create_then_insert@test_table +POSTHOOK: query: create table test_table(`timecolumn` timestamp, `userid` string, `num_l` float) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:druid_test_create_then_insert +POSTHOOK: Output: druid_test_create_then_insert@test_table +PREHOOK: query: insert into test_table values ('2015-01-08 00:00:00', 'i1-start', 4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: druid_test_create_then_insert@test_table +POSTHOOK: query: insert into test_table values ('2015-01-08 00:00:00', 'i1-start', 4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: druid_test_create_then_insert@test_table +POSTHOOK: Lineage: test_table.num_l SCRIPT [] +POSTHOOK: Lineage: test_table.timecolumn SCRIPT [] +POSTHOOK: Lineage: test_table.userid SCRIPT [] +PREHOOK: query: insert into test_table values ('2015-01-08 23:59:59', 'i1-end', 1) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: druid_test_create_then_insert@test_table +POSTHOOK: query: insert into test_table values ('2015-01-08 23:59:59', 'i1-end', 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: druid_test_create_then_insert@test_table +POSTHOOK: Lineage: test_table.num_l SCRIPT [] +POSTHOOK: Lineage: test_table.timecolumn SCRIPT [] +POSTHOOK: Lineage: test_table.userid SCRIPT [] +PREHOOK: query: CREATE TABLE druid_table (`__time` timestamp with local time zone, `userid` string, `num_l` float) +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "DAY") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:druid_test_create_then_insert +PREHOOK: Output: druid_test_create_then_insert@druid_table +POSTHOOK: query: CREATE TABLE druid_table (`__time` timestamp with local time zone, `userid` string, `num_l` float) +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "DAY") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:druid_test_create_then_insert +POSTHOOK: Output: druid_test_create_then_insert@druid_table +PREHOOK: query: INSERT INTO TABLE druid_table +select cast(`timecolumn` as timestamp with local time zone) as `__time`, `userid`, `num_l` FROM test_table +PREHOOK: type: QUERY +PREHOOK: Input: druid_test_create_then_insert@test_table +PREHOOK: Output: druid_test_create_then_insert@druid_table +POSTHOOK: query: INSERT INTO TABLE druid_table +select cast(`timecolumn` as timestamp with local time zone) as `__time`, `userid`, `num_l` FROM test_table +POSTHOOK: type: QUERY +POSTHOOK: Input: druid_test_create_then_insert@test_table +POSTHOOK: Output: druid_test_create_then_insert@druid_table +PREHOOK: query: select count(*) FROM druid_table +PREHOOK: type: QUERY +PREHOOK: Input: druid_test_create_then_insert@druid_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) FROM druid_table +POSTHOOK: type: QUERY +POSTHOOK: Input: druid_test_create_then_insert@druid_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +2 +PREHOOK: query: DROP TABLE test_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: druid_test_create_then_insert@test_table +PREHOOK: Output: druid_test_create_then_insert@test_table +POSTHOOK: query: DROP TABLE test_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: druid_test_create_then_insert@test_table +POSTHOOK: Output: druid_test_create_then_insert@test_table +PREHOOK: query: DROP TABLE druid_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: druid_test_create_then_insert@druid_table +PREHOOK: Output: druid_test_create_then_insert@druid_table +POSTHOOK: query: DROP TABLE druid_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: druid_test_create_then_insert@druid_table +POSTHOOK: Output: druid_test_create_then_insert@druid_table