commit 35d6d0a96f928c2664ccdb05f63ed4a5e5229459 Author: Mithun RK Date: Tue Sep 26 13:44:51 2017 -0700 HIVE-17609: Tool to manipulate delegation tokens. diff --git a/bin/ext/tokentool.sh b/bin/ext/tokentool.sh new file mode 100755 index 0000000000..e4b94d6253 --- /dev/null +++ b/bin/ext/tokentool.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +THISSERVICE=tokentool +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +BUFFER_SIZE=20971520 # 20MB. +JAAS_CONF="/home/y/libexec/hcat_server/conf/jaas-hcat-server.conf" +HCAT_SERVER_CONF="/home/y/libexec/hcat_server/conf/hive-site.xml" +CMD_LINE="" + +tokentool_help () { + echo "Usage: hive --tokentool [options]" + echo " -bufferSize : Jute buffer size in bytes (defaults to 20971520, or 20MB)" + echo " -jaasConfig : Path to jaas-config, to connect to the token-store (defaults to /home/y/libexec/hcat_server/conf/jaas-hcat-server.conf)" + echo " -confLocation : Path to the HCatServer's hive-site.xml (defaults to /home/y/libexec/hcat_server/conf/hive-site.xml)" + echo " -delete : To delete delegation tokens " + echo " -list : To list delegation tokens " + echo " -expired : Select expired tokens for deletion/listing " + echo " -olderThan : Select tokens older than the specified time-interval for deletion/listing (e.g. 3d for 3 days, 4h for 4 hours, 5m for 5 minutes, etc.)" + echo " -dryRun : Don't actually delete, but log which tokens might have been deleted " + echo " -batchSize : Number of tokens to drop between sleep intervals. " + echo " -sleepTime : Sleep-time in seconds, between batches of dropped delegation tokens. " + echo " -h | --help : Print this help message, to clarify usage "; +} + +tokentool() { + while [[ $# -gt 0 ]] + do + + case $1 in + -bufferSize|--bufferSize) + BUFFER_SIZE="$2" + shift # past argument + ;; + -jaasConfig|--jaasConfig) + JAAS_CONF="$2" + shift # past argument + ;; + -confLocation|--confLocation) + HCAT_SERVER_CONF="$2" + shift # past argument + ;; + -delete|--delete|-list|--list|-dryRun|--dryRun|-expired|--expired) + CMD_LINE="$CMD_LINE $1" + ;; + -olderThan|--olderThan|-batchSize|--batchSize|-sleepTime|--sleepTime) + CMD_LINE="$CMD_LINE $1 $2" + shift # past argument + ;; + -h|--help) + tokentool_help + exit 0; + ;; + *) + echo "Unrecognized option: $1" + tokentool_help + exit -1 + ;; + esac + shift # past option. + done # /while + + CMD_LINE="$CMD_LINE --confLocation $HCAT_SERVER_CONF" + export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Djute.maxbuffer=$BUFFER_SIZE -Djava.security.auth.login.config=$JAAS_CONF" + execHiveCmd org.apache.hadoop.hive.thrift.DelegationTokenTool $CMD_LINE +} diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index 4c1118134b..fb14ced675 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -1035,11 +1035,18 @@ private void cancelDelegationTokens(JobContext context) throws IOException{ // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in // the conf will not be set String tokenStrForm = client.getTokenStrForm(); + String hCatKeyTokenSignature = context.getConfiguration().get( + HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); if (tokenStrForm != null - && context.getConfiguration().get( - HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + && hCatKeyTokenSignature != null) { + LOG.info("FileOutputCommitterContainer::cancelDelegationTokens(): " + + "Cancelling token fetched for HCAT_KEY_TOKEN_SIGNATURE == (" + hCatKeyTokenSignature + ")."); client.cancelDelegationToken(tokenStrForm); } + else { + LOG.info("FileOutputCommitterContainer::cancelDelegationTokens(): " + + "Could not find tokenStrForm, or HCAT_KEY_TOKEN_SIGNATURE. Skipping token cancellation."); + } } catch (MetaException e) { LOG.warn("MetaException while cancelling delegation token.", e); } catch (TException e) { diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java index 9b621952e6..3e05a0e5e7 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java @@ -117,6 +117,8 @@ void handleSecurity( //set to empty "Text" if hive.metastore.token.signature property is set to null Token hiveToken = hiveTokenSelector.selectToken( new Text(), ugi.getTokens()); + LOG.info("Security::handleSecurity(): Checking for pre-existing metastore token... " + + (hiveToken == null? "Not found. Creating a new one." : "Found. Using existing token.")); if (hiveToken == null) { // we did not get token set up by oozie, let's get them ourselves here. // we essentially get a token per unique Output HCatTableInfo - this is @@ -157,6 +159,7 @@ void handleSecurity( // this will be used by the outputcommitter to pass on to the metastore client // which in turn will pass on to the TokenSelector so that it can select // the right token. + LOG.info("Security::handleSecurity(): Setting signature of token to: " + tokenSignature); conf.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); } } diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 84bfcdd53e..0d7ff6eab5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -457,11 +457,13 @@ private void open() throws MetaException { // tokenSig could be null tokenStrForm = Utils.getTokenStrForm(tokenSig); if(tokenStrForm != null) { + LOG.info("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection."); // authenticate using delegation tokens via the "DIGEST" mechanism transport = authBridge.createClientTransport(null, store.getHost(), "DIGEST", tokenStrForm, transport, MetaStoreUtils.getMetaStoreSaslProperties(conf)); } else { + LOG.info("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection."); String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); transport = authBridge.createClientTransport( diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenTool.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenTool.java new file mode 100644 index 0000000000..a7d8bbd1eb --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenTool.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.hadoop.hive.thrift.HiveDelegationTokenManager.DELEGATION_TOKEN_STORE_CLS; + +/** + * Tool to twiddle MetaStore delegation tokens. + */ +public class DelegationTokenTool extends Configured implements Tool { + private static Logger LOG = LoggerFactory.getLogger(DelegationTokenTool.class); + + private DelegationTokenStore delegationTokenStore; + private static String confLocation; + + private enum OpType { DELETE, LIST } + private OpType opType = OpType.LIST; + + private boolean isDryRun = false; + + private long timeLimitMillis; + private Predicate selectForDeletion = Predicates.alwaysTrue(); + + private static final int BATCH_SIZE_DEFAULT = 100; + private int batchSize = BATCH_SIZE_DEFAULT; // Number of tokens to drop, between sleep intervals; + private static final long SLEEP_TIME_MILLIS_DEFAULT = 10 * 1000; + private long sleepTimeMillis = SLEEP_TIME_MILLIS_DEFAULT; // Sleep-time in milliseconds, between batches of delegation tokens dropped. + + private DelegationTokenTool() throws Exception {} + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new DelegationTokenTool(), args)); + } + + private void readArgs(String[] args) throws Exception { + + args = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + + Options options = new Options(); + options.addOption(new Option("confLocation", true, "Location of HCat Server's hive-site.")); + options.addOption(new Option("delete", false, "Delete delegation token.")); + options.addOption(new Option("list", false, "List delegation tokens.")); + options.addOption(new Option("olderThan", true, "Filter for token's issue-date. (e.g. 3d, 1h or 4m).")); + options.addOption(new Option("expired", false, "Select expired delegation tokens for listing/deletion.")); + options.addOption(new Option("dryRun", false, "Don't actually delete delegation tokens.")); + options.addOption(new Option("batchSize", true, "Number of tokens to drop between sleep intervals.")); + options.addOption(new Option("sleepTime", true, "Sleep-time in seconds, between batches of dropped delegation tokens.")); + + CommandLine commandLine = new GnuParser().parse(options, + args, + false // Stop on non-existent option. + ); + + if (commandLine.hasOption("confLocation")) { + confLocation = commandLine.getOptionValue("confLocation"); + } + + if (commandLine.hasOption("list")) { + opType = OpType.LIST; + } + else + if (commandLine.hasOption("delete")) { + opType = OpType.DELETE; + } + else { + throw new IllegalArgumentException("Operation must be delete, list or get!"); + } + + isDryRun = (commandLine.hasOption("dryRun")); + + if (commandLine.hasOption("expired")) { + LOG.info("Working on expired delegation tokens!"); + timeLimitMillis = System.currentTimeMillis(); + selectForDeletion = new Predicate() { + public boolean apply(DelegationTokenIdentifier input) { + return timeLimitMillis > input.getMaxDate(); + } + }; + } + else + if (commandLine.hasOption("olderThan")) { + + String olderThanLimitString = commandLine.getOptionValue("olderThan"); + switch (olderThanLimitString.charAt(olderThanLimitString.length()-1)) { + case 'd': + case 'D': + timeLimitMillis = System.currentTimeMillis() - 24*60*60*1000*Integer.parseInt(olderThanLimitString.substring(0, olderThanLimitString.length()-1)); + break; + case 'h': + case 'H': + timeLimitMillis = System.currentTimeMillis() - 60*60*1000*Integer.parseInt(olderThanLimitString.substring(0, olderThanLimitString.length()-1)); + break; + case 'm': + case 'M': + timeLimitMillis = System.currentTimeMillis() - 60*1000*Integer.parseInt(olderThanLimitString.substring(0, olderThanLimitString.length()-1)); + break; + default: + throw new IllegalArgumentException("Unsupported time-limit: " + olderThanLimitString); + } + + LOG.info("Working on delegation tokens older than current-time (" + timeLimitMillis + ")."); + selectForDeletion = new Predicate() { + public boolean apply(DelegationTokenIdentifier input) { + return timeLimitMillis > input.getIssueDate(); + } + }; + + } + else { + // Neither "expired" nor "olderThan" criteria selected. This better not be an attempt to delete tokens. + if (opType == OpType.DELETE) { + throw new IllegalArgumentException("Attempting to delete tokens. " + + "Specify deletion criteria (either expired or time-range)."); + } + } + + if (commandLine.hasOption("batchSize")) { + String batchSizeString = commandLine.getOptionValue("batchSize"); + batchSize = Integer.parseInt(batchSizeString); + if (batchSize < 1) { + LOG.warn("Invalid batch-size! (" + batchSize + ") Resetting to defaults."); + batchSize = BATCH_SIZE_DEFAULT; + } + LOG.info("Batch-size for drop == " + batchSize); + } + + if (commandLine.hasOption("sleepTime")) { + String sleepTimeString = commandLine.getOptionValue("sleepTime"); + sleepTimeMillis = 1000 * Integer.parseInt(sleepTimeString); + if (sleepTimeMillis <= 0) { + LOG.warn("Invalid sleep-time! (" + sleepTimeMillis + ") Resetting to defaults."); + sleepTimeMillis = SLEEP_TIME_MILLIS_DEFAULT; + } + LOG.info("Sleep between drop-batches: " + sleepTimeMillis + " milliseconds."); + } + + } + + private void init() throws Exception { + Configuration conf = new Configuration(); + conf.addResource(new Path(confLocation)); + + String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); + if (StringUtils.isBlank(tokenStoreClassName)) { + throw new Exception("Could not find Delegation TokenStore implementation."); + } + + Class clazz = Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class); + delegationTokenStore = ReflectionUtils.newInstance(clazz, conf); + delegationTokenStore.init(null, HadoopThriftAuthBridge.Server.ServerMode.METASTORE); + } + + private List getAllDelegationTokenIDs() throws Exception { + return delegationTokenStore.getAllDelegationTokenIdentifiers(); + } + + private void doList() throws Exception { + for (DelegationTokenIdentifier tokenId : Iterables.filter(getAllDelegationTokenIDs(), selectForDeletion)) { + System.out.println(tokenId.toString()); + } + } + + private void doDelete() throws Exception { + int nDeletedTokens = 0; + List allDelegationTokenIDs = getAllDelegationTokenIDs(); + for (DelegationTokenIdentifier tokenId : Iterables.filter(allDelegationTokenIDs, selectForDeletion)) { + if ((++nDeletedTokens % batchSize) == 0) { + LOG.info("Deleted " + nDeletedTokens + "/" + allDelegationTokenIDs.size() + + " (" + (((long)(100*nDeletedTokens))/allDelegationTokenIDs.size()) + "%). " + + "Sleeping for " + sleepTimeMillis + "ms..."); + try {Thread.sleep(sleepTimeMillis); } catch (InterruptedException ignore) {} + } + LOG.info("Deleting token: " + tokenId.toString()); + if (!isDryRun) { + delegationTokenStore.removeToken(tokenId); + } + } + } + + public int run(String[] args) throws Exception { + try { + readArgs(args); + init(); + + switch (opType) { + case LIST: + doList(); break; + case DELETE: + doDelete(); break; + } + + return 0; + } + catch (Exception exception) { + LOG.error("Unexpected exception: ", exception); + return -1; + } + } +}