Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-3609

ConnectWebSocket should be able to reconnect automatically

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0
    • Component/s: Extensions
    • Labels:
      None

      Description

      WebSocket connection can be extended by trigger PutWebSocket periodically (typically with empty message), but it doesn't help when server goes down.

      Currently if a server restarted, ConnectWebSocket has to be restarted manually to establish new connection.

      Ideally, this situation should be recovered automatically. If existing session ids can be reused, it'd be even better as it recovers transparently.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user ijokarumawak opened a pull request:

          https://github.com/apache/nifi/pull/1597

          NIFI-3609: ConnectWebSocket auto session recovery

          Before this fix, ConnectWebSocket has to be restarted manually if its target WebSocket server restarted, or session expiration if there's no communication more than certain period of time (looks 3 min). This PR adds automatic session maintenance logic so that ConnectWebSocket keeps its session alive as long as it's running.

          • Removed unused disconnect method from WebSocketService interface.
          • Added session maintenance background thread at JettyWebSocketClient
            which reconnects sessions those are still referred by ConnectWebSocket
            processor but no longer active.
          • Added Session Maintenance Interval property to JettyWebSocketClient.
          • Allowed specifying existing session id so that it can be recovered
            transparently.
          • Moved test classes to appropriate package.
          • Added test cases that verify the same session id can be used after
            WebSocket server restarts.

          Thank you for submitting a contribution to Apache NiFi.

          In order to streamline the review of the contribution we ask you
          to ensure the following steps have been taken:

              1. For all changes:
          • [x] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [x] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [x] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [x] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/ijokarumawak/nifi nifi-3609

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1597.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1597


          commit 0da822fb52c698d04726b9b6bd060480dc21ed1b
          Author: Koji Kawamura <ijokarumawak@apache.org>
          Date: 2017-03-16T08:10:38Z

          NIFI-3609: ConnectWebSocket auto session recovery

          • Removed unused disconnect method from WebSocketService interface.
          • Added session maintenance background thread at JettyWebSocketClient
            which reconnects sessions those are still referred by ConnectWebSocket
            processor but no longer active.
          • Added Session Maintenance Interval property to JettyWebSocketClient.
          • Allowed specifying existing session id so that it can be recovered
            transparently.
          • Moved test classes to appropriate package.
          • Added test cases that verify the same session id can be used after
            WebSocket server restarts.

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/1597 NIFI-3609 : ConnectWebSocket auto session recovery Before this fix, ConnectWebSocket has to be restarted manually if its target WebSocket server restarted, or session expiration if there's no communication more than certain period of time (looks 3 min). This PR adds automatic session maintenance logic so that ConnectWebSocket keeps its session alive as long as it's running. Removed unused disconnect method from WebSocketService interface. Added session maintenance background thread at JettyWebSocketClient which reconnects sessions those are still referred by ConnectWebSocket processor but no longer active. Added Session Maintenance Interval property to JettyWebSocketClient. Allowed specifying existing session id so that it can be recovered transparently. Moved test classes to appropriate package. Added test cases that verify the same session id can be used after WebSocket server restarts. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: For all changes: [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [x] Has your PR been rebased against the latest commit within the target branch (typically master)? [x] Is your initial contribution a single, squashed commit? For code changes: [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [x] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [x] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-3609 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1597.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1597 commit 0da822fb52c698d04726b9b6bd060480dc21ed1b Author: Koji Kawamura <ijokarumawak@apache.org> Date: 2017-03-16T08:10:38Z NIFI-3609 : ConnectWebSocket auto session recovery Removed unused disconnect method from WebSocketService interface. Added session maintenance background thread at JettyWebSocketClient which reconnects sessions those are still referred by ConnectWebSocket processor but no longer active. Added Session Maintenance Interval property to JettyWebSocketClient. Allowed specifying existing session id so that it can be recovered transparently. Moved test classes to appropriate package. Added test cases that verify the same session id can be used after WebSocket server restarts.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jdye64 commented on the issue:

          https://github.com/apache/nifi/pull/1597

          reviewing

          Show
          githubbot ASF GitHub Bot added a comment - Github user jdye64 commented on the issue: https://github.com/apache/nifi/pull/1597 reviewing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jdye64 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1597#discussion_r114116075

          — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java —
          @@ -81,6 +87,16 @@
          .defaultValue("3 sec")
          .build();

          + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
          + .name("session-maintenance-interval")
          + .displayName("Session Maintenance Interval")
          + .description("The interval between session maintenance activities.")
          — End diff –

          Could we add some more details around what this is doing? I get its the interval for the maintenance thread but I'm worried that end users might not really understand what that is? Maybe a brief sentence about what this "maintenance" interval is actually doing and why it is necessary so they understand how to use it a little better.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jdye64 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1597#discussion_r114116075 — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java — @@ -81,6 +87,16 @@ .defaultValue("3 sec") .build(); + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder() + .name("session-maintenance-interval") + .displayName("Session Maintenance Interval") + .description("The interval between session maintenance activities.") — End diff – Could we add some more details around what this is doing? I get its the interval for the maintenance thread but I'm worried that end users might not really understand what that is? Maybe a brief sentence about what this "maintenance" interval is actually doing and why it is necessary so they understand how to use it a little better.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jdye64 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1597#discussion_r114116475

          — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java —
          @@ -135,27 +177,81 @@ public void stopClient() throws Exception {

          @Override
          public void connect(final String clientId) throws IOException

          { + connect(clientId, null); + }

          +
          + private void connect(final String clientId, String sessionId) throws IOException {
          +
          + connectionLock.lock();

          • final WebSocketMessageRouter router;
            try { - router = routers.getRouterOrFail(clientId); - }

            catch (WebSocketConfigurationException e) {

          • throw new IllegalStateException("Failed to get router due to: " + e, e);
            + final WebSocketMessageRouter router;
            + try { + router = routers.getRouterOrFail(clientId); + }

            catch (WebSocketConfigurationException e)

            { + throw new IllegalStateException("Failed to get router due to: " + e, e); + }

            + final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
            + listener.setSessionId(sessionId);
            +
            + final ClientUpgradeRequest request = new ClientUpgradeRequest();
            + final Future<Session> connect = client.connect(listener, webSocketUri, request);
            + getLogger().info("Connecting to : {}", new Object[]

            {webSocketUri});
            +
            + final Session session;
            + try { + session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); + }
            + getLogger().info("Connected, session={}", new Object[]{session});
            + activeSessions.put(clientId, listener.getSessionId());
            +
            + } finally { + connectionLock.unlock(); }
            - final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);

            - final ClientUpgradeRequest request = new ClientUpgradeRequest();
            - final Future<Session> connect = client.connect(listener, webSocketUri, request);
            - getLogger().info("Connecting to : {}", new Object[]{webSocketUri}

            );
            + }
            +
            + private Map<String, String> activeSessions = new ConcurrentHashMap<>();

          • final Session session;
            + void maintainSessions() throws Exception {
            + if (client == null) { + return; + }

            +
            + connectionLock.lock();
            +
            + final ComponentLog logger = getLogger();
            try

            { - session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); - }

            catch (Exception e) {

          • throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
            + // Loop through existing sessions and reconnect.
            + for (String clientId : activeSessions.keySet()) {
            + final WebSocketMessageRouter router;
            + try { + router = routers.getRouterOrFail(clientId); + }

            catch (final WebSocketConfigurationException e) {
            + if (logger.isDebugEnabled()) {
            + logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]

            {clientId}

            );
            + }
            + activeSessions.remove(clientId);
            + continue;
            + }
            +
            + final String sessionId = activeSessions.get(clientId);
            + // If this session is stil alive, do nothing.
            + if (!router.containsSession(sessionId)) {
            + // This session is no longer active, reconnect it.
            + // If it fails, the sessionId will remain in activeSessions, and retries later.

              • End diff –

          More a question but should we limit the number of times this is attempted before the sessionId is just removed? I could see an argument for either way but curious your thought here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jdye64 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1597#discussion_r114116475 — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java — @@ -135,27 +177,81 @@ public void stopClient() throws Exception { @Override public void connect(final String clientId) throws IOException { + connect(clientId, null); + } + + private void connect(final String clientId, String sessionId) throws IOException { + + connectionLock.lock(); final WebSocketMessageRouter router; try { - router = routers.getRouterOrFail(clientId); - } catch (WebSocketConfigurationException e) { throw new IllegalStateException("Failed to get router due to: " + e, e); + final WebSocketMessageRouter router; + try { + router = routers.getRouterOrFail(clientId); + } catch (WebSocketConfigurationException e) { + throw new IllegalStateException("Failed to get router due to: " + e, e); + } + final RoutingWebSocketListener listener = new RoutingWebSocketListener(router); + listener.setSessionId(sessionId); + + final ClientUpgradeRequest request = new ClientUpgradeRequest(); + final Future<Session> connect = client.connect(listener, webSocketUri, request); + getLogger().info("Connecting to : {}", new Object[] {webSocketUri}); + + final Session session; + try { + session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); + } + getLogger().info("Connected, session={}", new Object[]{session}); + activeSessions.put(clientId, listener.getSessionId()); + + } finally { + connectionLock.unlock(); } - final RoutingWebSocketListener listener = new RoutingWebSocketListener(router); - final ClientUpgradeRequest request = new ClientUpgradeRequest(); - final Future<Session> connect = client.connect(listener, webSocketUri, request); - getLogger().info("Connecting to : {}", new Object[]{webSocketUri} ); + } + + private Map<String, String> activeSessions = new ConcurrentHashMap<>(); final Session session; + void maintainSessions() throws Exception { + if (client == null) { + return; + } + + connectionLock.lock(); + + final ComponentLog logger = getLogger(); try { - session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); - } catch (Exception e) { throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); + // Loop through existing sessions and reconnect. + for (String clientId : activeSessions.keySet()) { + final WebSocketMessageRouter router; + try { + router = routers.getRouterOrFail(clientId); + } catch (final WebSocketConfigurationException e) { + if (logger.isDebugEnabled()) { + logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[] {clientId} ); + } + activeSessions.remove(clientId); + continue; + } + + final String sessionId = activeSessions.get(clientId); + // If this session is stil alive, do nothing. + if (!router.containsSession(sessionId)) { + // This session is no longer active, reconnect it. + // If it fails, the sessionId will remain in activeSessions, and retries later. End diff – More a question but should we limit the number of times this is attempted before the sessionId is just removed? I could see an argument for either way but curious your thought here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jdye64 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1597#discussion_r114116276

          — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java —
          @@ -81,6 +87,16 @@
          .defaultValue("3 sec")
          .build();

          + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
          + .name("session-maintenance-interval")
          + .displayName("Session Maintenance Interval")
          + .description("The interval between session maintenance activities.")
          + .required(true)
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
          + .defaultValue("10 sec")
          — End diff –

          Since this is invoking maintainSessions() and that is effectively reconnecting sessions would it make sense to make this default value less than the default value of CONNECTION_TIMEOUT just so by default this would prevent connections from timing out at all? This would help in the case that the client becomes inactive for longer than the default CONNECTION_TIMEOUT and help ensure messages are not missed being delivered to the client.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jdye64 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1597#discussion_r114116276 — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java — @@ -81,6 +87,16 @@ .defaultValue("3 sec") .build(); + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder() + .name("session-maintenance-interval") + .displayName("Session Maintenance Interval") + .description("The interval between session maintenance activities.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 sec") — End diff – Since this is invoking maintainSessions() and that is effectively reconnecting sessions would it make sense to make this default value less than the default value of CONNECTION_TIMEOUT just so by default this would prevent connections from timing out at all? This would help in the case that the client becomes inactive for longer than the default CONNECTION_TIMEOUT and help ensure messages are not missed being delivered to the client.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ijokarumawak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1597#discussion_r114280064

          — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java —
          @@ -81,6 +87,16 @@
          .defaultValue("3 sec")
          .build();

          + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
          + .name("session-maintenance-interval")
          + .displayName("Session Maintenance Interval")
          + .description("The interval between session maintenance activities.")
          — End diff –

          Good point, I added more description what this maintenance does. Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1597#discussion_r114280064 — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java — @@ -81,6 +87,16 @@ .defaultValue("3 sec") .build(); + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder() + .name("session-maintenance-interval") + .displayName("Session Maintenance Interval") + .description("The interval between session maintenance activities.") — End diff – Good point, I added more description what this maintenance does. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ijokarumawak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1597#discussion_r114280792

          — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java —
          @@ -81,6 +87,16 @@
          .defaultValue("3 sec")
          .build();

          + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder()
          + .name("session-maintenance-interval")
          + .displayName("Session Maintenance Interval")
          + .description("The interval between session maintenance activities.")
          + .required(true)
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
          + .defaultValue("10 sec")
          — End diff –

          The CONNECTION_TIMEOUT is a timeout for the initial connection attempt and idle connection timeout is different (IIRC 5 min by default and not configurable from controller ATM, probably we should it configurable though). The maintenance does not take any effect before a connection is established.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1597#discussion_r114280792 — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java — @@ -81,6 +87,16 @@ .defaultValue("3 sec") .build(); + public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder() + .name("session-maintenance-interval") + .displayName("Session Maintenance Interval") + .description("The interval between session maintenance activities.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 sec") — End diff – The CONNECTION_TIMEOUT is a timeout for the initial connection attempt and idle connection timeout is different (IIRC 5 min by default and not configurable from controller ATM, probably we should it configurable though). The maintenance does not take any effect before a connection is established.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ijokarumawak commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1597#discussion_r114281896

          — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java —
          @@ -135,27 +177,81 @@ public void stopClient() throws Exception {

          @Override
          public void connect(final String clientId) throws IOException

          { + connect(clientId, null); + }

          +
          + private void connect(final String clientId, String sessionId) throws IOException {
          +
          + connectionLock.lock();

          • final WebSocketMessageRouter router;
            try { - router = routers.getRouterOrFail(clientId); - }

            catch (WebSocketConfigurationException e) {

          • throw new IllegalStateException("Failed to get router due to: " + e, e);
            + final WebSocketMessageRouter router;
            + try { + router = routers.getRouterOrFail(clientId); + }

            catch (WebSocketConfigurationException e)

            { + throw new IllegalStateException("Failed to get router due to: " + e, e); + }

            + final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
            + listener.setSessionId(sessionId);
            +
            + final ClientUpgradeRequest request = new ClientUpgradeRequest();
            + final Future<Session> connect = client.connect(listener, webSocketUri, request);
            + getLogger().info("Connecting to : {}", new Object[]

            {webSocketUri});
            +
            + final Session session;
            + try { + session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); + }
            + getLogger().info("Connected, session={}", new Object[]{session});
            + activeSessions.put(clientId, listener.getSessionId());
            +
            + } finally { + connectionLock.unlock(); }
            - final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);

            - final ClientUpgradeRequest request = new ClientUpgradeRequest();
            - final Future<Session> connect = client.connect(listener, webSocketUri, request);
            - getLogger().info("Connecting to : {}", new Object[]{webSocketUri}

            );
            + }
            +
            + private Map<String, String> activeSessions = new ConcurrentHashMap<>();

          • final Session session;
            + void maintainSessions() throws Exception {
            + if (client == null) { + return; + }

            +
            + connectionLock.lock();
            +
            + final ComponentLog logger = getLogger();
            try

            { - session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); - }

            catch (Exception e) {

          • throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
            + // Loop through existing sessions and reconnect.
            + for (String clientId : activeSessions.keySet()) {
            + final WebSocketMessageRouter router;
            + try { + router = routers.getRouterOrFail(clientId); + }

            catch (final WebSocketConfigurationException e) {
            + if (logger.isDebugEnabled()) {
            + logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]

            {clientId}

            );
            + }
            + activeSessions.remove(clientId);
            + continue;
            + }
            +
            + final String sessionId = activeSessions.get(clientId);
            + // If this session is stil alive, do nothing.
            + if (!router.containsSession(sessionId)) {
            + // This session is no longer active, reconnect it.
            + // If it fails, the sessionId will remain in activeSessions, and retries later.

              • End diff –

          Good point. I think we should keep it running until user stops the processor or the controller service. The goal is making WebSocket connection successful. We don't know how long it takes for a connection to be recovered but as long as the processor running, we can assume user would like it to be connected again.

          During I was thinking and testing above scenarios, I found that ConnectWebSocket can not connect to a WebSocket server if the server is not running, and ConnectWebSocket stays 'STARTED' state but it doesn't retry connecting. I added de-registering call when initial connecting attempt fails so that it can retry connecting when it is scheduled next time.

          Once it successfully established a connection and got a session id, the maintenance activity does its job.

          Does this sound reasonable?

          Show
          githubbot ASF GitHub Bot added a comment - Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/1597#discussion_r114281896 — Diff: nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java — @@ -135,27 +177,81 @@ public void stopClient() throws Exception { @Override public void connect(final String clientId) throws IOException { + connect(clientId, null); + } + + private void connect(final String clientId, String sessionId) throws IOException { + + connectionLock.lock(); final WebSocketMessageRouter router; try { - router = routers.getRouterOrFail(clientId); - } catch (WebSocketConfigurationException e) { throw new IllegalStateException("Failed to get router due to: " + e, e); + final WebSocketMessageRouter router; + try { + router = routers.getRouterOrFail(clientId); + } catch (WebSocketConfigurationException e) { + throw new IllegalStateException("Failed to get router due to: " + e, e); + } + final RoutingWebSocketListener listener = new RoutingWebSocketListener(router); + listener.setSessionId(sessionId); + + final ClientUpgradeRequest request = new ClientUpgradeRequest(); + final Future<Session> connect = client.connect(listener, webSocketUri, request); + getLogger().info("Connecting to : {}", new Object[] {webSocketUri}); + + final Session session; + try { + session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); + } + getLogger().info("Connected, session={}", new Object[]{session}); + activeSessions.put(clientId, listener.getSessionId()); + + } finally { + connectionLock.unlock(); } - final RoutingWebSocketListener listener = new RoutingWebSocketListener(router); - final ClientUpgradeRequest request = new ClientUpgradeRequest(); - final Future<Session> connect = client.connect(listener, webSocketUri, request); - getLogger().info("Connecting to : {}", new Object[]{webSocketUri} ); + } + + private Map<String, String> activeSessions = new ConcurrentHashMap<>(); final Session session; + void maintainSessions() throws Exception { + if (client == null) { + return; + } + + connectionLock.lock(); + + final ComponentLog logger = getLogger(); try { - session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS); - } catch (Exception e) { throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e); + // Loop through existing sessions and reconnect. + for (String clientId : activeSessions.keySet()) { + final WebSocketMessageRouter router; + try { + router = routers.getRouterOrFail(clientId); + } catch (final WebSocketConfigurationException e) { + if (logger.isDebugEnabled()) { + logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[] {clientId} ); + } + activeSessions.remove(clientId); + continue; + } + + final String sessionId = activeSessions.get(clientId); + // If this session is stil alive, do nothing. + if (!router.containsSession(sessionId)) { + // This session is no longer active, reconnect it. + // If it fails, the sessionId will remain in activeSessions, and retries later. End diff – Good point. I think we should keep it running until user stops the processor or the controller service. The goal is making WebSocket connection successful. We don't know how long it takes for a connection to be recovered but as long as the processor running, we can assume user would like it to be connected again. During I was thinking and testing above scenarios, I found that ConnectWebSocket can not connect to a WebSocket server if the server is not running, and ConnectWebSocket stays 'STARTED' state but it doesn't retry connecting. I added de-registering call when initial connecting attempt fails so that it can retry connecting when it is scheduled next time. Once it successfully established a connection and got a session id, the maintenance activity does its job. Does this sound reasonable?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ijokarumawak commented on the issue:

          https://github.com/apache/nifi/pull/1597

          @jdye64 Thanks for your feedback, much appreciated! I've added new commit and comments to address your concerns. Please take another look when you have time. Thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/1597 @jdye64 Thanks for your feedback, much appreciated! I've added new commit and comments to address your concerns. Please take another look when you have time. Thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1597

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1597

            People

            • Assignee:
              ijokarumawak Koji Kawamura
              Reporter:
              ijokarumawak Koji Kawamura
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development