diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 76540b76b3..8f52e5d083 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.RetryUtils; import com.metamx.common.lifecycle.Lifecycle; @@ -59,6 +58,7 @@ import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -99,6 +99,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; +import org.joda.time.Interval; import org.joda.time.Period; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.slf4j.Logger; @@ -117,6 +118,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -156,6 +159,11 @@ private Configuration conf; + private static final DataSegment MARKER_DATA_SEGMENT = + DataSegment.builder().dataSource("marker_hive_druid_dummy_data_source") + .shardSpec(NoneShardSpec.instance()).interval(new Interval("2000/2001")).version("0") + .build(); + public DruidStorageHandler() { } @@ -328,14 +336,14 @@ private void updateKafkaIngestion(Table table){ null ), "UTF-8"); - Map inputParser = DruidStorageHandlerUtils.JSON_MAPPER + Map inputParser = JSON_MAPPER .convertValue(inputRowParser, Map.class); final DataSchema dataSchema = new DataSchema( dataSourceName, inputParser, dimensionsAndAggregates.rhs, granularitySpec, - DruidStorageHandlerUtils.JSON_MAPPER + JSON_MAPPER ); IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(getConf()); @@ -412,7 +420,7 @@ private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) { try { - String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec); + String task = JSON_MAPPER.writeValueAsString(spec); console.printInfo("submitting kafka Spec {}", task); LOG.info("submitting kafka Supervisor Spec {}", task); @@ -420,7 +428,7 @@ private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSuperv new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) .setContent( "application/json", - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)), + JSON_MAPPER.writeValueAsBytes(spec)), new StatusResponseHandler( Charset.forName("UTF-8"))).get(); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -502,7 +510,7 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { - return DruidStorageHandlerUtils.JSON_MAPPER + return JSON_MAPPER .readValue(response.getContent(), KafkaSupervisorSpec.class); // Druid Returns 400 Bad Request when not found. } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) { @@ -520,38 +528,46 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException { - // at this point we have Druid segments from reducers but we need to atomically - // rename and commit to metadata + final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); - final List segmentList = Lists.newArrayList(); - final Path tableDir = getSegmentDescriptorDir(); - // Read the created segments metadata from the table staging directory + final Path segmentDescriptorDir = getSegmentDescriptorDir(); try { - segmentList.addAll(DruidStorageHandlerUtils.getCreatedSegments(tableDir, getConf())); + if (!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) { + LOG.warn( + "Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows"); + LOG.info("no Druid segments to move, cleaning working directory {}", + getStagingWorkingDir().toString()); + cleanWorkingDir(); + return; + } } catch (IOException e) { - LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString()); + LOG.error("Failed to load segments descriptor from directory {}", segmentDescriptorDir.toString()); Throwables.propagate(e); cleanWorkingDir(); } - // Moving Druid segments and committing to druid metadata as one transaction. - final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); - List publishedDataSegmentList = Lists.newArrayList(); - final String segmentDirectory = - table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null - ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) - : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); - LOG.info(String.format( - "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", - segmentList.size(), - getStagingWorkingDir(), - segmentDirectory - )); - hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); try { + // at this point we have Druid segments from reducers but we need to atomically + // rename and commit to metadata + // Moving Druid segments and committing to druid metadata as one transaction. + List segmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf()); + final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); + List publishedDataSegmentList; + final String segmentDirectory = + table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); + LOG.info(String.format( + "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", + segmentList.size(), + getStagingWorkingDir(), + segmentDirectory + + )); + hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig, getConf(), - DruidStorageHandlerUtils.JSON_MAPPER + JSON_MAPPER ); publishedDataSegmentList = DruidStorageHandlerUtils.publishSegmentsAndCommit( getConnector(), @@ -562,7 +578,7 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce getConf(), dataSegmentPusher ); - + checkLoadStatus(publishedDataSegmentList); } catch (CallbackFailedException | IOException e) { LOG.error("Failed to move segments from staging directory"); if (e instanceof CallbackFailedException) { @@ -572,7 +588,6 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce } finally { cleanWorkingDir(); } - checkLoadStatus(publishedDataSegmentList); } /** diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 14242378ff..3997c9d71b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -290,14 +290,7 @@ public static String getURL(HttpClient client, URL url) throws IOException { ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); FileSystem fs = taskDir.getFileSystem(conf); FileStatus[] fss; - try { - fss = fs.listStatus(taskDir); - } catch (FileNotFoundException e) { - // This is a CREATE TABLE statement or query executed for CTAS/INSERT - // did not produce any result. We do not need to do anything, this is - // expected behavior. - return publishedSegmentsBuilder.build(); - } + fss = fs.listStatus(taskDir); for (FileStatus fileStatus : fss) { final DataSegment segment = JSON_MAPPER .readValue((InputStream) fs.open(fileStatus.getPath()), DataSegment.class); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 7d2bb91926..2ebca165e9 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -315,7 +315,7 @@ public void write(NullWritable key, DruidWritable value) throws IOException { @Override public void close(Reporter reporter) throws IOException { - this.close(true); + this.close(false); } }