From 93306e7f354393e638189a732c5f969c37066540 Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 26 Apr 2017 19:16:00 +0530 Subject: [PATCH] [HIVE-16518] Fix Insert override to disable all existing segments before publishing new segments --- .../hadoop/hive/druid/DruidStorageHandler.java | 27 ++++---- .../hive/druid/DruidStorageHandlerUtils.java | 72 +++++++++++++++++--- .../hadoop/hive/druid/TestDruidStorageHandler.java | 78 ++++++++++++++++++++-- 3 files changed, 152 insertions(+), 25 deletions(-) diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index d4f6865..5ce60e4 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -33,7 +33,6 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; -import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; @@ -56,7 +55,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -96,8 +94,6 @@ private final SQLMetadataConnector connector; - private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler; - private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; private HttpClient httpClient; @@ -151,17 +147,14 @@ public String getPassword() { } else { throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType)); } - druidSqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector); } @VisibleForTesting public DruidStorageHandler(SQLMetadataConnector connector, - SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler, MetadataStorageTablesConfig druidMetadataStorageTablesConfig, HttpClient httpClient ) { this.connector = connector; - this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler; this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig; this.httpClient = httpClient; } @@ -256,6 +249,12 @@ public void rollbackCreateTable(Table table) throws MetaException { @Override public void commitCreateTable(Table table) throws MetaException { + LOG.debug(String.format("commit create table table [%s]", table.getTableName())); + publishSegments(table, true); + } + + + public void publishSegments(Table table, boolean overwrite) throws MetaException { if (MetaStoreUtils.isExternalTable(table)) { return; } @@ -266,15 +265,19 @@ public void commitCreateTable(Table table) throws MetaException { List segmentList = DruidStorageHandlerUtils .getPublishedSegments(tableDir, getConf()); LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir)); - druidSqlMetadataStorageUpdaterJobHandler.publishSegments( - druidMetadataStorageTablesConfig.getSegmentsTable(), + final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); + + DruidStorageHandlerUtils.publishSegments( + connector, + druidMetadataStorageTablesConfig, + dataSourceName, segmentList, - DruidStorageHandlerUtils.JSON_MAPPER + DruidStorageHandlerUtils.JSON_MAPPER, + overwrite ); final String coordinatorAddress = HiveConf .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS); int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); - final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress)); // check if the coordinator is up @@ -488,7 +491,7 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio public void commitInsertTable(Table table, boolean overwrite) throws MetaException { if (overwrite) { LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName())); - this.commitCreateTable(table); + this.publishSegments(table, overwrite); } else { throw new MetaException("Insert into is not supported yet"); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 8d48e14..adf013b 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.common.collect.Lists; @@ -43,6 +44,8 @@ import io.druid.segment.column.ColumnConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.NoneShardSpec; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -54,9 +57,11 @@ import org.apache.hadoop.util.StringUtils; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; @@ -335,14 +340,7 @@ public static boolean disableDataSource(SQLMetadataConnector connector, new HandleCallback() { @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement( - String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", - metadataStorageTablesConfig.getSegmentsTable() - ) - ) - .bind("dataSource", dataSource) - .execute(); - + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); return null; } } @@ -355,6 +353,64 @@ public Void withHandle(Handle handle) throws Exception { return true; } + public static void publishSegments(final SQLMetadataConnector connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig, + final String dataSource, + final List segments, final ObjectMapper mapper, boolean overwrite) + { + connector.getDBI().inTransaction( + new TransactionCallback() + { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + { + if(overwrite){ + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); + } + final PreparedBatch batch = handle.prepareBatch( + String.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + metadataStorageTablesConfig.getSegmentsTable() + ) + ); + for (final DataSegment segment : segments) { + + batch.add( + new ImmutableMap.Builder() + .put("id", segment.getIdentifier()) + .put("dataSource", segment.getDataSource()) + .put("created_date", new DateTime().toString()) + .put("start", segment.getInterval().getStart().toString()) + .put("end", segment.getInterval().getEnd().toString()) + .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .put("version", segment.getVersion()) + .put("used", true) + .put("payload", mapper.writeValueAsBytes(segment)) + .build() + ); + + LOG.info("Published %s", segment.getIdentifier()); + + } + batch.execute(); + + return null; + } + } + ); + } + + public static void disableDataSourceWithHandle(Handle handle, MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource){ + handle.createStatement( + String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", + metadataStorageTablesConfig.getSegmentsTable() + ) + ) + .bind("dataSource", dataSource) + .execute(); + } + /** * @param connector SQL connector to metadata * @param metadataStorageTablesConfig Tables configuration diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index da6610a..05e3ec5 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hive.druid; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.druid.indexer.JobHelper; import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; +import io.druid.metadata.MetadataStorageTablesConfig; +import io.druid.metadata.SQLMetadataSegmentManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -42,10 +45,16 @@ import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.tweak.ResultSetMapper; import java.io.IOException; import java.io.OutputStream; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -85,7 +94,6 @@ public void before() throws Throwable { public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), derbyConnectorRule.metadataTablesConfigSupplier().get(), null ); @@ -114,7 +122,6 @@ public void testPreCreateTableWhenDataSourceExists() throws MetaException { ); DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), derbyConnectorRule.metadataTablesConfigSupplier().get(), null ); @@ -126,7 +133,6 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), derbyConnectorRule.metadataTablesConfigSupplier().get(), null ); @@ -158,7 +164,6 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() public void testCommitInsertTable() throws MetaException, IOException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), derbyConnectorRule.metadataTablesConfigSupplier().get(), null ); @@ -184,7 +189,6 @@ public void testCommitInsertTable() throws MetaException, IOException { public void testDeleteSegment() throws IOException, SegmentLoadingException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), - new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), derbyConnectorRule.metadataTablesConfigSupplier().get(), null ); @@ -221,4 +225,68 @@ public void testDeleteSegment() throws IOException, SegmentLoadingException { localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent()) ); } + + @Test + public void testCommitInsertOverwriteTable() throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + + DruidStorageHandler druidStorageHandler = new DruidStorageHandler( + connector, + metadataStorageTablesConfig, + null + ); + druidStorageHandler.preCreateTable(tableMock); + Configuration config = new Configuration(); + config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString()); + config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath); + druidStorageHandler.setConf(config); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + List existingSegments = Arrays.asList(DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v0") + .interval(new Interval(1, 10)).shardSpec(NoneShardSpec.instance()).build()); + DruidStorageHandlerUtils.publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + DruidStorageHandlerUtils.JSON_MAPPER, + true + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitInsertTable(tableMock, true); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List dataSegmentList = connector.getDBI() + .withHandle(new HandleCallback>() { + @Override + public List withHandle(Handle handle) throws Exception { + return handle + .createQuery(String.format("SELECT payload FROM %s WHERE used=true", + metadataStorageTablesConfig.getSegmentsTable())) + .map(new ResultSetMapper() { + + @Override + public DataSegment map(int i, ResultSet resultSet, + StatementContext statementContext) + throws SQLException { + try { + return DruidStorageHandlerUtils.JSON_MAPPER.readValue( + resultSet.getBytes("payload"), + DataSegment.class + ); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + }).list(); + } + }); + Assert.assertEquals(1, dataSegmentList.size()); + + } } -- 2.8.4 (Apple Git-73)