diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 31e3ea1..cc409ab 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -59,7 +60,6 @@ import org.apache.hive.common.util.ShutdownHookManager; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Strings; @@ -88,9 +88,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; - -import javax.annotation.Nullable; /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. @@ -235,9 +232,23 @@ public void preCreateTable(Table table) throws MetaException { } String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); if (MetaStoreUtils.isExternalTable(table)) { + if (dataSourceName == null) { + throw new MetaException( + String.format("Datasource name should be specified using [%s] for external tables " + + "using Druid", Constants.DRUID_DATA_SOURCE)); + } + // If it is an external table, we are done return; } - // If it is not an external table we need to check the metadata + // It is not an external table + // We need to check that datasource was not specified by user + if (dataSourceName != null) { + throw new MetaException( + String.format("Datasource name cannot be specified using [%s] for managed tables " + + "using Druid", Constants.DRUID_DATA_SOURCE)); + } + // We need to check the Druid metadata + dataSourceName = Warehouse.getQualifiedName(table); try { connector.createSegmentTable(); } catch (Exception e) { @@ -250,6 +261,7 @@ public void preCreateTable(Table table) throws MetaException { if (existingDataSources.contains(dataSourceName)) { throw new MetaException(String.format("Data source [%s] already existing", dataSourceName)); } + table.getParameters().put(Constants.DRUID_DATA_SOURCE, dataSourceName); } @Override @@ -279,13 +291,14 @@ public void rollbackCreateTable(Table table) throws MetaException { @Override public void commitCreateTable(Table table) throws MetaException { LOG.debug("commit create table {}", table.getTableName()); + if (MetaStoreUtils.isExternalTable(table)) { + // For external tables, we do not need to do anything else + return; + } publishSegments(table, true); } public void publishSegments(Table table, boolean overwrite) throws MetaException { - if (MetaStoreUtils.isExternalTable(table)) { - return; - } final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); final List segmentList = Lists.newArrayList(); final Path tableDir = getSegmentDescriptorDir(); @@ -513,6 +526,9 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio public void commitInsertTable(Table table, boolean overwrite) throws MetaException { LOG.debug("commit insert into table {} overwrite {}", table.getTableName(), overwrite); + if (MetaStoreUtils.isExternalTable(table)) { + throw new MetaException("Cannot insert data into external table backed by Druid"); + } this.publishSegments(table, overwrite); } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 35ea94f..e36c207 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -35,7 +35,6 @@ import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; -import io.druid.query.BaseQuery; import io.druid.query.select.SelectQueryConfig; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; @@ -63,26 +62,14 @@ import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.util.StringUtils; -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.io.CharStreams; import com.metamx.common.MapUtils; -import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.NoopEmitter; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; -import com.metamx.http.client.response.InputStreamResponseHandler; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -97,13 +84,13 @@ import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; -import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -121,7 +108,6 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -264,13 +250,21 @@ public static String getURL(HttpClient client, URL url) throws IOException { throws IOException { ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); FileSystem fs = taskDir.getFileSystem(conf); - for (FileStatus fileStatus : fs.listStatus(taskDir)) { + FileStatus[] fss; + try { + fss = fs.listStatus(taskDir); + } catch (FileNotFoundException e) { + // This is a CREATE TABLE statement or query executed for CTAS/INSERT + // did not produce any result. We do not need to do anything, this is + // expected behavior. + return publishedSegmentsBuilder.build(); + } + for (FileStatus fileStatus : fss) { final DataSegment segment = JSON_MAPPER .readValue(fs.open(fileStatus.getPath()), DataSegment.class); publishedSegmentsBuilder.add(segment); } - List publishedSegments = publishedSegmentsBuilder.build(); - return publishedSegments; + return publishedSegmentsBuilder.build(); } /** @@ -390,16 +384,23 @@ public static void publishSegments(final SQLMetadataConnector connector, ) throws CallbackFailedException { connector.getDBI().inTransaction( (TransactionCallback) (handle, transactionStatus) -> { - final List finalSegmentsToPublish = Lists.newArrayList(); + // We create the timeline for the existing and new segments VersionedIntervalTimeline timeline; if (overwrite) { + // If we are overwriting, we disable existing sources disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); - // When overwriting start with empty timeline, as we are overwriting segments with new versions - timeline = new VersionedIntervalTimeline<>( - Ordering.natural() - ); + + // When overwriting or when we do not have segments, we can just start with empty timeline, + // as we are overwriting segments with new versions and not inserting new segments at all, + // respectively + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); } else { - // Append Mode - build a timeline of existing segments in metadata storage. + // Append Mode + if (segments.isEmpty()) { + // If there are no new segments, we can just bail out + return null; + } + // Otherwise, build a timeline of existing segments in metadata storage Interval indexedInterval = JodaUtils .umbrellaInterval(Iterables.transform(segments, input -> input.getInterval() @@ -408,6 +409,8 @@ public static void publishSegments(final SQLMetadataConnector connector, timeline = getTimelineForIntervalWithHandle( handle, dataSource, indexedInterval, metadataStorageTablesConfig); } + + final List finalSegmentsToPublish = Lists.newArrayList(); for (DataSegment segment : segments) { List> existingChunks = timeline .lookup(segment.getInterval()); diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 56b437d..0b13a08 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -64,8 +63,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; public class TestDruidStorageHandler { @@ -76,7 +75,9 @@ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - private static final String DATA_SOURCE_NAME = "testName"; + private static final String DB_NAME = "default"; + private static final String TABLE_NAME = "testName"; + private static final String DATA_SOURCE_NAME = "default.testName"; private String segmentsTable; @@ -103,13 +104,13 @@ private DataSegment createSegment(String location, Interval interval, String ver public void before() throws Throwable { tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath(); segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); - Map mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME); - Mockito.when(tableMock.getParameters()).thenReturn(mockMap); + Mockito.when(tableMock.getParameters()).thenReturn(new HashMap<>()); Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0); StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class); Mockito.when(storageDes.getBucketColsSize()).thenReturn(0); Mockito.when(tableMock.getSd()).thenReturn(storageDes); - Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME); + Mockito.when(tableMock.getDbName()).thenReturn(DB_NAME); + Mockito.when(tableMock.getTableName()).thenReturn(TABLE_NAME); config = new Configuration(); config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); @@ -121,7 +122,6 @@ public void before() throws Throwable { derbyConnectorRule.metadataTablesConfigSupplier().get() ); druidStorageHandler.setConf(config); - } Table tableMock = Mockito.mock(Table.class); @@ -180,7 +180,62 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), derbyConnectorRule.metadataTablesConfigSupplier().get() )).toArray()); + } + + @Test + public void testCommitCreateTablePlusCommitDropTableWithPurge() + throws MetaException, IOException { + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString()); + + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitCreateTable(tableMock); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + druidStorageHandler.commitDropTable(tableMock, true); + Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + } + + @Test + public void testCommitCreateEmptyTablePlusCommitDropTableWithoutPurge() + throws MetaException, IOException { + druidStorageHandler.preCreateTable(tableMock); + druidStorageHandler.commitCreateTable(tableMock); + Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + druidStorageHandler.commitDropTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + } + @Test + public void testCommitCreateEmptyTablePlusCommitDropTableWithPurge() + throws MetaException, IOException { + druidStorageHandler.preCreateTable(tableMock); + druidStorageHandler.commitCreateTable(tableMock); + Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + druidStorageHandler.commitDropTable(tableMock, true); + Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); } @Test @@ -201,6 +256,16 @@ public void testCommitInsertTable() throws MetaException, IOException { } @Test + public void testCommitEmptyInsertTable() throws MetaException, IOException { + druidStorageHandler.preCreateTable(tableMock); + druidStorageHandler.commitCreateTable(tableMock); + Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(), + derbyConnectorRule.metadataTablesConfigSupplier().get() + )).toArray()); + } + + @Test public void testDeleteSegment() throws IOException, SegmentLoadingException { String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath(); LocalFileSystem localFileSystem = FileSystem.getLocal(config); @@ -295,6 +360,122 @@ public void testCommitInsertOverwriteTable() throws MetaException, IOException { FileUtils.readFileToString(new File(expectedFinalHadoopPath.toUri()))); } + @Test + public void testCommitMultiInsertOverwriteTable() throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + druidStorageHandler.preCreateTable(tableMock); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); + pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); + + // This create and publish the segment to be overwritten + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), + new Interval(100, 150), "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config, + dataSegmentPusher + ); + // Check that there is one datasource with the published segment + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + // Sequence is the following: + // 1) INSERT with no segments -> Original segment still present in the datasource + // 2) INSERT OVERWRITE with no segments -> Datasource is empty + // 3) INSERT OVERWRITE with no segments -> Datasource is empty + // 4) INSERT with no segments -> Datasource is empty + // 5) INSERT with one segment -> Datasource has one segment + // 6) INSERT OVERWRITE with one segment -> Datasource has one segment + // 7) INSERT with one segment -> Datasource has two segments + // 8) INSERT OVERWRITE with no segments -> Datasource is empty + + // We start: + // #1 + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + Assert.assertEquals(1, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + + // #2 + druidStorageHandler.commitInsertTable(tableMock, true); + Assert.assertEquals(0, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + + // #3 + druidStorageHandler.commitInsertTable(tableMock, true); + Assert.assertEquals(0, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + + // #4 + druidStorageHandler.commitInsertTable(tableMock, true); + Assert.assertEquals(0, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + + // #5 + DataSegment dataSegment1 = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), + new Interval(180, 250), "v1", new LinearShardSpec(0)); + Path descriptorPath1 = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment1, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment1, descriptorPath1); + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + Assert.assertEquals(1, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + + // #6 + DataSegment dataSegment2 = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), + new Interval(200, 250), "v1", new LinearShardSpec(0)); + Path descriptorPath2 = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment2, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment2, descriptorPath2); + druidStorageHandler.commitInsertTable(tableMock, true); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + Assert.assertEquals(1, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + + // #7 + DataSegment dataSegment3 = createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), + new Interval(100, 200), "v1", new LinearShardSpec(0)); + Path descriptorPath3 = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment3, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment3, descriptorPath3); + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + Assert.assertEquals(2, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + + // #8 + druidStorageHandler.commitInsertTable(tableMock, true); + Assert.assertEquals(0, getUsedSegmentsList(connector, + metadataStorageTablesConfig).size()); + } + private List getUsedSegmentsList(DerbyConnectorTestUtility connector, final MetadataStorageTablesConfig metadataStorageTablesConfig) { return connector.getDBI() diff --git ql/src/test/queries/clientnegative/druid_datasource2.q ql/src/test/queries/clientnegative/druid_datasource2.q new file mode 100644 index 0000000..cc20931 --- /dev/null +++ ql/src/test/queries/clientnegative/druid_datasource2.q @@ -0,0 +1,3 @@ +CREATE TABLE druid_table_1 +STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler' +TBLPROPERTIES ("property" = "localhost", "druid.datasource" = "mydatasource"); diff --git ql/src/test/results/clientnegative/druid_datasource2.q.out ql/src/test/results/clientnegative/druid_datasource2.q.out new file mode 100644 index 0000000..1b74b06 --- /dev/null +++ ql/src/test/results/clientnegative/druid_datasource2.q.out @@ -0,0 +1,7 @@ +PREHOOK: query: CREATE TABLE druid_table_1 +STORED BY 'org.apache.hadoop.hive.druid.QTestDruidStorageHandler' +TBLPROPERTIES ("property" = "localhost", "druid.datasource" = "mydatasource") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_table_1 +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Datasource name cannot be specified using [druid.datasource] for managed tables using Druid)