diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7ceb322..6ad4562 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1966,6 +1966,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "declared" ), + HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081", + "Address of the Druid coordinator. It is used to check the load status of newly created segments" + ), HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000, "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" + "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" + @@ -1976,6 +1979,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "the HTTP client."), HIVE_DRUID_HTTP_READ_TIMEOUT("hive.druid.http.read.timeout", "PT1M", "Read timeout period for the HTTP\n" + "client in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 1 minute."), + HIVE_DRUID_SLEEP_TIME("hive.druid.sleep.time", "PT10S", + "Sleep time between retries in ISO8601 format (for example P2W, P3M, PT1H30M, PT0.750S), default is period of 10 seconds." + ), HIVE_DRUID_BASE_PERSIST_DIRECTORY("hive.druid.basePersistDirectory", "/tmp", "Local temporary directory used to persist intermediate indexing state." ), @@ -1997,6 +2003,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal DRUID_WORKING_DIR("hive.druid.working.directory", "/tmp/workingDirectory", "Default hdfs working directory used to store some intermediate metadata" ), + HIVE_DRUID_MAX_TRIES("hive.druid.maxTries", 5, "Maximum number of retries before giving up"), + HIVE_DRUID_PASSIVE_WAIT_TIME("hive.druid.passiveWaitTimeMs", 30000, + "Wait time in ms default to 30 seconds." + ), // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true, "Whether writes to HBase should be forced to the write-ahead log. \n" + diff --git druid-handler/pom.xml druid-handler/pom.xml index b057fff..71c372e 100644 --- druid-handler/pom.xml +++ druid-handler/pom.xml @@ -189,6 +189,11 @@ calcite-druid ${calcite.version} + + joda-time + joda-time + 2.8.2 + junit diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index a08a4e3..2fa942f 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -18,11 +18,21 @@ package org.apache.hadoop.hive.druid; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.metamx.common.RetryUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; @@ -50,13 +60,19 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. @@ -66,6 +82,8 @@ protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class); + protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); + public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir"; private final SQLMetadataConnector connector; @@ -74,6 +92,8 @@ private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; + private HttpClient httpClient; + private String uniqueId = null; private String rootWorkingDir = null; @@ -114,7 +134,7 @@ public String getPassword() { connector = new MySQLConnector(storageConnectorConfigSupplier, Suppliers.ofInstance(druidMetadataStorageTablesConfig) ); - } else if (dbType.equals("postgres")) { + } else if (dbType.equals("postgresql")) { connector = new PostgreSQLConnector(storageConnectorConfigSupplier, Suppliers.ofInstance(druidMetadataStorageTablesConfig) ); @@ -127,11 +147,13 @@ public String getPassword() { @VisibleForTesting public DruidStorageHandler(SQLMetadataConnector connector, SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler, - MetadataStorageTablesConfig druidMetadataStorageTablesConfig + MetadataStorageTablesConfig druidMetadataStorageTablesConfig, + HttpClient httpClient ) { this.connector = connector; this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler; this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig; + this.httpClient = httpClient; } @Override @@ -216,6 +238,7 @@ public void commitCreateTable(Table table) throws MetaException { if (MetaStoreUtils.isExternalTable(table)) { return; } + Lifecycle lifecycle = new Lifecycle(); LOG.info(String.format("Committing table [%s] to the druid metastore", table.getDbName())); final Path tableDir = getSegmentDescriptorDir(); try { @@ -227,11 +250,111 @@ public void commitCreateTable(Table table) throws MetaException { segmentList, DruidStorageHandlerUtils.JSON_MAPPER ); + final String coordinatorAddress = HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS); + int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); + final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); + LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress)); + + // check if the coordinator is up + httpClient = makeHttpClient(lifecycle); + try { + lifecycle.start(); + } catch (Exception e) { + Throwables.propagate(e); + } + String coordinatorResponse = null; + try { + coordinatorResponse = RetryUtils.retry(new Callable() { + @Override + public String call() throws Exception { + return DruidStorageHandlerUtils.getURL(httpClient, + new URL(String.format("http://%s/status", coordinatorAddress)) + ); + } + }, new Predicate() { + @Override + public boolean apply(@Nullable Throwable input) { + return input instanceof IOException; + } + }, maxTries); + } catch (Exception e) { + console.printInfo( + "Will skip waiting for data loading"); + return; + } + if (Strings.isNullOrEmpty(coordinatorResponse)) { + console.printInfo( + "Will skip waiting for data loading"); + return; + } + console.printInfo( + String.format("Waiting for the loading of [%s] segments", segmentList.size())); + long passiveWaitTimeMs = HiveConf + .getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME); + ImmutableSet setOfUrls = FluentIterable.from(segmentList) + .transform(new Function() { + @Override + public URL apply(DataSegment dataSegment) { + try { + //Need to make sure that we are using UTC since most of the druid cluster use UTC by default + return new URL(String + .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s", + coordinatorAddress, dataSourceName, DataSegment + .makeDataSegmentIdentifier(dataSegment.getDataSource(), + new DateTime(dataSegment.getInterval() + .getStartMillis(), DateTimeZone.UTC), + new DateTime(dataSegment.getInterval() + .getEndMillis(), DateTimeZone.UTC), + dataSegment.getVersion(), + dataSegment.getShardSpec() + ) + )); + } catch (MalformedURLException e) { + Throwables.propagate(e); + } + return null; + } + }).toSet(); + + int numRetries = 0; + while (numRetries++ < maxTries && !setOfUrls.isEmpty()) { + setOfUrls = ImmutableSet.copyOf(Sets.filter(setOfUrls, new Predicate() { + @Override + public boolean apply(URL input) { + try { + String result = DruidStorageHandlerUtils.getURL(httpClient, input); + LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result)); + return Strings.isNullOrEmpty(result); + } catch (IOException e) { + LOG.error(String.format("Error while checking URL [%s]", input), e); + return true; + } + } + })); + + try { + if (!setOfUrls.isEmpty()) { + Thread.sleep(passiveWaitTimeMs); + } + } catch (InterruptedException e) { + Thread.interrupted(); + Throwables.propagate(e); + } + } + if (!setOfUrls.isEmpty()) { + // We are not Throwing an exception since it might be a transient issue that is blocking loading + console.printError(String.format( + "Wait time exhausted and we have [%s] out of [%s] segments not loaded yet", + setOfUrls.size(), segmentList.size() + )); + } } catch (IOException e) { LOG.error("Exception while commit", e); Throwables.propagate(e); } finally { cleanWorkingDir(); + lifecycle.stop(); } } @@ -391,4 +514,21 @@ private String getRootWorkingDir() { } return rootWorkingDir; } + + private HttpClient makeHttpClient(Lifecycle lifecycle) { + final int numConnection = HiveConf + .getIntVar(getConf(), + HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION + ); + final Period readTimeout = new Period( + HiveConf.getVar(getConf(), + HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT + )); + + return HttpClientInit.createClient( + HttpClientConfig.builder().withNumConnections(numConnection) + .withReadTimeout(new Period(readTimeout).toStandardDuration()).build(), + lifecycle + ); + } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 193e4aa..52e7e8d 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -20,16 +20,21 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.common.collect.Lists; +import com.google.common.io.CharStreams; import com.metamx.common.MapUtils; +import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.NoopEmitter; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.jackson.DefaultObjectMapper; @@ -46,10 +51,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Period; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -61,8 +70,11 @@ import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.Reader; import java.net.InetAddress; +import java.net.URI; import java.net.URL; import java.net.UnknownHostException; import java.sql.SQLException; @@ -80,7 +92,6 @@ public final class DruidStorageHandlerUtils { private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile"; - /** * Mapper to use to serialize/deserialize Druid objects (JSON) */ @@ -179,6 +190,14 @@ public static InputStream submitRequest(HttpClient client, Request request) return response; } + + public static String getURL(HttpClient client, URL url) throws IOException { + try (Reader reader = new InputStreamReader( + DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, url)))) { + return CharStreams.toString(reader); + } + } + /** * @param taskDir path to the directory containing the segments descriptor info * the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java index 8770749..e7eb4cc 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java @@ -68,7 +68,8 @@ public void testPreCreateTableWillCreateSegmentsTable() throws MetaException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get() + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null ); try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) { @@ -96,7 +97,8 @@ public void testPreCreateTableWhenDataSourceExists() throws MetaException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get() + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null ); druidStorageHandler.preCreateTable(tableMock); } @@ -107,7 +109,8 @@ public void testCommitCreateTablePlusCommitDropTableWithoutPurge() DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get() + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null ); druidStorageHandler.preCreateTable(tableMock); Configuration config = new Configuration(); @@ -142,7 +145,8 @@ public void testDeleteSegment() throws IOException, SegmentLoadingException { DruidStorageHandler druidStorageHandler = new DruidStorageHandler( derbyConnectorRule.getConnector(), new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()), - derbyConnectorRule.metadataTablesConfigSupplier().get() + derbyConnectorRule.metadataTablesConfigSupplier().get(), + null ); String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath(); @@ -177,5 +181,4 @@ public void testDeleteSegment() throws IOException, SegmentLoadingException { localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent()) ); } - }