diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index e2c5b9d..8f8e3a5 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -158,6 +158,7 @@ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1) ); pushSegments(Lists.newArrayList(currentOpenSegment)); + LOG.info(String.format("Creating new partition for segment [%s], partition num [%d]", retVal.getIdentifierAsString(), retVal.getShardSpec().getPartitionNum())); currentOpenSegment = retVal; return retVal; } @@ -169,6 +170,7 @@ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { new LinearShardSpec(0) ); pushSegments(Lists.newArrayList(currentOpenSegment)); + LOG.info(String.format("Creating segment [%s]", retVal.getIdentifierAsString())); currentOpenSegment = retVal; return retVal; } @@ -187,7 +189,6 @@ private void pushSegments(List segmentsToPush) { .makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir); DruidStorageHandlerUtils .writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath); - LOG.info( String.format( "Pushed the segment [%s] and persisted the descriptor located at [%s]", @@ -217,6 +218,10 @@ public String apply( Joiner.on(", ").join(pushedSegmentIdentifierHashSet) )); } + for (SegmentIdentifier dataSegmentId : segmentsToPush) { + LOG.info(String.format("Dropping segment [%s]", dataSegmentId.toString())); + appenderator.drop(dataSegmentId).get(); + } LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size())); } catch (InterruptedException e) {