diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 76540b76b3..e8711dc32f 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -59,6 +59,8 @@ import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.SingleDimensionShardSpec; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -99,6 +101,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; +import org.joda.time.Interval; import org.joda.time.Period; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.slf4j.Logger; @@ -110,6 +113,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -117,6 +121,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -156,6 +162,11 @@ private Configuration conf; + private static final DataSegment MARKER_DATA_SEGMENT = + DataSegment.builder().dataSource("marker_hive_druid_dummy_data_source") + .shardSpec(NoneShardSpec.instance()).interval(new Interval("2000/2001")).version("0") + .build(); + public DruidStorageHandler() { } @@ -249,6 +260,15 @@ public void preCreateTable(Table table) throws MetaException { if (existingDataSources.contains(dataSourceName)) { throw new MetaException(String.format("Data source [%s] already existing", dataSourceName)); } + final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils + .makeSegmentDescriptorOutputPath(MARKER_DATA_SEGMENT, getSegmentDescriptorDir()); + try { + FileSystem fs = segmentDescriptorOutputPath.getFileSystem(conf); + DruidStorageHandlerUtils.writeSegmentDescriptor(fs, MARKER_DATA_SEGMENT, segmentDescriptorOutputPath); + } catch (IOException e) { + LOG.info("can not create the initial segment count file at {}", segmentDescriptorOutputPath.toString()); + throw new MetaException(e.getMessage()); + } table.getParameters().put(Constants.DRUID_DATA_SOURCE, dataSourceName); } @@ -328,14 +348,14 @@ private void updateKafkaIngestion(Table table){ null ), "UTF-8"); - Map inputParser = DruidStorageHandlerUtils.JSON_MAPPER + Map inputParser = JSON_MAPPER .convertValue(inputRowParser, Map.class); final DataSchema dataSchema = new DataSchema( dataSourceName, inputParser, dimensionsAndAggregates.rhs, granularitySpec, - DruidStorageHandlerUtils.JSON_MAPPER + JSON_MAPPER ); IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(getConf()); @@ -412,7 +432,7 @@ private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) { try { - String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec); + String task = JSON_MAPPER.writeValueAsString(spec); console.printInfo("submitting kafka Spec {}", task); LOG.info("submitting kafka Supervisor Spec {}", task); @@ -420,7 +440,7 @@ private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSuperv new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) .setContent( "application/json", - DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)), + JSON_MAPPER.writeValueAsBytes(spec)), new StatusResponseHandler( Charset.forName("UTF-8"))).get(); if (response.getStatus().equals(HttpResponseStatus.OK)) { @@ -502,7 +522,7 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { input -> input instanceof IOException, getMaxRetryCount()); if (response.getStatus().equals(HttpResponseStatus.OK)) { - return DruidStorageHandlerUtils.JSON_MAPPER + return JSON_MAPPER .readValue(response.getContent(), KafkaSupervisorSpec.class); // Druid Returns 400 Bad Request when not found. } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) { @@ -523,16 +543,23 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce // at this point we have Druid segments from reducers but we need to atomically // rename and commit to metadata final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); - final List segmentList = Lists.newArrayList(); + List tmpSegmentList = Collections.EMPTY_LIST; final Path tableDir = getSegmentDescriptorDir(); - // Read the created segments metadata from the table staging directory try { - segmentList.addAll(DruidStorageHandlerUtils.getCreatedSegments(tableDir, getConf())); + tmpSegmentList = DruidStorageHandlerUtils.getCreatedSegments(tableDir, getConf()); + //Need to make sure that we have at least + Preconditions.checkState(tmpSegmentList.size() >= 1, + "Segments count is not matching expected greater than 1 found[%s]", tmpSegmentList.size() + ); + } catch (IOException e) { LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString()); Throwables.propagate(e); cleanWorkingDir(); } + final List segmentList = tmpSegmentList.stream().filter( + dataSegment -> !dataSegment.getDataSource() + .equalsIgnoreCase(MARKER_DATA_SEGMENT.getDataSource())).collect(Collectors.toList()); // Moving Druid segments and committing to druid metadata as one transaction. final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); List publishedDataSegmentList = Lists.newArrayList(); @@ -551,7 +578,7 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce try { DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig, getConf(), - DruidStorageHandlerUtils.JSON_MAPPER + JSON_MAPPER ); publishedDataSegmentList = DruidStorageHandlerUtils.publishSegmentsAndCommit( getConnector(), diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 14242378ff..3997c9d71b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -290,14 +290,7 @@ public static String getURL(HttpClient client, URL url) throws IOException { ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); FileSystem fs = taskDir.getFileSystem(conf); FileStatus[] fss; - try { - fss = fs.listStatus(taskDir); - } catch (FileNotFoundException e) { - // This is a CREATE TABLE statement or query executed for CTAS/INSERT - // did not produce any result. We do not need to do anything, this is - // expected behavior. - return publishedSegmentsBuilder.build(); - } + fss = fs.listStatus(taskDir); for (FileStatus fileStatus : fss) { final DataSegment segment = JSON_MAPPER .readValue((InputStream) fs.open(fileStatus.getPath()), DataSegment.class); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 7d2bb91926..2ebca165e9 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -315,7 +315,7 @@ public void write(NullWritable key, DruidWritable value) throws IOException { @Override public void close(Reporter reporter) throws IOException { - this.close(true); + this.close(false); } }