diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 3323cc0..e2c5b9d 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -42,6 +42,7 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; import org.apache.calcite.adapter.druid.DruidTable; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; @@ -57,9 +58,11 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; public class DruidRecordWriter implements RecordWriter, @@ -90,16 +93,19 @@ public DruidRecordWriter( final Path segmentsDescriptorsDir, final FileSystem fileSystem ) { + File basePersistDir = new File(realtimeTuningConfig.getBasePersistDirectory(), + UUID.randomUUID().toString() + ); this.tuningConfig = Preconditions - .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is null"); + .checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir), + "realtimeTuningConfig is null" + ); this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null"); + appenderator = Appenderators - .createOffline(this.dataSchema, - tuningConfig, - new FireDepartmentMetrics(), dataSegmentPusher, - DruidStorageHandlerUtils.JSON_MAPPER, - DruidStorageHandlerUtils.INDEX_IO, - DruidStorageHandlerUtils.INDEX_MERGER_V9 + .createOffline(this.dataSchema, tuningConfig, new FireDepartmentMetrics(), + dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, + DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9 ); Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0"); this.maxPartitionSize = maxPartitionSize; @@ -260,6 +266,11 @@ public void close(boolean abort) throws IOException { } catch (InterruptedException e) { Throwables.propagate(e); } finally { + try { + FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory()); + } catch (Exception e){ + LOG.error("error cleaning of base persist directory", e); + } appenderator.close(); } }