diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 33387b2b0d..bc08bd8a2f 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -81,7 +81,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; @@ -95,7 +94,6 @@ import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; - import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -110,6 +108,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -190,7 +189,7 @@ public HiveMetaHook getMetaHook() { } @Override - public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + public HiveAuthorizationProvider getAuthorizationProvider() { return new DefaultHiveAuthorizationProvider(); } @@ -255,7 +254,7 @@ public void preCreateTable(Table table) throws MetaException { } @Override - public void rollbackCreateTable(Table table) throws MetaException { + public void rollbackCreateTable(Table table) { if (MetaStoreUtils.isExternalTable(table)) { return; } @@ -287,7 +286,7 @@ public void commitCreateTable(Table table) throws MetaException { if(isKafkaStreamingTable(table)){ updateKafkaIngestion(table); } - loadDruidSegments(table, true); + this.commitInsertTable(table, true); } private void updateKafkaIngestion(Table table){ @@ -521,44 +520,30 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { } } - - protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException { - + /** + * Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN + * + * @param table Hive table + * @param overwrite true if it is an insert overwrite table + * + * @throws MetaException if errors occurs. + */ + protected List loadAndCommitDruidSegments(Table table, boolean overwrite, List segmentsToLoad) + throws IOException, CallbackFailedException { final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); - final Path segmentDescriptorDir = getSegmentDescriptorDir(); - try { - if (!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) { - LOG.info( - "Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows," - + " no Druid segments to move, cleaning working directory {}", - segmentDescriptorDir.getName(), getStagingWorkingDir().getName() - ); - cleanWorkingDir(); - return; - } - } catch (IOException e) { - LOG.error("Failed to load segments descriptor from directory {}", segmentDescriptorDir.toString()); - cleanWorkingDir(); - Throwables.propagate(e); - } + final String segmentDirectory = + table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); - try { - // at this point we have Druid segments from reducers but we need to atomically - // rename and commit to metadata - // Moving Druid segments and committing to druid metadata as one transaction. - List segmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf()); final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); List publishedDataSegmentList; - final String segmentDirectory = - table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null - ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) - : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); + LOG.info(String.format( "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", - segmentList.size(), - getStagingWorkingDir(), + segmentsToLoad.size(), + getStagingWorkingDir().toString(), segmentDirectory - )); hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig, @@ -569,21 +554,12 @@ protected void loadDruidSegments(Table table, boolean overwrite) throws MetaExce getConnector(), getDruidMetadataStorageTablesConfig(), dataSourceName, - segmentList, + segmentsToLoad, overwrite, getConf(), dataSegmentPusher ); - checkLoadStatus(publishedDataSegmentList); - } catch (CallbackFailedException | IOException e) { - LOG.error("Failed to move segments from staging directory"); - if (e instanceof CallbackFailedException) { - Throwables.propagate(e.getCause()); - } - Throwables.propagate(e); - } finally { - cleanWorkingDir(); - } + return publishedDataSegmentList; } /** @@ -715,17 +691,17 @@ private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) { } @Override - public void preDropTable(Table table) throws MetaException { + public void preDropTable(Table table) { // Nothing to do } @Override - public void rollbackDropTable(Table table) throws MetaException { + public void rollbackDropTable(Table table) { // Nothing to do } @Override - public void commitDropTable(Table table, boolean deleteData) throws MetaException { + public void commitDropTable(Table table, boolean deleteData) { if (MetaStoreUtils.isExternalTable(table)) { return; } @@ -773,16 +749,54 @@ public void commitInsertTable(Table table, boolean overwrite) throws MetaExcepti if (MetaStoreUtils.isExternalTable(table)) { throw new MetaException("Cannot insert data into external table backed by Druid"); } - this.loadDruidSegments(table, overwrite); + try { + // Check if there segments to load + final Path segmentDescriptorDir = getSegmentDescriptorDir(); + final List segmentsToLoad = fetchSegmentsMetadata(segmentDescriptorDir); + final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); + //No segments to load still need to honer overwrite + if (segmentsToLoad.isEmpty() && overwrite) { + //disable datasource + //Case it is an insert overwrite we have to disable the existing Druid DataSource + DruidStorageHandlerUtils + .disableDataSource(getConnector(), getDruidMetadataStorageTablesConfig(), + dataSourceName + ); + return; + } else if (!segmentsToLoad.isEmpty()) { + // at this point we have Druid segments from reducers but we need to atomically + // rename and commit to metadata + // Moving Druid segments and committing to druid metadata as one transaction. + checkLoadStatus(loadAndCommitDruidSegments(table, overwrite, segmentsToLoad)); + } + } catch (IOException e) { + throw new MetaException(e.getMessage()); + } catch (CallbackFailedException c) { + throw new MetaException(c.getCause().getMessage()); + } finally { + cleanWorkingDir(); + } + } + + private List fetchSegmentsMetadata(Path segmentDescriptorDir) throws IOException { + if (!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) { + LOG.info( + "Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows," + + " no Druid segments to move, cleaning working directory {}", + segmentDescriptorDir.toString(), getStagingWorkingDir().toString() + ); + return Collections.EMPTY_LIST; + } + return DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf()); } @Override - public void preInsertTable(Table table, boolean overwrite) throws MetaException { + public void preInsertTable(Table table, boolean overwrite) { } @Override - public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException { + public void rollbackInsertTable(Table table, boolean overwrite) { // do nothing } @@ -871,7 +885,7 @@ private SQLMetadataConnector getConnector() { .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI); - final Supplier storageConnectorConfigSupplier = Suppliers.ofInstance( + final Supplier storageConnectorConfigSupplier = Suppliers.ofInstance( new MetadataStorageConnectorConfig() { @Override public String getConnectURI() { diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 808351de74..93d3e5ce3b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -17,6 +17,27 @@ */ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.io.CharStreams; +import com.metamx.common.JodaUtils; +import com.metamx.common.MapUtils; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.NoopEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.jackson.DefaultObjectMapper; @@ -27,6 +48,9 @@ import io.druid.metadata.SQLMetadataConnector; import io.druid.metadata.storage.mysql.MySQLConnector; import io.druid.query.Druids; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.expression.LikeExprMacro; import io.druid.query.expression.RegexpExtractExprMacro; import io.druid.query.expression.TimestampCeilExprMacro; @@ -41,9 +65,6 @@ import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.DoubleSumAggregatorFactory; -import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexSpec; import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.data.RoaringBitmapSerdeFactory; @@ -62,7 +83,6 @@ import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.ShardSpec; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -78,29 +98,6 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.util.StringUtils; - -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; -import com.google.common.io.CharStreams; -import com.metamx.common.JodaUtils; -import com.metamx.common.MapUtils; -import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.NoopEmitter; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.InputStreamResponseHandler; - import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; @@ -117,7 +114,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 6a496c20df..b96a13fada 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -47,6 +47,7 @@ import org.joda.time.DateTimeZone; import org.joda.time.Interval; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -113,7 +114,7 @@ public void before() throws Throwable { Mockito.when(tableMock.getDbName()).thenReturn(DB_NAME); Mockito.when(tableMock.getTableName()).thenReturn(TABLE_NAME); config = new Configuration(); - config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); + config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), "hive-" + UUID.randomUUID().toString()); config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); config.set(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY), new Path(tableWorkingPath, "finalSegmentDir").toString()); @@ -125,6 +126,11 @@ public void before() throws Throwable { druidStorageHandler.setConf(config); } + @After + public void tearDown() { + temporaryFolder.delete(); + } + Table tableMock = Mockito.mock(Table.class); @Test @@ -672,7 +678,7 @@ public void testCommitInsertIntoWhenDestinationSegmentFileExist() FileUtils.readFileToString(new File(expectedFinalHadoopPath.toUri()))); } - @Test(expected = IllegalStateException.class) + @Test(expected = MetaException.class) public void testCommitInsertIntoWithConflictingIntervalSegment() throws MetaException, IOException { DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); @@ -714,7 +720,7 @@ public void testCommitInsertIntoWithConflictingIntervalSegment() druidStorageHandler.commitInsertTable(tableMock, false); } - @Test(expected = IllegalStateException.class) + @Test(expected = MetaException.class) public void testCommitInsertIntoWithNonExtendableSegment() throws MetaException, IOException { DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index f7c78e271f..45a3d4be32 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -47,9 +48,7 @@ import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import java.io.FileNotFoundException; import java.io.Serializable;