diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3be5a8d..cf87116 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2059,7 +2059,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal , "druid deep storage location."), DRUID_METADATA_BASE("hive.druid.metadata.base", "druid", "Default prefix for metadata tables"), DRUID_METADATA_DB_TYPE("hive.druid.metadata.db.type", "mysql", - new PatternSet("mysql", "postgresql"), "Type of the metadata database." + new PatternSet("mysql", "postgresql", "derby"), "Type of the metadata database." ), DRUID_METADATA_DB_USERNAME("hive.druid.metadata.username", "", "Username to connect to Type of the metadata DB." @@ -2074,7 +2074,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Default hdfs working directory used to store some intermediate metadata" ), HIVE_DRUID_MAX_TRIES("hive.druid.maxTries", 5, "Maximum number of retries before giving up"), - HIVE_DRUID_PASSIVE_WAIT_TIME("hive.druid.passiveWaitTimeMs", 30000, + HIVE_DRUID_PASSIVE_WAIT_TIME("hive.druid.passiveWaitTimeMs", 30000L, "Wait time in ms default to 30 seconds." ), HIVE_DRUID_BITMAP_FACTORY_TYPE("hive.druid.bitmap.type", "roaring", new PatternSet("roaring", "concise"), "Coding algorithm use to encode the bitmaps"), diff --git a/data/scripts/q_test_cleanup_druid.sql b/data/scripts/q_test_cleanup_druid.sql new file mode 100644 index 0000000..b0d3425 --- /dev/null +++ b/data/scripts/q_test_cleanup_druid.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS alltypesorc; +DROP TABLE IF EXISTS druid_table; diff --git a/data/scripts/q_test_druid_init.sql b/data/scripts/q_test_druid_init.sql new file mode 100644 index 0000000..ee025f1 --- /dev/null +++ b/data/scripts/q_test_druid_init.sql @@ -0,0 +1,29 @@ +set hive.stats.dbclass=fs; +-- +-- Table alltypesorc +-- +DROP TABLE IF EXISTS alltypesorc; +CREATE TABLE alltypesorc( + ctinyint TINYINT, + csmallint SMALLINT, + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cstring1 STRING, + cstring2 STRING, + ctimestamp1 TIMESTAMP, + ctimestamp2 TIMESTAMP, + cboolean1 BOOLEAN, + cboolean2 BOOLEAN) + STORED AS ORC; + +LOAD DATA LOCAL INPATH "${hiveconf:test.data.dir}/alltypesorc" +OVERWRITE INTO TABLE alltypesorc; + +ANALYZE TABLE alltypesorc COMPUTE STATISTICS; + +ANALYZE TABLE alltypesorc COMPUTE STATISTICS FOR COLUMNS ctinyint,csmallint,cint,cbigint,cfloat,cdouble,cstring1,cstring2,ctimestamp1,ctimestamp2,cboolean1,cboolean2; + +-- Druid Table + diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml index 5c8b521..2a62b90 100644 --- a/druid-handler/pom.xml +++ b/druid-handler/pom.xml @@ -341,6 +341,7 @@ net.jpountz.lz4:* org.apache.commons:* org.roaringbitmap:* + org.apache.derby:* diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 8117633..8f78de1 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,10 +17,25 @@ */ package org.apache.hadoop.hive.druid; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.RetryUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; +import io.druid.metadata.storage.derby.DerbyConnector; +import io.druid.metadata.storage.derby.DerbyMetadataStorage; import io.druid.metadata.storage.mysql.MySQLConnector; import io.druid.metadata.storage.postgresql.PostgreSQLConnector; import io.druid.segment.loading.DataSegmentPusher; @@ -28,7 +43,6 @@ import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -58,25 +72,7 @@ import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.metamx.common.RetryUtils; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; - import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.joda.time.Period; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.slf4j.Logger; @@ -88,6 +84,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. @@ -116,9 +114,9 @@ ShutdownHookManager.addShutdownHook(() -> lifecycle.stop()); } - private final SQLMetadataConnector connector; + private SQLMetadataConnector connector; - private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; + private MetadataStorageTablesConfig druidMetadataStorageTablesConfig = null; private String uniqueId = null; @@ -127,48 +125,6 @@ private Configuration conf; public DruidStorageHandler() { - //this is the default value in druid - final String base = HiveConf - .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_BASE); - final String dbType = HiveConf - .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE); - final String username = HiveConf - .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME); - final String password = HiveConf - .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD); - final String uri = HiveConf - .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI); - druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base); - - final Supplier storageConnectorConfigSupplier = Suppliers.ofInstance( - new MetadataStorageConnectorConfig() { - @Override - public String getConnectURI() { - return uri; - } - - @Override - public String getUser() { - return username; - } - - @Override - public String getPassword() { - return password; - } - }); - - if (dbType.equals("mysql")) { - connector = new MySQLConnector(storageConnectorConfigSupplier, - Suppliers.ofInstance(druidMetadataStorageTablesConfig) - ); - } else if (dbType.equals("postgresql")) { - connector = new PostgreSQLConnector(storageConnectorConfigSupplier, - Suppliers.ofInstance(druidMetadataStorageTablesConfig) - ); - } else { - throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType)); - } } @VisibleForTesting @@ -250,13 +206,13 @@ public void preCreateTable(Table table) throws MetaException { // We need to check the Druid metadata dataSourceName = Warehouse.getQualifiedName(table); try { - connector.createSegmentTable(); + getConnector().createSegmentTable(); } catch (Exception e) { LOG.error("Exception while trying to create druid segments table", e); throw new MetaException(e.getMessage()); } Collection existingDataSources = DruidStorageHandlerUtils - .getAllDataSourceNames(connector, druidMetadataStorageTablesConfig); + .getAllDataSourceNames(getConnector(), getDruidMetadataStorageTablesConfig()); LOG.debug("pre-create data source with name {}", dataSourceName); if (existingDataSources.contains(dataSourceName)) { throw new MetaException(String.format("Data source [%s] already existing", dataSourceName)); @@ -272,7 +228,7 @@ public void rollbackCreateTable(Table table) throws MetaException { final Path segmentDescriptorDir = getSegmentDescriptorDir(); try { List dataSegmentList = DruidStorageHandlerUtils - .getPublishedSegments(segmentDescriptorDir, getConf()); + .getCreatededSegments(segmentDescriptorDir, getConf()); for (DataSegment dataSegment : dataSegmentList) { try { deleteSegment(dataSegment); @@ -290,144 +246,146 @@ public void rollbackCreateTable(Table table) throws MetaException { @Override public void commitCreateTable(Table table) throws MetaException { - LOG.debug("commit create table {}", table.getTableName()); if (MetaStoreUtils.isExternalTable(table)) { // For external tables, we do not need to do anything else return; } - publishSegments(table, true); + loadDruidSegments(table, true); } - public void publishSegments(Table table, boolean overwrite) throws MetaException { + + protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException { + // at this point we have Druid segments from reducers but we need to atomically + // rename and commit to metadata final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); final List segmentList = Lists.newArrayList(); final Path tableDir = getSegmentDescriptorDir(); - console.logInfo(String.format("Committing hive table {} druid data source {} to the druid metadata store", - table.getTableName(), dataSourceName - )); + // Read the created segments metadata from the table staging directory try { - segmentList.addAll(DruidStorageHandlerUtils.getPublishedSegments(tableDir, getConf())); + segmentList.addAll(DruidStorageHandlerUtils.getCreatededSegments(tableDir, getConf())); } catch (IOException e) { LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString()); Throwables.propagate(e); cleanWorkingDir(); } + // Moving Druid segments and committing to druid metadata as one transaction. + final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); + List publishedDataSegmentList = Lists.newArrayList(); + final String segmentDirectory = + table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); + LOG.info(String.format( + "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", + segmentList.size(), + getStagingWorkingDir(), + segmentDirectory + + )); + hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory); try { - final String segmentDirectory = - table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null - ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) - : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); - LOG.info( - String.format("Will move [%s] druid segments from [%s] to [%s]", - segmentList.size(), - getStagingWorkingDir(), - segmentDirectory - - )); - HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig(); - pusherConfig.setStorageDirectory(segmentDirectory); - DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, getConf(), DruidStorageHandlerUtils.JSON_MAPPER); - DruidStorageHandlerUtils.publishSegments( - connector, - druidMetadataStorageTablesConfig, + DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig, + getConf(), + DruidStorageHandlerUtils.JSON_MAPPER + ); + publishedDataSegmentList = DruidStorageHandlerUtils.publishSegmentsAndCommit( + getConnector(), + getDruidMetadataStorageTablesConfig(), dataSourceName, segmentList, overwrite, - segmentDirectory, getConf(), dataSegmentPusher - ); - } catch (CallbackFailedException | IOException e ) { - LOG.error("Failed to publish segments"); - if (e instanceof CallbackFailedException) { + + } catch (CallbackFailedException | IOException e) { + LOG.error("Failed to move segments from staging directory"); + if (e instanceof CallbackFailedException) { Throwables.propagate(e.getCause()); } Throwables.propagate(e); } finally { cleanWorkingDir(); } - final String coordinatorAddress = HiveConf - .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS); - int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); - if (maxTries == 0) { - return; - } - LOG.debug("checking load status from coordinator {}", coordinatorAddress); + checkLoadStatus(publishedDataSegmentList); + } + + /** + * This function checks the load status of Druid segments by polling druid coordinator. + * @param segments List of druid segments to check for + * + * @return count of yet to load segments. + */ + private int checkLoadStatus(List segments){ + final String coordinatorAddress = HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS); + int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); + if (maxTries == 0) { + return segments.size(); + } + LOG.debug("checking load status from coordinator {}", coordinatorAddress); - String coordinatorResponse = null; + String coordinatorResponse; + try { + coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(), + new URL(String.format("http://%s/status", coordinatorAddress)) + ), input -> input instanceof IOException, maxTries); + } catch (Exception e) { + console.printInfo( + "Will skip waiting for data loading, coordinator unavailable"); + return segments.size(); + } + if (Strings.isNullOrEmpty(coordinatorResponse)) { + console.printInfo( + "Will skip waiting for data loading empty response from coordinator"); + return segments.size(); + } + console.printInfo( + String.format("Waiting for the loading of [%s] segments", segments.size())); + long passiveWaitTimeMs = HiveConf + .getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME); + Set UrlsOfUnloadedSegments = segments.stream().map(dataSegment -> { try { - coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(), - new URL(String.format("http://%s/status", coordinatorAddress)) - ), input -> input instanceof IOException, maxTries); - } catch (Exception e) { - console.printInfo( - "Will skip waiting for data loading, coordinator unavailable"); - return; - } - if (Strings.isNullOrEmpty(coordinatorResponse)) { - console.printInfo( - "Will skip waiting for data loading empty response from coordinator"); - return; + //Need to make sure that we are using segment identifier + return new URL(String.format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s", + coordinatorAddress, dataSegment.getDataSource(), dataSegment.getIdentifier() + )); + } catch (MalformedURLException e) { + Throwables.propagate(e); } - console.printInfo( - String.format("Waiting for the loading of [%s] segments", segmentList.size())); - long passiveWaitTimeMs = HiveConf - .getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME); - ImmutableSet setOfUrls = FluentIterable.from(segmentList) - .transform(dataSegment -> { - try { - //Need to make sure that we are using UTC since most of the druid cluster use UTC by default - return new URL(String - .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s", - coordinatorAddress, dataSourceName, DataSegment - .makeDataSegmentIdentifier(dataSegment.getDataSource(), - new DateTime(dataSegment.getInterval() - .getStartMillis(), DateTimeZone.UTC), - new DateTime(dataSegment.getInterval() - .getEndMillis(), DateTimeZone.UTC), - dataSegment.getVersion(), - dataSegment.getShardSpec() - ) - )); - } catch (MalformedURLException e) { - Throwables.propagate(e); - } - return null; - }).toSet(); - - int numRetries = 0; - while (numRetries++ < maxTries && !setOfUrls.isEmpty()) { - setOfUrls = ImmutableSet.copyOf(Sets.filter(setOfUrls, new Predicate() { - @Override - public boolean apply(URL input) { - try { - String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input); - LOG.debug("Checking segment {} response is {}", input, result); - return Strings.isNullOrEmpty(result); - } catch (IOException e) { - LOG.error(String.format("Error while checking URL [%s]", input), e); - return true; - } - } - })); + return null; + }).collect(Collectors.toSet()); + int numRetries = 0; + while (numRetries++ < maxTries && !UrlsOfUnloadedSegments.isEmpty()) { + UrlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(UrlsOfUnloadedSegments, input -> { try { - if (!setOfUrls.isEmpty()) { - Thread.sleep(passiveWaitTimeMs); - } - } catch (InterruptedException e) { - Thread.interrupted(); - Throwables.propagate(e); + String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input); + LOG.debug("Checking segment [{}] response is [{}]", input, result); + return Strings.isNullOrEmpty(result); + } catch (IOException e) { + LOG.error(String.format("Error while checking URL [%s]", input), e); + return true; } + })); + + try { + if (!UrlsOfUnloadedSegments.isEmpty()) { + Thread.sleep(passiveWaitTimeMs); + } + } catch (InterruptedException e) { + Thread.interrupted(); + Throwables.propagate(e); } - if (!setOfUrls.isEmpty()) { - // We are not Throwing an exception since it might be a transient issue that is blocking loading - console.printError(String.format( - "Wait time exhausted and we have [%s] out of [%s] segments not loaded yet", - setOfUrls.size(), segmentList.size() - )); - } + } + if (!UrlsOfUnloadedSegments.isEmpty()) { + // We are not Throwing an exception since it might be a transient issue that is blocking loading + console.printError(String.format( + "Wait time exhausted and we have [%s] out of [%s] segments not loaded yet", + UrlsOfUnloadedSegments.size(), segments.size() + )); + } + return UrlsOfUnloadedSegments.size(); } @VisibleForTesting @@ -503,7 +461,7 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio if (deleteData == true) { LOG.info("Dropping with purge all the data for data source {}", dataSourceName); List dataSegmentList = DruidStorageHandlerUtils - .getDataSegmentList(connector, druidMetadataStorageTablesConfig, dataSourceName); + .getDataSegmentList(getConnector(), getDruidMetadataStorageTablesConfig(), dataSourceName); if (dataSegmentList.isEmpty()) { LOG.info("Nothing to delete for data source {}", dataSourceName); return; @@ -517,7 +475,7 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio } } if (DruidStorageHandlerUtils - .disableDataSource(connector, druidMetadataStorageTablesConfig, dataSourceName)) { + .disableDataSource(getConnector(), getDruidMetadataStorageTablesConfig(), dataSourceName)) { LOG.info("Successfully dropped druid data source {}", dataSourceName); } } @@ -529,7 +487,7 @@ public void commitInsertTable(Table table, boolean overwrite) throws MetaExcepti if (MetaStoreUtils.isExternalTable(table)) { throw new MetaException("Cannot insert data into external table backed by Druid"); } - this.publishSegments(table, overwrite); + this.loadDruidSegments(table, overwrite); } @Override @@ -602,6 +560,69 @@ private Path getStagingWorkingDir() { return new Path(getRootWorkingDir(), makeStagingName()); } + private MetadataStorageTablesConfig getDruidMetadataStorageTablesConfig() { + if (druidMetadataStorageTablesConfig != null) { + return druidMetadataStorageTablesConfig; + } + final String base = HiveConf + .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_BASE); + druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base); + return druidMetadataStorageTablesConfig; + } + + private SQLMetadataConnector getConnector() { + if (connector != null) { + return connector; + } + + final String dbType = HiveConf + .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE); + final String username = HiveConf + .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME); + final String password = HiveConf + .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD); + final String uri = HiveConf + .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI); + + + final Supplier storageConnectorConfigSupplier = Suppliers.ofInstance( + new MetadataStorageConnectorConfig() { + @Override + public String getConnectURI() { + return uri; + } + + @Override + public String getUser() { + return Strings.emptyToNull(username); + } + + @Override + public String getPassword() { + return Strings.emptyToNull(password); + } + }); + if (dbType.equals("mysql")) { + connector = new MySQLConnector(storageConnectorConfigSupplier, + Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) + ); + } else if (dbType.equals("postgresql")) { + connector = new PostgreSQLConnector(storageConnectorConfigSupplier, + Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) + ); + + } else if (dbType.equals("derby")) { + connector = new DerbyConnector(new DerbyMetadataStorage(storageConnectorConfigSupplier.get()), + storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()) + ); + } + else { + throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType)); + } + + return connector; + } + @VisibleForTesting protected String makeStagingName() { return ".staging-".concat(getUniqueId().replace(":", "")); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index fbceaac..8cf5d55 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -83,7 +83,6 @@ import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; -import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -101,7 +100,9 @@ import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -246,7 +247,7 @@ public static String getURL(HttpClient client, URL url) throws IOException { * * @throws IOException can be for the case we did not produce data. */ - public static List getPublishedSegments(Path taskDir, Configuration conf) + public static List getCreatededSegments(Path taskDir, Configuration conf) throws IOException { ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); FileSystem fs = taskDir.getFileSystem(conf); @@ -373,17 +374,34 @@ public static boolean disableDataSource(SQLMetadataConnector connector, return true; } - public static void publishSegments(final SQLMetadataConnector connector, + /** + * First computes the segments timeline to accommodate new segments for insert into case + * Then moves segments to druid deep storage with updated metadata/version + * ALL IS DONE IN ONE TRANSACTION + * + * @param connector DBI connector to commit + * @param metadataStorageTablesConfig Druid metadata tables definitions + * @param dataSource Druid datasource name + * @param segments List of segments to move and commit to metadata + * @param overwrite if it is an insert overwrite + * @param conf Configuration + * @param dataSegmentPusher segment pusher + * + * @return List of successfully published Druid segments. + * This list has the updated versions and metadata about segments after move and timeline sorting + * + * @throws CallbackFailedException + */ + public static List publishSegmentsAndCommit(final SQLMetadataConnector connector, final MetadataStorageTablesConfig metadataStorageTablesConfig, final String dataSource, final List segments, boolean overwrite, - String segmentDirectory, Configuration conf, DataSegmentPusher dataSegmentPusher ) throws CallbackFailedException { - connector.getDBI().inTransaction( - (TransactionCallback) (handle, transactionStatus) -> { + return connector.getDBI().inTransaction( + (handle, transactionStatus) -> { // We create the timeline for the existing and new segments VersionedIntervalTimeline timeline; if (overwrite) { @@ -397,7 +415,7 @@ public static void publishSegments(final SQLMetadataConnector connector, // Append Mode if (segments.isEmpty()) { // If there are no new segments, we can just bail out - return null; + return Collections.EMPTY_LIST; } // Otherwise, build a timeline of existing segments in metadata storage Interval indexedInterval = JodaUtils @@ -504,7 +522,7 @@ public static void publishSegments(final SQLMetadataConnector connector, } batch.execute(); - return null; + return finalSegmentsToPublish; } ); } diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java index 0b13a08..6f7fc78 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java @@ -323,10 +323,9 @@ public void testCommitInsertOverwriteTable() throws MetaException, IOException { .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 150), "v0", new LinearShardSpec(0))); DruidStorageHandlerUtils - .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, - taskDirPath.toString(), config, dataSegmentPusher ); @@ -377,10 +376,9 @@ public void testCommitMultiInsertOverwriteTable() throws MetaException, IOExcept .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 150), "v0", new LinearShardSpec(0))); DruidStorageHandlerUtils - .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, - taskDirPath.toString(), config, dataSegmentPusher ); @@ -522,10 +520,9 @@ public void testCommitInsertIntoTable() throws MetaException, IOException { DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils - .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, - taskDirPath.toString(), config, dataSegmentPusher ); @@ -576,10 +573,9 @@ public void testInsertIntoAppendOneMorePartition() throws MetaException, IOExcep .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(), new Interval(100, 150), "v0", new LinearShardSpec(0))); DruidStorageHandlerUtils - .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, - taskDirPath.toString(), config, dataSegmentPusher ); @@ -630,10 +626,9 @@ public void testCommitInsertIntoWhenDestinationSegmentFileExist() pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY))); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils - .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, - taskDirPath.toString(), config, dataSegmentPusher ); @@ -699,10 +694,9 @@ public void testCommitInsertIntoWithConflictingIntervalSegment() pusherConfig.setStorageDirectory(taskDirPath.toString()); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils - .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, - taskDirPath.toString(), config, dataSegmentPusher ); @@ -738,10 +732,9 @@ public void testCommitInsertIntoWithNonExtendableSegment() throws MetaException, pusherConfig.setStorageDirectory(taskDirPath.toString()); DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER); DruidStorageHandlerUtils - .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, existingSegments, true, - taskDirPath.toString(), config, dataSegmentPusher ); diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index af75bfb..465c512 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -187,7 +187,7 @@ public DruidWritable apply(@Nullable ImmutableMap input } druidRecordWriter.close(false); List dataSegmentList = DruidStorageHandlerUtils - .getPublishedSegments(segmentDescriptroPath, config); + .getCreatededSegments(segmentDescriptroPath, config); Assert.assertEquals(1, dataSegmentList.size()); File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir); diff --git a/itests/pom.xml b/itests/pom.xml index 3bf29f9..a782cd2 100644 --- a/itests/pom.xml +++ b/itests/pom.xml @@ -46,6 +46,7 @@ hive-jmh hive-unit-hadoop2 hive-minikdc + qtest-druid diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml new file mode 100644 index 0000000..d1f26cb --- /dev/null +++ b/itests/qtest-druid/pom.xml @@ -0,0 +1,278 @@ + + + + + + hive-it + org.apache.hive + 3.0.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + hive-it-druid + + jar + Hive Integration - QFile Druid Tests + + + + + ../.. + 2.11.0 + 1.19.3 + 9.3.19.v20170502 + 10.11.1.1 + 16.0.1 + + + + io.druid + druid-services + ${druid.version} + + + jersey-server + com.sun.jersey + + + + + io.druid + druid-server + ${druid.version} + + + jersey-server + com.sun.jersey + + + jersey-servlet + com.sun.jersey + + + jersey-core + com.sun.jersey + + + + + com.google.guava + guava + ${druid.guava.version} + + + org.apache.hadoop + hadoop-client + + + com.google.inject + guice + + + com.google.inject.extensions + guice-servlet + + + io.netty + netty + + + com.google.guava + guava + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + jersey-servlet + com.sun.jersey + + + jersey-core + com.sun.jersey + + + jersey-client + com.sun.jersey + + + + + io.druid.extensions + druid-hdfs-storage + ${druid.version} + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + org.apache.curator + curator-framework + ${druid.curator.version} + + + org.apache.curator + curator-client + ${druid.curator.version} + + + org.apache.curator + curator-x-discovery + ${druid.curator.version} + + + org.apache.curator + curator-recipes + ${druid.curator.version} + + + com.sun.jersey + jersey-bundle + ${druid.jersey.version} + + + org.eclipse.jetty + jetty-server + ${druid.jetty.version} + + + org.eclipse.jetty + jetty-servlet + ${druid.jetty.version} + + + org.eclipse.jetty + jetty-servlets + ${druid.jetty.version} + + + org.eclipse.jetty + jetty-proxy + ${druid.jetty.version} + + + org.eclipse.jetty + jetty-util + ${druid.jetty.version} + + + org.eclipse.jetty + jetty-security + ${druid.jetty.version} + + + org.apache.derby + derbynet + ${druid.derby.version} + + + org.apache.derby + derbyclient + ${druid.derby.version} + + + org.apache.derby + derby + ${druid.derby.version} + + + + junit + junit + ${junit.version} + test + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.plugin.version} + + + + package + + shade + + + + + false + false + + + io.druid.cli.Main + + + + + + junit:* + jmock:* + *:xml-apis + org.apache.maven:lib:tests + javax.ws.rs:jsr311-api + *:javax.el-api + *:jsp-api* + org.apache.logging.log4j:log4j-1.2-api + org.apache.hive:hive-exec + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + \ No newline at end of file diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/DruidNode.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/DruidNode.java new file mode 100644 index 0000000..1911144 --- /dev/null +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/DruidNode.java @@ -0,0 +1,26 @@ +package org.apache.hive.druid; + +import java.io.Closeable; +import java.io.IOException; + +public abstract class DruidNode implements Closeable{ + + private final String nodeType; + + public DruidNode(String nodeId) {this.nodeType = nodeId;} + + final public String getNodeType() { + return nodeType; + } + + /** + * starts the druid node + */ + public abstract void start() throws IOException; + + /** + * @return true if the process is working + */ + public abstract boolean isAlive(); + +} diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java new file mode 100644 index 0000000..f81a0ca --- /dev/null +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.druid; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ForkingDruidNode extends DruidNode { + private final static String DEFAULT_JAVA_CMD = "java"; + + private final static Logger log = LoggerFactory.getLogger(ForkingDruidNode.class); + + private final String classpath; + + private final Map properties; + + private final List jvmArgs; + + private final File logLocation; + + private final File logFile; + + private final String javaCmd; + + private final ProcessBuilder processBuilder = new ProcessBuilder(); + + private Process druidProcess = null; + + private Boolean started = false; + + private final List allowedPrefixes = Lists.newArrayList( + "com.metamx", + "druid", + "io.druid", + "java.io.tmpdir", + "hadoop" + ); + + public ForkingDruidNode(String nodeType, + String extraClasspath, + Map properties, + List jvmArgs, + File logLocation, + String javaCmd + ) { + super(nodeType); + + final List command = Lists.newArrayList(); + this.classpath = Strings.isNullOrEmpty(extraClasspath) + ? System.getProperty("java.class.path") + : extraClasspath; + this.properties = properties == null ? new HashMap<>() : properties; + this.jvmArgs = Preconditions.checkNotNull(jvmArgs); + this.logLocation = logLocation == null ? new File("/tmp/druid") : logLocation; + if (!this.logLocation.exists()) { + this.logLocation.mkdirs(); + } + + this.javaCmd = javaCmd == null ? DEFAULT_JAVA_CMD : javaCmd; + + logFile = new File(this.logLocation, getNodeType() + ".log"); + // set the log stream + processBuilder.redirectErrorStream(true); + processBuilder.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile)); + command.add(this.javaCmd); + command.addAll(this.jvmArgs); + command.add("-server"); + command.add("-cp"); + command.add(classpath); + + // inject properties from the main App that matches allowedPrefix + for (String propName : System.getProperties().stringPropertyNames()) { + for (String allowedPrefix : allowedPrefixes) { + if (propName.startsWith(allowedPrefix)) { + command.add( + String.format( + "-D%s=%s", + propName, + System.getProperty(propName) + ) + ); + } + } + } + this.properties + .forEach((key, value) -> command.add(String.format("-D%s=%s", key, value))); + command.addAll(Lists.newArrayList("io.druid.cli.Main", "server", getNodeType())); + processBuilder.command(command); + log.info("Creating forking druid node with " + String.join(" ", processBuilder.command())); + } + + @Override + public void start() throws IOException { + synchronized (started) { + if (started == false) { + druidProcess = processBuilder.start(); + started = true; + } + log.info("Started " + getNodeType()); + } + } + + @Override + public boolean isAlive() { + synchronized (started) { + return started && druidProcess != null && druidProcess.isAlive(); + } + } + + @Override + public void close() throws IOException { + synchronized (started) { + if (druidProcess != null && druidProcess.isAlive()) { + druidProcess.destroy(); + } + try { + log.info("Waiting for " + getNodeType()); + if (druidProcess.waitFor(5000, TimeUnit.MILLISECONDS)) { + log.info(String.format("Shutdown completed for node [%s]", getNodeType())); + } else { + log.info(String.format("Waiting to shutdown node [%s] exhausted shutting down forcibly", getNodeType())); + druidProcess.destroyForcibly(); + } + } catch (InterruptedException e) { + Thread.interrupted(); + Throwables.propagate(e); + } + } + } +} diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java new file mode 100644 index 0000000..71259dc --- /dev/null +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.druid; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * This class has the hooks to start and stop the external Druid Nodes + */ +public class MiniDruidCluster extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(MiniDruidCluster.class); + + private static final String COMMON_DRUID_JVM_PROPPERTIES = "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info"; + + private static final List HISTORICAL_JVM_CONF = Arrays + .asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", + COMMON_DRUID_JVM_PROPPERTIES + ); + + private static final List COORDINATOR_JVM_CONF = Arrays + .asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", + COMMON_DRUID_JVM_PROPPERTIES + ); + + private static final Map COMMON_DRUID_CONF = ImmutableMap.of( + "druid.metadata.storage.type", "derby" + ); + + private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( + "druid.processing.buffer.sizeBytes", "213870912", + "druid.processing.numThreads", "2", + "druid.server.maxSize", "130000000000" + ); + + private static final Map COMMON_COORDINATOR_INDEXER = ImmutableMap + .of( + "druid.indexer.logs.type", "file", + "druid.coordinator.asOverlord.enabled", "true", + "druid.coordinator.asOverlord.overlordService", "druid/overlord", + "druid.coordinator.period", "PT10S", + "druid.manager.segments.pollDuration", "PT10S" + ); + + private final DruidNode historical; + + private final DruidNode broker; + + // Coordinator is running as Overlord as well. + private final DruidNode coordinator; + + private final List druidNodes; + + private final File dataDirectory; + + private final File logDirectory; + + public MiniDruidCluster(String name) { + this(name, "/tmp/miniDruid/log", "/tmp/miniDruid/data", 2181, null); + } + + + public MiniDruidCluster(String name, String logDir, String dataDir, Integer zookeeperPort, String classpath) { + super(name); + this.dataDirectory = new File(dataDir, "druid-data"); + this.logDirectory = new File(logDir); + try { + + if (dataDirectory.exists()) { + // need to clean data directory to ensure that there is no interference from old runs + // Cleaning is happening here to allow debugging in case of tests fail + // we don;t have to clean logs since it is an append mode + log.info("Cleaning the druid-data directory [{}]", dataDirectory.getAbsolutePath()); + FileUtils.deleteDirectory(dataDirectory); + } else { + log.info("Creating the druid-data directory [{}]", dataDirectory.getAbsolutePath()); + dataDirectory.mkdirs(); + } + } catch (IOException e) { + log.error("Failed to clean data directory"); + Throwables.propagate(e); + } + String derbyURI = String + .format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db;create=true", + dataDirectory.getAbsolutePath() + ); + String segmentsCache = String + .format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]", + dataDirectory.getAbsolutePath() + ); + String indexingLogDir = new File(logDirectory, "indexer-log").getAbsolutePath(); + + ImmutableMap.Builder coordinatorMapBuilder = new ImmutableMap.Builder(); + ImmutableMap.Builder historicalMapBuilder = new ImmutableMap.Builder(); + + Map coordinatorProperties = coordinatorMapBuilder.putAll(COMMON_DRUID_CONF) + .putAll(COMMON_COORDINATOR_INDEXER) + .put("druid.metadata.storage.connector.connectURI", derbyURI) + .put("druid.indexer.logs.directory", indexingLogDir) + .put("druid.zk.service.host", "localhost:" + zookeeperPort) + .put("druid.coordinator.startDelay", "PT1S") + .build(); + Map historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF) + .putAll(COMMON_DRUID_HISTORICAL) + .put("druid.zk.service.host", "localhost:" + zookeeperPort) + .put("druid.segmentCache.locations", segmentsCache) + .put("druid.storage.storageDirectory", getDeepStorageDir()) + .put("druid.storage.type", "hdfs") + .build(); + coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties, + COORDINATOR_JVM_CONF, + logDirectory, null + ); + historical = new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, + logDirectory, null + ); + broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF, + logDirectory, null + ); + druidNodes = Arrays.asList(coordinator, historical, broker); + + } + + @Override + protected void serviceStart() throws Exception { + druidNodes.stream().forEach(node -> { + try { + node.start(); + } catch (IOException e) { + log.error("Failed to start node " + node.getNodeType() + + " Consequently will destroy the cluster"); + druidNodes.stream().filter(node1 -> node1.isAlive()).forEach(nodeToStop -> { + try { + log.info("Stopping Node " + nodeToStop.getNodeType()); + nodeToStop.close(); + } catch (IOException e1) { + log.error("Error while stopping " + nodeToStop.getNodeType(), e1); + } + }); + Throwables.propagate(e); + } + }); + } + + @Override + protected void serviceStop() throws Exception { + druidNodes.stream().forEach(node -> { + try { + node.close(); + } catch (IOException e) { + // nothing that we can really do about it + log.error(String.format("Failed to stop druid node [%s]", node.getNodeType()), e); + } + }); + } + + + public String getMetadataURI() { + return String.format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db", + dataDirectory.getAbsolutePath() + ); + } + + public String getDeepStorageDir() { + return dataDirectory.getAbsolutePath() + File.separator + "deep-storage"; + } +} diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index 7f7d5f3..7cccd3b 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -383,6 +383,26 @@ + + org.apache.hive + hive-it-druid + ${project.version} + test + + + com.sun.jersey + jersey-bundle + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + @@ -428,7 +448,7 @@ - + diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidCliDriver.java new file mode 100644 index 0000000..fa75d65 --- /dev/null +++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidCliDriver.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniDruidCliDriver { + + static CliAdapter adapter = new CliConfigs.MiniDruidCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniDruidCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index cca1055..ce28a0c 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1593,3 +1593,7 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ spark.perf.disabled.query.files=query14.q,\ query64.q + +druid.query.files=druidmini_test1.q,\ + druidmini_test_insert.q + diff --git a/itests/util/pom.xml b/itests/util/pom.xml index 16118b5..6dfe69c 100644 --- a/itests/util/pom.xml +++ b/itests/util/pom.xml @@ -93,6 +93,12 @@ org.apache.hive hive-metastore ${project.version} + + + guice + com.google.inject + + org.apache.hive @@ -126,11 +132,23 @@ org.apache.hadoop hadoop-common ${hadoop.version} + + + guice + com.google.inject + + org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} + + + guice + com.google.inject + + org.apache.hbase @@ -192,5 +210,24 @@ ${hbase.version} tests + + org.apache.hive + hive-it-druid + ${project.version} + + + com.sun.jersey + jersey-bundle + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 438d296..267e31c 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -163,6 +163,29 @@ public MiniLlapCliConfig() { } } + public static class MiniDruidCliConfig extends AbstractCliConfig { + public MiniDruidCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + + includesFrom(testConfigProps, "druid.query.files"); + + setResultsDir("ql/src/test/results/clientpositive/druid"); + setLogDir("itests/qtest/target/tmp/log"); + + setInitScript("q_test_druid_init.sql"); + setCleanupScript("q_test_cleanup_druid.sql"); + setHiveConfDir(""); + setClusterType(MiniClusterType.druid); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + public static class MiniLlapLocalCliConfig extends AbstractCliConfig { public MiniLlapLocalCliConfig() { diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 36ad581..e1c6562 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hive.common.io.SortPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.druid.MiniDruidCluster; import org.apache.hadoop.hive.llap.LlapItUtils; import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.llap.io.api.LlapProxy; @@ -200,6 +201,8 @@ private final String initScript; private final String cleanupScript; + private MiniDruidCluster druidCluster; + public interface SuiteAddTestFunctor { public void addTestToSuite(TestSuite suite, Object setup, String tName); } @@ -363,6 +366,17 @@ public void initConf() throws Exception { conf.set(confEntry.getKey(), clusterSpecificConf.get(confEntry.getKey())); } } + if (druidCluster != null) { + final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); + fs.mkdirs(druidDeepStorage); + conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); + conf.set("hive.druid.metadata.db.type", "derby"); + conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); + final Path scratchDir = fs + .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); + fs.mkdirs(scratchDir); + conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); + } } private void setFsRelatedProperties(HiveConf conf, boolean isLocalFs, FileSystem fs) { @@ -438,7 +452,8 @@ private void createRemoteDirs() { private enum CoreClusterType { MR, TEZ, - SPARK + SPARK, + DRUID } public enum FsType { @@ -456,7 +471,8 @@ private void createRemoteDirs() { miniSparkOnYarn(CoreClusterType.SPARK, FsType.hdfs), llap(CoreClusterType.TEZ, FsType.hdfs), llap_local(CoreClusterType.TEZ, FsType.local), - none(CoreClusterType.MR, FsType.local); + none(CoreClusterType.MR, FsType.local), + druid(CoreClusterType.DRUID, FsType.hdfs); private final CoreClusterType coreClusterType; @@ -491,6 +507,8 @@ public static MiniClusterType valueForString(String type) { return llap; } else if (type.equals("llap_local")) { return llap_local; + } else if (type.equals("druid")) { + return druid; } else { return none; } @@ -645,6 +663,15 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws mr = shims.getMiniSparkCluster(conf, 2, uriString, 1); } else if (clusterType == MiniClusterType.mr) { mr = shims.getMiniMrCluster(conf, 2, uriString, 1); + } else if (clusterType == MiniClusterType.druid) { + final String tempDir = System.getProperty("test.tmp.dir"); + druidCluster = new MiniDruidCluster("mini-druid", getLogDirectory(), tempDir, + setup.zkPort, Utilities.jarFinderGetJar(MiniDruidCluster.class) + ); + druidCluster.init(conf); + final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); + fs.mkdirs(druidDeepStorage); + druidCluster.start(); } } @@ -657,6 +684,10 @@ public void shutdown() throws Exception { if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { SessionState.get().getTezSession().destroy(); } + if (druidCluster != null) { + druidCluster.stop(); + druidCluster = null; + } setup.tearDown(); if (sparkSession != null) { try { diff --git a/pom.xml b/pom.xml index 1682f47..92d496b 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ 3.2 1.5.4 1.4 - 10.10.2.0 + 10.11.1.1 3.1.0 0.1.2 0.10.1 @@ -168,7 +168,7 @@ 3.0.1 1.1 9.3.8.v20160314 - 1.14 + 1.19 2.22.2 2.12 diff --git a/ql/src/test/queries/clientpositive/druidmini_test1.q b/ql/src/test/queries/clientpositive/druidmini_test1.q new file mode 100644 index 0000000..630e617 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidmini_test1.q @@ -0,0 +1,121 @@ +CREATE TABLE druid_table +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") +AS +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL; + +-- Time Series Query +explain select count(*) FROM druid_table; +SELECT count(*) FROM druid_table; + + +EXPLAIN SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint) +FROM druid_table GROUP BY floor_year(`__time`); + +SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint) +FROM druid_table GROUP BY floor_year(`__time`); + +EXPLAIN SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint) +FROM druid_table GROUP BY floor_year(`__time`); + +SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint) +FROM druid_table GROUP BY floor_year(`__time`); + + +EXPLAIN SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint) +FROM druid_table GROUP BY floor_year(`__time`); + +SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint) +FROM druid_table GROUP BY floor_year(`__time`); + + +-- Group By + + +EXPLAIN SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10; + +SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10; + + +EXPLAIN SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10; + +SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10; + + +-- TIME STUFF + +EXPLAIN +SELECT `__time` +FROM druid_table ORDER BY `__time` ASC LIMIT 10; + +SELECT `__time` +FROM druid_table ORDER BY `__time` ASC LIMIT 10; + +EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10; + + +SELECT `__time` +FROM druid_table +WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10; + + +EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10; + + +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10; + + +EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' + AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10; + + +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' + AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10; + + +EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;; + + +SELECT `__time` +FROM druid_table +WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;; + + +EXPLAIN +SELECT `__time` +FROM druid_table +WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00') + OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10; + + +SELECT `__time` +FROM druid_table +WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00') + OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10; diff --git a/ql/src/test/queries/clientpositive/druidmini_test_insert.q b/ql/src/test/queries/clientpositive/druidmini_test_insert.q new file mode 100644 index 0000000..558e246 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidmini_test_insert.q @@ -0,0 +1,53 @@ +CREATE TABLE druid_alltypesorc +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") +AS +SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp2 IS NOT NULL; + +SELECT COUNT(*) FROM druid_alltypesorc; + +INSERT INTO TABLE druid_alltypesorc +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL; + + +SELECT COUNT(*) FROM druid_alltypesorc; + +INSERT OVERWRITE TABLE druid_alltypesorc +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL; + +SELECT COUNT(*) FROM druid_alltypesorc; + +DROP TABLE druid_alltypesorc; diff --git a/ql/src/test/results/clientpositive/druid/druidmini_test1.q.out b/ql/src/test/results/clientpositive/druid/druidmini_test1.q.out new file mode 100644 index 0000000..9577e72 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidmini_test1.q.out @@ -0,0 +1,748 @@ +PREHOOK: query: CREATE TABLE druid_table +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") +AS +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_table +POSTHOOK: query: CREATE TABLE druid_table +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") +AS +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_table +POSTHOOK: Lineage: druid_table.__time EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] +POSTHOOK: Lineage: druid_table.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: druid_table.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: druid_table.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: druid_table.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: druid_table.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: druid_table.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: druid_table.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: druid_table.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: druid_table.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: druid_table.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] +PREHOOK: query: explain select count(*) FROM druid_table +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) FROM druid_table +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"timeseries","dataSource":"default.druid_table","descending":false,"granularity":"all","aggregations":[{"type":"count","name":"$f0"}],"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: $f0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: SELECT count(*) FROM druid_table +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) FROM druid_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +6105 +PREHOOK: query: EXPLAIN SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"timeseries","dataSource":"default.druid_table","descending":false,"granularity":"year","aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cfloat"},{"type":"doubleSum","name":"$f2","fieldName":"cdouble"},{"type":"longSum","name":"$f3","fieldName":"ctinyint"},{"type":"longSum","name":"$f4","fieldName":"csmallint"},{"type":"longSum","name":"$f5","fieldName":"cint"},{"type":"longSum","name":"$f6","fieldName":"cbigint"}],"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone), $f1 (type: float), $f2 (type: float), $f3 (type: bigint), $f4 (type: bigint), $f5 (type: bigint), $f6 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1968-12-31 16:00:00.0 US/Pacific -4532.57 3660538.8 -4611 3658030 688783835691 8060200254 +1969-12-31 16:00:00.0 US/Pacific -35057.676 2.3648124E7 -35356 4123059 719285966109 2932345033 +PREHOOK: query: EXPLAIN SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"timeseries","dataSource":"default.druid_table","descending":false,"granularity":"year","aggregations":[{"type":"doubleMin","name":"$f1","fieldName":"cfloat"},{"type":"doubleMin","name":"$f2","fieldName":"cdouble"},{"type":"longMin","name":"$f3","fieldName":"ctinyint"},{"type":"longMin","name":"$f4","fieldName":"csmallint"},{"type":"longMin","name":"$f5","fieldName":"cint"},{"type":"longMin","name":"$f6","fieldName":"cbigint"}],"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone), $f1 (type: float), $f2 (type: float), $f3 (type: bigint), $f4 (type: bigint), $f5 (type: bigint), $f6 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1968-12-31 16:00:00.0 US/Pacific -1790.778 -308691.84 -1790 -313425 0 -8577981133 +1969-12-31 16:00:00.0 US/Pacific -964.719 -287404.84 -1051 -292138 -1073279343 -2147311592 +PREHOOK: query: EXPLAIN SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"timeseries","dataSource":"default.druid_table","descending":false,"granularity":"year","aggregations":[{"type":"doubleMax","name":"$f1","fieldName":"cfloat"},{"type":"doubleMax","name":"$f2","fieldName":"cdouble"},{"type":"longMax","name":"$f3","fieldName":"ctinyint"},{"type":"longMax","name":"$f4","fieldName":"csmallint"},{"type":"longMax","name":"$f5","fieldName":"cint"},{"type":"longMax","name":"$f6","fieldName":"cbigint"}],"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone), $f1 (type: float), $f2 (type: float), $f3 (type: bigint), $f4 (type: bigint), $f5 (type: bigint), $f6 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint) +FROM druid_table GROUP BY floor_year(`__time`) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1968-12-31 16:00:00.0 US/Pacific 62.0 57235.0 62 57235 314088763179 2144274348 +1969-12-31 16:00:00.0 US/Pacific 769.164 1.9565518E7 723 57435 319104152611 4923772860 +PREHOOK: query: EXPLAIN SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"default","dimension":"cstring1"}],"limitSpec":{"type":"default","limit":10,"columns":[{"dimension":"$f1","direction":"ascending","dimensionOrder":"numeric"}]},"aggregations":[{"type":"doubleSum","name":"$f1","fieldName":"cdouble"}],"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"]} + druid.query.type groupBy + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: cstring1 (type: string), $f1 (type: float) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1cGVWH7n1QU -596096.7 +821UdmGbkEf4j -14161.827 +00iT08 0.0 +02v8WnLuYDos3Cq 0.0 +yv1js 0.0 +02VRbSC5I 0.0 +014ILGhXxNY7g02hl0Xw 0.0 +02vDyIVT752 0.0 +00PafC7v 0.0 +ytpx1RL8F2I 0.0 +PREHOOK: query: EXPLAIN SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"groupBy","dataSource":"default.druid_table","granularity":"all","dimensions":[{"type":"default","dimension":"cstring2"}],"limitSpec":{"type":"default","limit":10,"columns":[{"dimension":"cstring2","direction":"ascending","dimensionOrder":"alphanumeric"}]},"aggregations":[{"type":"doubleMax","name":"$f1","fieldName":"cdouble"}],"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"]} + druid.query.type groupBy + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: cstring2 (type: string), $f1 (type: float) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +NULL 1.9565518E7 +0AAE3daA78MISbsRsHJrp2PI 0.0 +0amu3m60U20Xa3 -200.0 +0aO3Lwer 0.0 +0aQBRP67JY0gpi 15601.0 +0b1WvXy 0.0 +0b03cuG3B4ASx4es1411336I -7196.0 +0B5S310g 0.0 +0bffMd8KSbW32A8A5 0.0 +0bke07kBhD1s33AV3R1X7j7j 0.0 +PREHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"select","dataSource":"default.druid_table","descending":false,"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"dimensions":[],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: timestamp with local time zone) + sort order: + + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: SELECT `__time` +FROM druid_table ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT `__time` +FROM druid_table ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +PREHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"select","dataSource":"default.druid_table","descending":false,"intervals":["1900-01-01T00:00:00.000/1970-03-01T08:00:00.000"],"dimensions":[],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: timestamp with local time zone) + sort order: + + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +PREHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"select","dataSource":"default.druid_table","descending":false,"intervals":["1968-01-01T08:00:00.000/1970-03-01T08:00:00.001"],"dimensions":[],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: timestamp with local time zone) + sort order: + + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +PREHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' + AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' + AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"select","dataSource":"default.druid_table","descending":false,"intervals":["1968-01-01T08:00:00.000/1970-03-01T08:00:00.001"],"dimensions":[],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: timestamp with local time zone) + sort order: + + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' + AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' + AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +PREHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"select","dataSource":"default.druid_table","descending":false,"intervals":["1968-01-01T08:00:00.000/1970-01-01T08:00:00.001"],"dimensions":[],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: timestamp with local time zone) + sort order: + + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT `__time` +FROM druid_table +WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +PREHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00') + OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT `__time` +FROM druid_table +WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00') + OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: druid_table + properties: + druid.query.json {"queryType":"select","dataSource":"default.druid_table","descending":false,"intervals":["1968-01-01T08:00:00.000/1970-04-01T08:00:00.001"],"dimensions":[],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: __time (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: timestamp with local time zone) + sort order: + + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + TopN Hash Memory Usage: 0.1 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: timestamp with local time zone) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: SELECT `__time` +FROM druid_table +WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00') + OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT `__time` +FROM druid_table +WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00') + OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table +#### A masked pattern was here #### +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific +1969-12-31 15:59:00.0 US/Pacific diff --git a/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out new file mode 100644 index 0000000..7e01b0d --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out @@ -0,0 +1,150 @@ +PREHOOK: query: CREATE TABLE druid_alltypesorc +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") +AS +SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp2 IS NOT NULL +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_alltypesorc +POSTHOOK: query: CREATE TABLE druid_alltypesorc +STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' +TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE") +AS +SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp2 IS NOT NULL +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_alltypesorc +POSTHOOK: Lineage: druid_alltypesorc.__time EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: druid_alltypesorc.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] +PREHOOK: query: SELECT COUNT(*) FROM druid_alltypesorc +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: SELECT COUNT(*) FROM druid_alltypesorc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_alltypesorc +#### A masked pattern was here #### +6057 +PREHOOK: query: INSERT INTO TABLE druid_alltypesorc +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@druid_alltypesorc +POSTHOOK: query: INSERT INTO TABLE druid_alltypesorc +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@druid_alltypesorc +PREHOOK: query: SELECT COUNT(*) FROM druid_alltypesorc +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: SELECT COUNT(*) FROM druid_alltypesorc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_alltypesorc +#### A masked pattern was here #### +12162 +PREHOOK: query: INSERT OVERWRITE TABLE druid_alltypesorc +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@druid_alltypesorc +POSTHOOK: query: INSERT OVERWRITE TABLE druid_alltypesorc +SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`, + cstring1, + cstring2, + cdouble, + cfloat, + ctinyint, + csmallint, + cint, + cbigint, + cboolean1, + cboolean2 + FROM alltypesorc where ctimestamp1 IS NOT NULL +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@druid_alltypesorc +PREHOOK: query: SELECT COUNT(*) FROM druid_alltypesorc +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: SELECT COUNT(*) FROM druid_alltypesorc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_alltypesorc +#### A masked pattern was here #### +6105 +PREHOOK: query: DROP TABLE druid_alltypesorc +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_alltypesorc +PREHOOK: Output: default@druid_alltypesorc +POSTHOOK: query: DROP TABLE druid_alltypesorc +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_alltypesorc +POSTHOOK: Output: default@druid_alltypesorc