As it stands now, if a person wants to create a number of pipelines that essentially do the same thing, but to different datasets, then a new pipeline is needed for each data type. If we could use properties (much like Tomcat's web.xml file where they are passing in the ${catalina.home} property) this would help reduce the amount of config files that need to be generated.
I've already made a change to the PipelineRuleSet to accomodate this. Here is the code:
package org.apache.commons.pipeline.config;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.digester.*;
import org.apache.commons.pipeline.*;
import org.xml.sax.Attributes;
import org.apache.commons.digester.substitution.MultiVariableExpander;
import org.apache.commons.digester.substitution.VariableSubstitutor;
- This is a Digester RuleSet that provides rules for parsing a pipeline
- XML file.
- The rules defined by this object are used for parsing the following tags:
- <ul>
- <li>
- <B><code><pipeline></pipeline></code></B><br/>
- The root element of the XML configuration file for a pipeline. This tag
- supports two optional attributes that are for use only when configuring
- branch pipelines, <code>key</code> and <code>configURI</code>. These
- attributes are described more fully below in the <code><branch></code>
- documentation.
- </li>
- <li>
- <B><code><driverFactory className="<em>MyDriverFactory</em>" id="<em>my_id</em>" ... /></code></B><br/>
- This tag is used to create and configure a
{@link StageDriverFactory} that
* will be used to create {@link StageDriver} instances for stages
* in the parent pipeline. Each {@link StageDriverFactory}is identified by a unique
- string identifier that is used by the <code><stage></code> tag
- to identify the factory used to create the driver for the stage.
- The class of the factory (which must supply a no-argument constructor)
- is specified by the <code>className</code> attribute, and all other
- additional attributes are used by Digester to configure the associated properties
- (using standard Java bean naming conventions) of the driver instance created.
- </li>
- <li>
- <B><code><stage className="<em>MyStageClass</em>" driverFactoryId="<i>name</i>" ... ></stage></code></B><br/>
- A single stage is created, configured, and added to the parent pipeline using
- this tag. Stages created in this manner are added to the pipeline in the order
- that they occur in the configuration file. The class of the stage (which must
- provide a no-argument constructor) is specified by the <em>className</em> attribute.
- Each stage should be associated with a previously declared driver factory
- by use of the <code>driverFactoryId</code> attribute. All other attributes are
- used by Digester to set bean properties on the newly created Stage object.
- </li>
- <li>
- <B><code><feed/></code></B><br/>
- Enqueue an object onto the first stage in the pipeline. Note that this
- must come <em>after</em> the first stage declaration in the configuration file,
- otherwise the queue for the first stage does not exist yet and the fed
- values will be discarded.
- <p/>
- There are two types of usage available, provided by the following subtags:
- <ul>
- <li>
- <B><code><value>my_value</value></code></B><br/>
- Feed the string value of the body of this tag to the first stage in the
- pipeline.
- </li>
- <li>
- <B><code><object className="MyClass" ... /></code></B><br/>
- This tag uses the standard Digester ObjectCreateRule to create an
- arbitrary object of the specified class (which must provide a
- no-argument constructor) to the first stage in the pipeline.
- All attributes other than <code>className</codee> are used by
- Digester to set bean properties on the newly created object.
- </li>
- </ul>
- </li>
- <li>
- <B><code><branch/></code></B><br/>
- Add a branch to a pipeline. The contents of this tag should
- be one or more <code><pipeline/></code> declarations. Branch
- pipelines added in this fashion must be configured with an attribute
- named <code>key</code> that holds the name by which the branch pipeline
- will be referred to.
- <p/>
- Branch pipelines may be configured either inline in the main
- configuration file or in a separate file referred to by the
- <code>configURI</code> pipeline attribute.
- </li>
- </ul>
public class PipelineRuleSet extends RuleSetBase {
private static Class[] addBranchTypes = { String.class, Pipeline.class };
{ String.class, Object.class }
private static Class[] setEnvTypes =;
private List<RuleSet> nestedRuleSets;
- Creates a new instance of the rule set used by Digester to configure a pipeline.
public PipelineRuleSet() {
- Creates a new pipeline rule set with the specified collection of additional
- rule sets that may be used for recursive creation of branch pipelines.
- @param nestedRuleSets A list of other RuleSet instances that are being used in conjunction with the
- PipelineRuleSet. In the case that branch pipelines are referred to by URI, these
- rule sets will be added to a new Digester instance (along with a PipelineRuleSet
- instance) that is used to parse the branch configuration file.
public PipelineRuleSet(List<RuleSet> nestedRuleSets) { this.nestedRuleSets = nestedRuleSets; }
- Adds the rule instances for pipeline, stage, and enqueue
- tasks to the Digester instance supplied.
- @param digester The Digester instance to which the rules should be added.
public void addRuleInstances(Digester digester) { ObjectCreationFactory pipelineFactory = new PipelineFactory(); ObjectCreationFactory driverFactoryFactory = new StageDriverFactoryFactory(); //rules to create pipeline digester.addFactoryCreate("pipeline", pipelineFactory); digester.addSetProperties("pipeline"); digester.addRule("pipeline", new PipelineDriverFactoriesRule()); //these rules are used to add branches to the main pipeline digester.addFactoryCreate("*/branch/pipeline", pipelineFactory); digester.addRule("*/branch/pipeline", new CallMethodRule(1, "addBranch", 2, addBranchTypes)); digester.addCallParam("*/branch/pipeline", 0, "key"); digester.addCallParam("*/branch/pipeline", 1, 0); digester.addRule("*/branch/pipeline", new PipelineDriverFactoriesRule()); //rules for adding values to the global pipeline environment digester.addObjectCreate("*/pipeline/env/object", "java.lang.Object", "className"); digester.addSetProperties("*/pipeline/env/object"); digester.addRule("*/pipeline/env/object", new CallMethodRule(1, "setEnv", 2, setEnvTypes)); digester.addCallParam("*/pipeline/env/object", 0, "key"); digester.addCallParam("*/pipeline/env/object", 1, 0); digester.addRule("*/pipeline/env/value", new CallMethodRule(0, "setEnv", 2, setEnvTypes)); digester.addCallParam("*/pipeline/env/value", 0, "key"); digester.addCallParam("*/pipeline/env/value", 1); //this rule is intended to be used to add a StageEventListener to the pipeline. digester.addObjectCreate("*/pipeline/listener", "org.apache.commons.pipeline.StageEventListener", "className"); digester.addSetProperties("*/pipeline/listener"); digester.addSetNext("*/pipeline/listener", "registerListener", "org.apache.commons.pipeline.StageEventListener"); //this rule is intended to be used to add a StageDriverFactory to the pipeline. digester.addFactoryCreate("*/pipeline/driverFactory", driverFactoryFactory); digester.addSetProperties("*/pipeline/driverFactory"); digester.addObjectCreate("*/driverFactory", "org.apache.commons.pipeline.StageDriverFactory", "className"); digester.addSetProperties("*/driverFactory"); digester.addSetNext("*/driverFactory", "setWrappedSDF", "org.apache.commons.pipeline.StageDriverFactory"); //rules for setting an object property on the next-to-top object on the stack //similar to setNestedPropertiesRule digester.addObjectCreate("*/property", "java.lang.Object", "className"); digester.addSetProperties("*/property"); digester.addRule("*/property", new SetNestedPropertyObjectRule()); //this rule is intended to be used to add a stage to a pipeline digester.addObjectCreate("*/pipeline/stage", "org.apache.commons.pipeline.BaseStage", "className"); digester.addSetProperties("*/pipeline/stage"); digester.addRule("*/pipeline/stage", new PipelineAddStageRule()); //rule for feeding a string value to the source feeder digester.addRule("*/pipeline/feed/value", new PipelineFeedValueRule()); //rules for enqueueing an object digester.addObjectCreate("*/pipeline/feed/object", "java.lang.Object", "className"); digester.addSetProperties("*/pipeline/feed/object"); digester.addRule("*/pipeline/feed/object", new PipelineFeedObjectRule()); // add system property substitution enableDigesterSubstitutor(digester); }
- This factory is used to create a pipeline. If the "configURI" parameter
- is specified, the pipeline is created based upon the configuration file
- located at that URI.
private class PipelineFactory extends AbstractObjectCreationFactory {
public Object createObject(Attributes attributes) throws java.lang.Exception {
String configURI = attributes.getValue("configURI");
if (configURI == null) { return new Pipeline(); }else {
{ subDigester.addRuleSet(ruleset); }
Digester subDigester = new Digester();
if (nestedRuleSets != null) {
for (RuleSet ruleset : nestedRuleSets)
Pipeline pipeline = (Pipeline) subDigester.parse(configURI);
return pipeline;
} else
- Configure the storage for the map of driver factories for the pipeline.
private class PipelineDriverFactoriesRule extends Rule {
public void begin(String namespace, String name, Attributes attributes) throws Exception { digester.push("org.apache.commons.pipeline.config.DriverFactories", new HashMap<String, StageDriverFactory>()); super.begin(namespace, name, attributes); }
public void end(String namespace, String name) throws Exception
{ super.end(namespace, name); digester.pop("org.apache.commons.pipeline.config.DriverFactories"); }}
- Configure the storage for the map of driver factories for the pipeline.
private class SetNestedPropertyObjectRule extends Rule {
String propName;
public void begin(String namespace, String name, Attributes attributes) throws Exception
{ propName = attributes.getValue("propName"); super.begin(namespace, name, attributes); }public void end(String namespace, String name) throws Exception
{ super.end(namespace, name); BeanUtils.setProperty(digester.peek(1), propName, digester.peek()); }}
- This ObjectCreationFactory creates a stage driver factory and stores
- it in the scope of the rule set so that it can be retrieved by the stage
- creation rule.
private class StageDriverFactoryFactory extends AbstractObjectCreationFactory {
public Object createObject(Attributes attributes) throws Exception {
Map<String, StageDriverFactory> driverFactories =
(Map<String,StageDriverFactory>) digester.peek("org.apache.commons.pipeline.config.DriverFactories");
String className = attributes.getValue("className");
String id = attributes.getValue("id");
Class clazz = Class.forName(className);
if (!StageDriverFactory.class.isAssignableFrom(clazz))
{ StageDriverFactory factory = (StageDriverFactory) clazz.newInstance(); driverFactories.put(id, factory); return factory; } }
- This Rule adds a stage to the pipeline using the factory specified
- by the driverFactoryId attribute.
private class PipelineAddStageRule extends Rule {
public void begin(String namespace, String name, Attributes attributes) throws Exception { digester.push("org.apache.commons.pipeline.config.DriverFactoryIds", attributes.getValue("driverFactoryId")); super.begin(namespace, name, attributes); }
public void end(String namespace, String name) throws Exception
{ super.end(namespace, name); String factoryId = (String) digester.pop("org.apache.commons.pipeline.config.DriverFactoryIds"); Map<String, StageDriverFactory> driverFactories = (Map<String,StageDriverFactory>) digester.peek("org.apache.commons.pipeline.config.DriverFactories"); StageDriverFactory factory = driverFactories.get(factoryId); Stage stage = (Stage) digester.peek(); Pipeline pipeline = (Pipeline) digester.peek(1); pipeline.addStage(stage, factory); }}
- This Rule allows an object to be fed to the pipeline.
private class PipelineFeedValueRule extends RuleUnknown macro: { public void body(String namespace, String name, String text) throws Exception { Pipeline pipeline = (Pipeline) digester.peek(); pipeline.getSourceFeeder().feed(text); super.body(namespace, name, text); } }
- This Rule allows an object to be fed to the pipeline.
private class PipelineFeedObjectRule extends RuleUnknown macro: { public void end(String namespace, String name) throws Exception { super.end(namespace, name); Pipeline pipeline = (Pipeline) digester.peek(1); pipeline.getSourceFeeder().feed(digester.peek()); } }
- Adds a substitutor to interpolate system properties
* - @param digester The digester to which we add the substitutor
protected void enableDigesterSubstitutor(Digester digester) { MultiVariableExpander expander = new MultiVariableExpander(); expander.addSource("$", System.getProperties()); // allow expansion in both xml attributes and element text Substitutor substitutor = new VariableSubstitutor(expander); digester.setSubstitutor(substitutor); }}