diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 25b6ab34a9..859a543434 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -19,7 +19,6 @@ package org.apache.hive.service.cli.operation; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; @@ -27,18 +26,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.io.SessionStream; import org.apache.hadoop.hive.common.metrics.common.Metrics; @@ -78,22 +80,19 @@ import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.server.ThreadWithGarbageCleanup; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; /** * SQLOperation. - * */ public class SQLOperation extends ExecuteStatementOperation { private IDriver driver = null; - private TableSchema resultSchema; + private Optional resultSchema; private AbstractSerDe serde = null; private boolean fetchStarted = false; private volatile MetricsScope currentSQLStateScope; - private QueryInfo queryInfo; - private long queryTimeout; + private final QueryInfo queryInfo; + private final long queryTimeout; private ScheduledExecutorService timeoutExecutor; private final boolean runAsync; private final long operationLogCleanupDelayMs; @@ -102,21 +101,25 @@ /** * A map to track query count running by each user */ - private static Map userQueries = new HashMap(); + private static final Map USER_QUERIES = new ConcurrentHashMap<>(); private static final String ACTIVE_SQL_USER = MetricsConstant.SQL_OPERATION_PREFIX + "active_user"; - private MetricsScope submittedQryScp; + private final Optional submittedQryScp; public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground, long queryTimeout) { // TODO: call setRemoteUser in ExecuteStatementOperation or higher. super(parentSession, statement, confOverlay, runInBackground); this.runAsync = runInBackground; - this.queryTimeout = queryTimeout; - long timeout = HiveConf.getTimeVar(queryState.getConf(), - HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS); + this.resultSchema = Optional.empty(); + + final long timeout = + HiveConf.getTimeVar(queryState.getConf(), HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (timeout > 0 && (queryTimeout <= 0 || timeout < queryTimeout)) { this.queryTimeout = timeout; + } else { + this.queryTimeout = queryTimeout; } + this.operationLogCleanupDelayMs = HiveConf.getTimeVar(queryState.getConf(), HiveConf.ConfVars.HIVE_SERVER2_OPERATION_LOG_CLEANUP_DELAY, TimeUnit.MILLISECONDS); @@ -125,10 +128,9 @@ public SQLOperation(HiveSession parentSession, String statement, Map 0) { - timeoutExecutor = new ScheduledThreadPoolExecutor(1); - Runnable timeoutTask = new Runnable() { - @Override - public void run() { - try { - String queryId = queryState.getQueryId(); - LOG.info("Query timed out after: " + queryTimeout - + " seconds. Cancelling the execution now: " + queryId); - SQLOperation.this.cancel(OperationState.TIMEDOUT); - } catch (HiveSQLException e) { - LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e); - } finally { - // Stop - timeoutExecutor.shutdown(); - } + if (queryTimeout > 0L) { + timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + timeoutExecutor.schedule(() -> { + try { + final String queryId = queryState.getQueryId(); + LOG.info("Query timed out after: " + queryTimeout + " seconds. Cancelling the execution now: " + queryId); + SQLOperation.this.cancel(OperationState.TIMEDOUT); + } catch (HiveSQLException e) { + LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e); } - }; - timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS); + return null; + }, queryTimeout, TimeUnit.SECONDS); } queryInfo.setQueryDisplay(driver.getQueryDisplay()); @@ -211,7 +206,7 @@ public void run() { throw toSQLException("Error while compiling statement", e); } catch (Throwable e) { setState(OperationState.ERROR); - throw new HiveSQLException("Error running query: " + e.toString(), e); + throw new HiveSQLException("Error running query", e); } } @@ -247,7 +242,7 @@ private void runQuery() throws HiveSQLException { } else if (e instanceof HiveSQLException) { throw (HiveSQLException) e; } else { - throw new HiveSQLException("Error running query: " + e.toString(), e); + throw new HiveSQLException("Error running query", e); } } setState(OperationState.FINISHED); @@ -257,7 +252,7 @@ private void runQuery() throws HiveSQLException { public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); - boolean runAsync = shouldRunAsync(); + final boolean runAsync = shouldRunAsync(); final boolean asyncPrepare = runAsync && HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE); @@ -329,7 +324,7 @@ public Object run() throws HiveSQLException { } catch (HiveSQLException e) { // TODO: why do we invent our own error path op top of the one from Future.get? setOperationException(e); - LOG.error("Error running hive query: ", e); + LOG.error("Error running hive query", e); } finally { LogUtils.unregisterLoggingContext(); @@ -440,10 +435,10 @@ public TableSchema getResultSetSchema() throws HiveSQLException { // Since compilation is always a blocking RPC call, and schema is ready after compilation, // we can return when are in the RUNNING state. assertState(Arrays.asList(OperationState.RUNNING, OperationState.FINISHED)); - if (resultSchema == null) { - resultSchema = new TableSchema(driver.getSchema()); + if (!resultSchema.isPresent()) { + resultSchema = Optional.of(new TableSchema(driver.getSchema())); } - return resultSchema; + return resultSchema.get(); } @Override @@ -493,26 +488,11 @@ public String getTaskStatus() throws HiveSQLException { if (driver != null) { List statuses = driver.getQueryDisplay().getTaskDisplays(); if (statuses != null) { - ByteArrayOutputStream out = null; - try { - ObjectMapper mapper = new ObjectMapper(); - out = new ByteArrayOutputStream(); - mapper.writeValue(out, statuses); - return out.toString("UTF-8"); - } catch (JsonGenerationException e) { + try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(out, statuses); + return out.toString(StandardCharsets.UTF_8.name()); + } catch (Exception e) { throw new HiveSQLException(e); - } catch (JsonMappingException e) { - throw new HiveSQLException(e); - } catch (IOException e) { - throw new HiveSQLException(e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - throw new HiveSQLException(e); - } - } } } } @@ -520,18 +500,13 @@ public String getTaskStatus() throws HiveSQLException { return null; } - private RowSet decode(List rows, RowSet rowSet) throws Exception { - if (driver.isFetchingTable()) { - return prepareFromRow(rows, rowSet); - } - return decodeFromString(rows, rowSet); + private RowSet decode(final List rows, final RowSet rowSet) throws Exception { + return (driver.isFetchingTable()) ? prepareFromRow(rows, rowSet) : decodeFromString(rows, rowSet); } // already encoded to thrift-able object in ThriftFormatter - private RowSet prepareFromRow(List rows, RowSet rowSet) throws Exception { - for (Object row : rows) { - rowSet.addRow((Object[]) row); - } + private RowSet prepareFromRow(final List rows, final RowSet rowSet) throws Exception { + rows.forEach(row -> rowSet.addRow((Object[]) row)); return rowSet; } @@ -560,43 +535,30 @@ private RowSet decodeFromString(List rows, RowSet rowSet) } private AbstractSerDe getSerDe() throws SQLException { - if (serde != null) { - return serde; - } - try { - Schema mResultSchema = driver.getSchema(); + if (serde == null) { + try { + this.serde = new LazySimpleSerDe(); - List fieldSchemas = mResultSchema.getFieldSchemas(); - StringBuilder namesSb = new StringBuilder(); - StringBuilder typesSb = new StringBuilder(); + final Schema mResultSchema = driver.getSchema(); + final List fieldSchemas = mResultSchema.getFieldSchemas(); + final Properties props = new Properties(); - if (fieldSchemas != null && !fieldSchemas.isEmpty()) { - for (int pos = 0; pos < fieldSchemas.size(); pos++) { - if (pos != 0) { - namesSb.append(","); - typesSb.append(","); - } - namesSb.append(fieldSchemas.get(pos).getName()); - typesSb.append(fieldSchemas.get(pos).getType()); + if (!fieldSchemas.isEmpty()) { + + final String names = fieldSchemas.stream().map(i -> i.getName()).collect(Collectors.joining(",")); + final String types = fieldSchemas.stream().map(i -> i.getType()).collect(Collectors.joining(",")); + + LOG.debug("Column names: " + names); + LOG.debug("Column types: " + types); + + props.setProperty(serdeConstants.LIST_COLUMNS, names); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types); } - } - String names = namesSb.toString(); - String types = typesSb.toString(); - - serde = new LazySimpleSerDe(); - Properties props = new Properties(); - if (names.length() > 0) { - LOG.debug("Column names: " + names); - props.setProperty(serdeConstants.LIST_COLUMNS, names); - } - if (types.length() > 0) { - LOG.debug("Column types: " + types); - props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types); - } - SerDeUtils.initializeSerDe(serde, queryState.getConf(), props, null); - } catch (Exception ex) { - throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex); + SerDeUtils.initializeSerDe(serde, queryState.getConf(), props, null); + } catch (Exception ex) { + throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex); + } } return serde; } @@ -609,82 +571,103 @@ public QueryInfo getQueryInfo() { } @Override - protected void onNewState(OperationState state, OperationState prevState) { - + protected void onNewState(final OperationState state, final OperationState prevState) { super.onNewState(state, prevState); + currentSQLStateScope = updateOperationStateMetrics(currentSQLStateScope, MetricsConstant.SQL_OPERATION_PREFIX, MetricsConstant.COMPLETED_SQL_OPERATION_PREFIX, state); - Metrics metrics = MetricsFactory.getInstance(); - if (metrics != null) { + final Optional metrics = Optional.ofNullable(MetricsFactory.getInstance()); + if (metrics.isPresent()) { // New state is changed to running from something else (user is active) if (state == OperationState.RUNNING && prevState != state) { - incrementUserQueries(metrics); + incrementUserQueries(metrics.get()); } // New state is not running (user not active) any more if (prevState == OperationState.RUNNING && prevState != state) { - decrementUserQueries(metrics); + decrementUserQueries(metrics.get()); } } - if (state == OperationState.FINISHED || state == OperationState.CANCELED || state == OperationState.ERROR) { - //update runtime + switch (state) { + case CANCELED: queryInfo.setRuntime(getOperationComplete() - getOperationStart()); - if (metrics != null && submittedQryScp != null) { - metrics.endScope(submittedQryScp); + if (metrics.isPresent() && submittedQryScp.isPresent()) { + metrics.get().endScope(submittedQryScp.get()); } - } - - if (state == OperationState.CLOSED) { - queryInfo.setEndTime(); - } else { - //CLOSED state not interesting, state before (FINISHED, ERROR) is. queryInfo.updateState(state.toString()); - } - - if (state == OperationState.ERROR) { + break; + case CLOSED: + queryInfo.setEndTime(); + break; + case ERROR: + queryInfo.setRuntime(getOperationComplete() - getOperationStart()); + if (metrics.isPresent() && submittedQryScp.isPresent()) { + metrics.get().endScope(submittedQryScp.get()); + } markQueryMetric(MetricsFactory.getInstance(), MetricsConstant.HS2_FAILED_QUERIES); - } - if (state == OperationState.FINISHED) { + queryInfo.updateState(state.toString()); + break; + case FINISHED: + queryInfo.setRuntime(getOperationComplete() - getOperationStart()); + if (metrics.isPresent() && submittedQryScp.isPresent()) { + metrics.get().endScope(submittedQryScp.get()); + } markQueryMetric(MetricsFactory.getInstance(), MetricsConstant.HS2_SUCCEEDED_QUERIES); + queryInfo.updateState(state.toString()); + break; + case INITIALIZED: + /* fall through */ + case PENDING: + /* fall through */ + case RUNNING: + /* fall through */ + case TIMEDOUT: + /* fall through */ + case UNKNOWN: + /* fall through */ + default: + queryInfo.updateState(state.toString()); + break; } } - private void incrementUserQueries(Metrics metrics) { - String username = parentSession.getUserName(); - if (username != null) { - synchronized (userQueries) { - AtomicInteger count = userQueries.get(username); - if (count == null) { - count = new AtomicInteger(0); - AtomicInteger prev = userQueries.put(username, count); - if (prev == null) { - metrics.incrementCounter(ACTIVE_SQL_USER); - } else { - count = prev; - } + private void incrementUserQueries(final Metrics metrics) { + final String username = parentSession.getUserName(); + if (StringUtils.isNotBlank(username)) { + USER_QUERIES.compute(username, (key, value) -> { + if (value == null) { + metrics.incrementCounter(ACTIVE_SQL_USER); + return new AtomicInteger(1); + } else { + value.incrementAndGet(); + return value; } - count.incrementAndGet(); - } + }); } } - private void decrementUserQueries(Metrics metrics) { - String username = parentSession.getUserName(); - if (username != null) { - synchronized (userQueries) { - AtomicInteger count = userQueries.get(username); - if (count != null && count.decrementAndGet() <= 0) { - metrics.decrementCounter(ACTIVE_SQL_USER); - userQueries.remove(username); + private void decrementUserQueries(final Metrics metrics) { + final String username = parentSession.getUserName(); + if (StringUtils.isNotBlank(username)) { + USER_QUERIES.compute(username, (key, value) -> { + if (value == null) { + return null; + } else { + final int newValue = value.decrementAndGet(); + if (newValue == 0) { + metrics.decrementCounter(ACTIVE_SQL_USER); + return null; + } + return value; } - } + }); } } private void markQueryMetric(Metrics metric, String name) { - if(metric != null) { + if (metric != null) { metric.markMeter(name); } }