From 23d49c3f8aacb09ce1e86708a4751d50b2ce1068 Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 13 Apr 2017 21:42:19 +0530 Subject: [PATCH] [HIVE-15571] Support Insert into for HIVE --- .../hadoop/hive/druid/DruidStorageHandler.java | 73 +++++++--- .../hive/druid/DruidStorageHandlerUtils.java | 155 +++++++++++++++++++++ .../hadoop/hive/druid/io/DruidOutputFormat.java | 11 +- .../hadoop/hive/druid/io/DruidRecordWriter.java | 105 +++++++++++--- .../hadoop/hive/ql/io/TestDruidRecordWriter.java | 6 +- 5 files changed, 308 insertions(+), 42 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index d4f6865..9fe01b9 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,22 +17,6 @@ */ package org.apache.hadoop.hive.druid; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.metamx.common.RetryUtils; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; @@ -41,6 +25,7 @@ import io.druid.metadata.storage.postgresql.PostgreSQLConnector; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -56,7 +41,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -67,13 +51,33 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.metamx.common.RetryUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.Interval; import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -82,6 +86,8 @@ import java.util.Map; import java.util.concurrent.Callable; +import javax.annotation.Nullable; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -94,6 +100,8 @@ public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir"; + public static final String EXISTING_SEGMENTS_FILE_NAME = "existingSegments.json"; + private final SQLMetadataConnector connector; private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler; @@ -232,6 +240,10 @@ public void preCreateTable(Table table) throws MetaException { @Override public void rollbackCreateTable(Table table) throws MetaException { + deleteGeneratedSegments(table); + } + + private void deleteGeneratedSegments(Table table) throws MetaException { if (MetaStoreUtils.isExternalTable(table)) { return; } @@ -256,6 +268,10 @@ public void rollbackCreateTable(Table table) throws MetaException { @Override public void commitCreateTable(Table table) throws MetaException { + commitSegmentsToMetadataStore(table); + } + + private void commitSegmentsToMetadataStore(Table table) throws MetaException { if (MetaStoreUtils.isExternalTable(table)) { return; } @@ -490,17 +506,32 @@ public void commitInsertTable(Table table, boolean overwrite) throws MetaExcepti LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName())); this.commitCreateTable(table); } else { - throw new MetaException("Insert into is not supported yet"); + LOG.debug(String.format("commit insert into table [%s]", table.getTableName())); + this.commitSegmentsToMetadataStore(table); } } @Override public void preInsertTable(Table table, boolean overwrite) throws MetaException { - if (!overwrite) { - throw new MetaException("INSERT INTO statement is not allowed by druid storage handler"); + Path existingSegmentsFile = getExistingSegmentsFilePath(); + List maxSegmentsList = overwrite ? + Lists.newArrayList() : + DruidStorageHandlerUtils + .getMaxPartitionSegments(connector, druidMetadataStorageTablesConfig, + table.getParameters().get(Constants.DRUID_DATA_SOURCE)); + try { + DruidStorageHandlerUtils.writeSegmentsList(existingSegmentsFile.getFileSystem(getConf()), + existingSegmentsFile, + maxSegmentsList); + } catch (IOException e) { + throw Throwables.propagate(e); } } + private Path getExistingSegmentsFilePath() { + return new Path(getStagingWorkingDir(), EXISTING_SEGMENTS_FILE_NAME); + } + @Override public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException { // do nothing diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 8d48e14..e756622 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -17,15 +17,23 @@ */ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Interner; import com.google.common.collect.Interners; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.io.CharStreams; +import com.metamx.common.IAE; import com.metamx.common.MapUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.NoopEmitter; @@ -33,30 +41,46 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; + +import io.druid.common.utils.JodaUtils; import io.druid.jackson.DefaultObjectMapper; +import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; +import io.druid.metadata.storage.postgresql.PostgreSQLConnector; import io.druid.query.BaseQuery; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.column.ColumnConfig; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; +import io.druid.timeline.partition.PartitionChunk; +import jodd.util.collection.JoddArrayList; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.util.StringUtils; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Interval; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; @@ -92,6 +116,7 @@ import java.util.zip.ZipFile; import static org.apache.hadoop.hive.ql.exec.Utilities.jarFinderGetJar; +import static org.apache.hadoop.hive.ql.exec.Utilities.makeProperties; /** * Utils class for Druid storage handler. @@ -467,4 +492,134 @@ public static void addDependencyJars(Configuration conf, Class... classes) th conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); } + public static void writeSegmentsList( + final FileSystem outputFS, + final Path outputPath, + final List segments + ) + throws IOException { + final DataPusher pusher = (DataPusher) RetryProxy.create( + DataPusher.class, new DataPusher() { + @Override + public long push() throws IOException { + try { + if (outputFS.exists(outputPath)) { + if (!outputFS.delete(outputPath, false)) { + throw new IOException( + String.format("Failed to delete descriptor at [%s]", outputPath)); + } + } + try (final OutputStream descriptorOut = outputFS.create( + outputPath, + true, + DEFAULT_FS_BUFFER_SIZE + )) { + JSON_MAPPER.writeValue(descriptorOut, segments); + descriptorOut.flush(); + } + } catch (RuntimeException | IOException ex) { + throw ex; + } + return -1; + } + }, + RetryPolicies + .exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS) + ); + pusher.push(); + } + + public static VersionedIntervalTimeline getExistingSegmentsTimeline(FileSystem fileSystem, Path existingSegmentsPath) + throws IOException { + List existingSegments = JSON_MAPPER + .readValue(fileSystem.open(existingSegmentsPath), + new TypeReference>() { + }); + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + for(DataSegment segment : existingSegments){ + timeline.add(segment.getInterval(), segment.getVersion(), + segment.getShardSpec().createChunk(segment)); + } + return timeline; + } + + + public static List getMaxPartitionSegments(SQLMetadataConnector connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource) { + return connector.retryTransaction( + new TransactionCallback>() { + @Override + public List inTransaction(Handle handle, TransactionStatus transactionStatus) + throws Exception { + // Make up a pending segment based on existing segments + final List maxSegmentsList = Lists.newArrayList(); + + final List> existingChunks = getTimelineForDataSourceWithHandle( + metadataStorageTablesConfig, + handle, + dataSource + ).lookup(new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT)); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + return null; // throw exception + } + for(TimelineObjectHolder chunk: existingChunks) { + DataSegment max = null; + for (PartitionChunk existing : chunk.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = existing.getObject(); + } + } + maxSegmentsList.add(max); + } + return maxSegmentsList; + } + }, + 0, + NUM_RETRIES + ); + } + + private static VersionedIntervalTimeline getTimelineForDataSourceWithHandle( + final MetadataStorageTablesConfig metadataStorageTablesConfig, + final Handle handle, + final String dataSource + ) throws IOException { + Query> sql = handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE used = true AND dataSource = ?", + metadataStorageTablesConfig.getSegmentsTable() + ) + ).bind(0, dataSource); + + final ResultIterator dbSegments = sql + .map(ByteArrayMapper.FIRST) + .iterator(); + + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + + while (dbSegments.hasNext()) { + final byte[] payload = dbSegments.next(); + + DataSegment segment = DruidStorageHandlerUtils.JSON_MAPPER.readValue( + payload, + DataSegment.class + ); + + timeline.add(segment.getInterval(), segment.getVersion(), + segment.getShardSpec().createChunk(segment)); + + } + + dbSegments.close(); + + return timeline; + } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 4385dfe..6c1e53a 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -44,12 +44,16 @@ import io.druid.segment.realtime.plumber.CustomVersioningPolicy; import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; + import org.apache.calcite.adapter.druid.DruidTable; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.serde.DruidWritable; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -225,10 +229,15 @@ null ); + Path existingSegmentsPath = new Path(workingPath, DruidStorageHandler.EXISTING_SEGMENTS_FILE_NAME); + VersionedIntervalTimeline existingSegmentsTimeline = DruidStorageHandlerUtils + .getExistingSegmentsTimeline(existingSegmentsPath.getFileSystem(jc), existingSegmentsPath); + LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher, maxPartitionSize, new Path(workingPath, SEGMENTS_DESCRIPTOR_DIR_NAME), - finalOutPath.getFileSystem(jc) + finalOutPath.getFileSystem(jc), + existingSegmentsTimeline ); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 8d22df6..ebc8823 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -24,11 +24,14 @@ import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.metadata.MetadataStorageTablesConfig; +import io.druid.metadata.SQLMetadataConnector; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.loading.DataSegmentPusher; @@ -40,12 +43,19 @@ import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.plumber.Committers; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; +import io.druid.timeline.partition.PartitionChunk; + import org.apache.calcite.adapter.druid.DruidTable; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.serde.DruidWritable; import org.apache.hadoop.io.NullWritable; @@ -85,13 +95,16 @@ private final Supplier committerSupplier; + private final VersionedIntervalTimeline existingSegmentsTimeline; + public DruidRecordWriter( DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, DataSegmentPusher dataSegmentPusher, int maxPartitionSize, final Path segmentsDescriptorsDir, - final FileSystem fileSystem + final FileSystem fileSystem, + VersionedIntervalTimeline existingSegmentsTimeline ) { File basePersistDir = new File(realtimeTuningConfig.getBasePersistDirectory(), UUID.randomUUID().toString() @@ -114,6 +127,7 @@ public DruidRecordWriter( .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null"); this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null"); committerSupplier = Suppliers.ofInstance(Committers.nil()); + this.existingSegmentsTimeline = existingSegmentsTimeline; } /** @@ -136,16 +150,7 @@ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { ); SegmentIdentifier retVal; - if (currentOpenSegment == null) { - retVal = new SegmentIdentifier( - dataSchema.getDataSource(), - interval, - tuningConfig.getVersioningPolicy().getVersion(interval), - new LinearShardSpec(0) - ); - currentOpenSegment = retVal; - return retVal; - } else if (currentOpenSegment.getInterval().equals(interval)) { + if ( currentOpenSegment != null && currentOpenSegment.getInterval().equals(interval)) { retVal = currentOpenSegment; int rowCount = appenderator.getRowCount(retVal); if (rowCount < maxPartitionSize) { @@ -154,7 +159,7 @@ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { retVal = new SegmentIdentifier( dataSchema.getDataSource(), interval, - tuningConfig.getVersioningPolicy().getVersion(interval), + currentOpenSegment.getVersion(), new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1) ); pushSegments(Lists.newArrayList(currentOpenSegment)); @@ -164,13 +169,75 @@ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) { return retVal; } } else { - retVal = new SegmentIdentifier( - dataSchema.getDataSource(), - interval, - tuningConfig.getVersioningPolicy().getVersion(interval), - new LinearShardSpec(0) - ); - pushSegments(Lists.newArrayList(currentOpenSegment)); + // Lookup with incomplete partitions as we only set partitions with max ID for each interval + List> existingChunks = Lists.newArrayList(existingSegmentsTimeline + .lookupWithIncompletePartitions(interval)); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + throw new IllegalStateException( + String.format( + "Cannot allocate new segment for dataSource[%s], interval[%s], already have [%,d] chunks.", + dataSchema.getDataSource(), + interval, + existingChunks.size() + ) + ); + } + + SegmentIdentifier max = null; + + if (!existingChunks.isEmpty()) { + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } + + if(max == null){ + // No existing segments for current interval + retVal = new SegmentIdentifier( + dataSchema.getDataSource(), + interval, + tuningConfig.getVersioningPolicy().getVersion(interval), + new LinearShardSpec(0) + ); + } else if (max.getShardSpec() instanceof LinearShardSpec) { + retVal = new SegmentIdentifier( + dataSchema.getDataSource(), + max.getInterval(), + max.getVersion(), + new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) + ); + } else if (max.getShardSpec() instanceof NumberedShardSpec) { + retVal = new SegmentIdentifier( + dataSchema.getDataSource(), + max.getInterval(), + max.getVersion(), + new NumberedShardSpec( + max.getShardSpec().getPartitionNum() + 1, + ((NumberedShardSpec) max.getShardSpec()).getPartitions() + ) + ); + } else { + throw new IllegalStateException( + String.format( + "Cannot allocate new segment for dataSource[%s], interval[%s]: ShardSpec class[%s] used by [%s].", + dataSchema.getDataSource(), + interval, + max.getShardSpec().getClass(), + max.getIdentifierAsString() + ) + ); + } + + if(currentOpenSegment != null) { + pushSegments(Lists.newArrayList(currentOpenSegment)); + } LOG.info("Creating segment {}", retVal.getIdentifierAsString()); currentOpenSegment = retVal; return retVal; diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index d9e01fe..fdb696f 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; import com.metamx.common.Granularity; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; @@ -52,6 +53,8 @@ import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.segment.realtime.firehose.WindowedStorageAdapter; import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; + import org.apache.calcite.adapter.druid.DruidTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -156,7 +159,8 @@ public void testWrite() throws IOException, SegmentLoadingException { DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME ); druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, - segmentDescriptroPath, localFileSystem + segmentDescriptroPath, localFileSystem, new VersionedIntervalTimeline( + Ordering.natural()) ); List druidWritables = Lists.transform(expectedRows, -- 2.8.4 (Apple Git-73)