diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index c0feb8d138..7c021d7718 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.RetryUtils; import com.metamx.common.lifecycle.Lifecycle; @@ -118,6 +117,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -329,7 +330,7 @@ private void updateKafkaIngestion(Table table){ null ), "UTF-8"); - Map inputParser = DruidStorageHandlerUtils.JSON_MAPPER + Map inputParser = JSON_MAPPER .convertValue(inputRowParser, Map.class); final DataSchema dataSchema = new DataSchema( dataSourceName, @@ -414,7 +415,7 @@ private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) { try { - String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec); + String task = JSON_MAPPER.writeValueAsString(spec); console.printInfo("submitting kafka Spec {}", task); LOG.info("submitting kafka Supervisor Spec {}", task); @@ -422,7 +423,7 @@ private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSuperv new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) .setContent( "application/json", - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)), + JSON_MAPPER.writeValueAsBytes(spec)), new StatusResponseHandler( Charset.forName("UTF-8"))).get(); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -504,7 +505,7 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { - return DruidStorageHandlerUtils.JSON_MAPPER + return JSON_MAPPER .readValue(response.getContent(), KafkaSupervisorSpec.class); // Druid Returns 400 Bad Request when not found. } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) { @@ -522,38 +523,46 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException { - // at this point we have Druid segments from reducers but we need to atomically - // rename and commit to metadata + final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); - final List segmentList = Lists.newArrayList(); - final Path tableDir = getSegmentDescriptorDir(); - // Read the created segments metadata from the table staging directory + final Path segmentDescriptorDir = getSegmentDescriptorDir(); try { - segmentList.addAll(DruidStorageHandlerUtils.getCreatedSegments(tableDir, getConf())); + if (!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) { + LOG.warn( + "Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows"); + LOG.info("no Druid segments to move, cleaning working directory {}", + getStagingWorkingDir().toString()); + cleanWorkingDir(); + return; + } } catch (IOException e) { - LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString()); + LOG.error("Failed to load segments descriptor from directory {}", segmentDescriptorDir.toString()); Throwables.propagate(e); cleanWorkingDir(); } - // Moving Druid segments and committing to druid metadata as one transaction. - final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); - List publishedDataSegmentList = Lists.newArrayList(); - final String segmentDirectory = - table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null - ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) - : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); - LOG.info(String.format( - "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", - segmentList.size(), - getStagingWorkingDir(), - segmentDirectory - )); - hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); try { + // at this point we have Druid segments from reducers but we need to atomically + // rename and commit to metadata + // Moving Druid segments and committing to druid metadata as one transaction. + List segmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf()); + final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); + List publishedDataSegmentList; + final String segmentDirectory = + table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); + LOG.info(String.format( + "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", + segmentList.size(), + getStagingWorkingDir(), + segmentDirectory + + )); + hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig, getConf(), - DruidStorageHandlerUtils.JSON_MAPPER + JSON_MAPPER ); publishedDataSegmentList = DruidStorageHandlerUtils.publishSegmentsAndCommit( getConnector(), @@ -564,7 +573,7 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce getConf(), dataSegmentPusher ); - + checkLoadStatus(publishedDataSegmentList); } catch (CallbackFailedException | IOException e) { LOG.error("Failed to move segments from staging directory"); if (e instanceof CallbackFailedException) { @@ -574,7 +583,6 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce } finally { cleanWorkingDir(); } - checkLoadStatus(publishedDataSegmentList); } /** diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 1aef565cf3..808351de74 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -299,14 +299,7 @@ public static String getURL(HttpClient client, URL url) throws IOException { ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); FileSystem fs = taskDir.getFileSystem(conf); FileStatus[] fss; - try { - fss = fs.listStatus(taskDir); - } catch (FileNotFoundException e) { - // This is a CREATE TABLE statement or query executed for CTAS/INSERT - // did not produce any result. We do not need to do anything, this is - // expected behavior. - return publishedSegmentsBuilder.build(); - } + fss = fs.listStatus(taskDir); for (FileStatus fileStatus : fss) { final DataSegment segment = JSON_MAPPER .readValue((InputStream) fs.open(fileStatus.getPath()), DataSegment.class); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 7d2bb91926..2ebca165e9 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -315,7 +315,7 @@ public void write(NullWritable key, DruidWritable value) throws IOException { @Override public void close(Reporter reporter) throws IOException { - this.close(true); + this.close(false); } } diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java new file mode 100644 index 0000000000..d079e4f031 --- /dev/null +++ druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class DruidStorageHandlerUtilsTest { + + @Test public void testCreateSelectStarQuery() throws IOException { + Assert.assertTrue("this should not be null", + DruidStorageHandlerUtils.createSelectStarQuery("dummy_ds").contains("dummy_ds")); + } +} \ No newline at end of file