Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5364

Rework JAAS configuration to support user-supplied entries

    Details

      Description

      Recent issues (see linked) have brought to light a critical deficiency in the handling of JAAS configuration.

      1. the MapR distribution relies on an explicit JAAS conf, rather than in-memory conf used by stock Hadoop.
      2. the ZK/Kafka/Hadoop security configuration is supposed to be independent (one can enable each element separately) but isn't.

      Perhaps we should rework the JAAS conf code to merge any user-supplied configuration with our defaults, rather than using an all-or-nothing approach.

      We should also address some recent regressions:

      1. The HadoopSecurityContext should be installed regardless of auth mode, to login with UserGroupInformation, which:

      • handles the HADOOP_USER_NAME variable.
      • installs an OS-specific user principal (from UnixLoginModule etc.) unrelated to Kerberos.
      • picks up the HDFS/HBASE delegation tokens.

      2. Fix the use of alternative authentication methods - delegation tokens and Kerberos ticket cache.

        Issue Links

          Activity

          Hide
          eronwright Eron Wright added a comment -

          Till Rohrmann Maximilian Michels Bringing this to your attention as a critical fix for Flink 1.2.

          Show
          eronwright Eron Wright added a comment - Till Rohrmann Maximilian Michels Bringing this to your attention as a critical fix for Flink 1.2.
          Hide
          eronwright Eron Wright added a comment -

          Here's a WIP to show how I plan to fix the problem. Basically this patch:
          a) makes it explicit how the login credentials are shared with connectors
          b) allows for the use of a user-suppled JAAS config (which fixes the MapR issue of FLINK-5055)
          c) decouples the various aspects into independent 'modules'

          https://github.com/EronWright/flink/pull/3

          Additional test work is needed.

          CC Vijay Srinivasaraghavan

          Show
          eronwright Eron Wright added a comment - Here's a WIP to show how I plan to fix the problem. Basically this patch: a) makes it explicit how the login credentials are shared with connectors b) allows for the use of a user-suppled JAAS config (which fixes the MapR issue of FLINK-5055 ) c) decouples the various aspects into independent 'modules' https://github.com/EronWright/flink/pull/3 Additional test work is needed. CC Vijay Srinivasaraghavan
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Hi Eron Wright , I think you're right and we should definitely try to fix it for the final 1.2 release. Great that you're looking into it.

          Do you want to open the PR against the Flink Github repository? I guess we can already make a first round of review work.

          Show
          till.rohrmann Till Rohrmann added a comment - Hi Eron Wright , I think you're right and we should definitely try to fix it for the final 1.2 release. Great that you're looking into it. Do you want to open the PR against the Flink Github repository? I guess we can already make a first round of review work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user EronWright opened a pull request:

          https://github.com/apache/flink/pull/3057

          FLINK-5364 Rework JAAS configuration to support user-supplied entries

          Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055

          CC @tillrohrmann

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/EronWright/flink feature-FLINK-5364-rebase

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3057.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3057


          commit 4acf43624c16627aaa89560c8361fe4bf9a19fa6
          Author: wrighe3 <eron.wright@emc.com>
          Date: 2016-12-20T09:07:38Z

          FLINK-5364 Rework JAAS configuration to support user-supplied entries

          Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055

          commit 2d56de9fe1da2e0ecdfd02498b71a8477e9295b3
          Author: wrighe3 <eron.wright@emc.com>
          Date: 2017-01-04T05:18:12Z

          FLINK-5364 Rework JAAS configuration to support user-supplied entries

          Minor fixes and doc changes.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/3057 FLINK-5364 Rework JAAS configuration to support user-supplied entries Fixes FLINK-5364 , FLINK-5361 , FLINK-5350 , FLINK-5055 CC @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink feature- FLINK-5364 -rebase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3057.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3057 commit 4acf43624c16627aaa89560c8361fe4bf9a19fa6 Author: wrighe3 <eron.wright@emc.com> Date: 2016-12-20T09:07:38Z FLINK-5364 Rework JAAS configuration to support user-supplied entries Fixes FLINK-5364 , FLINK-5361 , FLINK-5350 , FLINK-5055 commit 2d56de9fe1da2e0ecdfd02498b71a8477e9295b3 Author: wrighe3 <eron.wright@emc.com> Date: 2017-01-04T05:18:12Z FLINK-5364 Rework JAAS configuration to support user-supplied entries Minor fixes and doc changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3057

          @EronWright thanks a lot for this fix, I am looking though it now.

          From the description in the JIRAs, I take that this adds the logic that reads custom JAAS security configurations and uses Flink's internal one (Hadoop UGI based) for fallback default?

          This should be relevant for the `master` and the 1.2 release branch, correct?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3057 @EronWright thanks a lot for this fix, I am looking though it now. From the description in the JIRAs, I take that this adds the logic that reads custom JAAS security configurations and uses Flink's internal one (Hadoop UGI based) for fallback default? This should be relevant for the `master` and the 1.2 release branch, correct?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on the issue:

          https://github.com/apache/flink/pull/3057

          @StephanEwen yes, this PR does the following:
          1. isolate the configuration of kerberos credentials from how those credentials are used
          2. modularize the security code to reflect the independent aspects of configuring Hadoop vs. JAAS based on the credential.
          3. make explicit as to which JAAS login contexts are provided the cluster's credential
          4. incorporate the user-supplied JAAS (to satisfy the MapR scenario)
          5. update `config.md` and `internals/security.md`

          The main user impact is that one must explicitly share the credential with specific contexts - the 'KafkaClient' login context, ZooKeeper 'Client' context, etc, based on whether the corresponding service is kerberized. Earlier we had tried to always share the credential to all JAAS contexts, but this caused problems.

          With this patch, I believe the design goal is met of allowing Kerberos to be independently enabled for any external connection. For example, a Kerberized Kafka + a non-Kerberized Hadoop is a valid scenario.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3057 @StephanEwen yes, this PR does the following: 1. isolate the configuration of kerberos credentials from how those credentials are used 2. modularize the security code to reflect the independent aspects of configuring Hadoop vs. JAAS based on the credential. 3. make explicit as to which JAAS login contexts are provided the cluster's credential 4. incorporate the user-supplied JAAS (to satisfy the MapR scenario) 5. update `config.md` and `internals/security.md` The main user impact is that one must explicitly share the credential with specific contexts - the 'KafkaClient' login context, ZooKeeper 'Client' context, etc, based on whether the corresponding service is kerberized. Earlier we had tried to always share the credential to all JAAS contexts, but this caused problems. With this patch, I believe the design goal is met of allowing Kerberos to be independently enabled for any external connection. For example, a Kerberized Kafka + a non-Kerberized Hadoop is a valid scenario.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95009232

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java —
          @@ -71,163 +64,93 @@
          */
          public static void install(SecurityConfiguration config) throws Exception {

          • if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - }

            -

          • // establish the JAAS config
          • JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
          • javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
            -
          • populateSystemSecurityProperties(config.flinkConf);
            -
          • // establish the UGI login user
          • UserGroupInformation.setConfiguration(config.hadoopConf);
            -
          • // only configure Hadoop security if we have security enabled
          • if (UserGroupInformation.isSecurityEnabled()) {
            -
          • final UserGroupInformation loginUser;
            -
          • if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
          • String keytabPath = (new File(config.keytab)).getAbsolutePath();
            -
          • UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
            -
          • loginUser = UserGroupInformation.getLoginUser();
            -
          • // supplement with any available tokens
          • String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
          • if (fileLocation != null) {
          • /*
          • * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
          • * used in the context of reading the stored tokens from UGI.
          • * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
          • * loginUser.addCredentials(cred);
          • */
          • try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
          • }
          • } else {
          • // login with current user credentials (e.g. ticket cache)
          • try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
            -
          • // note that the stored tokens are read automatically
          • loginUser = UserGroupInformation.getLoginUser();
            + // install the security modules
            + List<SecurityModule> modules = new ArrayList();
            + try
            Unknown macro: { + for (Class<? extends SecurityModule> moduleClass }

            + catch(Exception ex)

            { + throw new Exception("unable to establish the security context", ex); + }

            + installedModules = modules;

          • LOG.info("Hadoop user set to {}", loginUser.toString());
            + // install a security context
            + // use the Hadoop login user as the subject of the installed security context
            + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + }

            + UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
            + installedContext = new HadoopSecurityContext(loginUser);
            + }

          • boolean delegationToken = false;
          • final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
          • Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
          • for (Token<? extends TokenIdentifier> token : usrTok) {
          • final Text id = new Text(token.getIdentifier());
          • LOG.debug("Found user token " + id + " with " + token);
          • if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
          • delegationToken = true;
            + static void uninstall() {
            + if(installedModules != null) {
            + for (SecurityModule module : Lists.reverse(installedModules)) {
            + try { + module.uninstall(); }
          • }
            -
          • if (!loginUser.hasKerberosCredentials()) {
          • //throw an error in non-yarn deployment if kerberos cache is not available
          • if (!delegationToken) {
          • LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
          • throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
            + catch(UnsupportedOperationException e) {
              • End diff –

          It seems uninstalling in not really supported by some modules. Why do they throw an exception if the exception is ignored anyways? Can they not simply do nothing, maybe log a warning? Does it make sense to add a method `supportsUninstall()` to `SecurityModule`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95009232 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java — @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - // establish the JAAS config JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - populateSystemSecurityProperties(config.flinkConf); - // establish the UGI login user UserGroupInformation.setConfiguration(config.hadoopConf); - // only configure Hadoop security if we have security enabled if (UserGroupInformation.isSecurityEnabled()) { - final UserGroupInformation loginUser; - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { String keytabPath = (new File(config.keytab)).getAbsolutePath(); - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - loginUser = UserGroupInformation.getLoginUser(); - // supplement with any available tokens String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); if (fileLocation != null) { /* * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); */ try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } } } else { // login with current user credentials (e.g. ticket cache) try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } - // note that the stored tokens are read automatically loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); + try Unknown macro: { + for (Class<? extends SecurityModule> moduleClass } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; LOG.info("Hadoop user set to {}", loginUser.toString()); + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } boolean delegationToken = false; final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); for (Token<? extends TokenIdentifier> token : usrTok) { final Text id = new Text(token.getIdentifier()); LOG.debug("Found user token " + id + " with " + token); if (token.getKind().equals(HDFS_DELEGATION_KIND)) { delegationToken = true; + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { + try { + module.uninstall(); } } - if (!loginUser.hasKerberosCredentials()) { //throw an error in non-yarn deployment if kerberos cache is not available if (!delegationToken) { LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(UnsupportedOperationException e) { End diff – It seems uninstalling in not really supported by some modules. Why do they throw an exception if the exception is ignored anyways? Can they not simply do nothing, maybe log a warning? Does it make sense to add a method `supportsUninstall()` to `SecurityModule`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r94997957

          — Diff: docs/internals/flink_security.md —
          @@ -24,64 +24,109 @@ specific language governing permissions and limitations
          under the License.
          -->

          -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN)
          -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers
          -who plans to run Flink on a secure environment.
          +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos),
          +filesystems, connectors, and state backends.

            1. Objective
              +The primary goals of the Flink Kerberos security infrastructure are:
              +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
              +2. to authenticate to ZooKeeper (if configured to use SASL)
              +3. to authenticate to Hadoop components (e.g. HDFS, HBase)

          -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario,
          -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure
          -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the
          -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
          +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure
          +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
          +or ticket cache entry.
          +
          +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
          +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster.

            1. How Flink Security works
              -Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI.
              -A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security
              -requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period
              -of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
              +In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort,
              +Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication:

          – Kafka (0.9)
          - Kafka (0.9)

          • HDFS
            +- HBase
          • ZooKeeper

          -Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
          -(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
          +Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable
          +Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of
          +Kerbreros credentials, which is then explicitly used by each component.
          +
          +The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
          +are installed at startup. The next section describes each security module.
          +
          +### Hadoop Security Module
          +This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide login user context. The login user is
          +then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
          +
          +If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise,
          +the login user conveys only the user identity of the OS account that launched the cluster.
          +
          +### JAAS Security Module
          +This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
          +Kafka, and other such components that rely on JAAS.
          +
          +Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html). Static entries override any
          +dynamic entries provided by this module.
          +
          +### ZooKeeper Security Module
          +This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
          +and the JAAS login context name (default: `Client`).
          +
          +## Security Configuration
          +
          +### Flink Configuration
          +The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
          +
          +- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
          +
          +A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
          +
          +- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
          +
          +- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
          +
          +These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS login contexts (or applications) are enabled with the following configuration option:
          +
          +- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).

          -Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login
          -module name to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is
          -instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
          +ZooKeeper-related configuration overrides:

          -It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled
          -then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
          +- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.

              1. Security Configurations
                +- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
                +one of the values specified in `security.kerberos.login.contexts`.

          -Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
          +### Hadoop Configuration

          – `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
          +The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.

          – `security.principal`: User principal name that the Flink cluster should run as.
          +Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop
          +delegation tokens (for HDFS and for HBase).

          -The delegation token mechanism (kinit cache) is still supported for backward compatibility but enabling security using keytab configuration is the preferred and recommended approach.
          +## Deployment Modes
          +Here is some information specific to each deployment mode.

              1. Standalone Mode:
                +### Standalone Mode

          Steps to run a secure Flink cluster in standalone/cluster mode:
          – Add security configurations to Flink configuration file (on all cluster nodes)
          – Make sure the Keytab file exist in the path as indicated in security.keytab configuration on all cluster nodes
          – Deploy Flink cluster using cluster start/stop scripts or CLI
          +1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
          +2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
          +3. Deploy Flink cluster as normal.

              1. Yarn Mode:
                +### YARN/Mesos Mode

          -Steps to run secure Flink cluster in Yarn mode:
          – Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI)
          – Make sure the Keytab file exist in the path as indicated in security.keytab configuration
          – Deploy Flink cluster using CLI
          +Steps to run a secure Flink cluster in YARN/Mesos mode:
          +1. Add security-related configuration options to the Flink configuration file on the client.
          +2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
          +3. Deploy Flink cluster as normal.

          -In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
          -Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a>
          +In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.

              1. Token Renewal
                +For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.

          -UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
          \ No newline at end of file
          +## Further Details
          +### Ticket Renewal
          +Each component that uses Kerberos is independently responsible for renewing the Kerberos TGT. Hadoop, ZooKeeper, and Kafka all do so,
          — End diff –

          I would add a sentence here that this requires specifying a keytab to work. Hadoop credentials will expire is only ticket cache / delegation tokens are used.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r94997957 — Diff: docs/internals/flink_security.md — @@ -24,64 +24,109 @@ specific language governing permissions and limitations under the License. --> -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers -who plans to run Flink on a secure environment. +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), +filesystems, connectors, and state backends. Objective +The primary goals of the Flink Kerberos security infrastructure are: +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka) +2. to authenticate to ZooKeeper (if configured to use SASL) +3. to authenticate to Hadoop components (e.g. HDFS, HBase) -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token +or ticket cache entry. + +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. How Flink Security works -Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. -A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security -requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period -of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security. +In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort, +Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication: – Kafka (0.9) - Kafka (0.9 ) HDFS +- HBase ZooKeeper -Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation -(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context. +Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable +Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of +Kerbreros credentials, which is then explicitly used by each component. + +The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which +are installed at startup. The next section describes each security module. + +### Hadoop Security Module +This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide login user context. The login user is +then used for all interactions with Hadoop, including HDFS, HBase, and YARN. + +If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise, +the login user conveys only the user identity of the OS account that launched the cluster. + +### JAAS Security Module +This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper, +Kafka, and other such components that rely on JAAS. + +Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation] ( http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html ). Static entries override any +dynamic entries provided by this module. + +### ZooKeeper Security Module +This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`) +and the JAAS login context name (default: `Client`). + +## Security Configuration + +### Flink Configuration +The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option: + +- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`). + +A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file: + +- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials. + +- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab. + +These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS login contexts (or applications ) are enabled with the following configuration option: + +- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication). -Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login -module name to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is -instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism. +ZooKeeper-related configuration overrides: -It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled -then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster. +- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server. Security Configurations +- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match +one of the values specified in `security.kerberos.login.contexts`. -Secure credentials can be supplied by adding below configuration elements to Flink configuration file: +### Hadoop Configuration – `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret. +The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled. – `security.principal`: User principal name that the Flink cluster should run as. +Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop +delegation tokens (for HDFS and for HBase). -The delegation token mechanism ( kinit cache ) is still supported for backward compatibility but enabling security using keytab configuration is the preferred and recommended approach. +## Deployment Modes +Here is some information specific to each deployment mode. Standalone Mode: +### Standalone Mode Steps to run a secure Flink cluster in standalone/cluster mode: – Add security configurations to Flink configuration file (on all cluster nodes) – Make sure the Keytab file exist in the path as indicated in security.keytab configuration on all cluster nodes – Deploy Flink cluster using cluster start/stop scripts or CLI +1. Add security-related configuration options to the Flink configuration file (on all cluster nodes). +2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes. +3. Deploy Flink cluster as normal. Yarn Mode: +### YARN/Mesos Mode -Steps to run secure Flink cluster in Yarn mode: – Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI) – Make sure the Keytab file exist in the path as indicated in security.keytab configuration – Deploy Flink cluster using CLI +Steps to run a secure Flink cluster in YARN/Mesos mode: +1. Add security-related configuration options to the Flink configuration file on the client. +2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node. +3. Deploy Flink cluster as normal. -In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file. -Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a> +In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers. Token Renewal +For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation. -UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink. \ No newline at end of file +## Further Details +### Ticket Renewal +Each component that uses Kerberos is independently responsible for renewing the Kerberos TGT. Hadoop, ZooKeeper, and Kafka all do so, — End diff – I would add a sentence here that this requires specifying a keytab to work. Hadoop credentials will expire is only ticket cache / delegation tokens are used.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95009675

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java —
          @@ -0,0 +1,120 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.security.modules;
          +
          +import org.apache.commons.lang3.StringUtils;
          +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
          +import org.apache.flink.runtime.security.SecurityUtils;
          +import org.apache.hadoop.security.Credentials;
          +import org.apache.hadoop.security.UserGroupInformation;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +import javax.security.auth.Subject;
          +import java.io.File;
          +import java.lang.reflect.InvocationTargetException;
          +import java.lang.reflect.Method;
          +
          +/**
          + * Responsible for installing a Hadoop login user.
          + */
          +public class HadoopModule implements SecurityModule {
          +
          + private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);
          +
          + UserGroupInformation loginUser;
          +
          + @Override
          + public void install(SecurityUtils.SecurityConfiguration securityConfig) {
          +
          + UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
          +
          + try {
          + if (UserGroupInformation.isSecurityEnabled() &&
          + !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
          + String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
          +
          + UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
          +
          + loginUser = UserGroupInformation.getLoginUser();
          +
          + // supplement with any available tokens
          + String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
          + if (fileLocation != null) {
          + /*
          + * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
          + * used in the context of reading the stored tokens from UGI.
          + * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
          + * loginUser.addCredentials(cred);
          + */
          + try

          { + Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", + File.class, org.apache.hadoop.conf.Configuration.class); + Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), + securityConfig.getHadoopConfiguration()); + Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", + Credentials.class); + addCredentialsMethod.invoke(loginUser, cred); + }

          catch (NoSuchMethodException e) {
          + LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          + } catch (InvocationTargetException e)

          { + throw e.getTargetException(); + }

          + }
          + } else {
          + // login with current user credentials (e.g. ticket cache, OS login)
          + // note that the stored tokens are read automatically
          + try

          { + //Use reflection API to get the login user object + //UserGroupInformation.loginUserFromSubject(null); + Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); + Subject subject = null; + loginUserFromSubjectMethod.invoke(null, subject); + }

          catch (NoSuchMethodException e) {
          + LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          + } catch (InvocationTargetException e)

          { + throw e.getTargetException(); + }

          +
          + loginUser = UserGroupInformation.getLoginUser();
          + }
          +
          + if (UserGroupInformation.isSecurityEnabled()) {
          + // note: UGI::hasKerberosCredentials inaccurately reports false
          + // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
          + // so we check only in ticket cache scenario.
          + if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) {
          + // a delegation token is an adequate substitute in most cases
          + if (!HadoopUtils.hasHDFSDelegationToken())

          { + LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials"); + }

          + }
          + }
          +
          + LOG.info("Hadoop user set to {}", loginUser);
          +
          + } catch (Throwable ex) {
          + throw new RuntimeException("Hadoop login failure", ex);
          — End diff –

          I think throwing `RuntimeException` is sort of an antipattern. In this case here, the exception is not a "programming error" type of exception and should be explicitly declared and handled - calling methods should be aware that this method can fail and decide how to handle that. I would declare a `SecurityException` or so in the signature.

          (parts of the existing Flink code are also guilty of doing that)

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95009675 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java — @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * Responsible for installing a Hadoop login user. + */ +public class HadoopModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class); + + UserGroupInformation loginUser; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) { + + UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration()); + + try { + if (UserGroupInformation.isSecurityEnabled() && + !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) { + String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath(); + + UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath); + + loginUser = UserGroupInformation.getLoginUser(); + + // supplement with any available tokens + String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + if (fileLocation != null) { + /* + * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are + * used in the context of reading the stored tokens from UGI. + * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); + * loginUser.addCredentials(cred); + */ + try { + Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", + File.class, org.apache.hadoop.conf.Configuration.class); + Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), + securityConfig.getHadoopConfiguration()); + Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", + Credentials.class); + addCredentialsMethod.invoke(loginUser, cred); + } catch (NoSuchMethodException e) { + LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + } + } else { + // login with current user credentials (e.g. ticket cache, OS login) + // note that the stored tokens are read automatically + try { + //Use reflection API to get the login user object + //UserGroupInformation.loginUserFromSubject(null); + Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); + Subject subject = null; + loginUserFromSubjectMethod.invoke(null, subject); + } catch (NoSuchMethodException e) { + LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + + loginUser = UserGroupInformation.getLoginUser(); + } + + if (UserGroupInformation.isSecurityEnabled()) { + // note: UGI::hasKerberosCredentials inaccurately reports false + // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786 ), + // so we check only in ticket cache scenario. + if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) { + // a delegation token is an adequate substitute in most cases + if (!HadoopUtils.hasHDFSDelegationToken()) { + LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials"); + } + } + } + + LOG.info("Hadoop user set to {}", loginUser); + + } catch (Throwable ex) { + throw new RuntimeException("Hadoop login failure", ex); — End diff – I think throwing `RuntimeException` is sort of an antipattern. In this case here, the exception is not a "programming error" type of exception and should be explicitly declared and handled - calling methods should be aware that this method can fail and decide how to handle that. I would declare a `SecurityException` or so in the signature. (parts of the existing Flink code are also guilty of doing that)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95007256

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java —
          @@ -71,163 +64,93 @@
          */
          public static void install(SecurityConfiguration config) throws Exception {

          • if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - }

            -

          • // establish the JAAS config
          • JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
          • javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
            -
          • populateSystemSecurityProperties(config.flinkConf);
            -
          • // establish the UGI login user
          • UserGroupInformation.setConfiguration(config.hadoopConf);
            -
          • // only configure Hadoop security if we have security enabled
          • if (UserGroupInformation.isSecurityEnabled()) {
            -
          • final UserGroupInformation loginUser;
            -
          • if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
          • String keytabPath = (new File(config.keytab)).getAbsolutePath();
            -
          • UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
            -
          • loginUser = UserGroupInformation.getLoginUser();
            -
          • // supplement with any available tokens
          • String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
          • if (fileLocation != null) {
          • /*
          • * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
          • * used in the context of reading the stored tokens from UGI.
          • * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
          • * loginUser.addCredentials(cred);
          • */
          • try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
          • }
          • } else {
          • // login with current user credentials (e.g. ticket cache)
          • try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
            -
          • // note that the stored tokens are read automatically
          • loginUser = UserGroupInformation.getLoginUser();
            + // install the security modules
            + List<SecurityModule> modules = new ArrayList();
            + try
            Unknown macro: { + for (Class<? extends SecurityModule> moduleClass }

            + catch(Exception ex)

            { + throw new Exception("unable to establish the security context", ex); + }

            + installedModules = modules;

          • LOG.info("Hadoop user set to {}", loginUser.toString());
            + // install a security context
            + // use the Hadoop login user as the subject of the installed security context
            + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + }

            + UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
            + installedContext = new HadoopSecurityContext(loginUser);
            + }

          • boolean delegationToken = false;
          • final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
          • Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
          • for (Token<? extends TokenIdentifier> token : usrTok) {
          • final Text id = new Text(token.getIdentifier());
          • LOG.debug("Found user token " + id + " with " + token);
          • if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
          • delegationToken = true;
            + static void uninstall() {
            + if(installedModules != null) {
            + for (SecurityModule module : Lists.reverse(installedModules)) {
            + try { + module.uninstall(); }
          • }
            -
          • if (!loginUser.hasKerberosCredentials()) {
          • //throw an error in non-yarn deployment if kerberos cache is not available
          • if (!delegationToken) {
          • LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
          • throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
            + catch(UnsupportedOperationException e) {
              • End diff –

          Can you call this `catch (UnsupportedOperationException ignored) {`? Helps to avoid some warnings.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95007256 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java — @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - // establish the JAAS config JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - populateSystemSecurityProperties(config.flinkConf); - // establish the UGI login user UserGroupInformation.setConfiguration(config.hadoopConf); - // only configure Hadoop security if we have security enabled if (UserGroupInformation.isSecurityEnabled()) { - final UserGroupInformation loginUser; - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { String keytabPath = (new File(config.keytab)).getAbsolutePath(); - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - loginUser = UserGroupInformation.getLoginUser(); - // supplement with any available tokens String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); if (fileLocation != null) { /* * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); */ try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } } } else { // login with current user credentials (e.g. ticket cache) try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } - // note that the stored tokens are read automatically loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); + try Unknown macro: { + for (Class<? extends SecurityModule> moduleClass } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; LOG.info("Hadoop user set to {}", loginUser.toString()); + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } boolean delegationToken = false; final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); for (Token<? extends TokenIdentifier> token : usrTok) { final Text id = new Text(token.getIdentifier()); LOG.debug("Found user token " + id + " with " + token); if (token.getKind().equals(HDFS_DELEGATION_KIND)) { delegationToken = true; + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { + try { + module.uninstall(); } } - if (!loginUser.hasKerberosCredentials()) { //throw an error in non-yarn deployment if kerberos cache is not available if (!delegationToken) { LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(UnsupportedOperationException e) { End diff – Can you call this `catch (UnsupportedOperationException ignored) {`? Helps to avoid some warnings.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r94994079

          — Diff: docs/internals/flink_security.md —
          @@ -24,64 +24,109 @@ specific language governing permissions and limitations
          under the License.
          -->

          -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN)
          -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers
          -who plans to run Flink on a secure environment.
          +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos),
          +filesystems, connectors, and state backends.

            1. Objective
              +The primary goals of the Flink Kerberos security infrastructure are:
              +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
              +2. to authenticate to ZooKeeper (if configured to use SASL)
              +3. to authenticate to Hadoop components (e.g. HDFS, HBase)

          -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario,
          -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure
          -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the
          -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
          +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure
          +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
          +or ticket cache entry.
          +
          +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
          +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster.
          — End diff –

          Maybe point out here that this refers to a "Flink Cluster" (a set of JobManager/TaskManager processes). One can run different jobs with different credentials next to each other in YARN by starting different per-job-clusters or Yarn/Mesos sessions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r94994079 — Diff: docs/internals/flink_security.md — @@ -24,64 +24,109 @@ specific language governing permissions and limitations under the License. --> -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers -who plans to run Flink on a secure environment. +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), +filesystems, connectors, and state backends. Objective +The primary goals of the Flink Kerberos security infrastructure are: +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka) +2. to authenticate to ZooKeeper (if configured to use SASL) +3. to authenticate to Hadoop components (e.g. HDFS, HBase) -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token +or ticket cache entry. + +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. — End diff – Maybe point out here that this refers to a "Flink Cluster" (a set of JobManager/TaskManager processes). One can run different jobs with different credentials next to each other in YARN by starting different per-job-clusters or Yarn/Mesos sessions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95008182

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java —
          @@ -71,163 +64,93 @@
          */
          public static void install(SecurityConfiguration config) throws Exception {

          • if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - }

            -

          • // establish the JAAS config
          • JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
          • javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
            -
          • populateSystemSecurityProperties(config.flinkConf);
            -
          • // establish the UGI login user
          • UserGroupInformation.setConfiguration(config.hadoopConf);
            -
          • // only configure Hadoop security if we have security enabled
          • if (UserGroupInformation.isSecurityEnabled()) {
            -
          • final UserGroupInformation loginUser;
            -
          • if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
          • String keytabPath = (new File(config.keytab)).getAbsolutePath();
            -
          • UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
            -
          • loginUser = UserGroupInformation.getLoginUser();
            -
          • // supplement with any available tokens
          • String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
          • if (fileLocation != null) {
          • /*
          • * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
          • * used in the context of reading the stored tokens from UGI.
          • * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
          • * loginUser.addCredentials(cred);
          • */
          • try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
          • }
          • } else {
          • // login with current user credentials (e.g. ticket cache)
          • try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
            -
          • // note that the stored tokens are read automatically
          • loginUser = UserGroupInformation.getLoginUser();
            + // install the security modules
            + List<SecurityModule> modules = new ArrayList();
            + try
            Unknown macro: { + for (Class<? extends SecurityModule> moduleClass }

            + catch(Exception ex)

            { + throw new Exception("unable to establish the security context", ex); + }

            + installedModules = modules;

          • LOG.info("Hadoop user set to {}", loginUser.toString());
            + // install a security context
            + // use the Hadoop login user as the subject of the installed security context
            + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + }

            + UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
            + installedContext = new HadoopSecurityContext(loginUser);
            + }

          • boolean delegationToken = false;
          • final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
          • Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
          • for (Token<? extends TokenIdentifier> token : usrTok) {
          • final Text id = new Text(token.getIdentifier());
          • LOG.debug("Found user token " + id + " with " + token);
          • if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
          • delegationToken = true;
            + static void uninstall() {
            + if(installedModules != null) {
            + for (SecurityModule module : Lists.reverse(installedModules)) {
            + try { + module.uninstall(); }
          • }
            -
          • if (!loginUser.hasKerberosCredentials()) {
          • //throw an error in non-yarn deployment if kerberos cache is not available
          • if (!delegationToken) {
          • LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
          • throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
            + catch(UnsupportedOperationException e) {
            }
            }
            -
          • if (!(installedContext instanceof NoOpSecurityContext)) { - LOG.warn("overriding previous security context"); - }

            -

          • installedContext = new HadoopSecurityContext(loginUser);
            + installedModules = null;
            }
          • }
          • static void clearContext() { installedContext = new NoOpSecurityContext(); }
          • boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
            + private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList(
            + new ArrayList<Class<? extends SecurityModule>>() {{
            + add(HadoopModule.class);
            + add(JaasModule.class);
            + add(ZooKeeperModule.class);
            + }});
          • if (disableSaslClient) { - LOG.info("SASL client auth for ZK will be disabled"); - //SASL auth is disabled by default but will be enabled if specified in configuration - System.setProperty(ZOOKEEPER_SASL_CLIENT,"false"); - return; - }

            + private final org.apache.hadoop.conf.Configuration hadoopConf;

          • // load Jaas config file to initialize SASL
          • final File jaasConfFile;
          • try { - Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, ""); - InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME); - Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); - jaasConfFile = jaasConfPath.toFile(); - jaasConfFile.deleteOnExit(); - jaasConfStream.close(); - }

            catch (IOException e)

            { - throw new RuntimeException("SASL auth is enabled for ZK but unable to " + - "locate pseudo Jaas config provided with Flink", e); - }

            + private final boolean useTicketCache;

          • LOG.info("Enabling {} property with pseudo JAAS config file: {}",
          • JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath());
            + private final String keytab;
          • String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
          • if (!StringUtils.isBlank(zkSaslServiceName)) {
          • LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
          • System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
          • }
            + private List<String> loginContextNames;
          • }
            + private String zkServiceName;
              • End diff –

          Some fields are finals others not. Can all be final here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95008182 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java — @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - // establish the JAAS config JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - populateSystemSecurityProperties(config.flinkConf); - // establish the UGI login user UserGroupInformation.setConfiguration(config.hadoopConf); - // only configure Hadoop security if we have security enabled if (UserGroupInformation.isSecurityEnabled()) { - final UserGroupInformation loginUser; - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { String keytabPath = (new File(config.keytab)).getAbsolutePath(); - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - loginUser = UserGroupInformation.getLoginUser(); - // supplement with any available tokens String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); if (fileLocation != null) { /* * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); */ try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } } } else { // login with current user credentials (e.g. ticket cache) try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } - // note that the stored tokens are read automatically loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); + try Unknown macro: { + for (Class<? extends SecurityModule> moduleClass } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; LOG.info("Hadoop user set to {}", loginUser.toString()); + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } boolean delegationToken = false; final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); for (Token<? extends TokenIdentifier> token : usrTok) { final Text id = new Text(token.getIdentifier()); LOG.debug("Found user token " + id + " with " + token); if (token.getKind().equals(HDFS_DELEGATION_KIND)) { delegationToken = true; + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { + try { + module.uninstall(); } } - if (!loginUser.hasKerberosCredentials()) { //throw an error in non-yarn deployment if kerberos cache is not available if (!delegationToken) { LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(UnsupportedOperationException e) { } } - if (!(installedContext instanceof NoOpSecurityContext)) { - LOG.warn("overriding previous security context"); - } - installedContext = new HadoopSecurityContext(loginUser); + installedModules = null; } } static void clearContext() { installedContext = new NoOpSecurityContext(); } /* * This method configures some of the system properties that are require for ZK and Kafka SASL authentication * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and * Kafka current code behavior. + /** + * The global security configuration. + * + * See {@link SecurityOptions} for corresponding configuration options. */ private static void populateSystemSecurityProperties(Configuration configuration) { Preconditions.checkNotNull(configuration, "The supplied configuration was null"); + public static class SecurityConfiguration { boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE); + private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList( + new ArrayList<Class<? extends SecurityModule>>() {{ + add(HadoopModule.class); + add(JaasModule.class); + add(ZooKeeperModule.class); + }}); if (disableSaslClient) { - LOG.info("SASL client auth for ZK will be disabled"); - //SASL auth is disabled by default but will be enabled if specified in configuration - System.setProperty(ZOOKEEPER_SASL_CLIENT,"false"); - return; - } + private final org.apache.hadoop.conf.Configuration hadoopConf; // load Jaas config file to initialize SASL final File jaasConfFile; try { - Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, ""); - InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME); - Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); - jaasConfFile = jaasConfPath.toFile(); - jaasConfFile.deleteOnExit(); - jaasConfStream.close(); - } catch (IOException e) { - throw new RuntimeException("SASL auth is enabled for ZK but unable to " + - "locate pseudo Jaas config provided with Flink", e); - } + private final boolean useTicketCache; LOG.info("Enabling {} property with pseudo JAAS config file: {}", JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); + private final String keytab; //ZK client module lookup the configuration to handle SASL. // https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); System.setProperty(ZOOKEEPER_SASL_CLIENT, "true"); + private final String principal; String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME); if (!StringUtils.isBlank(zkSaslServiceName)) { LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName); System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName); } + private List<String> loginContextNames; } + private String zkServiceName; End diff – Some fields are finals others not. Can all be final here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95007601

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java —
          @@ -71,163 +64,93 @@
          */
          public static void install(SecurityConfiguration config) throws Exception {

          • if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - }

            -

          • // establish the JAAS config
          • JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
          • javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
            -
          • populateSystemSecurityProperties(config.flinkConf);
            -
          • // establish the UGI login user
          • UserGroupInformation.setConfiguration(config.hadoopConf);
            -
          • // only configure Hadoop security if we have security enabled
          • if (UserGroupInformation.isSecurityEnabled()) {
            -
          • final UserGroupInformation loginUser;
            -
          • if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
          • String keytabPath = (new File(config.keytab)).getAbsolutePath();
            -
          • UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
            -
          • loginUser = UserGroupInformation.getLoginUser();
            -
          • // supplement with any available tokens
          • String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
          • if (fileLocation != null) {
          • /*
          • * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
          • * used in the context of reading the stored tokens from UGI.
          • * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
          • * loginUser.addCredentials(cred);
          • */
          • try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
          • }
          • } else {
          • // login with current user credentials (e.g. ticket cache)
          • try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
            -
          • // note that the stored tokens are read automatically
          • loginUser = UserGroupInformation.getLoginUser();
            + // install the security modules
            + List<SecurityModule> modules = new ArrayList();
            + try
            Unknown macro: { + for (Class<? extends SecurityModule> moduleClass }

            + catch(Exception ex)

            { + throw new Exception("unable to establish the security context", ex); + }

            + installedModules = modules;

          • LOG.info("Hadoop user set to {}", loginUser.toString());
            + // install a security context
            + // use the Hadoop login user as the subject of the installed security context
            + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + }

            + UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
            + installedContext = new HadoopSecurityContext(loginUser);
            + }

          • boolean delegationToken = false;
          • final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
          • Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
          • for (Token<? extends TokenIdentifier> token : usrTok) {
          • final Text id = new Text(token.getIdentifier());
          • LOG.debug("Found user token " + id + " with " + token);
          • if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
          • delegationToken = true;
            + static void uninstall() {
            + if(installedModules != null) {
            + for (SecurityModule module : Lists.reverse(installedModules)) {
            + try { + module.uninstall(); }
          • }
            -
          • if (!loginUser.hasKerberosCredentials()) {
          • //throw an error in non-yarn deployment if kerberos cache is not available
          • if (!delegationToken) {
          • LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
          • throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
            + catch(UnsupportedOperationException e) {
            }
            }
            -
          • if (!(installedContext instanceof NoOpSecurityContext)) { - LOG.warn("overriding previous security context"); - }

            -

          • installedContext = new HadoopSecurityContext(loginUser);
            + installedModules = null;
            }
          • }
          • static void clearContext() { installedContext = new NoOpSecurityContext(); }
          • boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
            + private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList(
            + new ArrayList<Class<? extends SecurityModule>>() {{
              • End diff –

          I think you can use `Arrays.asList(...)` here, instead of creating an anonymous subclass.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95007601 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java — @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - // establish the JAAS config JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - populateSystemSecurityProperties(config.flinkConf); - // establish the UGI login user UserGroupInformation.setConfiguration(config.hadoopConf); - // only configure Hadoop security if we have security enabled if (UserGroupInformation.isSecurityEnabled()) { - final UserGroupInformation loginUser; - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { String keytabPath = (new File(config.keytab)).getAbsolutePath(); - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - loginUser = UserGroupInformation.getLoginUser(); - // supplement with any available tokens String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); if (fileLocation != null) { /* * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); */ try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } } } else { // login with current user credentials (e.g. ticket cache) try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } - // note that the stored tokens are read automatically loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); + try Unknown macro: { + for (Class<? extends SecurityModule> moduleClass } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; LOG.info("Hadoop user set to {}", loginUser.toString()); + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } boolean delegationToken = false; final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); for (Token<? extends TokenIdentifier> token : usrTok) { final Text id = new Text(token.getIdentifier()); LOG.debug("Found user token " + id + " with " + token); if (token.getKind().equals(HDFS_DELEGATION_KIND)) { delegationToken = true; + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { + try { + module.uninstall(); } } - if (!loginUser.hasKerberosCredentials()) { //throw an error in non-yarn deployment if kerberos cache is not available if (!delegationToken) { LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(UnsupportedOperationException e) { } } - if (!(installedContext instanceof NoOpSecurityContext)) { - LOG.warn("overriding previous security context"); - } - installedContext = new HadoopSecurityContext(loginUser); + installedModules = null; } } static void clearContext() { installedContext = new NoOpSecurityContext(); } /* * This method configures some of the system properties that are require for ZK and Kafka SASL authentication * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and * Kafka current code behavior. + /** + * The global security configuration. + * + * See {@link SecurityOptions} for corresponding configuration options. */ private static void populateSystemSecurityProperties(Configuration configuration) { Preconditions.checkNotNull(configuration, "The supplied configuration was null"); + public static class SecurityConfiguration { boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE); + private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList( + new ArrayList<Class<? extends SecurityModule>>() {{ End diff – I think you can use `Arrays.asList(...)` here, instead of creating an anonymous subclass.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95007014

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java —
          @@ -71,163 +64,93 @@
          */
          public static void install(SecurityConfiguration config) throws Exception {

          • if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - }

            -

          • // establish the JAAS config
          • JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
          • javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
            -
          • populateSystemSecurityProperties(config.flinkConf);
            -
          • // establish the UGI login user
          • UserGroupInformation.setConfiguration(config.hadoopConf);
            -
          • // only configure Hadoop security if we have security enabled
          • if (UserGroupInformation.isSecurityEnabled()) {
            -
          • final UserGroupInformation loginUser;
            -
          • if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
          • String keytabPath = (new File(config.keytab)).getAbsolutePath();
            -
          • UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
            -
          • loginUser = UserGroupInformation.getLoginUser();
            -
          • // supplement with any available tokens
          • String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
          • if (fileLocation != null) {
          • /*
          • * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
          • * used in the context of reading the stored tokens from UGI.
          • * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
          • * loginUser.addCredentials(cred);
          • */
          • try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
          • }
          • } else {
          • // login with current user credentials (e.g. ticket cache)
          • try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
            -
          • // note that the stored tokens are read automatically
          • loginUser = UserGroupInformation.getLoginUser();
            + // install the security modules
            + List<SecurityModule> modules = new ArrayList();
              • End diff –

          Can you use `new ArrayList<>()` here? In general, it would be nice to make a pass over the code with warnings according to generics and serializability enabled. I get a lot of warnings printed when compiling this. Minimizing these kinds of warnings helps to spot the warnings that inform about actual subtle bugs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95007014 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java — @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - // establish the JAAS config JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - populateSystemSecurityProperties(config.flinkConf); - // establish the UGI login user UserGroupInformation.setConfiguration(config.hadoopConf); - // only configure Hadoop security if we have security enabled if (UserGroupInformation.isSecurityEnabled()) { - final UserGroupInformation loginUser; - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { String keytabPath = (new File(config.keytab)).getAbsolutePath(); - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - loginUser = UserGroupInformation.getLoginUser(); - // supplement with any available tokens String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); if (fileLocation != null) { /* * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); */ try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } } } else { // login with current user credentials (e.g. ticket cache) try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } - // note that the stored tokens are read automatically loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); End diff – Can you use `new ArrayList<>()` here? In general, it would be nice to make a pass over the code with warnings according to generics and serializability enabled. I get a lot of warnings printed when compiling this. Minimizing these kinds of warnings helps to spot the warnings that inform about actual subtle bugs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95010038

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java —
          @@ -0,0 +1,147 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.security.modules;
          +
          +import org.apache.flink.annotation.Internal;
          +import org.apache.flink.runtime.security.DynamicConfiguration;
          +import org.apache.flink.runtime.security.KerberosUtils;
          +import org.apache.flink.runtime.security.SecurityUtils;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +import javax.security.auth.login.AppConfigurationEntry;
          +import java.io.File;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.file.Files;
          +import java.nio.file.Path;
          +import java.nio.file.StandardCopyOption;
          +
          +/**
          + * Responsible for installing a process-wide JAAS configuration.
          + * <p>
          + * The installed configuration combines login modules based on:
          + * - the user-supplied JAAS configuration file, if any
          + * - a Kerberos keytab, if configured
          + * - any cached Kerberos credentials from the current environment
          + * <p>
          + * The module also installs a default JAAS config file (if necessary) for
          + * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations.
          + * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
          + * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
          + */
          +@Internal
          +public class JaasModule implements SecurityModule {
          +
          + private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class);
          +
          + static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
          +
          + static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
          +
          + private String priorConfigFile;
          + private javax.security.auth.login.Configuration priorConfig;
          +
          + private DynamicConfiguration currentConfig;
          +
          + @Override
          + public void install(SecurityUtils.SecurityConfiguration securityConfig) {
          +
          + // ensure that a config file is always defined, for compatibility with
          + // ZK and Kafka which check for the system property and existence of the file
          + priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
          + if (priorConfigFile == null)

          { + File configFile = generateDefaultConfigFile(); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath()); + }

          +
          + // read the JAAS configuration file
          + priorConfig = javax.security.auth.login.Configuration.getConfiguration();
          +
          + // construct a dynamic JAAS configuration
          + currentConfig = new DynamicConfiguration(priorConfig);
          +
          + // wire up the configured JAAS login contexts to use the krb5 entries
          + AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
          + if(krb5Entries != null) {
          + for (String app : securityConfig.getLoginContextNames())

          { + currentConfig.addAppConfigurationEntry(app, krb5Entries); + }

          + }
          +
          + javax.security.auth.login.Configuration.setConfiguration(currentConfig);
          + }
          +
          + @Override
          + public void uninstall() {
          + if(priorConfigFile != null)

          { + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile); + }

          else

          { + System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG); + }

          + javax.security.auth.login.Configuration.setConfiguration(priorConfig);
          + }
          +
          + public DynamicConfiguration getCurrentConfiguration()

          { + return currentConfig; + }

          +
          + private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) {
          +
          + AppConfigurationEntry userKerberosAce = null;
          + if (securityConfig.useTicketCache())

          { + userKerberosAce = KerberosUtils.ticketCacheEntry(); + }

          + AppConfigurationEntry keytabKerberosAce = null;
          + if (securityConfig.getKeytab() != null)

          { + keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal()); + }

          +
          + AppConfigurationEntry[] appConfigurationEntry;
          + if (userKerberosAce != null && keytabKerberosAce != null) {
          + appConfigurationEntry = new AppConfigurationEntry[]

          {keytabKerberosAce, userKerberosAce}

          ;
          + } else if (keytabKerberosAce != null) {
          + appConfigurationEntry = new AppConfigurationEntry[]

          {keytabKerberosAce}

          ;
          + } else if (userKerberosAce != null) {
          + appConfigurationEntry = new AppConfigurationEntry[]

          {userKerberosAce}

          ;
          + } else

          { + return null; + }

          +
          + return appConfigurationEntry;
          + }
          +
          + /**
          + * Generate the default JAAS config file.
          + */
          + private static File generateDefaultConfigFile() {
          + // load Jaas config file to initialize SASL
          + final File jaasConfFile;
          + try {
          + Path jaasConfPath = Files.createTempFile("jaas-", ".conf");
          + try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME))

          { + Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); + }

          + jaasConfFile = jaasConfPath.toFile();
          + jaasConfFile.deleteOnExit();
          + } catch (IOException e) {
          + throw new RuntimeException("unable to generate a JAAS configuration file", e);
          — End diff –

          See above - why not declare that this method may throw an `IOException`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95010038 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java — @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.DynamicConfiguration; +import org.apache.flink.runtime.security.KerberosUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Responsible for installing a process-wide JAAS configuration. + * <p> + * The installed configuration combines login modules based on: + * - the user-supplied JAAS configuration file, if any + * - a Kerberos keytab, if configured + * - any cached Kerberos credentials from the current environment + * <p> + * The module also installs a default JAAS config file (if necessary) for + * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations. + * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html + * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + */ +@Internal +public class JaasModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class); + + static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; + + static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf"; + + private String priorConfigFile; + private javax.security.auth.login.Configuration priorConfig; + + private DynamicConfiguration currentConfig; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) { + + // ensure that a config file is always defined, for compatibility with + // ZK and Kafka which check for the system property and existence of the file + priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null); + if (priorConfigFile == null) { + File configFile = generateDefaultConfigFile(); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath()); + } + + // read the JAAS configuration file + priorConfig = javax.security.auth.login.Configuration.getConfiguration(); + + // construct a dynamic JAAS configuration + currentConfig = new DynamicConfiguration(priorConfig); + + // wire up the configured JAAS login contexts to use the krb5 entries + AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig); + if(krb5Entries != null) { + for (String app : securityConfig.getLoginContextNames()) { + currentConfig.addAppConfigurationEntry(app, krb5Entries); + } + } + + javax.security.auth.login.Configuration.setConfiguration(currentConfig); + } + + @Override + public void uninstall() { + if(priorConfigFile != null) { + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile); + } else { + System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG); + } + javax.security.auth.login.Configuration.setConfiguration(priorConfig); + } + + public DynamicConfiguration getCurrentConfiguration() { + return currentConfig; + } + + private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) { + + AppConfigurationEntry userKerberosAce = null; + if (securityConfig.useTicketCache()) { + userKerberosAce = KerberosUtils.ticketCacheEntry(); + } + AppConfigurationEntry keytabKerberosAce = null; + if (securityConfig.getKeytab() != null) { + keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal()); + } + + AppConfigurationEntry[] appConfigurationEntry; + if (userKerberosAce != null && keytabKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce} ; + } else if (keytabKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce} ; + } else if (userKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce} ; + } else { + return null; + } + + return appConfigurationEntry; + } + + /** + * Generate the default JAAS config file. + */ + private static File generateDefaultConfigFile() { + // load Jaas config file to initialize SASL + final File jaasConfFile; + try { + Path jaasConfPath = Files.createTempFile("jaas-", ".conf"); + try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) { + Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); + } + jaasConfFile = jaasConfPath.toFile(); + jaasConfFile.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("unable to generate a JAAS configuration file", e); — End diff – See above - why not declare that this method may throw an `IOException`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95003470

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java —
          @@ -0,0 +1,112 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.security;
          +
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +import scala.Array;
          +
          +import javax.annotation.Nullable;
          +import javax.security.auth.login.AppConfigurationEntry;
          +import javax.security.auth.login.Configuration;
          +import java.util.Arrays;
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +
          +/**
          + * A dynamic JAAS configuration.
          + *
          + * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
          + * an (optional) underlying configuration. Entries from the underlying configuration take
          + * precedence over dynamic entries.
          + */
          +public class DynamicConfiguration extends Configuration {
          +
          + protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class);
          +
          + private final Configuration delegate;
          +
          + private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
          +
          + /**
          + * Create a dynamic configuration.
          + * @param delegate an underlying configuration to delegate to, or null.
          + */
          + public DynamicConfiguration(@Nullable Configuration delegate)

          { + this.delegate = delegate; + }

          +
          + /**
          + * Add entries for the given application name.
          + */
          + public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) {
          + final AppConfigurationEntry[] existing = dynamicEntries.get(name);
          + final AppConfigurationEntry[] updated;
          + if(existing == null)

          { + updated = Arrays.copyOf(entry, entry.length); + }

          + else

          { + updated = merge(existing, entry); + }

          + dynamicEntries.put(name, updated);
          + }
          +
          + /**
          + * Retrieve the AppConfigurationEntries for the specified <i>name</i>
          + * from this Configuration.
          + *
          + * <p>
          + *
          + * @param name the name used to index the Configuration.
          + *
          + * @return an array of AppConfigurationEntries for the specified <i>name</i>
          + * from this Configuration, or null if there are no entries
          + * for the specified <i>name</i>
          + */
          + @Override
          + public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
          + AppConfigurationEntry[] entry = null;
          + if(delegate != null)

          { + entry = delegate.getAppConfigurationEntry(name); + }

          + final AppConfigurationEntry[] existing = dynamicEntries.get(name);
          + if(existing != null) {
          + if(entry != null)

          { + entry = merge(entry, existing); + }

          + else

          { + entry = Arrays.copyOf(existing, existing.length); + }

          + }
          + return entry;
          + }
          +
          + private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) {
          + AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length);
          + Array.copy(b, 0, merged, a.length, b.length);
          — End diff –

          Can we use `System.arrayCopy()` here to avoid a dependency on Scala in this part of the code?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95003470 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java — @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Array; + +import javax.annotation.Nullable; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + + +/** + * A dynamic JAAS configuration. + * + * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon + * an (optional) underlying configuration. Entries from the underlying configuration take + * precedence over dynamic entries. + */ +public class DynamicConfiguration extends Configuration { + + protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class); + + private final Configuration delegate; + + private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>(); + + /** + * Create a dynamic configuration. + * @param delegate an underlying configuration to delegate to, or null. + */ + public DynamicConfiguration(@Nullable Configuration delegate) { + this.delegate = delegate; + } + + /** + * Add entries for the given application name. + */ + public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) { + final AppConfigurationEntry[] existing = dynamicEntries.get(name); + final AppConfigurationEntry[] updated; + if(existing == null) { + updated = Arrays.copyOf(entry, entry.length); + } + else { + updated = merge(existing, entry); + } + dynamicEntries.put(name, updated); + } + + /** + * Retrieve the AppConfigurationEntries for the specified <i>name</i> + * from this Configuration. + * + * <p> + * + * @param name the name used to index the Configuration. + * + * @return an array of AppConfigurationEntries for the specified <i>name</i> + * from this Configuration, or null if there are no entries + * for the specified <i>name</i> + */ + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + AppConfigurationEntry[] entry = null; + if(delegate != null) { + entry = delegate.getAppConfigurationEntry(name); + } + final AppConfigurationEntry[] existing = dynamicEntries.get(name); + if(existing != null) { + if(entry != null) { + entry = merge(entry, existing); + } + else { + entry = Arrays.copyOf(existing, existing.length); + } + } + return entry; + } + + private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) { + AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length); + Array.copy(b, 0, merged, a.length, b.length); — End diff – Can we use `System.arrayCopy()` here to avoid a dependency on Scala in this part of the code?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3057

          @EronWright Thanks for explaining. It figured it out concurrently by looking at the updated documentation you wrote The docs are good, helped a lot!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3057 @EronWright Thanks for explaining. It figured it out concurrently by looking at the updated documentation you wrote The docs are good, helped a lot!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on the issue:

          https://github.com/apache/flink/pull/3057

          Thanks for the review, I'll address these comments ASAP.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3057 Thanks for the review, I'll address these comments ASAP.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95265401

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java —
          @@ -71,163 +64,93 @@
          */
          public static void install(SecurityConfiguration config) throws Exception {

          • if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - }

            -

          • // establish the JAAS config
          • JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal);
          • javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
            -
          • populateSystemSecurityProperties(config.flinkConf);
            -
          • // establish the UGI login user
          • UserGroupInformation.setConfiguration(config.hadoopConf);
            -
          • // only configure Hadoop security if we have security enabled
          • if (UserGroupInformation.isSecurityEnabled()) {
            -
          • final UserGroupInformation loginUser;
            -
          • if (config.keytab != null && !StringUtils.isBlank(config.principal)) {
          • String keytabPath = (new File(config.keytab)).getAbsolutePath();
            -
          • UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
            -
          • loginUser = UserGroupInformation.getLoginUser();
            -
          • // supplement with any available tokens
          • String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
          • if (fileLocation != null) {
          • /*
          • * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
          • * used in the context of reading the stored tokens from UGI.
          • * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
          • * loginUser.addCredentials(cred);
          • */
          • try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
          • }
          • } else {
          • // login with current user credentials (e.g. ticket cache)
          • try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - }

            catch (NoSuchMethodException e) {

          • LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
          • }
            -
          • // note that the stored tokens are read automatically
          • loginUser = UserGroupInformation.getLoginUser();
            + // install the security modules
            + List<SecurityModule> modules = new ArrayList();
            + try
            Unknown macro: { + for (Class<? extends SecurityModule> moduleClass }

            + catch(Exception ex)

            { + throw new Exception("unable to establish the security context", ex); + }

            + installedModules = modules;

          • LOG.info("Hadoop user set to {}", loginUser.toString());
            + // install a security context
            + // use the Hadoop login user as the subject of the installed security context
            + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + }

            + UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
            + installedContext = new HadoopSecurityContext(loginUser);
            + }

          • boolean delegationToken = false;
          • final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
          • Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens();
          • for (Token<? extends TokenIdentifier> token : usrTok) {
          • final Text id = new Text(token.getIdentifier());
          • LOG.debug("Found user token " + id + " with " + token);
          • if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
          • delegationToken = true;
            + static void uninstall() {
            + if(installedModules != null) {
            + for (SecurityModule module : Lists.reverse(installedModules)) {
            + try { + module.uninstall(); }
          • }
            -
          • if (!loginUser.hasKerberosCredentials()) {
          • //throw an error in non-yarn deployment if kerberos cache is not available
          • if (!delegationToken) {
          • LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
          • throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials");
            + catch(UnsupportedOperationException e) {
              • End diff –

          The uninstall isn't used in production, only in test code. Throwing gives more information to the unit test. Hopefully getting rid of the warning will suffice for now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95265401 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java — @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - // establish the JAAS config JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - populateSystemSecurityProperties(config.flinkConf); - // establish the UGI login user UserGroupInformation.setConfiguration(config.hadoopConf); - // only configure Hadoop security if we have security enabled if (UserGroupInformation.isSecurityEnabled()) { - final UserGroupInformation loginUser; - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { String keytabPath = (new File(config.keytab)).getAbsolutePath(); - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - loginUser = UserGroupInformation.getLoginUser(); - // supplement with any available tokens String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); if (fileLocation != null) { /* * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); */ try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } } } else { // login with current user credentials (e.g. ticket cache) try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } - // note that the stored tokens are read automatically loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); + try Unknown macro: { + for (Class<? extends SecurityModule> moduleClass } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; LOG.info("Hadoop user set to {}", loginUser.toString()); + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } boolean delegationToken = false; final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); for (Token<? extends TokenIdentifier> token : usrTok) { final Text id = new Text(token.getIdentifier()); LOG.debug("Found user token " + id + " with " + token); if (token.getKind().equals(HDFS_DELEGATION_KIND)) { delegationToken = true; + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { + try { + module.uninstall(); } } - if (!loginUser.hasKerberosCredentials()) { //throw an error in non-yarn deployment if kerberos cache is not available if (!delegationToken) { LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(UnsupportedOperationException e) { End diff – The uninstall isn't used in production, only in test code. Throwing gives more information to the unit test. Hopefully getting rid of the warning will suffice for now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3057#discussion_r95283348

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java —
          @@ -0,0 +1,147 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.security.modules;
          +
          +import org.apache.flink.annotation.Internal;
          +import org.apache.flink.runtime.security.DynamicConfiguration;
          +import org.apache.flink.runtime.security.KerberosUtils;
          +import org.apache.flink.runtime.security.SecurityUtils;
          +import org.slf4j.Logger;
          +import org.slf4j.LoggerFactory;
          +
          +import javax.security.auth.login.AppConfigurationEntry;
          +import java.io.File;
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.nio.file.Files;
          +import java.nio.file.Path;
          +import java.nio.file.StandardCopyOption;
          +
          +/**
          + * Responsible for installing a process-wide JAAS configuration.
          + * <p>
          + * The installed configuration combines login modules based on:
          + * - the user-supplied JAAS configuration file, if any
          + * - a Kerberos keytab, if configured
          + * - any cached Kerberos credentials from the current environment
          + * <p>
          + * The module also installs a default JAAS config file (if necessary) for
          + * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations.
          + * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
          + * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
          + */
          +@Internal
          +public class JaasModule implements SecurityModule {
          +
          + private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class);
          +
          + static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
          +
          + static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
          +
          + private String priorConfigFile;
          + private javax.security.auth.login.Configuration priorConfig;
          +
          + private DynamicConfiguration currentConfig;
          +
          + @Override
          + public void install(SecurityUtils.SecurityConfiguration securityConfig) {
          +
          + // ensure that a config file is always defined, for compatibility with
          + // ZK and Kafka which check for the system property and existence of the file
          + priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
          + if (priorConfigFile == null)

          { + File configFile = generateDefaultConfigFile(); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath()); + }

          +
          + // read the JAAS configuration file
          + priorConfig = javax.security.auth.login.Configuration.getConfiguration();
          +
          + // construct a dynamic JAAS configuration
          + currentConfig = new DynamicConfiguration(priorConfig);
          +
          + // wire up the configured JAAS login contexts to use the krb5 entries
          + AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
          + if(krb5Entries != null) {
          + for (String app : securityConfig.getLoginContextNames())

          { + currentConfig.addAppConfigurationEntry(app, krb5Entries); + }

          + }
          +
          + javax.security.auth.login.Configuration.setConfiguration(currentConfig);
          + }
          +
          + @Override
          + public void uninstall() {
          + if(priorConfigFile != null)

          { + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile); + }

          else

          { + System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG); + }

          + javax.security.auth.login.Configuration.setConfiguration(priorConfig);
          + }
          +
          + public DynamicConfiguration getCurrentConfiguration()

          { + return currentConfig; + }

          +
          + private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) {
          +
          + AppConfigurationEntry userKerberosAce = null;
          + if (securityConfig.useTicketCache())

          { + userKerberosAce = KerberosUtils.ticketCacheEntry(); + }

          + AppConfigurationEntry keytabKerberosAce = null;
          + if (securityConfig.getKeytab() != null)

          { + keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal()); + }

          +
          + AppConfigurationEntry[] appConfigurationEntry;
          + if (userKerberosAce != null && keytabKerberosAce != null) {
          + appConfigurationEntry = new AppConfigurationEntry[]

          {keytabKerberosAce, userKerberosAce}

          ;
          + } else if (keytabKerberosAce != null) {
          + appConfigurationEntry = new AppConfigurationEntry[]

          {keytabKerberosAce}

          ;
          + } else if (userKerberosAce != null) {
          + appConfigurationEntry = new AppConfigurationEntry[]

          {userKerberosAce}

          ;
          + } else

          { + return null; + }

          +
          + return appConfigurationEntry;
          + }
          +
          + /**
          + * Generate the default JAAS config file.
          + */
          + private static File generateDefaultConfigFile() {
          + // load Jaas config file to initialize SASL
          + final File jaasConfFile;
          + try {
          + Path jaasConfPath = Files.createTempFile("jaas-", ".conf");
          + try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME))

          { + Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); + }

          + jaasConfFile = jaasConfPath.toFile();
          + jaasConfFile.deleteOnExit();
          + } catch (IOException e) {
          + throw new RuntimeException("unable to generate a JAAS configuration file", e);
          — End diff –

          In this case, I do consider a failure to generate the default config a programming error; it simply extracts an in-built resource as a temp file. I definitely agree with your general point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95283348 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java — @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.DynamicConfiguration; +import org.apache.flink.runtime.security.KerberosUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Responsible for installing a process-wide JAAS configuration. + * <p> + * The installed configuration combines login modules based on: + * - the user-supplied JAAS configuration file, if any + * - a Kerberos keytab, if configured + * - any cached Kerberos credentials from the current environment + * <p> + * The module also installs a default JAAS config file (if necessary) for + * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations. + * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html + * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + */ +@Internal +public class JaasModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class); + + static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; + + static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf"; + + private String priorConfigFile; + private javax.security.auth.login.Configuration priorConfig; + + private DynamicConfiguration currentConfig; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) { + + // ensure that a config file is always defined, for compatibility with + // ZK and Kafka which check for the system property and existence of the file + priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null); + if (priorConfigFile == null) { + File configFile = generateDefaultConfigFile(); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath()); + } + + // read the JAAS configuration file + priorConfig = javax.security.auth.login.Configuration.getConfiguration(); + + // construct a dynamic JAAS configuration + currentConfig = new DynamicConfiguration(priorConfig); + + // wire up the configured JAAS login contexts to use the krb5 entries + AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig); + if(krb5Entries != null) { + for (String app : securityConfig.getLoginContextNames()) { + currentConfig.addAppConfigurationEntry(app, krb5Entries); + } + } + + javax.security.auth.login.Configuration.setConfiguration(currentConfig); + } + + @Override + public void uninstall() { + if(priorConfigFile != null) { + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile); + } else { + System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG); + } + javax.security.auth.login.Configuration.setConfiguration(priorConfig); + } + + public DynamicConfiguration getCurrentConfiguration() { + return currentConfig; + } + + private static AppConfigurationEntry[] getAppConfigurationEntries(SecurityUtils.SecurityConfiguration securityConfig) { + + AppConfigurationEntry userKerberosAce = null; + if (securityConfig.useTicketCache()) { + userKerberosAce = KerberosUtils.ticketCacheEntry(); + } + AppConfigurationEntry keytabKerberosAce = null; + if (securityConfig.getKeytab() != null) { + keytabKerberosAce = KerberosUtils.keytabEntry(securityConfig.getKeytab(), securityConfig.getPrincipal()); + } + + AppConfigurationEntry[] appConfigurationEntry; + if (userKerberosAce != null && keytabKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce, userKerberosAce} ; + } else if (keytabKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce} ; + } else if (userKerberosAce != null) { + appConfigurationEntry = new AppConfigurationEntry[] {userKerberosAce} ; + } else { + return null; + } + + return appConfigurationEntry; + } + + /** + * Generate the default JAAS config file. + */ + private static File generateDefaultConfigFile() { + // load Jaas config file to initialize SASL + final File jaasConfFile; + try { + Path jaasConfPath = Files.createTempFile("jaas-", ".conf"); + try (InputStream resourceStream = JaasModule.class.getClassLoader().getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) { + Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); + } + jaasConfFile = jaasConfPath.toFile(); + jaasConfFile.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("unable to generate a JAAS configuration file", e); — End diff – In this case, I do consider a failure to generate the default config a programming error; it simply extracts an in-built resource as a temp file. I definitely agree with your general point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on the issue:

          https://github.com/apache/flink/pull/3057

          @StephanEwen updated based on feedback, thanks again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3057 @StephanEwen updated based on feedback, thanks again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3057

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3057
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in

          • 1.2.0 via 00193f7e238340cc18c57a44c7e6377432839373
          • 1.3.0 via fc3a778c0cafe1adc9efbd8796a8bd64122e4ad2
          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.2.0 via 00193f7e238340cc18c57a44c7e6377432839373 1.3.0 via fc3a778c0cafe1adc9efbd8796a8bd64122e4ad2

            People

            • Assignee:
              eronwright Eron Wright
              Reporter:
              eronwright Eron Wright
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development