Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Apex support modification of operator properties at runtime but the current implemenations has the following shortcomings.

      1. Property is not set across all partitions on the same window as individual partitions can be on different windows when property change is initiated from client resulting in inconsistency of data for those windows. I am being generous using the word inconsistent.
      2. Sometimes properties need to be set on more than one logical operators at the same time to achieve the change the user is seeking. Today they will be two separate changes happening on two different windows again resulting in inconsistent data for some windows. These would need to happen as a single transaction.
      3. If there is an operator failure before a committed checkpoint after an operator property is dynamically changed the operator will restart with the old property and the change will not be re-applied.

      Tim and myself did some brainstorming and we have a proposal to overcome these shortcomings. The main problem in all the above cases is that the property changes are happening out-of-band of data flow and hence independent of windowing. The proposal is to bring the property change request into the in-band dataflow so that they are handled consistently with windowing and handled distributively.

      The idea is to inject a special property change tuple containing the property changes and the identification information of the operator's they affect into the dataflow at the input operator. The tuple will be injected at window boundary after end window and before begin window and as this tuple flows through the DAG the intended operators properties will be modifed. They will all be modified consistently at the same window. The tuple can contain more than one property changes for more than one logical operators and the change will be applied consistently to the different logical operators at the same window. In case of failure the replay of tuples will ensure that the property change gets reapplied at the correct window.

        Issue Links

          Activity

          Hide
          timothyfarkas Timothy Farkas added a comment -

          I was thinking and we may have to do even more. Consider the following iteration case:

          A -> B -> C
          And
          A <- B

          If we change a property on C then A and B won't remove anything from the control tuple so it won't get dropped. The control tuple will loop indefinitely.

          We will also need to tag the control tuple if an operator has seen it before. If it returns to an operator it's seen before then it should be dropped.

          Show
          timothyfarkas Timothy Farkas added a comment - I was thinking and we may have to do even more. Consider the following iteration case: A -> B -> C And A <- B If we change a property on C then A and B won't remove anything from the control tuple so it won't get dropped. The control tuple will loop indefinitely. We will also need to tag the control tuple if an operator has seen it before. If it returns to an operator it's seen before then it should be dropped.
          Hide
          PramodSSImmaneni Pramod Immaneni added a comment -

          Yes that makes sense and remove the tuple from the stream when there are no more properties in it.

          Show
          PramodSSImmaneni Pramod Immaneni added a comment - Yes that makes sense and remove the tuple from the stream when there are no more properties in it.
          Hide
          timothyfarkas Timothy Farkas added a comment -

          We also have to consider iteration. The MODIFY_CONFIGURATION control tuple should not be sent in an infinite loop.

          I think as configurations are applied, they should be removed from the control tuple before it is passed down stream. If the control tuple no longer has any configurations contained in it, it should not be passed down stream.

          Show
          timothyfarkas Timothy Farkas added a comment - We also have to consider iteration. The MODIFY_CONFIGURATION control tuple should not be sent in an infinite loop. I think as configurations are applied, they should be removed from the control tuple before it is passed down stream. If the control tuple no longer has any configurations contained in it, it should not be passed down stream.
          Hide
          timothyfarkas Timothy Farkas added a comment -

          From the discussion on the dev mailing list the algorithm for deciding the window to set a property on has been changed slightly to this.

          1.) pick a window N windows ahead of the current max window of the operators. Let's call this window W
          2.) Send a property change request to the operators to change the property on window W
          3.) As part of the property change request the operator will do one of two things:
          a Reply with a failure if it has passed window W.
          b Reply with a success if it has not already passed window W.
          4.) Operators which replied with a success will asynchronously wait for a confirmation message to apply the property. If the operator reaches window W before it receives the confirmation, the operator will block until a confirmation is received.
          5.) Meanwhile the app master collects the responses to the property change requests. If all the property change requests responded with a success, then a confirmation message is sent to all the operators to apply the property. If one or more of the operators replied with failure, then a property change cancellation is sent to all the operators, and then the whole process is retried.

          This way 99.99% of the time a property change would be applied without pausing operators. Operators will only be paused on rare ocassions, and only for the sake of preventing application errors that could be triggered by an incorrect application of a property.

          Show
          timothyfarkas Timothy Farkas added a comment - From the discussion on the dev mailing list the algorithm for deciding the window to set a property on has been changed slightly to this. 1.) pick a window N windows ahead of the current max window of the operators. Let's call this window W 2.) Send a property change request to the operators to change the property on window W 3.) As part of the property change request the operator will do one of two things: a Reply with a failure if it has passed window W. b Reply with a success if it has not already passed window W. 4.) Operators which replied with a success will asynchronously wait for a confirmation message to apply the property. If the operator reaches window W before it receives the confirmation, the operator will block until a confirmation is received. 5.) Meanwhile the app master collects the responses to the property change requests. If all the property change requests responded with a success, then a confirmation message is sent to all the operators to apply the property. If one or more of the operators replied with failure, then a property change cancellation is sent to all the operators, and then the whole process is retried. This way 99.99% of the time a property change would be applied without pausing operators. Operators will only be paused on rare ocassions, and only for the sake of preventing application errors that could be triggered by an incorrect application of a property.
          Show
          PramodSSImmaneni Pramod Immaneni added a comment - Follow development here https://github.com/PramodSSImmaneni/incubator-apex-core/tree/feature-property-modifications
          Hide
          timothyfarkas Timothy Farkas added a comment -

          Furthermore this approach is not limited to DAGs with a single input operator. In the case where a DAG has multiple input operators property changes can be set within the same window across all input operators by enforcing some synchronization at the input operator level when setting the property. This synchronization would look like the following:

          1. When receiving a property change request, ask all input operators to stop and send their current window.
          2. Take the max window + 1 (not technically correct but you get the idea)
          3. Send the property change request to all the input operators and tell them to apply the change at the maximum window id + 1.
          4. Resume the input operators.

          This ensures that the change is applied at the same window Id and also ensures that the change is applied at a window ID that the input operator had never played before. Therefore property changes will not interfere with the idempotence of operators.

          Show
          timothyfarkas Timothy Farkas added a comment - Furthermore this approach is not limited to DAGs with a single input operator. In the case where a DAG has multiple input operators property changes can be set within the same window across all input operators by enforcing some synchronization at the input operator level when setting the property. This synchronization would look like the following: 1. When receiving a property change request, ask all input operators to stop and send their current window. 2. Take the max window + 1 (not technically correct but you get the idea) 3. Send the property change request to all the input operators and tell them to apply the change at the maximum window id + 1. 4. Resume the input operators. This ensures that the change is applied at the same window Id and also ensures that the change is applied at a window ID that the input operator had never played before. Therefore property changes will not interfere with the idempotence of operators.

            People

            • Assignee:
              PramodSSImmaneni Pramod Immaneni
              Reporter:
              PramodSSImmaneni Pramod Immaneni
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:

                Development