Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-3502

JDBC Persistent Aggregator

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 2.6.0
    • 2.6.0
    • None
    • None
    • JDBC Aggregator Database

    • Patch Available

    Description

      The patch provided is an implementation of a Aggregator for JDBC.

      It implements org.apache.camel.spi.AggregationRepository (2.4+) and has been tested on Camel 2.4 and 2.6.
      Some details have been disscussed on the mailling list with Claus.

      The implementation is very similar to the one for HawtDB.
      Unit tests have been included and use the H2 in-memory database.

      About that, the Test JdbcAggregateLoadAndRecoverTest has been adapted to exclude the redeliveries from the failling messages. It is possible for a message to be sent due to a scan() background operation evn before it is send by the onCompletion() method ?

      Here is a small documentation

      Database

      To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "COMPLETED". The name must be configured in the Spring bean with the _RepositoryName property. In the following example aggregation will be used.

      The table structure definition of both table are identical: in both case a String value is used as key (id) whereas a Blob contains the exchange serialized in byte array.
      However one difference should be remembered: the id field does not have the same content depending on the table.
      In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.

      Here is the SQL query used to create the tables, just replace "aggregation" with your aggregator repository name.

      CREATE TABLE aggregation (
          id varchar(255) NOT NULL,
          exchange blob NOT NULL,
          constraint aggregation_pk PRIMARY KEY (id)
      );
      CREATE TABLE aggregation_completed (
          id varchar(255) NOT NULL,
          exchange blob NOT NULL,
          constraint aggregation_completed_pk PRIMARY KEY (id)
      );
      

      Codec (Serialization)

      Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field.
      All those conversions are handled by the JdbcCodec class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream.

      The ClassLoadingAwareObjectInputStream has been reused from the ActiveMQ project. It wraps an ObjectInputStream and use it with the ContextClassLoader rather than the currentThread one. The benefit is to be able to load classes exposed by other bundles.
      This allows the exchange body and headers to have custom types object references.

      Transactional

      TransactionTemplate is use to wrap all the calls to the database.
      Therefore a Transaction Manager is required (see bean-declaration)

      Service (Start/Stop)

      The JdbcAggregationRepository extends ServiceSupport. This includes the components in Camel component life cycle.
      The start() method verify the connection of the database and the presence of the required tables.

      Aggregator configuration

      Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler property.

      Here is the declaration for Oracle :

          <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
              <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
          </bean>
      
          <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
      
          <bean id="repo" class="org.apache.camel.component.jdbc.aggregationRepository.JdbcAggregationRepository">
              <constructor-arg name="transactionManager" ref="transactionManager"/>
              <constructor-arg name="repositoryName" value="aggregation"/>
              <constructor-arg name="dataSource" ref="dataSource"/>
              <!-- Only with Oracle, else use default -->
              <property name="lobHandler" ref="lobHandler"/>
          </bean>
      

      Feature

      A feature has been created : camel-jdbc-aggregator.

      Attachments

        1. camel-jdbc-aggregator.zip
          49 kB
          Olivier Roger
        2. jdbc-aggregator.patch
          143 kB
          Olivier Roger
        3. jdbc-aggregator.patch
          143 kB
          Olivier Roger
        4. jdbc-aggregator-other-changes.patch
          5 kB
          Olivier Roger
        5. jdbc-aggregator-setter-injection.patch
          6 kB
          Olivier Roger

        Activity

          People

            davsclaus Claus Ibsen
            olivier.roger Olivier Roger
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: