commit b171ccb52570dd9bf49dc3823e14e650b87994ca Author: Ashutosh Chauhan Date: Thu Nov 27 11:23:20 2014 -0800 Deleted shims/common-secure diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java index 7661ad5..39ef86e 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/Security.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.thrift.DelegationTokenSelector; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -37,7 +38,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; -import org.apache.hadoop.security.token.delegation.DelegationTokenSelector; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.thrift.TException; diff --git a/shims/aggregator/pom.xml b/shims/aggregator/pom.xml index 20e51a2..3772177 100644 --- a/shims/aggregator/pom.xml +++ b/shims/aggregator/pom.xml @@ -41,12 +41,6 @@ org.apache.hive.shims - hive-shims-common-secure - ${project.version} - compile - - - org.apache.hive.shims hive-shims-0.20S ${project.version} runtime diff --git a/shims/common-secure/pom.xml b/shims/common-secure/pom.xml deleted file mode 100644 index 2c56f5d..0000000 --- a/shims/common-secure/pom.xml +++ /dev/null @@ -1,93 +0,0 @@ - - - - 4.0.0 - - org.apache.hive - hive - 0.15.0-SNAPSHOT - ../../pom.xml - - - org.apache.hive.shims - hive-shims-common-secure - jar - Hive Shims Secure Common - - - ../.. - - - - - - - org.apache.hive.shims - hive-shims-common - ${project.version} - - - - commons-codec - commons-codec - ${commons-codec.version} - - - commons-lang - commons-lang - ${commons-lang.version} - - - commons-logging - commons-logging - ${commons-logging.version} - - - org.apache.hadoop - hadoop-core - ${hadoop-20S.version} - true - - - org.apache.hadoop - hadoop-tools - ${hadoop-20S.version} - true - - - org.apache.thrift - libthrift - ${libthrift.version} - - - org.apache.curator - curator-framework - ${curator.version} - - - org.apache.zookeeper - zookeeper - ${zookeeper.version} - - - org.apache.hadoop - hadoop-core - - - - - diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java deleted file mode 100644 index bf196ae..0000000 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java +++ /dev/null @@ -1,441 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.shims; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.net.URI; -import java.security.AccessControlException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.DefaultFileAccess; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; -import org.apache.hadoop.mapred.ClusterStatus; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.CombineFileInputFormat; -import org.apache.hadoop.mapred.lib.CombineFileSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.Progressable; - -import com.google.common.primitives.Longs; - -/** - * Base implemention for shims against secure Hadoop 0.20.3/0.23. - */ -public abstract class HadoopShimsSecure implements HadoopShims { - - static final Log LOG = LogFactory.getLog(HadoopShimsSecure.class); - - @Override - public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() { - return new CombineFileInputFormatShim() { - @Override - public RecordReader getRecordReader(InputSplit split, - JobConf job, Reporter reporter) throws IOException { - throw new IOException("CombineFileInputFormat.getRecordReader not needed."); - } - }; - } - - public static class InputSplitShim extends CombineFileSplit { - long shrinkedLength; - boolean _isShrinked; - public InputSplitShim() { - super(); - _isShrinked = false; - } - - public InputSplitShim(JobConf conf, Path[] paths, long[] startOffsets, - long[] lengths, String[] locations) throws IOException { - super(conf, paths, startOffsets, lengths, dedup(locations)); - _isShrinked = false; - } - - public void shrinkSplit(long length) { - _isShrinked = true; - shrinkedLength = length; - } - - public boolean isShrinked() { - return _isShrinked; - } - - public long getShrinkedLength() { - return shrinkedLength; - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - _isShrinked = in.readBoolean(); - if (_isShrinked) { - shrinkedLength = in.readLong(); - } - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeBoolean(_isShrinked); - if (_isShrinked) { - out.writeLong(shrinkedLength); - } - } - } - - /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once - * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy - * of org.apache.hadoop.mapred.lib.CombineFileRecordReader - */ - public static class CombineFileRecordReader implements RecordReader { - - static final Class[] constructorSignature = new Class[] { - InputSplit.class, - Configuration.class, - Reporter.class, - Integer.class - }; - - protected CombineFileSplit split; - protected JobConf jc; - protected Reporter reporter; - protected Class> rrClass; - protected Constructor> rrConstructor; - protected FileSystem fs; - - protected int idx; - protected long progress; - protected RecordReader curReader; - protected boolean isShrinked; - protected long shrinkedLength; - - @Override - public boolean next(K key, V value) throws IOException { - - while ((curReader == null) - || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(), - value)) { - if (!initNextRecordReader(key)) { - return false; - } - } - return true; - } - - @Override - public K createKey() { - K newKey = curReader.createKey(); - return (K)(new CombineHiveKey(newKey)); - } - - @Override - public V createValue() { - return curReader.createValue(); - } - - /** - * Return the amount of data processed. - */ - @Override - public long getPos() throws IOException { - return progress; - } - - @Override - public void close() throws IOException { - if (curReader != null) { - curReader.close(); - curReader = null; - } - } - - /** - * Return progress based on the amount of data processed so far. - */ - @Override - public float getProgress() throws IOException { - return Math.min(1.0f, progress / (float) (split.getLength())); - } - - /** - * A generic RecordReader that can hand out different recordReaders - * for each chunk in the CombineFileSplit. - */ - public CombineFileRecordReader(JobConf job, CombineFileSplit split, - Reporter reporter, - Class> rrClass) - throws IOException { - this.split = split; - this.jc = job; - this.rrClass = rrClass; - this.reporter = reporter; - this.idx = 0; - this.curReader = null; - this.progress = 0; - - isShrinked = false; - - assert (split instanceof InputSplitShim); - if (((InputSplitShim) split).isShrinked()) { - isShrinked = true; - shrinkedLength = ((InputSplitShim) split).getShrinkedLength(); - } - - try { - rrConstructor = rrClass.getDeclaredConstructor(constructorSignature); - rrConstructor.setAccessible(true); - } catch (Exception e) { - throw new RuntimeException(rrClass.getName() + - " does not have valid constructor", e); - } - initNextRecordReader(null); - } - - /** - * do next and handle exception inside it. - * @param key - * @param value - * @return - * @throws IOException - */ - private boolean doNextWithExceptionHandler(K key, V value) throws IOException { - try { - return curReader.next(key, value); - } catch (Exception e) { - return HiveIOExceptionHandlerUtil - .handleRecordReaderNextException(e, jc); - } - } - - /** - * Get the record reader for the next chunk in this CombineFileSplit. - */ - protected boolean initNextRecordReader(K key) throws IOException { - - if (curReader != null) { - curReader.close(); - curReader = null; - if (idx > 0) { - progress += split.getLength(idx - 1); // done processing so far - } - } - - // if all chunks have been processed, nothing more to do. - if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) { - return false; - } - - // get a record reader for the idx-th chunk - try { - curReader = rrConstructor.newInstance(new Object[] - {split, jc, reporter, Integer.valueOf(idx)}); - - // change the key if need be - if (key != null) { - K newKey = curReader.createKey(); - ((CombineHiveKey)key).setKey(newKey); - } - - // setup some helper config variables. - jc.set("map.input.file", split.getPath(idx).toString()); - jc.setLong("map.input.start", split.getOffset(idx)); - jc.setLong("map.input.length", split.getLength(idx)); - } catch (Exception e) { - curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException( - e, jc); - } - idx++; - return true; - } - } - - public abstract static class CombineFileInputFormatShim extends - CombineFileInputFormat - implements HadoopShims.CombineFileInputFormatShim { - - @Override - public Path[] getInputPathsShim(JobConf conf) { - try { - return FileInputFormat.getInputPaths(conf); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void createPool(JobConf conf, PathFilter... filters) { - super.createPool(conf, filters); - } - - @Override - public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException { - long minSize = job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), 0); - - // For backward compatibility, let the above parameter be used - if (job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZEPERNODE"), 0) == 0) { - super.setMinSplitSizeNode(minSize); - } - - if (job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZEPERRACK"), 0) == 0) { - super.setMinSplitSizeRack(minSize); - } - - if (job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), 0) == 0) { - super.setMaxSplitSize(minSize); - } - - InputSplit[] splits = super.getSplits(job, numSplits); - - ArrayList inputSplitShims = new ArrayList(); - for (int pos = 0; pos < splits.length; pos++) { - CombineFileSplit split = (CombineFileSplit) splits[pos]; - Set dirIndices = getDirIndices(split.getPaths(), job); - if (dirIndices.size() != split.getPaths().length) { - List prunedPaths = prune(dirIndices, Arrays.asList(split.getPaths())); - List prunedStartOffsets = prune(dirIndices, Arrays.asList( - ArrayUtils.toObject(split.getStartOffsets()))); - List prunedLengths = prune(dirIndices, Arrays.asList( - ArrayUtils.toObject(split.getLengths()))); - inputSplitShims.add(new InputSplitShim(job, prunedPaths.toArray(new Path[prunedPaths.size()]), - Longs.toArray(prunedStartOffsets), - Longs.toArray(prunedLengths), split.getLocations())); - } - } - return inputSplitShims.toArray(new InputSplitShim[inputSplitShims.size()]); - } - - @Override - public InputSplitShim getInputSplitShim() throws IOException { - return new InputSplitShim(); - } - - @Override - public RecordReader getRecordReader(JobConf job, CombineFileSplit split, - Reporter reporter, - Class> rrClass) - throws IOException { - CombineFileSplit cfSplit = split; - return new CombineFileRecordReader(job, cfSplit, reporter, rrClass); - } - - } - - @Override - abstract public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception; - - @Override - abstract public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext( - Configuration conf, final Progressable progressable); - - @Override - abstract public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job); - - @Override - abstract public boolean isLocalMode(Configuration conf); - - @Override - abstract public void setJobLauncherRpcAddress(Configuration conf, String val); - - @Override - abstract public String getJobLauncherHttpAddress(Configuration conf); - - @Override - abstract public String getJobLauncherRpcAddress(Configuration conf); - - @Override - abstract public short getDefaultReplication(FileSystem fs, Path path); - - @Override - abstract public long getDefaultBlockSize(FileSystem fs, Path path); - - @Override - abstract public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration conf) - throws IOException; - - @Override - abstract public FileSystem createProxyFileSystem(FileSystem fs, URI uri); - - @Override - abstract public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException; - - protected void run(FsShell shell, String[] command) throws Exception { - LOG.debug(ArrayUtils.toString(command)); - int retval = shell.run(command); - LOG.debug("Return value is :" + retval); - } - - /** - * CombineFileInputFormat sometimes returns directories as splits, need to prune them. - */ - private static Set getDirIndices(Path[] paths, JobConf conf) throws IOException { - Set result = new HashSet(); - for (int i = 0; i < paths.length; i++) { - FileSystem fs = paths[i].getFileSystem(conf); - if (!fs.isFile(paths[i])) { - result.add(i); - } - } - return result; - } - - private static List prune(Set indicesToPrune, List elms) { - List result = new ArrayList(); - int i = 0; - for (K elm : elms) { - if (indicesToPrune.contains(i)) { - continue; - } - result.add(elm); - i++; - } - return result; - } - - private static String[] dedup(String[] locations) throws IOException { - Set dedup = new HashSet(); - Collections.addAll(dedup, locations); - return dedup.toArray(new String[dedup.size()]); - } - - @Override - public void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action) - throws IOException, AccessControlException, Exception { - DefaultFileAccess.checkFileAccess(fs, stat, action); - } -} diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java deleted file mode 100644 index 0f56b8c..0000000 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DBTokenStore implements DelegationTokenStore { - private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class); - - @Override - public int addMasterKey(String s) throws TokenStoreException { - if (LOG.isTraceEnabled()) { - LOG.trace("addMasterKey: s = " + s); - } - return (Integer)invokeOnRawStore("addMasterKey", new Object[]{s},String.class); - } - - @Override - public void updateMasterKey(int keySeq, String s) throws TokenStoreException { - if (LOG.isTraceEnabled()) { - LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); - } - invokeOnRawStore("updateMasterKey", new Object[] {Integer.valueOf(keySeq), s}, - Integer.class, String.class); - } - - @Override - public boolean removeMasterKey(int keySeq) { - return (Boolean)invokeOnRawStore("removeMasterKey", new Object[] {Integer.valueOf(keySeq)}, - Integer.class); - } - - @Override - public String[] getMasterKeys() throws TokenStoreException { - return (String[])invokeOnRawStore("getMasterKeys", new Object[0]); - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) throws TokenStoreException { - - try { - String identifier = TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); - String tokenStr = Base64.encodeBase64URLSafeString( - HiveDelegationTokenSupport.encodeDelegationTokenInformation(token)); - boolean result = (Boolean)invokeOnRawStore("addToken", new Object[] {identifier, tokenStr}, - String.class, String.class); - if (LOG.isTraceEnabled()) { - LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", addded = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) - throws TokenStoreException { - try { - String tokenStr = (String)invokeOnRawStore("getToken", new Object[] { - TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); - DelegationTokenInformation result = null; - if (tokenStr != null) { - result = HiveDelegationTokenSupport.decodeDelegationTokenInformation(Base64.decodeBase64(tokenStr)); - } - if (LOG.isTraceEnabled()) { - LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException{ - try { - boolean result = (Boolean)invokeOnRawStore("removeToken", new Object[] { - TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); - if (LOG.isTraceEnabled()) { - LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", addded = " + result); - } - return result; - } catch (IOException e) { - throw new TokenStoreException(e); - } - } - - @Override - public List getAllDelegationTokenIdentifiers() throws TokenStoreException{ - - List tokenIdents = (List)invokeOnRawStore("getAllTokenIdentifiers", new Object[0]); - List delTokenIdents = new ArrayList(tokenIdents.size()); - - for (String tokenIdent : tokenIdents) { - DelegationTokenIdentifier delToken = new DelegationTokenIdentifier(); - try { - TokenStoreDelegationTokenSecretManager.decodeWritable(delToken, tokenIdent); - } catch (IOException e) { - throw new TokenStoreException(e); - } - delTokenIdents.add(delToken); - } - return delTokenIdents; - } - - private Object rawStore; - - @Override - public void init(Object rawStore, ServerMode smode) throws TokenStoreException { - this.rawStore = rawStore; - } - - private Object invokeOnRawStore(String methName, Object[] params, Class ... paramTypes) - throws TokenStoreException{ - - try { - return rawStore.getClass().getMethod(methName, paramTypes).invoke(rawStore, params); - } catch (IllegalArgumentException e) { - throw new TokenStoreException(e); - } catch (SecurityException e) { - throw new TokenStoreException(e); - } catch (IllegalAccessException e) { - throw new TokenStoreException(e); - } catch (InvocationTargetException e) { - throw new TokenStoreException(e.getCause()); - } catch (NoSuchMethodException e) { - throw new TokenStoreException(e); - } - } - - @Override - public void setConf(Configuration conf) { - // No-op - } - - @Override - public Configuration getConf() { - return null; - } - - @Override - public void close() throws IOException { - // No-op. - } - -} diff --git a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java deleted file mode 100644 index e46f293..0000000 --- a/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java +++ /dev/null @@ -1,472 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.thrift; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; -import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooDefs.Perms; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ZooKeeper token store implementation. - */ -public class ZooKeeperTokenStore implements DelegationTokenStore { - - private static final Logger LOGGER = - LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); - - protected static final String ZK_SEQ_FORMAT = "%010d"; - private static final String NODE_KEYS = "/keys"; - private static final String NODE_TOKENS = "/tokens"; - - private String rootNode = ""; - private volatile CuratorFramework zkSession; - private String zkConnectString; - private int connectTimeoutMillis; - private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); - - /** - * ACLProvider permissions will be used in case parent dirs need to be created - */ - private final ACLProvider aclDefaultProvider = new ACLProvider() { - - @Override - public List getDefaultAcl() { - return newNodeAcl; - } - - @Override - public List getAclForPath(String path) { - return getDefaultAcl(); - } - }; - - - private ServerMode serverMode; - - private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" - + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; - - private Configuration conf; - - /** - * Default constructor for dynamic instantiation w/ Configurable - * (ReflectionUtils does not support Configuration constructor injection). - */ - protected ZooKeeperTokenStore() { - } - - private CuratorFramework getSession() { - if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { - synchronized (this) { - if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { - zkSession = - CuratorFrameworkFactory.builder().connectString(zkConnectString) - .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); - zkSession.start(); - } - } - } - return zkSession; - } - - private void setupJAASConfig(Configuration conf) throws IOException { - if (!UserGroupInformation.getLoginUser().isFromKeytab()) { - // The process has not logged in using keytab - // this should be a test mode, can't use keytab to authenticate - // with zookeeper. - LOGGER.warn("Login is not from keytab"); - return; - } - - String principal; - String keytab; - switch (serverMode) { - case METASTORE: - principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); - keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); - break; - case HIVESERVER2: - principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); - keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); - break; - default: - throw new AssertionError("Unexpected server mode " + serverMode); - } - Utils.setZookeeperClientKerberosJaasConfig(principal, keytab); - } - - private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { - String val = conf.get(param); - if (val == null || val.trim().isEmpty()) { - throw new IOException("Configuration parameter " + param + " should be set, " - + WHEN_ZK_DSTORE_MSG); - } - return val; - } - - /** - * Create a path if it does not already exist ("mkdir -p") - * @param path string with '/' separator - * @param acl list of ACL entries - * @throws TokenStoreException - */ - public void ensurePath(String path, List acl) - throws TokenStoreException { - try { - CuratorFramework zk = getSession(); - String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .withACL(acl).forPath(path); - LOGGER.info("Created path: {} ", node); - } catch (KeeperException.NodeExistsException e) { - // node already exists - } catch (Exception e) { - throw new TokenStoreException("Error creating path " + path, e); - } - } - - /** - * Parse ACL permission string, from ZooKeeperMain private method - * @param permString - * @return - */ - public static int getPermFromString(String permString) { - int perm = 0; - for (int i = 0; i < permString.length(); i++) { - switch (permString.charAt(i)) { - case 'r': - perm |= ZooDefs.Perms.READ; - break; - case 'w': - perm |= ZooDefs.Perms.WRITE; - break; - case 'c': - perm |= ZooDefs.Perms.CREATE; - break; - case 'd': - perm |= ZooDefs.Perms.DELETE; - break; - case 'a': - perm |= ZooDefs.Perms.ADMIN; - break; - default: - LOGGER.error("Unknown perm type: " + permString.charAt(i)); - } - } - return perm; - } - - /** - * Parse comma separated list of ACL entries to secure generated nodes, e.g. - * sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa - * @param aclString - * @return ACL list - */ - public static List parseACLs(String aclString) { - String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); - List acl = new ArrayList(aclComps.length); - for (String a : aclComps) { - if (StringUtils.isBlank(a)) { - continue; - } - a = a.trim(); - // from ZooKeeperMain private method - int firstColon = a.indexOf(':'); - int lastColon = a.lastIndexOf(':'); - if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { - LOGGER.error(a + " does not have the form scheme:id:perm"); - continue; - } - ACL newAcl = new ACL(); - newAcl.setId(new Id(a.substring(0, firstColon), a.substring( - firstColon + 1, lastColon))); - newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); - acl.add(newAcl); - } - return acl; - } - - private void initClientAndPaths() { - if (this.zkSession != null) { - this.zkSession.close(); - } - try { - ensurePath(rootNode + NODE_KEYS, newNodeAcl); - ensurePath(rootNode + NODE_TOKENS, newNodeAcl); - } catch (TokenStoreException e) { - throw e; - } - } - - @Override - public void setConf(Configuration conf) { - if (conf == null) { - throw new IllegalArgumentException("conf is null"); - } - this.conf = conf; - } - - @Override - public Configuration getConf() { - return null; // not required - } - - private Map getAllKeys() throws KeeperException, InterruptedException { - - String masterKeyNode = rootNode + NODE_KEYS; - - // get children of key node - List nodes = zkGetChildren(masterKeyNode); - - // read each child node, add to results - Map result = new HashMap(); - for (String node : nodes) { - String nodePath = masterKeyNode + "/" + node; - byte[] data = zkGetData(nodePath); - if (data != null) { - result.put(getSeq(node), data); - } - } - return result; - } - - private List zkGetChildren(String path) { - CuratorFramework zk = getSession(); - try { - return zk.getChildren().forPath(path); - } catch (Exception e) { - throw new TokenStoreException("Error getting children for " + path, e); - } - } - - private byte[] zkGetData(String nodePath) { - CuratorFramework zk = getSession(); - try { - return zk.getData().forPath(nodePath); - } catch (KeeperException.NoNodeException ex) { - return null; - } catch (Exception e) { - throw new TokenStoreException("Error reading " + nodePath, e); - } - } - - private int getSeq(String path) { - String[] pathComps = path.split("/"); - return Integer.parseInt(pathComps[pathComps.length-1]); - } - - @Override - public int addMasterKey(String s) { - String keysPath = rootNode + NODE_KEYS + "/"; - CuratorFramework zk = getSession(); - String newNode; - try { - newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) - .forPath(keysPath, s.getBytes()); - } catch (Exception e) { - throw new TokenStoreException("Error creating new node with path " + keysPath, e); - } - LOGGER.info("Added key {}", newNode); - return getSeq(newNode); - } - - @Override - public void updateMasterKey(int keySeq, String s) { - CuratorFramework zk = getSession(); - String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); - try { - zk.setData().forPath(keyPath, s.getBytes()); - } catch (Exception e) { - throw new TokenStoreException("Error setting data in " + keyPath, e); - } - } - - @Override - public boolean removeMasterKey(int keySeq) { - String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); - zkDelete(keyPath); - return true; - } - - private void zkDelete(String path) { - CuratorFramework zk = getSession(); - try { - zk.delete().forPath(path); - } catch (KeeperException.NoNodeException ex) { - // already deleted - } catch (Exception e) { - throw new TokenStoreException("Error deleting " + path, e); - } - } - - @Override - public String[] getMasterKeys() { - try { - Map allKeys = getAllKeys(); - String[] result = new String[allKeys.size()]; - int resultIdx = 0; - for (byte[] keyBytes : allKeys.values()) { - result[resultIdx++] = new String(keyBytes); - } - return result; - } catch (KeeperException ex) { - throw new TokenStoreException(ex); - } catch (InterruptedException ex) { - throw new TokenStoreException(ex); - } - } - - - private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { - try { - return rootNode + NODE_TOKENS + "/" - + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); - } catch (IOException ex) { - throw new TokenStoreException("Failed to encode token identifier", ex); - } - } - - @Override - public boolean addToken(DelegationTokenIdentifier tokenIdentifier, - DelegationTokenInformation token) { - byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); - String tokenPath = getTokenPath(tokenIdentifier); - CuratorFramework zk = getSession(); - String newNode; - try { - newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) - .forPath(tokenPath, tokenBytes); - } catch (Exception e) { - throw new TokenStoreException("Error creating new node with path " + tokenPath, e); - } - - LOGGER.info("Added token: {}", newNode); - return true; - } - - @Override - public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { - String tokenPath = getTokenPath(tokenIdentifier); - zkDelete(tokenPath); - return true; - } - - @Override - public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { - byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); - try { - return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); - } catch (Exception ex) { - throw new TokenStoreException("Failed to decode token", ex); - } - } - - @Override - public List getAllDelegationTokenIdentifiers() { - String containerNode = rootNode + NODE_TOKENS; - final List nodes = zkGetChildren(containerNode); - List result = new java.util.ArrayList( - nodes.size()); - for (String node : nodes) { - DelegationTokenIdentifier id = new DelegationTokenIdentifier(); - try { - TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); - result.add(id); - } catch (Exception e) { - LOGGER.warn("Failed to decode token '{}'", node); - } - } - return result; - } - - @Override - public void close() throws IOException { - if (this.zkSession != null) { - this.zkSession.close(); - } - } - - @Override - public void init(Object objectStore, ServerMode smode) { - this.serverMode = smode; - zkConnectString = - conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); - if (zkConnectString == null || zkConnectString.trim().isEmpty()) { - // try alternate config param - zkConnectString = - conf.get( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, - null); - if (zkConnectString == null || zkConnectString.trim().isEmpty()) { - throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " - + "either " + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR - + " or " - + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE - + WHEN_ZK_DSTORE_MSG); - } - } - connectTimeoutMillis = - conf.getInt( - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, - CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); - String aclStr = conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); - if (StringUtils.isNotBlank(aclStr)) { - this.newNodeAcl = parseACLs(aclStr); - } - rootNode = - conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, - HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; - - try { - // Install the JAAS Configuration for the runtime - setupJAASConfig(conf); - } catch (IOException e) { - throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " - + e.getMessage(), e); - } - initClientAndPaths(); - } - -} diff --git a/shims/common/pom.xml b/shims/common/pom.xml index 0981a86..739504d 100644 --- a/shims/common/pom.xml +++ b/shims/common/pom.xml @@ -67,6 +67,11 @@ ${libthrift.version} + org.apache.curator + curator-framework + ${curator.version} + + org.apache.zookeeper zookeeper ${zookeeper.version} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java new file mode 100644 index 0000000..bf196ae --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java @@ -0,0 +1,441 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.shims; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.security.AccessControlException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DefaultFileAccess; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.CombineFileInputFormat; +import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Progressable; + +import com.google.common.primitives.Longs; + +/** + * Base implemention for shims against secure Hadoop 0.20.3/0.23. + */ +public abstract class HadoopShimsSecure implements HadoopShims { + + static final Log LOG = LogFactory.getLog(HadoopShimsSecure.class); + + @Override + public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() { + return new CombineFileInputFormatShim() { + @Override + public RecordReader getRecordReader(InputSplit split, + JobConf job, Reporter reporter) throws IOException { + throw new IOException("CombineFileInputFormat.getRecordReader not needed."); + } + }; + } + + public static class InputSplitShim extends CombineFileSplit { + long shrinkedLength; + boolean _isShrinked; + public InputSplitShim() { + super(); + _isShrinked = false; + } + + public InputSplitShim(JobConf conf, Path[] paths, long[] startOffsets, + long[] lengths, String[] locations) throws IOException { + super(conf, paths, startOffsets, lengths, dedup(locations)); + _isShrinked = false; + } + + public void shrinkSplit(long length) { + _isShrinked = true; + shrinkedLength = length; + } + + public boolean isShrinked() { + return _isShrinked; + } + + public long getShrinkedLength() { + return shrinkedLength; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + _isShrinked = in.readBoolean(); + if (_isShrinked) { + shrinkedLength = in.readLong(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeBoolean(_isShrinked); + if (_isShrinked) { + out.writeLong(shrinkedLength); + } + } + } + + /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once + * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy + * of org.apache.hadoop.mapred.lib.CombineFileRecordReader + */ + public static class CombineFileRecordReader implements RecordReader { + + static final Class[] constructorSignature = new Class[] { + InputSplit.class, + Configuration.class, + Reporter.class, + Integer.class + }; + + protected CombineFileSplit split; + protected JobConf jc; + protected Reporter reporter; + protected Class> rrClass; + protected Constructor> rrConstructor; + protected FileSystem fs; + + protected int idx; + protected long progress; + protected RecordReader curReader; + protected boolean isShrinked; + protected long shrinkedLength; + + @Override + public boolean next(K key, V value) throws IOException { + + while ((curReader == null) + || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(), + value)) { + if (!initNextRecordReader(key)) { + return false; + } + } + return true; + } + + @Override + public K createKey() { + K newKey = curReader.createKey(); + return (K)(new CombineHiveKey(newKey)); + } + + @Override + public V createValue() { + return curReader.createValue(); + } + + /** + * Return the amount of data processed. + */ + @Override + public long getPos() throws IOException { + return progress; + } + + @Override + public void close() throws IOException { + if (curReader != null) { + curReader.close(); + curReader = null; + } + } + + /** + * Return progress based on the amount of data processed so far. + */ + @Override + public float getProgress() throws IOException { + return Math.min(1.0f, progress / (float) (split.getLength())); + } + + /** + * A generic RecordReader that can hand out different recordReaders + * for each chunk in the CombineFileSplit. + */ + public CombineFileRecordReader(JobConf job, CombineFileSplit split, + Reporter reporter, + Class> rrClass) + throws IOException { + this.split = split; + this.jc = job; + this.rrClass = rrClass; + this.reporter = reporter; + this.idx = 0; + this.curReader = null; + this.progress = 0; + + isShrinked = false; + + assert (split instanceof InputSplitShim); + if (((InputSplitShim) split).isShrinked()) { + isShrinked = true; + shrinkedLength = ((InputSplitShim) split).getShrinkedLength(); + } + + try { + rrConstructor = rrClass.getDeclaredConstructor(constructorSignature); + rrConstructor.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException(rrClass.getName() + + " does not have valid constructor", e); + } + initNextRecordReader(null); + } + + /** + * do next and handle exception inside it. + * @param key + * @param value + * @return + * @throws IOException + */ + private boolean doNextWithExceptionHandler(K key, V value) throws IOException { + try { + return curReader.next(key, value); + } catch (Exception e) { + return HiveIOExceptionHandlerUtil + .handleRecordReaderNextException(e, jc); + } + } + + /** + * Get the record reader for the next chunk in this CombineFileSplit. + */ + protected boolean initNextRecordReader(K key) throws IOException { + + if (curReader != null) { + curReader.close(); + curReader = null; + if (idx > 0) { + progress += split.getLength(idx - 1); // done processing so far + } + } + + // if all chunks have been processed, nothing more to do. + if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) { + return false; + } + + // get a record reader for the idx-th chunk + try { + curReader = rrConstructor.newInstance(new Object[] + {split, jc, reporter, Integer.valueOf(idx)}); + + // change the key if need be + if (key != null) { + K newKey = curReader.createKey(); + ((CombineHiveKey)key).setKey(newKey); + } + + // setup some helper config variables. + jc.set("map.input.file", split.getPath(idx).toString()); + jc.setLong("map.input.start", split.getOffset(idx)); + jc.setLong("map.input.length", split.getLength(idx)); + } catch (Exception e) { + curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException( + e, jc); + } + idx++; + return true; + } + } + + public abstract static class CombineFileInputFormatShim extends + CombineFileInputFormat + implements HadoopShims.CombineFileInputFormatShim { + + @Override + public Path[] getInputPathsShim(JobConf conf) { + try { + return FileInputFormat.getInputPaths(conf); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void createPool(JobConf conf, PathFilter... filters) { + super.createPool(conf, filters); + } + + @Override + public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException { + long minSize = job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), 0); + + // For backward compatibility, let the above parameter be used + if (job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZEPERNODE"), 0) == 0) { + super.setMinSplitSizeNode(minSize); + } + + if (job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZEPERRACK"), 0) == 0) { + super.setMinSplitSizeRack(minSize); + } + + if (job.getLong(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), 0) == 0) { + super.setMaxSplitSize(minSize); + } + + InputSplit[] splits = super.getSplits(job, numSplits); + + ArrayList inputSplitShims = new ArrayList(); + for (int pos = 0; pos < splits.length; pos++) { + CombineFileSplit split = (CombineFileSplit) splits[pos]; + Set dirIndices = getDirIndices(split.getPaths(), job); + if (dirIndices.size() != split.getPaths().length) { + List prunedPaths = prune(dirIndices, Arrays.asList(split.getPaths())); + List prunedStartOffsets = prune(dirIndices, Arrays.asList( + ArrayUtils.toObject(split.getStartOffsets()))); + List prunedLengths = prune(dirIndices, Arrays.asList( + ArrayUtils.toObject(split.getLengths()))); + inputSplitShims.add(new InputSplitShim(job, prunedPaths.toArray(new Path[prunedPaths.size()]), + Longs.toArray(prunedStartOffsets), + Longs.toArray(prunedLengths), split.getLocations())); + } + } + return inputSplitShims.toArray(new InputSplitShim[inputSplitShims.size()]); + } + + @Override + public InputSplitShim getInputSplitShim() throws IOException { + return new InputSplitShim(); + } + + @Override + public RecordReader getRecordReader(JobConf job, CombineFileSplit split, + Reporter reporter, + Class> rrClass) + throws IOException { + CombineFileSplit cfSplit = split; + return new CombineFileRecordReader(job, cfSplit, reporter, rrClass); + } + + } + + @Override + abstract public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception; + + @Override + abstract public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext( + Configuration conf, final Progressable progressable); + + @Override + abstract public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job); + + @Override + abstract public boolean isLocalMode(Configuration conf); + + @Override + abstract public void setJobLauncherRpcAddress(Configuration conf, String val); + + @Override + abstract public String getJobLauncherHttpAddress(Configuration conf); + + @Override + abstract public String getJobLauncherRpcAddress(Configuration conf); + + @Override + abstract public short getDefaultReplication(FileSystem fs, Path path); + + @Override + abstract public long getDefaultBlockSize(FileSystem fs, Path path); + + @Override + abstract public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration conf) + throws IOException; + + @Override + abstract public FileSystem createProxyFileSystem(FileSystem fs, URI uri); + + @Override + abstract public FileSystem getNonCachedFileSystem(URI uri, Configuration conf) throws IOException; + + protected void run(FsShell shell, String[] command) throws Exception { + LOG.debug(ArrayUtils.toString(command)); + int retval = shell.run(command); + LOG.debug("Return value is :" + retval); + } + + /** + * CombineFileInputFormat sometimes returns directories as splits, need to prune them. + */ + private static Set getDirIndices(Path[] paths, JobConf conf) throws IOException { + Set result = new HashSet(); + for (int i = 0; i < paths.length; i++) { + FileSystem fs = paths[i].getFileSystem(conf); + if (!fs.isFile(paths[i])) { + result.add(i); + } + } + return result; + } + + private static List prune(Set indicesToPrune, List elms) { + List result = new ArrayList(); + int i = 0; + for (K elm : elms) { + if (indicesToPrune.contains(i)) { + continue; + } + result.add(elm); + i++; + } + return result; + } + + private static String[] dedup(String[] locations) throws IOException { + Set dedup = new HashSet(); + Collections.addAll(dedup, locations); + return dedup.toArray(new String[dedup.size()]); + } + + @Override + public void checkFileAccess(FileSystem fs, FileStatus stat, FsAction action) + throws IOException, AccessControlException, Exception { + DefaultFileAccess.checkFileAccess(fs, stat, action); + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java index 3439b1b..2648d1d 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.hive.thrift.DelegationTokenSelector; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -35,7 +36,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; -import org.apache.hadoop.security.token.delegation.DelegationTokenSelector; import org.apache.zookeeper.client.ZooKeeperSaslClient; public class Utils { diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java new file mode 100644 index 0000000..0f56b8c --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DBTokenStore implements DelegationTokenStore { + private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class); + + @Override + public int addMasterKey(String s) throws TokenStoreException { + if (LOG.isTraceEnabled()) { + LOG.trace("addMasterKey: s = " + s); + } + return (Integer)invokeOnRawStore("addMasterKey", new Object[]{s},String.class); + } + + @Override + public void updateMasterKey(int keySeq, String s) throws TokenStoreException { + if (LOG.isTraceEnabled()) { + LOG.trace("updateMasterKey: s = " + s + ", keySeq = " + keySeq); + } + invokeOnRawStore("updateMasterKey", new Object[] {Integer.valueOf(keySeq), s}, + Integer.class, String.class); + } + + @Override + public boolean removeMasterKey(int keySeq) { + return (Boolean)invokeOnRawStore("removeMasterKey", new Object[] {Integer.valueOf(keySeq)}, + Integer.class); + } + + @Override + public String[] getMasterKeys() throws TokenStoreException { + return (String[])invokeOnRawStore("getMasterKeys", new Object[0]); + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) throws TokenStoreException { + + try { + String identifier = TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + String tokenStr = Base64.encodeBase64URLSafeString( + HiveDelegationTokenSupport.encodeDelegationTokenInformation(token)); + boolean result = (Boolean)invokeOnRawStore("addToken", new Object[] {identifier, tokenStr}, + String.class, String.class); + if (LOG.isTraceEnabled()) { + LOG.trace("addToken: tokenIdentifier = " + tokenIdentifier + ", addded = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) + throws TokenStoreException { + try { + String tokenStr = (String)invokeOnRawStore("getToken", new Object[] { + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); + DelegationTokenInformation result = null; + if (tokenStr != null) { + result = HiveDelegationTokenSupport.decodeDelegationTokenInformation(Base64.decodeBase64(tokenStr)); + } + if (LOG.isTraceEnabled()) { + LOG.trace("getToken: tokenIdentifier = " + tokenIdentifier + ", result = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreException{ + try { + boolean result = (Boolean)invokeOnRawStore("removeToken", new Object[] { + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier)}, String.class); + if (LOG.isTraceEnabled()) { + LOG.trace("removeToken: tokenIdentifier = " + tokenIdentifier + ", addded = " + result); + } + return result; + } catch (IOException e) { + throw new TokenStoreException(e); + } + } + + @Override + public List getAllDelegationTokenIdentifiers() throws TokenStoreException{ + + List tokenIdents = (List)invokeOnRawStore("getAllTokenIdentifiers", new Object[0]); + List delTokenIdents = new ArrayList(tokenIdents.size()); + + for (String tokenIdent : tokenIdents) { + DelegationTokenIdentifier delToken = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(delToken, tokenIdent); + } catch (IOException e) { + throw new TokenStoreException(e); + } + delTokenIdents.add(delToken); + } + return delTokenIdents; + } + + private Object rawStore; + + @Override + public void init(Object rawStore, ServerMode smode) throws TokenStoreException { + this.rawStore = rawStore; + } + + private Object invokeOnRawStore(String methName, Object[] params, Class ... paramTypes) + throws TokenStoreException{ + + try { + return rawStore.getClass().getMethod(methName, paramTypes).invoke(rawStore, params); + } catch (IllegalArgumentException e) { + throw new TokenStoreException(e); + } catch (SecurityException e) { + throw new TokenStoreException(e); + } catch (IllegalAccessException e) { + throw new TokenStoreException(e); + } catch (InvocationTargetException e) { + throw new TokenStoreException(e.getCause()); + } catch (NoSuchMethodException e) { + throw new TokenStoreException(e); + } + } + + @Override + public void setConf(Configuration conf) { + // No-op + } + + @Override + public Configuration getConf() { + return null; + } + + @Override + public void close() throws IOException { + // No-op. + } + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java new file mode 100644 index 0000000..f6e2420 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; + +/** + * A delegation token that is specialized for Hive + */ + +public class DelegationTokenSelector + extends AbstractDelegationTokenSelector{ + + public DelegationTokenSelector() { + super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND); + } +} diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java new file mode 100644 index 0000000..e46f293 --- /dev/null +++ b/shims/common/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java @@ -0,0 +1,472 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.thrift; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper token store implementation. + */ +public class ZooKeeperTokenStore implements DelegationTokenStore { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); + + protected static final String ZK_SEQ_FORMAT = "%010d"; + private static final String NODE_KEYS = "/keys"; + private static final String NODE_TOKENS = "/tokens"; + + private String rootNode = ""; + private volatile CuratorFramework zkSession; + private String zkConnectString; + private int connectTimeoutMillis; + private List newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); + + /** + * ACLProvider permissions will be used in case parent dirs need to be created + */ + private final ACLProvider aclDefaultProvider = new ACLProvider() { + + @Override + public List getDefaultAcl() { + return newNodeAcl; + } + + @Override + public List getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + + private ServerMode serverMode; + + private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" + + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; + + private Configuration conf; + + /** + * Default constructor for dynamic instantiation w/ Configurable + * (ReflectionUtils does not support Configuration constructor injection). + */ + protected ZooKeeperTokenStore() { + } + + private CuratorFramework getSession() { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + synchronized (this) { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + zkSession = + CuratorFrameworkFactory.builder().connectString(zkConnectString) + .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zkSession.start(); + } + } + } + return zkSession; + } + + private void setupJAASConfig(Configuration conf) throws IOException { + if (!UserGroupInformation.getLoginUser().isFromKeytab()) { + // The process has not logged in using keytab + // this should be a test mode, can't use keytab to authenticate + // with zookeeper. + LOGGER.warn("Login is not from keytab"); + return; + } + + String principal; + String keytab; + switch (serverMode) { + case METASTORE: + principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); + break; + case HIVESERVER2: + principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); + break; + default: + throw new AssertionError("Unexpected server mode " + serverMode); + } + Utils.setZookeeperClientKerberosJaasConfig(principal, keytab); + } + + private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { + String val = conf.get(param); + if (val == null || val.trim().isEmpty()) { + throw new IOException("Configuration parameter " + param + " should be set, " + + WHEN_ZK_DSTORE_MSG); + } + return val; + } + + /** + * Create a path if it does not already exist ("mkdir -p") + * @param path string with '/' separator + * @param acl list of ACL entries + * @throws TokenStoreException + */ + public void ensurePath(String path, List acl) + throws TokenStoreException { + try { + CuratorFramework zk = getSession(); + String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(acl).forPath(path); + LOGGER.info("Created path: {} ", node); + } catch (KeeperException.NodeExistsException e) { + // node already exists + } catch (Exception e) { + throw new TokenStoreException("Error creating path " + path, e); + } + } + + /** + * Parse ACL permission string, from ZooKeeperMain private method + * @param permString + * @return + */ + public static int getPermFromString(String permString) { + int perm = 0; + for (int i = 0; i < permString.length(); i++) { + switch (permString.charAt(i)) { + case 'r': + perm |= ZooDefs.Perms.READ; + break; + case 'w': + perm |= ZooDefs.Perms.WRITE; + break; + case 'c': + perm |= ZooDefs.Perms.CREATE; + break; + case 'd': + perm |= ZooDefs.Perms.DELETE; + break; + case 'a': + perm |= ZooDefs.Perms.ADMIN; + break; + default: + LOGGER.error("Unknown perm type: " + permString.charAt(i)); + } + } + return perm; + } + + /** + * Parse comma separated list of ACL entries to secure generated nodes, e.g. + * sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa + * @param aclString + * @return ACL list + */ + public static List parseACLs(String aclString) { + String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); + List acl = new ArrayList(aclComps.length); + for (String a : aclComps) { + if (StringUtils.isBlank(a)) { + continue; + } + a = a.trim(); + // from ZooKeeperMain private method + int firstColon = a.indexOf(':'); + int lastColon = a.lastIndexOf(':'); + if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { + LOGGER.error(a + " does not have the form scheme:id:perm"); + continue; + } + ACL newAcl = new ACL(); + newAcl.setId(new Id(a.substring(0, firstColon), a.substring( + firstColon + 1, lastColon))); + newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); + acl.add(newAcl); + } + return acl; + } + + private void initClientAndPaths() { + if (this.zkSession != null) { + this.zkSession.close(); + } + try { + ensurePath(rootNode + NODE_KEYS, newNodeAcl); + ensurePath(rootNode + NODE_TOKENS, newNodeAcl); + } catch (TokenStoreException e) { + throw e; + } + } + + @Override + public void setConf(Configuration conf) { + if (conf == null) { + throw new IllegalArgumentException("conf is null"); + } + this.conf = conf; + } + + @Override + public Configuration getConf() { + return null; // not required + } + + private Map getAllKeys() throws KeeperException, InterruptedException { + + String masterKeyNode = rootNode + NODE_KEYS; + + // get children of key node + List nodes = zkGetChildren(masterKeyNode); + + // read each child node, add to results + Map result = new HashMap(); + for (String node : nodes) { + String nodePath = masterKeyNode + "/" + node; + byte[] data = zkGetData(nodePath); + if (data != null) { + result.put(getSeq(node), data); + } + } + return result; + } + + private List zkGetChildren(String path) { + CuratorFramework zk = getSession(); + try { + return zk.getChildren().forPath(path); + } catch (Exception e) { + throw new TokenStoreException("Error getting children for " + path, e); + } + } + + private byte[] zkGetData(String nodePath) { + CuratorFramework zk = getSession(); + try { + return zk.getData().forPath(nodePath); + } catch (KeeperException.NoNodeException ex) { + return null; + } catch (Exception e) { + throw new TokenStoreException("Error reading " + nodePath, e); + } + } + + private int getSeq(String path) { + String[] pathComps = path.split("/"); + return Integer.parseInt(pathComps[pathComps.length-1]); + } + + @Override + public int addMasterKey(String s) { + String keysPath = rootNode + NODE_KEYS + "/"; + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) + .forPath(keysPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + keysPath, e); + } + LOGGER.info("Added key {}", newNode); + return getSeq(newNode); + } + + @Override + public void updateMasterKey(int keySeq, String s) { + CuratorFramework zk = getSession(); + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + try { + zk.setData().forPath(keyPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error setting data in " + keyPath, e); + } + } + + @Override + public boolean removeMasterKey(int keySeq) { + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + zkDelete(keyPath); + return true; + } + + private void zkDelete(String path) { + CuratorFramework zk = getSession(); + try { + zk.delete().forPath(path); + } catch (KeeperException.NoNodeException ex) { + // already deleted + } catch (Exception e) { + throw new TokenStoreException("Error deleting " + path, e); + } + } + + @Override + public String[] getMasterKeys() { + try { + Map allKeys = getAllKeys(); + String[] result = new String[allKeys.size()]; + int resultIdx = 0; + for (byte[] keyBytes : allKeys.values()) { + result[resultIdx++] = new String(keyBytes); + } + return result; + } catch (KeeperException ex) { + throw new TokenStoreException(ex); + } catch (InterruptedException ex) { + throw new TokenStoreException(ex); + } + } + + + private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { + try { + return rootNode + NODE_TOKENS + "/" + + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + } catch (IOException ex) { + throw new TokenStoreException("Failed to encode token identifier", ex); + } + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) { + byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token); + String tokenPath = getTokenPath(tokenIdentifier); + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) + .forPath(tokenPath, tokenBytes); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + tokenPath, e); + } + + LOGGER.info("Added token: {}", newNode); + return true; + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { + String tokenPath = getTokenPath(tokenIdentifier); + zkDelete(tokenPath); + return true; + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); + try { + return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); + } catch (Exception ex) { + throw new TokenStoreException("Failed to decode token", ex); + } + } + + @Override + public List getAllDelegationTokenIdentifiers() { + String containerNode = rootNode + NODE_TOKENS; + final List nodes = zkGetChildren(containerNode); + List result = new java.util.ArrayList( + nodes.size()); + for (String node : nodes) { + DelegationTokenIdentifier id = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); + result.add(id); + } catch (Exception e) { + LOGGER.warn("Failed to decode token '{}'", node); + } + } + return result; + } + + @Override + public void close() throws IOException { + if (this.zkSession != null) { + this.zkSession.close(); + } + } + + @Override + public void init(Object objectStore, ServerMode smode) { + this.serverMode = smode; + zkConnectString = + conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + // try alternate config param + zkConnectString = + conf.get( + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, + null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " + + "either " + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR + + " or " + + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE + + WHEN_ZK_DSTORE_MSG); + } + } + connectTimeoutMillis = + conf.getInt( + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, + CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); + String aclStr = conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); + if (StringUtils.isNotBlank(aclStr)) { + this.newNodeAcl = parseACLs(aclStr); + } + rootNode = + conf.get(HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, + HadoopThriftAuthBridge.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; + + try { + // Install the JAAS Configuration for the runtime + setupJAASConfig(conf); + } catch (IOException e) { + throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " + + e.getMessage(), e); + } + initClientAndPaths(); + } + +} diff --git a/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java b/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java deleted file mode 100644 index 83257f5..0000000 --- a/shims/common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenSelector.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.security.token.delegation; - -import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; - -/** - * A delegation token that is specialized for Hive - */ - -public class DelegationTokenSelector - extends AbstractDelegationTokenSelector{ - - public DelegationTokenSelector() { - super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND); - } -} diff --git a/shims/pom.xml b/shims/pom.xml index 48bb421..679b7c0 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -33,7 +33,6 @@ common - common-secure 0.20S 0.23 scheduler