Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0
    • Component/s: camel-core, camel-spring
    • Labels:
      None
    • Environment:

      Java 6, Windows XP

    • Patch Info:
      Patch Available

      Description

      Attached is a patch that adds a stream-processing resequencer to Camel. The resequencing algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence in advance (although a capacity parameter prevents the resequencer from running out of memory)

      Route builder examples for the stream-processing resequencer:

      from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")

      is equivalent to:

      from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")

      Custom values for the resequencer's capacity and timeout can be set like in this example:

      from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig(300, 4000L)).to("mock:result")

      The XML configuration looks like:

      <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
        <route>
          <from uri="direct:start"/>
          <resequencer>
            <simple>in.header.seqnum</simple>
            <to uri="mock:result" />
            <stream-config capacity="300", timeout="4000"/>
          </resequencer>
        </route>
      </camelContext>
      

      The existing batch-processing resequencer can be defined as usual:

      from("direct:start").resequencer(header("seqnum")).to("mock:result")

      which is now equivalent to

      from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")

      It is now also possible to define a custom configuration for the existing batch-processing resequencer:

      from("direct:start").resequencer(header("seqnum")).batch(new BatchResequencerConfig(300, 4000L)).to("mock:result")

      This set the batchSize to 300 and the batchTimeout to 4000 ms.

      For the stream-processing resequencer to work, messages must contain a sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).

      Gap detection is done with strategies that implement the SequenceNumberComparator<E> interface. In addition to the java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E> interface defines the predecessor(E, E) and successor(E, E) operations. The stream resequencer can be configured with cutstom SequenceNumberComparator<E> strategies.

      The stream-processing resequencer uses the same algorithm as the one in ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time dependencies to ServiceMix I've copied the ServiceMix-independent resequencing engine over to Camel. This redundancy should be removed once Camel and servicemix-eip are going to be combined (are they?). I can contribute to this task, if needed.

        Activity

        Martin Krasser created issue -
        james strachan made changes -
        Field Original Value New Value
        Status Open [ 1 ] Resolved [ 5 ]
        Fix Version/s 1.2.0 [ 11822 ]
        Resolution Fixed [ 1 ]
        james strachan made changes -
        Link This issue is related to CAMEL-126 [ CAMEL-126 ]
        Claus Ibsen made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Jeff Turner made changes -
        Project Import Sat Nov 27 00:14:50 EST 2010 [ 1290834890113 ]

          People

          • Assignee:
            Unassigned
            Reporter:
            Martin Krasser
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development