From 655bd6dd19899260d88fa9078687330c6f9d149f Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 13 Apr 2019 01:41:20 +0530 Subject: [PATCH] [HIVE-21612] Upgrade to druid 0.14.0-incubating --- .../hive/druid/DruidStorageHandler.java | 6 ++-- .../hive/druid/DruidStorageHandlerUtils.java | 12 +++---- .../hive/druid/io/DruidRecordWriter.java | 32 +++++++++---------- .../hive/druid/json/KafkaTuningConfig.java | 2 +- pom.xml | 4 +-- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index c3bf491414..ea11ebf811 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -559,7 +559,7 @@ private void checkLoadStatus(List segments) { return new URL(String.format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s", coordinatorAddress, dataSegment.getDataSource(), - dataSegment.getIdentifier())); + dataSegment.getId().toString())); } catch (MalformedURLException e) { Throwables.propagate(e); } @@ -604,7 +604,7 @@ private void checkLoadStatus(List segments) { @VisibleForTesting void deleteSegment(DataSegment segment) throws SegmentLoadingException { final Path path = DruidStorageHandlerUtils.getPath(segment); - LOG.info("removing segment {}, located at path {}", segment.getIdentifier(), path); + LOG.info("removing segment {}, located at path {}", segment.getId().toString(), path); try { if (path.getName().endsWith(".zip")) { @@ -691,7 +691,7 @@ private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) { try { deleteSegment(dataSegment); } catch (SegmentLoadingException e) { - LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()), e); + LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getId().toString()), e); } } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 7cf9bc7d77..1fab1e006a 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -75,7 +75,7 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.DataSegmentPusher; -import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.storage.hdfs.HdfsDataSegmentPusher; import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig; @@ -464,7 +464,7 @@ static boolean disableDataSource(SQLMetadataConnector connector, existingChunks.size())); } // Find out the segment with latest version and maximum partition number - SegmentIdentifier max = null; + SegmentIdWithShardSpec max = null; final ShardSpec newShardSpec; final String newVersion; if (!existingChunks.isEmpty()) { @@ -474,7 +474,7 @@ static boolean disableDataSource(SQLMetadataConnector connector, if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() .getShardSpec() .getPartitionNum()) { - max = SegmentIdentifier.fromDataSegment(existing.getObject()); + max = SegmentIdWithShardSpec.fromDataSegment(existing.getObject()); } } } @@ -514,7 +514,7 @@ static boolean disableDataSource(SQLMetadataConnector connector, for (final DataSegment segment : finalSegmentsToPublish) { - batch.add(new ImmutableMap.Builder().put("id", segment.getIdentifier()) + batch.add(new ImmutableMap.Builder().put("id", segment.getId().toString()) .put("dataSource", segment.getDataSource()) .put("created_date", new DateTime().toString()) .put("start", segment.getInterval().getStart().toString()) @@ -525,7 +525,7 @@ static boolean disableDataSource(SQLMetadataConnector connector, .put("payload", JSON_MAPPER.writeValueAsBytes(segment)) .build()); - LOG.info("Published {}", segment.getIdentifier()); + LOG.info("Published {}", segment.getId().toString()); } batch.execute(); @@ -584,7 +584,7 @@ private static int getStreamingFetchSize(SQLMetadataConnector connector) { * @return a sanitize file name */ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, Path segmentsDescriptorDir) { - return new Path(segmentsDescriptorDir, String.format("%s.json", pushedSegment.getIdentifier().replace(":", ""))); + return new Path(segmentsDescriptorDir, String.format("%s.json", pushedSegment.getId().toString().replace(":", ""))); } public static String createScanAllQuery(String dataSourceName, List columns) throws JsonProcessingException { diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 8c3f18e616..248b59aae6 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -34,7 +34,7 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.appenderator.Appenderator; import org.apache.druid.segment.realtime.appenderator.Appenderators; -import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.SegmentNotWritableException; import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata; import org.apache.druid.segment.realtime.plumber.Committers; @@ -81,7 +81,7 @@ private final Path segmentsDescriptorDir; - private SegmentIdentifier currentOpenSegment = null; + private SegmentIdWithShardSpec currentOpenSegment = null; private final int maxPartitionSize; @@ -128,15 +128,15 @@ public DruidRecordWriter(DataSchema dataSchema, * * @return segmentIdentifier with of the truncatedTime and maybe push the current open segment. */ - private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { + private SegmentIdWithShardSpec getSegmentIdentifierAndMaybePush(long truncatedTime) { DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(truncatedTime)); final Interval interval = new Interval(truncatedDateTime, segmentGranularity.increment(truncatedDateTime)); - SegmentIdentifier retVal; + SegmentIdWithShardSpec retVal; if (currentOpenSegment == null) { currentOpenSegment = - new SegmentIdentifier(dataSchema.getDataSource(), + new SegmentIdWithShardSpec(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(0)); @@ -148,37 +148,37 @@ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { return retVal; } else { retVal = - new SegmentIdentifier(dataSchema.getDataSource(), + new SegmentIdWithShardSpec(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)); pushSegments(Lists.newArrayList(currentOpenSegment)); LOG.info("Creating new partition for segment {}, partition num {}", - retVal.getIdentifierAsString(), + retVal.toString(), retVal.getShardSpec().getPartitionNum()); currentOpenSegment = retVal; return retVal; } } else { retVal = - new SegmentIdentifier(dataSchema.getDataSource(), + new SegmentIdWithShardSpec(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(0)); pushSegments(Lists.newArrayList(currentOpenSegment)); - LOG.info("Creating segment {}", retVal.getIdentifierAsString()); + LOG.info("Creating segment {}", retVal.toString()); currentOpenSegment = retVal; return retVal; } } - private void pushSegments(List segmentsToPush) { + private void pushSegments(List segmentsToPush) { try { SegmentsAndMetadata segmentsAndMetadata = appenderator.push(segmentsToPush, committerSupplier.get(), false).get(); final Set pushedSegmentIdentifierHashSet = new HashSet<>(); for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) { - pushedSegmentIdentifierHashSet.add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString()); + pushedSegmentIdentifierHashSet.add(SegmentIdWithShardSpec.fromDataSegment(pushedSegment).toString()); final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir); @@ -191,7 +191,7 @@ private void pushSegments(List segmentsToPush) { final Set toPushSegmentsHashSet = segmentsToPush.stream() - .map(SegmentIdentifier::getIdentifierAsString) + .map(SegmentIdWithShardSpec::toString) .collect(Collectors.toCollection(HashSet::new)); if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) { @@ -199,7 +199,7 @@ private void pushSegments(List segmentsToPush) { Joiner.on(", ").join(toPushSegmentsHashSet), Joiner.on(", ").join(pushedSegmentIdentifierHashSet))); } - for (SegmentIdentifier dataSegmentId : segmentsToPush) { + for (SegmentIdWithShardSpec dataSegmentId : segmentsToPush) { LOG.info("Dropping segment {}", dataSegmentId.toString()); appenderator.drop(dataSegmentId).get(); } @@ -242,14 +242,14 @@ private void pushSegments(List segmentsToPush) { || !currentOpenSegment.getInterval().equals(interval)) { pushSegments(ImmutableList.of(currentOpenSegment)); currentOpenSegment = - new SegmentIdentifier(dataSchema.getDataSource(), + new SegmentIdWithShardSpec(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(partitionNumber)); } } else { currentOpenSegment = - new SegmentIdentifier(dataSchema.getDataSource(), + new SegmentIdWithShardSpec(dataSchema.getDataSource(), interval, tuningConfig.getVersioningPolicy().getVersion(interval), new LinearShardSpec(partitionNumber)); @@ -276,7 +276,7 @@ private void pushSegments(List segmentsToPush) { @Override public void close(boolean abort) throws IOException { try { if (!abort) { - final List segmentsToPush = Lists.newArrayList(); + final List segmentsToPush = Lists.newArrayList(); segmentsToPush.addAll(appenderator.getSegments()); pushSegments(segmentsToPush); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java index 353a6d0abe..45ac77ba35 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java @@ -140,7 +140,7 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) { return maxBytesInMemory; } - @Override @JsonProperty public int getMaxRowsPerSegment() { + @Override @JsonProperty public Integer getMaxRowsPerSegment() { return maxRowsPerSegment; } diff --git a/pom.xml b/pom.xml index 6a4b2501b3..601557523e 100644 --- a/pom.xml +++ b/pom.xml @@ -108,7 +108,7 @@ 2.4 2.4 2.4 - 3.1.0 + 3.2.0 2.21.0 2.4 2.8 @@ -147,7 +147,7 @@ 10.14.1.0 3.1.0 0.1.2 - 0.13.0-incubating + 0.14.0-incubating 1.2.0-3f79e055 19.0 2.4.11 -- 2.17.2 (Apple Git-113)