diff --git common/src/java/org/apache/hadoop/hive/ant/GenHiveTemplate.java common/src/java/org/apache/hadoop/hive/ant/GenHiveTemplate.java index 4293b7c..5eaea01 100644 --- common/src/java/org/apache/hadoop/hive/ant/GenHiveTemplate.java +++ common/src/java/org/apache/hadoop/hive/ant/GenHiveTemplate.java @@ -110,7 +110,12 @@ private Document generateTemplate() throws Exception { Element property = appendElement(root, "property", null); appendElement(property, "name", confVars.varname); appendElement(property, "value", confVars.getDefaultExpr()); - appendElement(property, "description", normalize(confVars.getDescription())); + String validator = confVars.validatorDescription(); + String description = confVars.getDescription(); + if (validator != null) { + description = validator + ".\n" + description; + } + appendElement(property, "description", normalize(description)); // wish to add new line here. } return doc; diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 74bb863..2032b6b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.conf.Validator.PatternSet; import org.apache.hadoop.hive.conf.Validator.RangeValidator; import org.apache.hadoop.hive.conf.Validator.StringSet; +import org.apache.hadoop.hive.conf.Validator.TimeValidator; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; @@ -367,9 +369,9 @@ "JDBC connect string for a JDBC metastore"), HMSHANDLERATTEMPTS("hive.hmshandler.retry.attempts", 1, - "The number of times to retry a HMSHandler call if there were a connection error"), - HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", 1000, - "The number of milliseconds between HMSHandler retry attempts"), + "The number of times to retry a HMSHandler call if there were a connection error."), + HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", "1000ms", + new TimeValidator(TimeUnit.MILLISECONDS), "The time between HMSHandler retry attempts on failure."), HMSHANDLERFORCERELOADCONF("hive.hmshandler.force.reload.conf", false, "Whether to force reloading of the HMSHandler configuration (including\n" + "the connection URL, before the next metastore query that accesses the\n" + @@ -464,10 +466,12 @@ "for operations like drop-partition (disallow the drop-partition if the user in\n" + "question doesn't have permissions to delete the corresponding directory\n" + "on the storage)."), - METASTORE_EVENT_CLEAN_FREQ("hive.metastore.event.clean.freq", 0L, - "Frequency at which timer task runs to purge expired events in metastore(in seconds)."), - METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", 0L, - "Duration after which events expire from events table (in seconds)"), + METASTORE_EVENT_CLEAN_FREQ("hive.metastore.event.clean.freq", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Frequency at which timer task runs to purge expired events in metastore."), + METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Duration after which events expire from events table"), METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", true, "In unsecure mode, setting this property to true will cause the metastore to execute DFS operations using \n" + "the client's reported user and group permissions. Note that this property must be set on \n" + @@ -579,8 +583,9 @@ HIVE_CURRENT_DATABASE("hive.current.database", "", "Database name used by current session. Internal usage only.", true), // for hive script operator - HIVES_AUTO_PROGRESS_TIMEOUT("hive.auto.progress.timeout", 0, - "How long to run autoprogressor for the script/UDTF operators (in seconds).\n" + + HIVES_AUTO_PROGRESS_TIMEOUT("hive.auto.progress.timeout", "0s", + new TimeValidator(TimeUnit.SECONDS), + "How long to run autoprogressor for the script/UDTF operators.\n" + "Set to 0 for forever."), HIVETABLENAME("hive.table.name", "", ""), HIVEPARTITIONNAME("hive.partition.name", "", ""), @@ -689,10 +694,9 @@ "because this may prevent TaskTracker from killing tasks with infinite loops."), HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile", new StringSet("TextFile", "SequenceFile", "RCfile", "ORC"), - "Default file format for CREATE TABLE statement. \n" + - "Options are TextFile, SequenceFile, RCfile and ORC. Users can explicitly override it by CREATE TABLE ... STORED AS [FORMAT]"), + "Default file format for CREATE TABLE statement. Users can explicitly override it by CREATE TABLE ... STORED AS [FORMAT]"), HIVEQUERYRESULTFILEFORMAT("hive.query.result.fileformat", "TextFile", new StringSet("TextFile", "SequenceFile", "RCfile"), - "Default file format for storing result of the query. Allows TextFile, SequenceFile and RCfile"), + "Default file format for storing result of the query."), HIVECHECKFILEFORMAT("hive.fileformat.check", true, "Whether to check file format or not when loading data files"), // default serde for rcfile @@ -719,8 +723,8 @@ "Whether to log the plan's progress every time a job's progress is checked.\n" + "These logs are written to the location specified by hive.querylog.location"), - HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL("hive.querylog.plan.progress.interval", 60000L, - "The interval to wait between logging the plan's progress in milliseconds.\n" + + HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL("hive.querylog.plan.progress.interval", "60000ms", + "The interval to wait between logging the plan's progress.\n" + "If there is a whole number percentage change in the progress of the mappers or the reducers,\n" + "the progress is logged regardless of this value.\n" + "The actual interval will be the ceiling of (this value divided by the value of\n" + @@ -840,8 +844,7 @@ HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED", new StringSet("SPEED", "COMPRESSION"), "Define the encoding strategy to use while writing data. Changing this will\n" + "only affect the light weight encoding for integers. This flag will not\n" + - "change the compression level of higher level compression codec (like ZLIB).\n" + - "Possible options are SPEED and COMPRESSION."), + "change the compression level of higher level compression codec (like ZLIB)."), HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false, "If turned on splits generated by orc will include metadata about the stripes in the file. This\n" + @@ -1098,15 +1101,16 @@ "The Java class (implementing the StatsPublisher interface) that is used by default if hive.stats.dbclass is custom type."), HIVE_STATS_DEFAULT_AGGREGATOR("hive.stats.default.aggregator", "", "The Java class (implementing the StatsAggregator interface) that is used by default if hive.stats.dbclass is custom type."), - HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout", 30, - "Timeout value (number of seconds) used by JDBC connection and statements."), + HIVE_STATS_JDBC_TIMEOUT("hive.stats.jdbc.timeout", "30s", + "Timeout value used by JDBC connection and statements."), HIVE_STATS_ATOMIC("hive.stats.atomic", false, "whether to update metastore stats only if all stats are available"), HIVE_STATS_RETRIES_MAX("hive.stats.retries.max", 0, "Maximum number of retries when stats publisher/aggregator got an exception updating intermediate database. \n" + "Default is no tries on failures."), - HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait", 3000, - "The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by " + + HIVE_STATS_RETRIES_WAIT("hive.stats.retries.wait", "3000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "The base waiting window before the next retry. The actual wait time is calculated by " + "baseWindow * failures baseWindow * (failure 1) * (random number between [0.0,1.0])."), HIVE_STATS_COLLECT_RAWDATASIZE("hive.stats.collect.rawdatasize", true, "should the raw data size be collected when analyzing tables"), @@ -1217,8 +1221,9 @@ "The number of times you want to try to get all the locks"), HIVE_UNLOCK_NUMRETRIES("hive.unlock.numretries", 10, "The number of times you want to retry to do one unlock"), - HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60, - "The sleep time (in seconds) between various retries"), + HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", "60s", + new TimeValidator(TimeUnit.SECONDS), + "The sleep time between various retries"), HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false, "This param is to control whether or not only do lock on queries\n" + "that need to execute at least one mapred job."), @@ -1238,8 +1243,8 @@ // Transactions HIVE_TXN_MANAGER("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", ""), - HIVE_TXN_TIMEOUT("hive.txn.timeout", 300, - "time after which transactions are declared aborted if the client has not sent a heartbeat, in seconds."), + HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS), + "time after which transactions are declared aborted if the client has not sent a heartbeat."), HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + @@ -1253,12 +1258,14 @@ HIVE_COMPACTOR_WORKER_THREADS("hive.compactor.worker.threads", 0, "Number of compactor worker threads to run on this metastore instance."), - HIVE_COMPACTOR_WORKER_TIMEOUT("hive.compactor.worker.timeout", 86400L, - "Time in seconds, before a given compaction in working state is declared a failure\n" + + HIVE_COMPACTOR_WORKER_TIMEOUT("hive.compactor.worker.timeout", "86400s", + new TimeValidator(TimeUnit.SECONDS), + "Time before a given compaction in working state is declared a failure\n" + "and returned to the initiated state."), - HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", 300L, - "Time in seconds between checks to see if any partitions need compacted.\n" + + HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", "300s", + new TimeValidator(TimeUnit.SECONDS), + "Time between checks to see if any partitions need compacted.\n" + "This should be kept high because each check for compaction requires many calls against the NameNode."), HIVE_COMPACTOR_DELTA_NUM_THRESHOLD("hive.compactor.delta.num.threshold", 10, @@ -1295,7 +1302,7 @@ "Currently the query should be single sourced not having any subquery and should not have\n" + "any aggregations or distincts (which incurs RS), lateral views and joins.\n" + "1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only\n" + - "2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)\n" + "2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)" ), HIVEFETCHTASKCONVERSIONTHRESHOLD("hive.fetch.task.conversion.threshold", 1073741824L, "Input threshold for applying hive.fetch.task.conversion. If target table is native, input length\n" + @@ -1467,12 +1474,12 @@ "table. From 0.12 onwards, they are displayed separately. This flag will let you\n" + "get old behavior, if desired. See, test-case in patch for HIVE-6689."), - HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, Long.MAX_VALUE), + HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null), "This number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds between retries. \n" + "The default of 30 will keep trying for 30 minutes."), HIVE_SERVER2_TRANSPORT_MODE("hive.server2.transport.mode", "binary", new StringSet("binary", "http"), - "Server transport mode. \"binary\" or \"http\""), + "Transport mode of HiveServer2."), // http (over thrift) transport settings HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001, @@ -1483,11 +1490,11 @@ "Minimum number of worker threads when in HTTP mode."), HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS("hive.server2.thrift.http.max.worker.threads", 500, "Maximum number of worker threads when in HTTP mode."), - HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000, - "Maximum idle time in milliseconds for a connection on the server when in HTTP mode."), - HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", 60, - "Keepalive time (in seconds) for an idle http worker thread. When number of workers > min workers, " + - "excess threads are killed after this time interval."), + HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", "1800000ms", + "Maximum idle time for a connection on the server when in HTTP mode."), + HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", "60s", + "Keepalive time for an idle http worker thread. When the number of workers exceeds min workers, " + + "excessive threads are killed after this time interval."), // binary transport settings HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000, @@ -1511,22 +1518,24 @@ HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500, "Maximum number of Thrift worker threads"), HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME("hive.server2.thrift.worker.keepalive.time", 60, - "Keepalive time (in seconds) for an idle worker thread. When number of workers > min workers, " + - "excess threads are killed after this time interval."), + "Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, " + + "excessive threads are killed after this time interval."), // Configuration for async thread pool in SessionManager HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100, "Number of threads in the async thread pool for HiveServer2"), - HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10, - "Time (in seconds) for which HiveServer2 shutdown will wait for async"), + HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", "10s", + new TimeValidator(TimeUnit.SECONDS), + "Maximum time for which HiveServer2 shutdown will wait for async"), HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE("hive.server2.async.exec.wait.queue.size", 100, "Size of the wait queue for async thread pool in HiveServer2.\n" + "After hitting this limit, the async thread pool will reject new requests."), - HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10, - "Time (in seconds) that an idle HiveServer2 async thread (from the thread pool) will wait\n" + - "for a new task to arrive before terminating"), - HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L, - "Time in milliseconds that HiveServer2 will wait,\n" + - "before responding to asynchronous calls that use long polling"), + HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", "10s", + new TimeValidator(TimeUnit.SECONDS), + "Time that an idle HiveServer2 async thread (from the thread pool) will wait for a new task\n" + + "to arrive before terminating"), + HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", "5000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Time that HiveServer2 will wait before responding to asynchronous calls that use long polling"), // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", @@ -1591,6 +1600,18 @@ HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), + HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "0ms", + new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false), + "The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."), + HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "0ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value."), + HIVE_SERVER2_IDLE_OPERATION_TIMEOUT("hive.server2.idle.operation.timeout", "0ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Operation will be closed when it's not accessed for this duration of time, which can be disabled by setting to zero value.\n" + + " With positive value, it's checked for operations in terminal state only (FINISHED, CANCELED, CLOSED, ERROR).\n" + + " With negative value, it's checked for all of the operations regardless of state."), + HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role", "Comma separated list of configuration options which are immutable at runtime"), @@ -1648,8 +1669,9 @@ "Enable list bucketing optimizer. Default value is false so that we disable it by default."), // Allow TCP Keep alive socket option for for HiveServer or a maximum timeout for the socket. - SERVER_READ_SOCKET_TIMEOUT("hive.server.read.socket.timeout", 10, - "Timeout for the HiveServer to close the connection if no response from the client in N seconds, defaults to 10 seconds."), + SERVER_READ_SOCKET_TIMEOUT("hive.server.read.socket.timeout", "10s", + new TimeValidator(TimeUnit.SECONDS), + "Timeout for the HiveServer to close the connection if no response from the client. By default, 10 seconds."), SERVER_TCP_KEEP_ALIVE("hive.server.tcp.keepalive", true, "Whether to enable TCP keepalive for the Hive Server. Keepalive will prevent accumulation of half-open connections."), @@ -1708,8 +1730,9 @@ "turning on Tez for HiveServer2. The user could potentially want to run queries\n" + "over Tez without the pool of sessions."), - HIVE_QUOTEDID_SUPPORT("hive.support.quoted.identifiers", "column", new PatternSet("none", "column"), - "Whether to use quoted identifier. 'none' ot 'column' can be used. \n" + + HIVE_QUOTEDID_SUPPORT("hive.support.quoted.identifiers", "column", + new StringSet("none", "column"), + "Whether to use quoted identifier. 'none' or 'column' can be used. \n" + " none: default(past) behavior. Implies only alphaNumeric and underscore are valid characters in identifiers.\n" + " column: implies column names can contain any character." ), @@ -1729,8 +1752,9 @@ HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true, "Check if a plan contains a Cross Product. If there is one, output a warning to the Session's console."), - HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L, - "Time in milliseconds to wait for another thread to localize the same resource for hive-tez."), + HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", "5000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Time to wait for another thread to localize the same resource for hive-tez."), HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts", 5, "The number of attempts waiting for localizing a resource in hive-tez."), TEZ_AUTO_REDUCER_PARALLELISM("hive.tez.auto.reducer.parallelism", false, @@ -1840,6 +1864,10 @@ public String validate(String value) { return validator == null ? null : validator.validate(value); } + public String validatorDescription() { + return validator == null ? null : validator.toDescription(); + } + public String typeString() { return valType.typeString(); } @@ -1980,6 +2008,82 @@ public void setIntVar(ConfVars var, int val) { setIntVar(this, var, val); } + public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) { + return toTime(getVar(conf, var), getDefaultTimeUnit(var), outUnit); + } + + public static void setTimeVar(Configuration conf, ConfVars var, long time, TimeUnit timeunit) { + assert (var.valClass == String.class) : var.varname; + conf.set(var.varname, time + stringFor(timeunit)); + } + + public long getTimeVar(ConfVars var, TimeUnit outUnit) { + return getTimeVar(this, var, outUnit); + } + + public void setTimeVar(ConfVars var, long time, TimeUnit outUnit) { + setTimeVar(this, var, time, outUnit); + } + + private static TimeUnit getDefaultTimeUnit(ConfVars var) { + TimeUnit inputUnit = null; + if (var.validator instanceof TimeValidator) { + inputUnit = ((TimeValidator)var.validator).getTimeUnit(); + } + return inputUnit; + } + + public static long toTime(String value, TimeUnit inputUnit, TimeUnit outUnit) { + String[] parsed = parseTime(value.trim()); + return outUnit.convert(Long.valueOf(parsed[0].trim().trim()), unitFor(parsed[1].trim(), inputUnit)); + } + + private static String[] parseTime(String value) { + char[] chars = value.toCharArray(); + int i = 0; + for (; i < chars.length && (chars[i] == '-' || Character.isDigit(chars[i])); i++) { + } + return new String[] {value.substring(0, i), value.substring(i)}; + } + + public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { + unit = unit.trim().toLowerCase(); + if (unit.isEmpty()) { + if (defaultUnit == null) { + throw new IllegalArgumentException("Time unit is not specified"); + } + return defaultUnit; + } else if (unit.equals("d") || unit.startsWith("day")) { + return TimeUnit.DAYS; + } else if (unit.equals("h") || unit.startsWith("hour")) { + return TimeUnit.HOURS; + } else if (unit.equals("m") || unit.startsWith("min")) { + return TimeUnit.MINUTES; + } else if (unit.equals("s") || unit.startsWith("sec")) { + return TimeUnit.SECONDS; + } else if (unit.equals("ms") || unit.startsWith("msec")) { + return TimeUnit.MILLISECONDS; + } else if (unit.equals("us") || unit.startsWith("usec")) { + return TimeUnit.MICROSECONDS; + } else if (unit.equals("ns") || unit.startsWith("nsec")) { + return TimeUnit.NANOSECONDS; + } + throw new IllegalArgumentException("Invalid time unit " + unit); + } + + public static String stringFor(TimeUnit timeunit) { + switch (timeunit) { + case DAYS: return "day"; + case HOURS: return "hour"; + case MINUTES: return "min"; + case SECONDS: return "sec"; + case MILLISECONDS: return "msec"; + case MICROSECONDS: return "usec"; + case NANOSECONDS: return "nsec"; + } + throw new IllegalArgumentException("Invalid timeunit " + timeunit); + } + public static long getLongVar(Configuration conf, ConfVars var) { assert (var.valClass == Long.class) : var.varname; return conf.getLong(var.varname, var.defaultLongVal); diff --git common/src/java/org/apache/hadoop/hive/conf/Validator.java common/src/java/org/apache/hadoop/hive/conf/Validator.java index cea9c41..b29b2cb 100644 --- common/src/java/org/apache/hadoop/hive/conf/Validator.java +++ common/src/java/org/apache/hadoop/hive/conf/Validator.java @@ -22,6 +22,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; /** @@ -31,57 +32,85 @@ String validate(String value); - static class StringSet implements Validator { + String toDescription(); + class StringSet implements Validator { + + private final boolean caseSensitive; private final Set expected = new LinkedHashSet(); public StringSet(String... values) { + this(false, values); + } + + public StringSet(boolean caseSensitive, String... values) { + this.caseSensitive = caseSensitive; for (String value : values) { - expected.add(value.toLowerCase()); + expected.add(caseSensitive ? value : value.toLowerCase()); } } @Override public String validate(String value) { - if (value == null || !expected.contains(value.toLowerCase())) { + if (value == null || !expected.contains(caseSensitive ? value : value.toLowerCase())) { return "Invalid value.. expects one of " + expected; } return null; } + + @Override + public String toDescription() { + return "Expects one of " + expected; + } } - static enum RANGE_TYPE { + enum TYPE { INT { @Override protected boolean inRange(String value, Object lower, Object upper) { int ivalue = Integer.parseInt(value); - return (Integer)lower <= ivalue && ivalue <= (Integer)upper; + if (lower != null && ivalue < (Integer)lower) { + return false; + } + if (upper != null && ivalue > (Integer)upper) { + return false; + } + return true; } }, LONG { @Override protected boolean inRange(String value, Object lower, Object upper) { long lvalue = Long.parseLong(value); - return (Long)lower <= lvalue && lvalue <= (Long)upper; + if (lower != null && lvalue < (Long)lower) { + return false; + } + if (upper != null && lvalue > (Long)upper) { + return false; + } + return true; } }, FLOAT { @Override protected boolean inRange(String value, Object lower, Object upper) { float fvalue = Float.parseFloat(value); - return (Float)lower <= fvalue && fvalue <= (Float)upper; + if (lower != null && fvalue < (Float)lower) { + return false; + } + if (upper != null && fvalue > (Float)upper) { + return false; + } + return true; } }; - public static RANGE_TYPE valueOf(Object lower, Object upper) { - if (lower instanceof Integer && upper instanceof Integer) { - assert (Integer)lower < (Integer)upper; + public static TYPE valueOf(Object lower, Object upper) { + if (lower instanceof Integer || upper instanceof Integer) { return INT; - } else if (lower instanceof Long && upper instanceof Long) { - assert (Long)lower < (Long)upper; + } else if (lower instanceof Long || upper instanceof Long) { return LONG; - } else if (lower instanceof Float && upper instanceof Float) { - assert (Float)lower < (Float)upper; + } else if (lower instanceof Float || upper instanceof Float) { return FLOAT; } throw new IllegalArgumentException("invalid range from " + lower + " to " + upper); @@ -90,15 +119,15 @@ public static RANGE_TYPE valueOf(Object lower, Object upper) { protected abstract boolean inRange(String value, Object lower, Object upper); } - static class RangeValidator implements Validator { + class RangeValidator implements Validator { - private final RANGE_TYPE type; + private final TYPE type; private final Object lower, upper; public RangeValidator(Object lower, Object upper) { this.lower = lower; this.upper = upper; - this.type = RANGE_TYPE.valueOf(lower, upper); + this.type = TYPE.valueOf(lower, upper); } @Override @@ -115,9 +144,23 @@ public String validate(String value) { } return null; } + + @Override + public String toDescription() { + if (lower == null && upper == null) { + return null; + } + if (lower != null && upper != null) { + return "Expects value between " + lower + " and " + upper; + } + if (lower != null) { + return "Expects value bigger than " + lower; + } + return "Expects value smaller than " + upper; + } } - static class PatternSet implements Validator { + class PatternSet implements Validator { private final List expected = new ArrayList(); @@ -139,9 +182,14 @@ public String validate(String value) { } return "Invalid value.. expects one of patterns " + expected; } + + @Override + public String toDescription() { + return "Expects one of the pattern in " + expected; + } } - static class RatioValidator implements Validator { + class RatioValidator implements Validator { @Override public String validate(String value) { @@ -155,5 +203,77 @@ public String validate(String value) { } return null; } + + @Override + public String toDescription() { + return "Expects value between 0.0f and 1.0f"; + } + } + + class TimeValidator implements Validator { + + private final TimeUnit timeUnit; + + private final Long min; + private final boolean minInclusive; + + private final Long max; + private final boolean maxInclusive; + + public TimeValidator(TimeUnit timeUnit) { + this(timeUnit, null, false, null, false); + } + + public TimeValidator(TimeUnit timeUnit, + Long min, boolean minInclusive, Long max, boolean maxInclusive) { + this.timeUnit = timeUnit; + this.min = min; + this.minInclusive = minInclusive; + this.max = max; + this.maxInclusive = maxInclusive; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + @Override + public String validate(String value) { + try { + long time = HiveConf.toTime(value, timeUnit, timeUnit); + if (min != null && (minInclusive ? time < min : time <= min)) { + return value + " is smaller than " + timeString(min); + } + if (max != null && (maxInclusive ? time > max : time >= max)) { + return value + " is bigger than " + timeString(max); + } + } catch (Exception e) { + return e.toString(); + } + return null; + } + + public String toDescription() { + String description = + "Expects a time value with unit " + + "(d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec)" + + ", which is " + HiveConf.stringFor(timeUnit) + " if not specified"; + if (min != null && max != null) { + description += ".\nThe time should be in between " + + timeString(min) + (minInclusive ? " (inclusive)" : " (exclusive)") + " and " + + timeString(max) + (maxInclusive ? " (inclusive)" : " (exclusive)"); + } else if (min != null) { + description += ".\nThe time should be bigger than " + + (minInclusive ? "or equal to " : "") + timeString(min); + } else if (max != null) { + description += ".\nThe time should be smaller than " + + (maxInclusive ? "or equal to " : "") + timeString(max); + } + return description; + } + + private String timeString(long time) { + return time + " " + HiveConf.stringFor(timeUnit); + } } } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java index 39e7005..e51ced7 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import junit.framework.Assert; import junit.framework.TestCase; @@ -58,7 +59,7 @@ protected void setUp() throws Exception { hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port); hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERATTEMPTS, 2); - hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERINTERVAL, 0); + hiveConf.setTimeVar(HiveConf.ConfVars.HMSHANDLERINTERVAL, 0, TimeUnit.MILLISECONDS); hiveConf.setBoolVar(HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF, false); msc = new HiveMetaStoreClient(hiveConf, null); } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2SessionTimeout.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2SessionTimeout.java new file mode 100644 index 0000000..c4da73e --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2SessionTimeout.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc.miniHS2; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.service.cli.CLIServiceClient; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.SessionHandle; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestHiveServer2SessionTimeout { + + private static MiniHS2 miniHS2 = null; + private Map confOverlay; + + @BeforeClass + public static void beforeTest() throws Exception { + miniHS2 = new MiniHS2(new HiveConf()); + } + + @Before + public void setUp() throws Exception { + confOverlay = new HashMap(); + confOverlay.put(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + confOverlay.put(ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL.varname, "3s"); + confOverlay.put(ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT.varname, "3s"); + miniHS2.start(confOverlay); + } + + @After + public void tearDown() throws Exception { + miniHS2.stop(); + } + + @Test + public void testConnection() throws Exception { + CLIServiceClient serviceClient = miniHS2.getServiceClient(); + SessionHandle sessHandle = serviceClient.openSession("foo", "bar"); + OperationHandle handle = serviceClient.executeStatement(sessHandle, "SELECT 1", confOverlay); + Thread.sleep(7000); + try { + serviceClient.closeOperation(handle); + fail("Operation should have been closed by timeout!"); + } catch (HiveSQLException e) { + assertTrue(StringUtils.stringifyException(e), + e.getMessage().contains("Invalid OperationHandle")); + } + } +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 9e3481a..6c1b644 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -42,6 +42,7 @@ import java.util.Properties; import java.util.Set; import java.util.Timer; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -445,7 +446,7 @@ public void init() throws MetaException { partitionValidationPattern = null; } - long cleanFreq = hiveConf.getLongVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ) * 1000L; + long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS); if (cleanFreq > 0) { // In default config, there is no timer. Timer cleaner = new Timer("Metastore Events Cleaner Thread", true); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 4e76236..0d53bae 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -35,6 +35,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -6178,7 +6179,7 @@ public long cleanupEvents() { boolean commited = false; long delCnt; LOG.debug("Begin executing cleanupEvents"); - Long expiryTime = HiveConf.getLongVar(getConf(), ConfVars.METASTORE_EVENT_EXPIRY_DURATION) * 1000L; + Long expiryTime = HiveConf.getTimeVar(getConf(), ConfVars.METASTORE_EVENT_EXPIRY_DURATION, TimeUnit.MILLISECONDS); Long curTime = System.currentTimeMillis(); try { openTransaction(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java index 84e6dcd..63a113c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java @@ -23,6 +23,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; @@ -80,8 +81,8 @@ public Object invoke(final Object proxy, final Method method, final Object[] arg boolean gotNewConnectUrl = false; boolean reloadConf = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF); - int retryInterval = HiveConf.getIntVar(hiveConf, - HiveConf.ConfVars.HMSHANDLERINTERVAL); + long retryInterval = HiveConf.getTimeVar(hiveConf, + HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS); int retryLimit = HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 063dee6..6ada45a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.sql.*; import java.util.*; +import java.util.concurrent.TimeUnit; /** * A handler to answer transaction related calls that come into the metastore @@ -119,7 +120,7 @@ public TxnHandler(HiveConf conf) { throw new RuntimeException(e); } - timeout = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 1000; + timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); deadlockCnt = 0; buildJumpTable(); } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 8287c60..787952e 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.metastore.txn; -import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -26,11 +25,11 @@ import org.apache.log4j.LogManager; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import static junit.framework.Assert.*; @@ -868,7 +867,7 @@ public void testHeartbeatNoTxn() throws Exception { @Test public void testHeartbeatLock() throws Exception { - conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); HeartbeatRequest h = new HeartbeatRequest(); LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java index d7323cb..a46bf6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java @@ -44,7 +44,7 @@ // Name of the class to report for String logClassName = null; int notificationInterval; - int timeout; + long timeout; Reporter reporter; class ReporterTask extends TimerTask { @@ -116,7 +116,7 @@ public void run() { * @param timeout - when the autoprogressor should stop reporting (in ms) */ AutoProgressor(String logClassName, Reporter reporter, - int notificationInterval, int timeout) { + int notificationInterval, long timeout) { this.logClassName = logClassName; this.reporter = reporter; this.notificationInterval = notificationInterval; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java index 7fdb4e7..567890a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.lockmgr.LockException; import java.io.IOException; +import java.util.concurrent.TimeUnit; /** * Class to handle heartbeats for MR and Tez tasks. @@ -64,7 +65,8 @@ public void heartbeat() throws IOException { if (heartbeatInterval == 0) { // Multiply the heartbeat interval by 1000 to convert to milliseconds, // but divide by 2 to give us a safety factor. - heartbeatInterval = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 500; + heartbeatInterval = HiveConf.getTimeVar( + conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; if (heartbeatInterval == 0) { LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent"); dontHeartbeat = true; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index 5b857e2..df39a76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -358,7 +359,8 @@ public void processOp(Object row, int tag) throws HiveException { .getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) { autoProgressor = new AutoProgressor(this.getClass().getName(), reporter, Utilities.getDefaultNotificationInterval(hconf), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000); + HiveConf.getTimeVar( + hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS)); autoProgressor.go(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java index afd7bcf..fe3538a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,7 +87,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) { autoProgressor = new AutoProgressor(this.getClass().getName(), reporter, Utilities.getDefaultNotificationInterval(hconf), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000); + HiveConf.getTimeVar( + hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS)); autoProgressor.go(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 70047a2..d362e40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2743,7 +2743,7 @@ public T run(PreparedStatement stmt) throws SQLException { * first time it is caught, or SQLTransientException when the maxRetries has reached. */ public static T executeWithRetry(SQLCommand cmd, PreparedStatement stmt, - int baseWindow, int maxRetries) throws SQLException { + long baseWindow, int maxRetries) throws SQLException { Random r = new Random(); T result = null; @@ -2785,7 +2785,7 @@ public T run(PreparedStatement stmt) throws SQLException { * first time it is caught, or SQLTransientException when the maxRetries has reached. */ public static Connection connectWithRetry(String connectionString, - int waitWindow, int maxRetries) throws SQLException { + long waitWindow, int maxRetries) throws SQLException { Random r = new Random(); @@ -2827,7 +2827,7 @@ public static Connection connectWithRetry(String connectionString, * first time it is caught, or SQLTransientException when the maxRetries has reached. */ public static PreparedStatement prepareWithRetry(Connection conn, String stmt, - int waitWindow, int maxRetries) throws SQLException { + long waitWindow, int maxRetries) throws SQLException { Random r = new Random(); @@ -2867,7 +2867,7 @@ public static PreparedStatement prepareWithRetry(Connection conn, String stmt, * @param r a random generator. * @return number of milliseconds for the next wait time. */ - public static long getRandomWaitTime(int baseWindow, int failures, Random r) { + public static long getRandomWaitTime(long baseWindow, int failures, Random r) { return (long) ( baseWindow * failures + // grace period for the last round of attempt baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index eb2851b..df8640c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -219,8 +220,8 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); //DecimalFormat longFormatter = new DecimalFormat("###,###"); long reportTime = System.currentTimeMillis(); - long maxReportInterval = - HiveConf.getLongVar(job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL); + long maxReportInterval = HiveConf.getTimeVar( + job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS); boolean fatal = false; StringBuilder errMsg = new StringBuilder(); long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index ebe9f92..941e632 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import javax.security.auth.login.LoginException; @@ -830,9 +831,8 @@ public LocalResource localizeResource(Path src, Path dest, Configuration conf) int waitAttempts = conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal); - long sleepInterval = - conf.getLong(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.varname, - HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.defaultLongVal); + long sleepInterval = HiveConf.getTimeVar( + conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS); LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: " + sleepInterval); boolean found = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java index 11434a0..7d7e7c0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.metadata.*; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** @@ -38,7 +39,7 @@ private HiveLockManagerCtx ctx; - private int sleepTime = 1000; + private long sleepTime = 1000; private int numRetriesForLock = 0; private int numRetriesForUnLock = 0; @@ -82,12 +83,13 @@ public void prepareRetry() { public void refresh() { HiveConf conf = ctx.getConf(); - sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + sleepTime = conf.getTimeVar( + HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); } - public HiveLock lock(HiveLockObject key, HiveLockMode mode, int numRetriesForLock, int sleepTime) + public HiveLock lock(HiveLockObject key, HiveLockMode mode, int numRetriesForLock, long sleepTime) throws LockException { for (int i = 0; i <= numRetriesForLock; i++) { if (i > 0) { @@ -101,7 +103,7 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, int numRetriesForLoc return null; } - private void sleep(int sleepTime) { + private void sleep(long sleepTime) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { @@ -109,7 +111,7 @@ private void sleep(int sleepTime) { } } - public List lock(List objs, int numRetriesForLock, int sleepTime) + public List lock(List objs, int numRetriesForLock, long sleepTime) throws LockException { sortLocks(objs); for (int i = 0; i <= numRetriesForLock; i++) { @@ -132,7 +134,7 @@ private HiveLock lockPrimitive(HiveLockObject key, HiveLockMode mode) throws Loc } private List lockPrimitive(List objs, int numRetriesForLock, - int sleepTime) throws LockException { + long sleepTime) throws LockException { List locks = new ArrayList(); for (HiveLockObj obj : objs) { HiveLock lock = lockPrimitive(obj.getObj(), obj.getMode()); @@ -164,7 +166,7 @@ public int compare(HiveLockObj o1, HiveLockObj o2) { }); } - public void unlock(HiveLock hiveLock, int numRetriesForUnLock, int sleepTime) + public void unlock(HiveLock hiveLock, int numRetriesForUnLock, long sleepTime) throws LockException { String[] paths = hiveLock.getHiveLockObject().getPaths(); HiveLockObjectData data = hiveLock.getHiveLockObject().getData(); @@ -179,7 +181,7 @@ public void unlock(HiveLock hiveLock, int numRetriesForUnLock, int sleepTime) throw new LockException("Failed to release lock " + hiveLock); } - public void releaseLocks(List hiveLocks, int numRetriesForUnLock, int sleepTime) { + public void releaseLocks(List hiveLocks, int numRetriesForUnLock, long sleepTime) { for (HiveLock locked : hiveLocks) { try { unlock(locked, numRetriesForUnLock, sleepTime); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 46044d0..6449d4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -53,7 +54,7 @@ private int sessionTimeout; private String quorumServers; - private int sleepTime; + private long sleepTime; private int numRetriesForLock; private int numRetriesForUnLock; @@ -106,7 +107,8 @@ public void setContext(HiveLockManagerCtx ctx) throws LockException { sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf); - sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + sleepTime = conf.getTimeVar( + HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); @@ -132,7 +134,8 @@ public void setContext(HiveLockManagerCtx ctx) throws LockException { @Override public void refresh() { HiveConf conf = ctx.getConf(); - sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + sleepTime = conf.getTimeVar( + HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java index f636cff..5102c63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,7 +47,8 @@ private final Log LOG = LogFactory.getLog(this.getClass().getName()); private int timeout = 30; private final String comment = "Hive stats aggregation: " + this.getClass().getName(); - private int maxRetries, waitWindow; + private int maxRetries; + private long waitWindow; private final Random r; public JDBCStatsAggregator() { @@ -57,11 +59,13 @@ public JDBCStatsAggregator() { @Override public boolean connect(Configuration hiveconf, Task sourceTask) { this.hiveconf = hiveconf; - timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); + timeout = (int) HiveConf.getTimeVar( + hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS); connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); - waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT); + waitWindow = HiveConf.getTimeVar( + hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS); try { Class.forName(driver).newInstance(); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java index db62721..5e317ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,7 +49,8 @@ private int timeout; // default timeout in sec. for JDBC connection and statements // SQL comment that identifies where the SQL statement comes from private final String comment = "Hive stats publishing: " + this.getClass().getName(); - private int maxRetries, waitWindow; + private int maxRetries; + private long waitWindow; private final Random r; public JDBCStatsPublisher() { @@ -59,9 +61,11 @@ public JDBCStatsPublisher() { public boolean connect(Configuration hiveconf) { this.hiveconf = hiveconf; maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX); - waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT); + waitWindow = HiveConf.getTimeVar( + hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS); connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING); - timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT); + timeout = (int) HiveConf.getTimeVar( + hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS); String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER); try { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 3211759..9c27eca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -43,6 +43,7 @@ import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * A class to initiate compactions. This will run in a separate thread. @@ -140,13 +141,13 @@ public void run() { public void init(BooleanPointer stop) throws MetaException { super.init(stop); checkInterval = - HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL) * 1000; + conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ; } private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname()); - txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf, - HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT)); + txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS)); } // Figure out if there are any currently running compactions on the same table or partition. diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index f34b5ad..b0cf224 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Tests for the compactor Initiator thread. @@ -89,7 +90,7 @@ public void recoverFailedRemoteWorkers() throws Exception { txnHandler.findNextToCompact("nosuchhost-193892"); HiveConf conf = new HiveConf(); - HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L); + conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.SECONDS); startInitiator(conf); diff --git ql/src/test/results/clientnegative/set_hiveconf_validation2.q.out ql/src/test/results/clientnegative/set_hiveconf_validation2.q.out index 33f9360..9b8292d 100644 --- ql/src/test/results/clientnegative/set_hiveconf_validation2.q.out +++ ql/src/test/results/clientnegative/set_hiveconf_validation2.q.out @@ -8,4 +8,4 @@ POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@src key string default value string default -Query returned non-zero code: 1, cause: 'SET hive.fetch.task.conversion=true' FAILED in validation : Invalid value.. expects one of [minimal, more]. +Query returned non-zero code: 1, cause: 'SET hive.fetch.task.conversion=true' FAILED in validation : Invalid value.. expects one of [MINIMAL, MORE]. diff --git service/src/java/org/apache/hadoop/hive/service/HiveServer.java service/src/java/org/apache/hadoop/hive/service/HiveServer.java index 32729f2..27c7722 100644 --- service/src/java/org/apache/hadoop/hive/service/HiveServer.java +++ service/src/java/org/apache/hadoop/hive/service/HiveServer.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.logging.Log; @@ -62,8 +63,6 @@ import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import com.facebook.fb303.fb_status; /** @@ -670,8 +669,11 @@ public static void main(String[] args) { boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE); + int timeout = (int) HiveConf.getTimeVar( + conf, HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); - TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, 1000 * conf.getIntVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT)); + TServerTransport serverTransport = + tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, timeout); // set all properties specified on the command line for (Map.Entry item : hiveconf.entrySet()) { diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java index ff5de4a..40e9bad 100644 --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -362,8 +362,9 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) * However, if the background operation is complete, we return immediately. */ if (operation.shouldRunAsync()) { - long timeout = operation.getParentSession().getHiveConf().getLongVar( - HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + HiveConf conf = operation.getParentSession().getHiveConf(); + long timeout = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); try { operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { diff --git service/src/java/org/apache/hive/service/cli/OperationState.java service/src/java/org/apache/hive/service/cli/OperationState.java index 3e15f0c..51ffb40 100644 --- service/src/java/org/apache/hive/service/cli/OperationState.java +++ service/src/java/org/apache/hive/service/cli/OperationState.java @@ -25,29 +25,26 @@ * */ public enum OperationState { - INITIALIZED(TOperationState.INITIALIZED_STATE), - RUNNING(TOperationState.RUNNING_STATE), - FINISHED(TOperationState.FINISHED_STATE), - CANCELED(TOperationState.CANCELED_STATE), - CLOSED(TOperationState.CLOSED_STATE), - ERROR(TOperationState.ERROR_STATE), - UNKNOWN(TOperationState.UKNOWN_STATE), - PENDING(TOperationState.PENDING_STATE); + INITIALIZED(TOperationState.INITIALIZED_STATE, false), + RUNNING(TOperationState.RUNNING_STATE, false), + FINISHED(TOperationState.FINISHED_STATE, true), + CANCELED(TOperationState.CANCELED_STATE, true), + CLOSED(TOperationState.CLOSED_STATE, true), + ERROR(TOperationState.ERROR_STATE, true), + UNKNOWN(TOperationState.UKNOWN_STATE, false), + PENDING(TOperationState.PENDING_STATE, false); private final TOperationState tOperationState; + private final boolean terminal; - OperationState(TOperationState tOperationState) { + OperationState(TOperationState tOperationState, boolean terminal) { this.tOperationState = tOperationState; + this.terminal = terminal; } + // must be sync with TOperationState in order public static OperationState getOperationState(TOperationState tOperationState) { - // TODO: replace this with a Map? - for (OperationState opState : values()) { - if (tOperationState.equals(opState.tOperationState)) { - return opState; - } - } - return OperationState.UNKNOWN; + return OperationState.values()[tOperationState.getValue()]; } public static void validateTransition(OperationState oldState, @@ -91,7 +88,8 @@ public static void validateTransition(OperationState oldState, default: // fall-through } - throw new HiveSQLException("Illegal Operation state transition"); + throw new HiveSQLException("Illegal Operation state transition " + + "from " + oldState + " to " + newState); } public void validateTransition(OperationState newState) @@ -102,4 +100,8 @@ public void validateTransition(OperationState newState) public TOperationState toTOperationState() { return tOperationState; } + + public boolean isTerminal() { + return terminal; + } } diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 0d6436e..acb95cb 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,14 +53,19 @@ protected OperationLog operationLog; protected boolean isOperationLogEnabled; + private long operationTimeout; + private long lastAccessTime; + protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) { - super(); this.parentSession = parentSession; this.runAsync = runInBackground; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); + lastAccessTime = System.currentTimeMillis(); + operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); } public Future getBackgroundHandle() { @@ -111,7 +117,6 @@ protected void setHasResultSet(boolean hasResultSet) { opHandle.setHasResultSet(hasResultSet); } - public OperationLog getOperationLog() { return operationLog; } @@ -119,9 +124,33 @@ public OperationLog getOperationLog() { protected final OperationState setState(OperationState newState) throws HiveSQLException { state.validateTransition(newState); this.state = newState; + this.lastAccessTime = System.currentTimeMillis(); return this.state; } + public boolean isTimedOut(long current) { + if (operationTimeout == 0) { + return false; + } + if (operationTimeout > 0) { + // check only when it's in terminal state + return state.isTerminal() && lastAccessTime + operationTimeout <= current; + } + return lastAccessTime + -operationTimeout <= current; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public long getOperationTimeout() { + return operationTimeout; + } + + public void setOperationTimeout(long operationTimeout) { + this.operationTimeout = operationTimeout; + } + protected void setOperationException(HiveSQLException operationException) { this.operationException = operationException; } @@ -130,6 +159,7 @@ protected final void assertState(OperationState state) throws HiveSQLException { if (this.state != state) { throw new HiveSQLException("Expected state " + state + ", but found " + this.state); } + this.lastAccessTime = System.currentTimeMillis(); } public boolean isRunning() { diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 2867301..f5a8f27 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.operation; import java.util.Enumeration; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -155,15 +156,27 @@ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, return operation; } - public synchronized Operation getOperation(OperationHandle operationHandle) - throws HiveSQLException { - Operation operation = handleToOperation.get(operationHandle); + public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException { + Operation operation = getOperationInternal(operationHandle); if (operation == null) { throw new HiveSQLException("Invalid OperationHandle: " + operationHandle); } return operation; } + private synchronized Operation getOperationInternal(OperationHandle operationHandle) { + return handleToOperation.get(operationHandle); + } + + private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) { + Operation operation = handleToOperation.get(operationHandle); + if (operation != null && operation.isTimedOut(System.currentTimeMillis())) { + handleToOperation.remove(operationHandle); + return operation; + } + return null; + } + private synchronized void addOperation(Operation operation) { handleToOperation.put(operation.getHandle(), operation); } @@ -252,4 +265,16 @@ private Schema getLogSchema() { public OperationLog getOperationLogByThread() { return OperationLog.getCurrentOperationLog(); } + + public List removeExpiredOperations(OperationHandle[] handles) { + List removed = new ArrayList(); + for (OperationHandle handle : handles) { + Operation operation = removeTimedOutOperation(handle); + if (operation != null) { + LOG.warn("Operation " + handle + " is timed-out and will be closed"); + removed.add(operation); + } + } + return removed; + } } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSession.java service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 270e4a6..6359a5b 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -27,9 +27,9 @@ public interface HiveSession extends HiveSessionBase { - public void open(); + void open(); - public IMetaStoreClient getMetaStoreClient() throws HiveSQLException; + IMetaStoreClient getMetaStoreClient() throws HiveSQLException; /** * getInfo operation handler @@ -37,7 +37,7 @@ * @return * @throws HiveSQLException */ - public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException; + GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException; /** * execute operation handler @@ -46,7 +46,7 @@ * @return * @throws HiveSQLException */ - public OperationHandle executeStatement(String statement, + OperationHandle executeStatement(String statement, Map confOverlay) throws HiveSQLException; /** @@ -56,7 +56,7 @@ public OperationHandle executeStatement(String statement, * @return * @throws HiveSQLException */ - public OperationHandle executeStatementAsync(String statement, + OperationHandle executeStatementAsync(String statement, Map confOverlay) throws HiveSQLException; /** @@ -64,14 +64,14 @@ public OperationHandle executeStatementAsync(String statement, * @return * @throws HiveSQLException */ - public OperationHandle getTypeInfo() throws HiveSQLException; + OperationHandle getTypeInfo() throws HiveSQLException; /** * getCatalogs operation handler * @return * @throws HiveSQLException */ - public OperationHandle getCatalogs() throws HiveSQLException; + OperationHandle getCatalogs() throws HiveSQLException; /** * getSchemas operation handler @@ -80,7 +80,7 @@ public OperationHandle executeStatementAsync(String statement, * @return * @throws HiveSQLException */ - public OperationHandle getSchemas(String catalogName, String schemaName) + OperationHandle getSchemas(String catalogName, String schemaName) throws HiveSQLException; /** @@ -92,7 +92,7 @@ public OperationHandle getSchemas(String catalogName, String schemaName) * @return * @throws HiveSQLException */ - public OperationHandle getTables(String catalogName, String schemaName, + OperationHandle getTables(String catalogName, String schemaName, String tableName, List tableTypes) throws HiveSQLException; /** @@ -100,7 +100,7 @@ public OperationHandle getTables(String catalogName, String schemaName, * @return * @throws HiveSQLException */ - public OperationHandle getTableTypes() throws HiveSQLException ; + OperationHandle getTableTypes() throws HiveSQLException ; /** * getColumns operation handler @@ -111,7 +111,7 @@ public OperationHandle getTables(String catalogName, String schemaName, * @return * @throws HiveSQLException */ - public OperationHandle getColumns(String catalogName, String schemaName, + OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException; /** @@ -122,31 +122,33 @@ public OperationHandle getColumns(String catalogName, String schemaName, * @return * @throws HiveSQLException */ - public OperationHandle getFunctions(String catalogName, String schemaName, + OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException; /** * close the session * @throws HiveSQLException */ - public void close() throws HiveSQLException; + void close() throws HiveSQLException; - public void cancelOperation(OperationHandle opHandle) throws HiveSQLException; + void cancelOperation(OperationHandle opHandle) throws HiveSQLException; - public void closeOperation(OperationHandle opHandle) throws HiveSQLException; + void closeOperation(OperationHandle opHandle) throws HiveSQLException; - public TableSchema getResultSetMetadata(OperationHandle opHandle) + TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException; - public String getDelegationToken(HiveAuthFactory authFactory, String owner, + String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException; - public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr) + void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; - public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) + void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr) throws HiveSQLException; + + void closeExpiredOperations(); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java index 84e1c7e..25ae2da 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -92,4 +92,6 @@ String getIpAddress(); void setIpAddress(String ipAddress); + + long getLastAccessTime(); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 4e5f595..bc71a7c 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -54,6 +54,7 @@ import org.apache.hive.service.cli.operation.GetTableTypesOperation; import org.apache.hive.service.cli.operation.GetTypeInfoOperation; import org.apache.hive.service.cli.operation.MetadataOperation; +import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; import org.apache.hive.service.server.ThreadWithGarbageCleanup; @@ -84,6 +85,8 @@ private boolean isOperationLogEnabled; private File sessionLogDir; + private volatile long lastAccessTime; + public HiveSessionImpl(TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress) { this.username = username; @@ -108,6 +111,8 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo sessionState = new SessionState(hiveConf, username); sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); + + lastAccessTime = System.currentTimeMillis(); SessionState.start(sessionState); } @@ -239,10 +244,13 @@ public void open() { SessionState.start(sessionState); } - protected synchronized void acquire() throws HiveSQLException { + protected synchronized void acquire(boolean userAccess) { // Need to make sure that the this HiveServer2's session's session state is // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } /** @@ -252,14 +260,16 @@ protected synchronized void acquire() throws HiveSQLException { * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ - protected synchronized void release() { - assert sessionState != null; + protected synchronized void release(boolean userAccess) { SessionState.detachSession(); if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread = (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); currentThread.cacheThreadLocalRawStore(); } + if (userAccess) { + lastAccessTime = System.currentTimeMillis(); + } } @Override @@ -298,7 +308,7 @@ public IMetaStoreClient getMetaStoreClient() throws HiveSQLException { @Override public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException { - acquire(); + acquire(true); try { switch (getInfoType) { case CLI_SERVER_NAME: @@ -318,7 +328,7 @@ public GetInfoValue getInfo(GetInfoType getInfoType) throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString()); } } finally { - release(); + release(true); } } @@ -337,7 +347,7 @@ public OperationHandle executeStatementAsync(String statement, Map confOverlay, boolean runAsync) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); ExecuteStatementOperation operation = operationManager @@ -355,14 +365,14 @@ private OperationHandle executeStatementInternal(String statement, Map tableTypes) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); MetadataOperation operation = @@ -438,14 +448,14 @@ public OperationHandle getTables(String catalogName, String schemaName, String t operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getTableTypes() throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession()); @@ -458,14 +468,14 @@ public OperationHandle getTableTypes() operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(), @@ -479,14 +489,14 @@ public OperationHandle getColumns(String catalogName, String schemaName, operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException { - acquire(); + acquire(true); OperationManager operationManager = getOperationManager(); GetFunctionsOperation operation = operationManager @@ -500,14 +510,14 @@ public OperationHandle getFunctions(String catalogName, String schemaName, Strin operationManager.closeOperation(opHandle); throw e; } finally { - release(); + release(true); } } @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); /** * For metadata operations like getTables(), getColumns() etc, * the session allocates a private metastore handler which should be @@ -532,7 +542,7 @@ public void close() throws HiveSQLException { } catch (IOException ioe) { throw new HiveSQLException("Failure to close", ioe); } finally { - release(); + release(true); } } @@ -562,50 +572,79 @@ public void setUserName(String userName) { } @Override + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public void closeExpiredOperations() { + OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]); + if (handles.length > 0) { + List operations = operationManager.removeExpiredOperations(handles); + if (!operations.isEmpty()) { + closeTimedOutOperations(operations); + } + } + } + + private void closeTimedOutOperations(List operations) { + acquire(false); + try { + for (Operation operation : operations) { + opHandleSet.remove(operation.getHandle()); + try { + operation.close(); + } catch (Exception e) { + LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e); + } + } + } finally { + release(false); + } + } + + @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { sessionManager.getOperationManager().cancelOperation(opHandle); } finally { - release(); + release(true); } } @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { operationManager.closeOperation(opHandle); opHandleSet.remove(opHandle); } finally { - release(); + release(true); } } @Override public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - acquire(); + acquire(true); try { return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle); } finally { - release(); + release(true); } } @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { - acquire(); + acquire(true); try { if (fetchType == FetchType.QUERY_OUTPUT) { - return sessionManager.getOperationManager() - .getOperationNextRowSet(opHandle, orientation, maxRows); - } else { - return sessionManager.getOperationManager() - .getOperationLogRowSet(opHandle, orientation, maxRows); + return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); } + return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows); } finally { - release(); + release(true); } } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index 7668904..bad533b 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -19,7 +19,6 @@ package org.apache.hive.service.cli.session; import java.io.IOException; -import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -83,8 +82,8 @@ public String getDelegationToken () { } @Override - protected synchronized void acquire() throws HiveSQLException { - super.acquire(); + protected synchronized void acquire(boolean userAccess) { + super.acquire(userAccess); // if we have a metastore connection with impersonation, then set it first if (sessionHive != null) { Hive.set(sessionHive); @@ -98,11 +97,11 @@ protected synchronized void acquire() throws HiveSQLException { @Override public void close() throws HiveSQLException { try { - acquire(); + acquire(true); ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi); cancelDelegationToken(); } finally { - release(); + release(true); super.close(); } } diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 17c1c7b..7d86859 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -20,6 +20,8 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -59,6 +61,11 @@ private boolean isOperationLogEnabled; private File operationLogRootDir; + private long checkInterval; + private long sessionTimeout; + + private volatile boolean shutdown; + public SessionManager() { super("SessionManager"); } @@ -81,20 +88,28 @@ public synchronized void init(HiveConf hiveConf) { } private void createBackgroundOperationPool() { - int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); - LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize); - int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); - LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize); - int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME); - LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime); - // Create a thread pool with #backgroundPoolSize threads + int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + LOG.info("HiveServer2: Background operation thread pool size: " + poolSize); + int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE); + LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize); + long keepAliveTime = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS); + LOG.info( + "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds"); + + // Create a thread pool with #poolSize threads // Threads terminate when they are idle for more than the keepAliveTime - // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize + // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize String threadPoolName = "HiveServer2-Background-Pool"; - backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, - keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize), + backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize, + keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(poolQueueSize), new ThreadFactoryWithGarbageCleanup(threadPoolName)); backgroundOperationPool.allowCoreThreadTimeOut(true); + + checkInterval = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + sessionTimeout = HiveConf.getTimeVar( + hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); } private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException { @@ -139,20 +154,61 @@ private void initOperationLogRootDir() { @Override public synchronized void start() { super.start(); + if (checkInterval > 0) { + startTimeoutChecker(); + } + } + + private void startTimeoutChecker() { + final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds + Runnable timeoutChecker = new Runnable() { + @Override + public void run() { + for (sleepInterval(interval); !shutdown; sleepInterval(interval)) { + long current = System.currentTimeMillis(); + for (HiveSession session : new ArrayList(handleToSession.values())) { + if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) { + SessionHandle handle = session.getSessionHandle(); + LOG.warn("Session " + handle + " is Timed-out (last access : " + + new Date(session.getLastAccessTime()) + ") and will be closed"); + try { + closeSession(handle); + } catch (HiveSQLException e) { + LOG.warn("Exception is thrown closing session " + handle, e); + } + } else { + session.closeExpiredOperations(); + } + } + } + } + + private void sleepInterval(long interval) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + // ignore + } + } + }; + backgroundOperationPool.execute(timeoutChecker); } @Override public synchronized void stop() { super.stop(); + shutdown = true; if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); - int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + long timeout = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); try { backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + " seconds has been exceeded. RUNNING background operations will be shut down", e); } + backgroundOperationPool = null; } cleanupLoggingRootDir(); } diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 86ed4b4..443c371 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -61,7 +61,7 @@ protected int minWorkerThreads; protected int maxWorkerThreads; - protected int workerKeepAliveTime; + protected long workerKeepAliveTime; protected static HiveAuthFactory hiveAuthFactory; diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 21d1563..4067106 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -69,7 +69,8 @@ public void run() { minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); - workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME); + workerKeepAliveTime = hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); @@ -110,7 +111,8 @@ public void run() { // Linux:yes, Windows:no connector.setReuseAddress(!Shell.WINDOWS); - int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME); + int maxIdleTime = (int) hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS); connector.setMaxIdleTime(maxIdleTime); httpServer.addConnector(connector); diff --git service/src/test/org/apache/hive/service/cli/CLIServiceTest.java service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index d01e819..7530d59 100644 --- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -26,9 +26,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -202,7 +202,8 @@ public void testExecuteStatementAsync() throws Exception { * to give a compile time error. * (compilation is done synchronous as of now) */ - longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT); + longPollingTimeout = HiveConf.getTimeVar(new HiveConf(), + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName; try { runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout); @@ -295,7 +296,7 @@ private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryS long longPollingTimeDelta; OperationStatus opStatus = null; OperationState state = null; - confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout)); + confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms"); OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); int count = 0; while (true) {