From c58fe497acb99097f8e3313874e2cd629744b32d Mon Sep 17 00:00:00 2001 From: Nishant Date: Fri, 12 May 2017 05:04:04 +0530 Subject: [PATCH] [HIVE-15571] Support Insert into for druid storage handler review comments --- .../org/apache/hadoop/hive/conf/Constants.java | 2 + .../hadoop/hive/druid/DruidStorageHandler.java | 83 +++-- .../hive/druid/DruidStorageHandlerUtils.java | 358 ++++++++++++++++---- .../hadoop/hive/druid/io/DruidOutputFormat.java | 18 +- .../hadoop/hive/druid/TestDruidStorageHandler.java | 372 ++++++++++++++++----- 5 files changed, 625 insertions(+), 208 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 7695e02ab8..794b697dc0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -36,6 +36,8 @@ public static final String DRUID_QUERY_TYPE = "druid.query.type"; public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; + public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; + public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 4ed4df1e17..257048b111 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,22 +17,6 @@ */ package org.apache.hadoop.hive.druid; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.metamx.common.RetryUtils; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; @@ -40,6 +24,7 @@ import io.druid.metadata.storage.postgresql.PostgreSQLConnector; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -66,13 +51,30 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hive.common.util.ShutdownHookManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.metamx.common.RetryUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -81,6 +83,8 @@ import java.util.Map; import java.util.concurrent.Callable; +import javax.annotation.Nullable; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -92,7 +96,11 @@ protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir"; + + public static final String INTERMEDIATE_SEGMENT_DIR_NAME = "intermediateSegmentDir"; + private static final HttpClient HTTP_CLIENT; + static { final Lifecycle lifecycle = new Lifecycle(); try { @@ -101,10 +109,9 @@ LOG.error("Issues with lifecycle start", e); } HTTP_CLIENT = makeHttpClient(lifecycle); - ShutdownHookManager.addShutdownHook(()-> lifecycle.stop()); + ShutdownHookManager.addShutdownHook(() -> lifecycle.stop()); } - private final SQLMetadataConnector connector; private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; @@ -268,7 +275,6 @@ public void commitCreateTable(Table table) throws MetaException { publishSegments(table, true); } - public void publishSegments(Table table, boolean overwrite) throws MetaException { if (MetaStoreUtils.isExternalTable(table)) { return; @@ -281,14 +287,19 @@ public void publishSegments(Table table, boolean overwrite) throws MetaException .getPublishedSegments(tableDir, getConf()); LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir)); final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); - + final String segmentDirectory = + table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); DruidStorageHandlerUtils.publishSegments( connector, druidMetadataStorageTablesConfig, dataSourceName, segmentList, - DruidStorageHandlerUtils.JSON_MAPPER, - overwrite + overwrite, + segmentDirectory, + getConf() + ); final String coordinatorAddress = HiveConf .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS); @@ -393,7 +404,7 @@ public boolean apply(URL input) { @VisibleForTesting protected void deleteSegment(DataSegment segment) throws SegmentLoadingException { - final Path path = getPath(segment); + final Path path = DruidStorageHandlerUtils.getPath(segment); LOG.info(String.format("removing segment[%s], located at path[%s]", segment.getIdentifier(), path )); @@ -437,10 +448,6 @@ protected void deleteSegment(DataSegment segment) throws SegmentLoadingException } } - private static Path getPath(DataSegment dataSegment) { - return new Path(String.valueOf(dataSegment.getLoadSpec().get("path"))); - } - private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) { try { return fs.delete(path, false); @@ -497,19 +504,14 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio @Override public void commitInsertTable(Table table, boolean overwrite) throws MetaException { - if (overwrite) { - LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName())); - this.publishSegments(table, overwrite); - } else { - throw new MetaException("Insert into is not supported yet"); - } + LOG.debug(String.format("commit insert into table [%s] overwrite[%s]", table.getTableName(), + overwrite)); + this.publishSegments(table, overwrite); } @Override public void preInsertTable(Table table, boolean overwrite) throws MetaException { - if (!overwrite) { - throw new MetaException("INSERT INTO statement is not allowed by druid storage handler"); - } + } @Override @@ -522,6 +524,9 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map segments, final ObjectMapper mapper, boolean overwrite) - { - connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - if(overwrite){ - disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); - } - final PreparedBatch batch = handle.prepareBatch( - String.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - metadataStorageTablesConfig.getSegmentsTable() - ) - ); - for (final DataSegment segment : segments) { - - batch.add( - new ImmutableMap.Builder() - .put("id", segment.getIdentifier()) - .put("dataSource", segment.getDataSource()) - .put("created_date", new DateTime().toString()) - .put("start", segment.getInterval().getStart().toString()) - .put("end", segment.getInterval().getEnd().toString()) - .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .put("version", segment.getVersion()) - .put("used", true) - .put("payload", mapper.writeValueAsBytes(segment)) - .build() - ); - - LOG.info("Published %s", segment.getIdentifier()); + final MetadataStorageTablesConfig metadataStorageTablesConfig, + final String dataSource, + final List segments, + boolean overwrite, + String segmentDirectory, + Configuration conf) { + try { + connector.getDBI().inTransaction( + new TransactionCallback() { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) + throws Exception { + final List finalSegmentsToPublish = Lists.newArrayList(); + VersionedIntervalTimeline timeline; + if (overwrite) { + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); + // When overwriting start with empty timeline, as we are overwriting segments with new versions + timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + } else { + // Append Mode - build a timeline of existing segments in metadata storage. + Interval indexedInterval = JodaUtils + .umbrellaInterval(Iterables.transform(segments, + new Function() { + @Override + public Interval apply(@Nullable DataSegment input) { + return input.getInterval(); + } + })); + timeline = getTimelineForIntervalWithHandle( + handle, dataSource, indexedInterval, metadataStorageTablesConfig); + } + for (DataSegment segment : segments) { + List> existingChunks = timeline + .lookup(segment.getInterval()); + if (existingChunks.size() > 1) { + // Not possible to expand since we have more than one chunk with a single segment. + // This is the case when user wants to append a segment with coarser granularity. + // e.g If metadata storage already has segments for with granularity HOUR and segments to append have DAY granularity. + // Druid shard specs does not support multiple partitions for same interval with different granularity. + throw new IllegalStateException( + String.format( + "Cannot allocate new segment for dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append new segment.", + dataSource, + segment.getInterval(), + existingChunks.size() + ) + ); + } + // Find out the segment with latest version and maximum partition number + SegmentIdentifier max = null; + final ShardSpec newShardSpec; + final String newVersion; + if (!existingChunks.isEmpty()) { + // Some existing chunk, Find max + TimelineObjectHolder existingHolder = Iterables + .getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || + max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } - } - batch.execute(); + if (max == null) { + // No existing shard present in the database, use the current version. + newShardSpec = segment.getShardSpec(); + newVersion = segment.getVersion(); + } else { + // use version of existing max segment to generate new shard spec + newShardSpec = getNextPartitionShardSpec(max.getShardSpec()); + newVersion = max.getVersion(); + } - return null; - } - } - ); + DataSegment publishedSegment = publishSegmentWithShardSpec(segment, + newShardSpec, newVersion, + segmentDirectory, getPath(segment).getFileSystem(conf)); + finalSegmentsToPublish.add(publishedSegment); + timeline.add(publishedSegment.getInterval(), publishedSegment.getVersion(), + publishedSegment.getShardSpec().createChunk(publishedSegment)); + + } + + // Publish new segments to metadata storage + final PreparedBatch batch = handle.prepareBatch( + String.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + metadataStorageTablesConfig.getSegmentsTable() + ) + + ); + + for (final DataSegment segment : finalSegmentsToPublish) { + + batch.add( + new ImmutableMap.Builder() + .put("id", segment.getIdentifier()) + .put("dataSource", segment.getDataSource()) + .put("created_date", new DateTime().toString()) + .put("start", segment.getInterval().getStart().toString()) + .put("end", segment.getInterval().getEnd().toString()) + .put("partitioned", + (segment.getShardSpec() instanceof NoneShardSpec) ? + false : + true) + .put("version", segment.getVersion()) + .put("used", true) + .put("payload", JSON_MAPPER.writeValueAsBytes(segment)) + .build() + ); + + LOG.info("Published {}", segment.getIdentifier()); + + } + batch.execute(); + + return null; + } + } + ); + } catch (CallbackFailedException e) { + LOG.error("Exception while publishing segments", e.getCause()); + throw Throwables.propagate(e.getCause()); + } } - public static void disableDataSourceWithHandle(Handle handle, MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource){ + public static void disableDataSourceWithHandle(Handle handle, + MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource) { handle.createStatement( - String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", - metadataStorageTablesConfig.getSegmentsTable() - ) + String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", + metadataStorageTablesConfig.getSegmentsTable() + ) ) - .bind("dataSource", dataSource) - .execute(); + .bind("dataSource", dataSource) + .execute(); } /** @@ -537,4 +638,113 @@ public static void addDependencyJars(Configuration conf, Class... classes) th conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); } + private static VersionedIntervalTimeline getTimelineForIntervalWithHandle( + final Handle handle, + final String dataSource, + final Interval interval, + final MetadataStorageTablesConfig dbTables + ) throws IOException { + Query> sql = handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE used = true AND dataSource = ? AND start <= ? AND \"end\" >= ?", + dbTables.getSegmentsTable() + ) + ).bind(0, dataSource) + .bind(1, interval.getEnd().toString()) + .bind(2, interval.getStart().toString()); + + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + final ResultIterator dbSegments = sql + .map(ByteArrayMapper.FIRST) + .iterator(); + try { + while (dbSegments.hasNext()) { + final byte[] payload = dbSegments.next(); + DataSegment segment = JSON_MAPPER.readValue( + payload, + DataSegment.class + ); + timeline.add(segment.getInterval(), segment.getVersion(), + segment.getShardSpec().createChunk(segment)); + } + } finally { + dbSegments.close(); + } + return timeline; + } + + public static DataSegmentPusher createSegmentPusherForDirectory(String segmentDirectory, + Configuration configuration) throws IOException { + final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); + hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory); + return new HdfsDataSegmentPusher( + hdfsDataSegmentPusherConfig, configuration, JSON_MAPPER); + } + + public static DataSegment publishSegmentWithShardSpec(DataSegment segment, ShardSpec shardSpec, + String version, String segmentDirectory, FileSystem fs) + throws IOException { + boolean retry = true; + DataSegment.Builder dataSegmentBuilder = new DataSegment.Builder(segment).version(version); + Path finalPath = null; + while (retry) { + retry = false; + dataSegmentBuilder.shardSpec(shardSpec); + final Path intermediatePath = getPath(segment); + finalPath = finalPathForSegment(segmentDirectory, dataSegmentBuilder.build()); + + // Create parent if it does not exist, recreation is not an error + fs.mkdirs(finalPath.getParent()); + + if (!fs.rename(intermediatePath, finalPath)) { + if (fs.exists(finalPath)) { + // Someone else is also trying to append + shardSpec = getNextPartitionShardSpec(shardSpec); + retry = true; + } else { + throw new IOException(String.format( + "Failed to rename intermediate segment[%s] to final segment[%s] is not present.", + intermediatePath, + finalPath + )); + } + } + } + DataSegment dataSegment = dataSegmentBuilder + .loadSpec(ImmutableMap.of("type", "hdfs", "path", finalPath.toString())) + .build(); + + writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), "descriptor.json")); + + return dataSegment; + } + + public static Path finalPathForSegment(String segmentDirectory, DataSegment segment) { + return new Path( + String.format("%s/%s/index.zip", segmentDirectory, + DataSegmentPusherUtil.getHdfsStorageDir(segment))); + } + + private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) { + if (shardSpec instanceof LinearShardSpec) { + return new LinearShardSpec(shardSpec.getPartitionNum() + 1); + } else if (shardSpec instanceof NumberedShardSpec) { + return new NumberedShardSpec(shardSpec.getPartitionNum(), + ((NumberedShardSpec) shardSpec).getPartitions()); + } else { + // Druid only support appending more partitions to Linear and Numbered ShardSpecs. + throw new IllegalStateException( + String.format( + "Cannot expand shard spec [%s]", + shardSpec + ) + ); + } + } + + public static Path getPath(DataSegment dataSegment) { + return new Path(String.valueOf(dataSegment.getLoadSpec().get("path"))); + } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 31db86ae42..5e1deac185 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -39,10 +39,8 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; -import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.plumber.CustomVersioningPolicy; -import io.druid.storage.hdfs.HdfsDataSegmentPusher; -import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; + import org.apache.calcite.adapter.druid.DruidTable; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -63,6 +61,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.Progressable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,15 +93,7 @@ tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) : HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE); - final String segmentDirectory = - tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY) != null - ? tableProperties.getProperty(Constants.DRUID_SEGMENT_DIRECTORY) - : HiveConf.getVar(jc, HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); - - final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); - hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory); - final DataSegmentPusher hdfsDataSegmentPusher = new HdfsDataSegmentPusher( - hdfsDataSegmentPusherConfig, jc, DruidStorageHandlerUtils.JSON_MAPPER); + final String segmentDirectory = jc.get(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); final GranularitySpec granularitySpec = new UniformGranularitySpec( Granularity.fromString(segmentGranularity), @@ -225,7 +216,8 @@ ); LOG.debug(String.format("running with Data schema [%s] ", dataSchema)); - return new DruidRecordWriter(dataSchema, realtimeTuningConfig, hdfsDataSegmentPusher, + return new DruidRecordWriter(dataSchema, realtimeTuningConfig, + DruidStorageHandlerUtils.createSegmentPusherForDirectory(segmentDirectory, jc), maxPartitionSize, new Path(workingPath, SEGMENTS_DESCRIPTOR_DIR_NAME), finalOutPath.getFileSystem(jc) ); diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index dca558e8b3..3ba3196428 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -18,15 +18,16 @@ package org.apache.hadoop.hive.druid; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import io.druid.indexer.JobHelper; import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.ShardSpec; + +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -36,6 +37,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -48,6 +55,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.sql.ResultSet; @@ -71,8 +79,22 @@ private String tableWorkingPath; - private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1") - .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build(); + private Configuration config; + + private DruidStorageHandler druidStorageHandler; + + private DataSegment createSegment(String location) throws IOException { + return createSegment(location, new Interval(100, 170), "v1", new LinearShardSpec(0)); + } + + private DataSegment createSegment(String location, Interval interval, String version, + ShardSpec shardSpec) throws IOException { + FileUtils.writeStringToFile(new File(location), "dummySegmentData"); + DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version(version) + .interval(interval).shardSpec(shardSpec) + .loadSpec(ImmutableMap.of("path", location)).build(); + return dataSegment; + } @Before public void before() throws Throwable { @@ -85,16 +107,23 @@ public void before() throws Throwable { Mockito.when(storageDes.getBucketColsSize()).thenReturn(0); Mockito.when(tableMock.getSd()).thenReturn(storageDes); Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME); + config = new Configuration(); + config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); + config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); + config.set(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY), + new Path(tableWorkingPath, "finalSegmentDir").toString()); + druidStorageHandler = new DruidStorageHandler( + derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + ); + druidStorageHandler.setConf(config); + } Table tableMock = Mockito.mock(Table.class); @Test public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - ); try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) { Assert.assertFalse(derbyConnectorRule.getConnector() @@ -111,34 +140,28 @@ public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { } @Test(expected = MetaException.class) - public void testPreCreateTableWhenDataSourceExists() throws MetaException { + public void testPreCreateTableWhenDataSourceExists() throws MetaException, IOException { derbyConnectorRule.getConnector().createSegmentTable(); SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler( derbyConnectorRule.getConnector()); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "intermediatePath").toString()); + sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment), DruidStorageHandlerUtils.JSON_MAPPER ); - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - ); + druidStorageHandler.preCreateTable(tableMock); } @Test public void testCommitCreateTablePlusCommitDropTableWithoutPurge() throws MetaException, IOException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - ); druidStorageHandler.preCreateTable(tableMock); - Configuration config = new Configuration(); - config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); - config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); - druidStorageHandler.setConf(config); LocalFileSystem localFileSystem = FileSystem.getLocal(config); Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString()); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -158,17 +181,10 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() @Test public void testCommitInsertTable() throws MetaException, IOException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - ); druidStorageHandler.preCreateTable(tableMock); - Configuration config = new Configuration(); - config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); - config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); - druidStorageHandler.setConf(config); LocalFileSystem localFileSystem = FileSystem.getLocal(config); Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString()); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); @@ -182,16 +198,10 @@ public void testCommitInsertTable() throws MetaException, IOException { @Test public void testDeleteSegment() throws IOException, SegmentLoadingException { - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - derbyConnectorRule.getConnector(), - derbyConnectorRule.metadataTablesConfigSupplier().get() - ); - String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath(); - Configuration config = new Configuration(); - druidStorageHandler.setConf(config); LocalFileSystem localFileSystem = FileSystem.getLocal(config); - + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString()); Path segmentOutputPath = JobHelper .makeFileNamePath(new Path(segmentRootPath), localFileSystem, dataSegment, JobHelper.INDEX_ZIP); Path indexPath = new Path(segmentOutputPath, "index.zip"); @@ -224,62 +234,256 @@ public void testDeleteSegment() throws IOException, SegmentLoadingException { public void testCommitInsertOverwriteTable() throws MetaException, IOException { DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule - .metadataTablesConfigSupplier().get(); + .metadataTablesConfigSupplier().get(); - DruidStorageHandler druidStorageHandler = new DruidStorageHandler( - connector, - metadataStorageTablesConfig - ); druidStorageHandler.preCreateTable(tableMock); - Configuration config = new Configuration(); - config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); - config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); - druidStorageHandler.setConf(config); LocalFileSystem localFileSystem = FileSystem.getLocal(config); Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(180, 250), "v1", new LinearShardSpec(0)); Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, - new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) ); - List existingSegments = Arrays.asList(DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v0") - .interval(new Interval(1, 10)).shardSpec(NoneShardSpec.instance()).build()); - DruidStorageHandlerUtils.publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, - existingSegments, - DruidStorageHandlerUtils.JSON_MAPPER, - true - ); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + new Interval(100, 150), "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); druidStorageHandler.commitInsertTable(tableMock, true); Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( - DruidStorageHandlerUtils.getAllDataSourceNames(connector, - metadataStorageTablesConfig - )).toArray()); - - final List dataSegmentList = connector.getDBI() - .withHandle(new HandleCallback>() { - @Override - public List withHandle(Handle handle) throws Exception { - return handle - .createQuery(String.format("SELECT payload FROM %s WHERE used=true", - metadataStorageTablesConfig.getSegmentsTable())) - .map(new ResultSetMapper() { - - @Override - public DataSegment map(int i, ResultSet resultSet, - StatementContext statementContext) - throws SQLException { - try { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue( - resultSet.getBytes("payload"), - DataSegment.class - ); - } catch (IOException e) { - throw Throwables.propagate(e); - } - } - }).list(); - } - }); + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List dataSegmentList = getUsedSegmentsList(connector, + metadataStorageTablesConfig); Assert.assertEquals(1, dataSegmentList.size()); + DataSegment persistedSegment = Iterables.getOnlyElement(dataSegmentList); + Assert.assertEquals(dataSegment, persistedSegment); + Assert.assertEquals(dataSegment.getVersion(), persistedSegment.getVersion()); + String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) + .toString(); + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + persistedSegment.getLoadSpec()); + Assert.assertEquals("dummySegmentData", + FileUtils.readFileToString(new File(expectedFinalPath))); + } + + private List getUsedSegmentsList(DerbyConnectorTestUtility connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig) { + return connector.getDBI() + .withHandle(new HandleCallback>() { + @Override + public List withHandle(Handle handle) throws Exception { + return handle + .createQuery(String.format( + "SELECT payload FROM %s WHERE used=true ORDER BY created_date ASC", + metadataStorageTablesConfig.getSegmentsTable())) + .map(new ResultSetMapper() { + + @Override + public DataSegment map(int i, ResultSet resultSet, + StatementContext statementContext) + throws SQLException { + try { + return DruidStorageHandlerUtils.JSON_MAPPER.readValue( + resultSet.getBytes("payload"), + DataSegment.class + ); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + }).list(); + } + }); + } + + @Test + public void testCommitInsertIntoTable() throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + new Interval(100, 150), "v0", new LinearShardSpec(1))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List dataSegmentList = getUsedSegmentsList(connector, + metadataStorageTablesConfig); + Assert.assertEquals(2, dataSegmentList.size()); + + DataSegment persistedSegment = dataSegmentList.get(1); + // Insert into appends to old version + Assert.assertEquals("v0", persistedSegment.getVersion()); + Assert.assertTrue(persistedSegment.getShardSpec() instanceof LinearShardSpec); + Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum()); + String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) + .toString(); + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + persistedSegment.getLoadSpec()); + Assert.assertEquals("dummySegmentData", + FileUtils.readFileToString(new File(expectedFinalPath))); + } + + @Test + public void testCommitInsertIntoWhenDestinationSegmentFileExist() + throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + new Interval(100, 150), "v0", new LinearShardSpec(1))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + + // Create segment file at the destination location with LinearShardSpec(2) + FileUtils.writeStringToFile(new File(DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), + createSegment(new Path(taskDirPath, "index_conflict.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(1))).toString()), "dummy"); + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List dataSegmentList = getUsedSegmentsList(connector, + metadataStorageTablesConfig); + Assert.assertEquals(2, dataSegmentList.size()); + + DataSegment persistedSegment = dataSegmentList.get(1); + // Insert into appends to old version + Assert.assertEquals("v0", persistedSegment.getVersion()); + Assert.assertTrue(persistedSegment.getShardSpec() instanceof LinearShardSpec); + // insert into should skip and increment partition number to 3 + Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum()); + String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) + .toString(); + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + persistedSegment.getLoadSpec()); + Assert.assertEquals("dummySegmentData", + FileUtils.readFileToString(new File(expectedFinalPath))); + } + + @Test(expected = IllegalStateException.class) + public void testCommitInsertIntoWithConflictingIntervalSegment() + throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays.asList( + createSegment(new Path(taskDirPath, "index_old_1.zip").toString(), + new Interval(100, 150), + "v0", new LinearShardSpec(0)), + createSegment(new Path(taskDirPath, "index_old_2.zip").toString(), + new Interval(150, 200), + "v0", new LinearShardSpec(0)), + createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), + new Interval(200, 300), + "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + + // Try appending segment with conflicting interval + DataSegment conflictingSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 300), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils + .makeSegmentDescriptorOutputPath(conflictingSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils + .writeSegmentDescriptor(localFileSystem, conflictingSegment, descriptorPath); + druidStorageHandler.commitInsertTable(tableMock, false); + } + + @Test(expected = IllegalStateException.class) + public void testCommitInsertIntoWithNonExtendableSegment() throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old_1.zip").toString(), + new Interval(100, 150), "v0", new NoneShardSpec()), + createSegment(new Path(taskDirPath, "index_old_2.zip").toString(), + new Interval(200, 250), "v0", new LinearShardSpec(0)), + createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), + new Interval(250, 300), "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + + // Try appending to non extendable shard spec + DataSegment conflictingSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils + .makeSegmentDescriptorOutputPath(conflictingSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils + .writeSegmentDescriptor(localFileSystem, conflictingSegment, descriptorPath); + + druidStorageHandler.commitInsertTable(tableMock, false); } + } -- 2.11.0 (Apple Git-81)