Details

    • Type: Improvement
    • Status: Patch Available
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Extensions
    • Labels:
      None

      Description

      Creation of a new processor that reads Change Data Capture details from Microsoft SQL Server and outputs the changes a Records.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157258895

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java —
          @@ -0,0 +1,387 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql.processors;
          +
          +import org.apache.commons.lang3.StringUtils;
          +import org.apache.nifi.annotation.behavior.DynamicProperty;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.Stateful;
          +import org.apache.nifi.annotation.behavior.TriggerSerially;
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.state.Scope;
          +import org.apache.nifi.components.state.StateManager;
          +import org.apache.nifi.components.state.StateMap;
          +import org.apache.nifi.dbcp.DBCPService;
          +import org.apache.nifi.expression.AttributeExpression;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.behavior.WritesAttributes;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
          +import org.apache.nifi.processor.ProcessSessionFactory;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.schema.access.SchemaNotFoundException;
          +import org.apache.nifi.serialization.RecordSetWriter;
          +import org.apache.nifi.serialization.RecordSetWriterFactory;
          +import org.apache.nifi.serialization.WriteResult;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +import org.apache.nifi.serialization.record.ResultSetRecordSet;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.sql.Connection;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Timestamp;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.ConcurrentHashMap;
          +import java.util.concurrent.atomic.AtomicLong;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +@TriggerSerially
          +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
          +@Tags(

          {"sql", "jdbc", "cdc", "mssql"}

          )
          +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
          + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
          +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
          + + "that it can continue from the same point in time if restarted.")
          +@WritesAttributes(

          { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")}

          )
          +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
          + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.

          {table_name}

          `, one for each table. "
          + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
          +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
          + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
          +
          + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
          + .name("record-writer")
          + .displayName("Record Writer")
          + .description("Specifies the Controller Service to use for writing out the records")
          + .identifiesControllerService(RecordSetWriterFactory.class)
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
          + .name("cdcmssql-dbcp-service")
          + .displayName("Database Connection Pooling Service")
          + .description("The Controller Service that is used to obtain connection to database")
          + .required(true)
          + .identifiesControllerService(DBCPService.class)
          + .build();
          +
          + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
          + .name("cdcmssql-cdc-table-list")
          + .displayName("CDC Table List")
          + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
          + + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")
          + .required(false)
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
          + .name("cdcmssql-initial-snapshot")
          + .displayName("Generate an Initial Source Table Snapshot")
          + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
          + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
          + + "for extracting CDC changes.")
          + .allowableValues("true", "false")
          + .defaultValue("false")
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor
          + .Builder().name("cdcmssql-full-snapshot-row-limit")
          + .displayName("Change Set Row Limit")
          + .description("If a very large change occurs on the source table, "
          + + "the generated change set may be too large too quickly merge into a destination system. "
          + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
          + + "The fullsnapshot attribute will be set to true when this happens.")
          + .required(true)
          + .defaultValue("0")
          + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("Successfully created FlowFile from SQL query result set.")
          + .build();
          +
          + protected List<PropertyDescriptor> descriptors;
          + protected Set<Relationship> relationships;
          +
          + protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000);
          +
          + // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
          + protected Map<String,String> maxValueProperties;
          + protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils();
          +
          + public MSSQLCDCUtils getMssqlcdcUtils()

          { + return mssqlcdcUtils; + }

          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(RECORD_WRITER); + descriptors.add(DBCP_SERVICE); + descriptors.add(CDC_TABLES); + descriptors.add(TAKE_INITIAL_SNAPSHOT); + descriptors.add(FULL_SNAPSHOT_ROW_LIMIT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return this.relationships; + }

          +
          + @Override
          + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
          + if(!propertyDescriptorName.startsWith("initial.timestamp."))

          { + return null; + }

          +
          + return new PropertyDescriptor.Builder()
          + .name(propertyDescriptorName)
          — End diff –

          Nitpick to add displayName() as well, I know they're the same but this way it won't show up in any searches for PropertyDescriptors that have name() but not displayName()

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157258895 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java — @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags( {"sql", "jdbc", "cdc", "mssql"} ) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " + + "that it can continue from the same point in time if restarted.") +@WritesAttributes( { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")} ) +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp. {table_name} `, one for each table. " + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.") +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor { + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp."; + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("cdcmssql-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder() + .name("cdcmssql-cdc-table-list") + .displayName("CDC Table List") + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables " + + "are specified the [cdc] . [change_tables] table is queried for all of the available tables with change tracking enabled in the database.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder() + .name("cdcmssql-initial-snapshot") + .displayName("Generate an Initial Source Table Snapshot") + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the " + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point " + + "for extracting CDC changes.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor + .Builder().name("cdcmssql-full-snapshot-row-limit") + .displayName("Change Set Row Limit") + .description("If a very large change occurs on the source table, " + + "the generated change set may be too large too quickly merge into a destination system. " + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. " + + "The fullsnapshot attribute will be set to true when this happens.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + + protected List<PropertyDescriptor> descriptors; + protected Set<Relationship> relationships; + + protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000); + + // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time + protected Map<String,String> maxValueProperties; + protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + public MSSQLCDCUtils getMssqlcdcUtils() { + return mssqlcdcUtils; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(RECORD_WRITER); + descriptors.add(DBCP_SERVICE); + descriptors.add(CDC_TABLES); + descriptors.add(TAKE_INITIAL_SNAPSHOT); + descriptors.add(FULL_SNAPSHOT_ROW_LIMIT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + if(!propertyDescriptorName.startsWith("initial.timestamp.")) { + return null; + } + + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) — End diff – Nitpick to add displayName() as well, I know they're the same but this way it won't show up in any searches for PropertyDescriptors that have name() but not displayName()
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157258675

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java —
          @@ -0,0 +1,387 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql.processors;
          +
          +import org.apache.commons.lang3.StringUtils;
          +import org.apache.nifi.annotation.behavior.DynamicProperty;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.Stateful;
          +import org.apache.nifi.annotation.behavior.TriggerSerially;
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.state.Scope;
          +import org.apache.nifi.components.state.StateManager;
          +import org.apache.nifi.components.state.StateMap;
          +import org.apache.nifi.dbcp.DBCPService;
          +import org.apache.nifi.expression.AttributeExpression;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.behavior.WritesAttributes;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
          +import org.apache.nifi.processor.ProcessSessionFactory;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.schema.access.SchemaNotFoundException;
          +import org.apache.nifi.serialization.RecordSetWriter;
          +import org.apache.nifi.serialization.RecordSetWriterFactory;
          +import org.apache.nifi.serialization.WriteResult;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +import org.apache.nifi.serialization.record.ResultSetRecordSet;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.sql.Connection;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Timestamp;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.ConcurrentHashMap;
          +import java.util.concurrent.atomic.AtomicLong;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +@TriggerSerially
          +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
          +@Tags(

          {"sql", "jdbc", "cdc", "mssql"}

          )
          +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
          + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
          +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
          + + "that it can continue from the same point in time if restarted.")
          +@WritesAttributes(

          { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")}

          )
          +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
          + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.

          {table_name}

          `, one for each table. "
          + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
          +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
          + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
          +
          + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
          + .name("record-writer")
          + .displayName("Record Writer")
          + .description("Specifies the Controller Service to use for writing out the records")
          + .identifiesControllerService(RecordSetWriterFactory.class)
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
          + .name("cdcmssql-dbcp-service")
          + .displayName("Database Connection Pooling Service")
          + .description("The Controller Service that is used to obtain connection to database")
          + .required(true)
          + .identifiesControllerService(DBCPService.class)
          + .build();
          +
          + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
          + .name("cdcmssql-cdc-table-list")
          + .displayName("CDC Table List")
          + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
          + + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")
          + .required(false)
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
          + .name("cdcmssql-initial-snapshot")
          + .displayName("Generate an Initial Source Table Snapshot")
          + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
          + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
          + + "for extracting CDC changes.")
          + .allowableValues("true", "false")
          + .defaultValue("false")
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor
          + .Builder().name("cdcmssql-full-snapshot-row-limit")
          + .displayName("Change Set Row Limit")
          + .description("If a very large change occurs on the source table, "
          + + "the generated change set may be too large too quickly merge into a destination system. "
          + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
          + + "The fullsnapshot attribute will be set to true when this happens.")
          + .required(true)
          + .defaultValue("0")
          + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
          — End diff –

          Consider adding Expression Language support here, the limit might be different between dev and production systems for example. Also there's a typo in the description, should read "too large to quickly merge", and the term "change set" or "changeset" should be used consistently (I'm fine with either) throughout the doc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157258675 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java — @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags( {"sql", "jdbc", "cdc", "mssql"} ) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " + + "that it can continue from the same point in time if restarted.") +@WritesAttributes( { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")} ) +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp. {table_name} `, one for each table. " + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.") +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor { + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp."; + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("cdcmssql-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder() + .name("cdcmssql-cdc-table-list") + .displayName("CDC Table List") + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables " + + "are specified the [cdc] . [change_tables] table is queried for all of the available tables with change tracking enabled in the database.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder() + .name("cdcmssql-initial-snapshot") + .displayName("Generate an Initial Source Table Snapshot") + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the " + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point " + + "for extracting CDC changes.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor + .Builder().name("cdcmssql-full-snapshot-row-limit") + .displayName("Change Set Row Limit") + .description("If a very large change occurs on the source table, " + + "the generated change set may be too large too quickly merge into a destination system. " + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. " + + "The fullsnapshot attribute will be set to true when this happens.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) — End diff – Consider adding Expression Language support here, the limit might be different between dev and production systems for example. Also there's a typo in the description, should read "too large to quickly merge", and the term "change set" or "changeset" should be used consistently (I'm fine with either) throughout the doc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157260327

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java —
          @@ -0,0 +1,792 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql;
          +
          +import org.apache.nifi.annotation.behavior.Stateful;
          +import org.apache.nifi.cdc.event.ColumnDefinition;
          +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL;
          +import org.apache.nifi.components.state.Scope;
          +import org.apache.nifi.components.state.StateMap;
          +import org.apache.nifi.controller.AbstractControllerService;
          +import org.apache.nifi.dbcp.DBCPService;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.reporting.InitializationException;
          +import org.apache.nifi.serialization.record.MockRecordWriter;
          +import org.apache.nifi.util.MockFlowFile;
          +import org.apache.nifi.util.TestRunner;
          +import org.apache.nifi.util.TestRunners;
          +import org.apache.nifi.util.file.FileUtils;
          +import org.junit.After;
          +import org.junit.AfterClass;
          +import org.junit.Assert;
          +import org.junit.Before;
          +import org.junit.BeforeClass;
          +import org.junit.Test;
          +
          +import java.io.File;
          +import java.io.IOException;
          +import java.sql.Connection;
          +import java.sql.DriverManager;
          +import java.sql.SQLException;
          +import java.sql.SQLNonTransientConnectionException;
          +import java.sql.Statement;
          +import java.sql.Timestamp;
          +import java.sql.Types;
          +import java.util.ArrayList;
          +import java.util.HashMap;
          +import java.util.List;
          +import java.util.Map;
          +
          +public class CaptureChangeMSSQLTest {
          +
          + private TestRunner runner;
          + private MockCaptureChangeMSSQL processor;
          + private final static String DB_LOCATION = "target/db_qdt";
          +
          +
          + @BeforeClass
          + public static void setupBeforeClass() throws IOException, SQLException {
          + System.setProperty("derby.stream.error.file", "target/derby.log");
          +
          + // remove previous test database, if any
          + final File dbLocation = new File(DB_LOCATION);
          + try

          { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + }
          +
          + // load CDC schema to database
          + final DBCPService dbcp = new DBCPServiceSimpleImpl();
          + final Connection con = dbcp.getConnection();
          + Statement stmt = con.createStatement();
          +
          + stmt.execute("CREATE TABLE cdc.change_tables(\n" +
          + "object_id int,\n" +
          + //These four columns are computed from object_id/source_object_id in MS SQL, but for testing we put them as strings
          + "schemaName varchar(128),\n" +
          + "tableName varchar(128),\n" +
          + "sourceSchemaName varchar(128),\n" +
          + "sourceTableName varchar(128),\n" +
          +
          + "version int,\n" +
          + "capture_instance varchar(128),\n" +
          + "start_lsn int,\n" +
          + "end_lsn int,\n" +
          + "supports_net_changes BOOLEAN,\n" +
          + "has_drop_pending BOOLEAN,\n" +
          + "role_name varchar(128),\n" +
          + "index_name varchar(128),\n" +
          + "filegroup_name varchar(128),\n" +
          + "create_date TIMESTAMP,\n" +
          + "partition_switch BOOLEAN)");
          +
          + stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" +
          + "start_lsn int,\n" +
          + "tran_begin_time TIMESTAMP,\n" +
          + "tran_end_time TIMESTAMP,\n" +
          + "tran_id int,\n" +
          + "tran_begin_lsn int)");
          +
          + stmt.execute("CREATE TABLE cdc.index_columns(\n" +
          + "object_id int,\n" +
          + "column_name varchar(128),\n" +
          + "index_ordinal int,\n" +
          + "column_id int)");
          +
          + stmt.execute("CREATE TABLE cdc.captured_columns(\n" +
          + "object_id int,\n" +
          + "column_name varchar(128),\n" +
          + "column_id int,\n" +
          + "column_type varchar(128),\n" +
          + "column_ordinal int,\n" +
          + "is_computed BOOLEAN)");
          + }
          +
          + @AfterClass
          + public static void cleanUpAfterClass() throws Exception {
          + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + }
          + // remove previous test database, if any
          + final File dbLocation = new File(DB_LOCATION);
          + try { + FileUtils.deleteFile(dbLocation, true); + }

          catch (IOException ioe)

          { + // Do nothing, may not have existed + }

          + }
          +
          + @Before
          + public void setup() throws InitializationException, IOException, SQLException

          { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map<String, String> dbcpProperties = new HashMap<>(); + + processor = new MockCaptureChangeMSSQL(); + runner = TestRunners.newTestRunner(processor); + + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(CaptureChangeMSSQL.DBCP_SERVICE, "dbcp"); + + final MockRecordWriter writerService = new MockRecordWriter(null, false); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + runner.setProperty(CaptureChangeMSSQL.RECORD_WRITER, "writer"); + + runner.getStateManager().clear(Scope.CLUSTER); + }

          +
          + @After
          + public void teardown() throws IOException

          { + runner.getStateManager().clear(Scope.CLUSTER); + runner = null; + }

          +
          + @Test
          + public void testSelectGenerator()

          { + MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + List<ColumnDefinition> columns = new ArrayList<>(); + columns.add(new MSSQLColumnDefinition(Types.INTEGER, "ID", 1, true)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "LastName", 2, false)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "FirstName", 3, false)); + + MSSQLTableInfo ti = new MSSQLTableInfo("NiFi", "cdc", "Names", + "dbo", "dbo_Names_CT", 1000L, columns); + + String noMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, null); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTime); + + String withMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, new Timestamp(0)); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "WHERE t.tran_end_time > ?\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTime); + }

          +
          + @Test
          + public void testRetrieveAllChanges() throws SQLException, IOException {
          + setupNamesTable();
          +
          — End diff –

          Line 215 is not blank, so it throws a CheckStyle violation

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157260327 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java — @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CaptureChangeMSSQLTest { + + private TestRunner runner; + private MockCaptureChangeMSSQL processor; + private final static String DB_LOCATION = "target/db_qdt"; + + + @BeforeClass + public static void setupBeforeClass() throws IOException, SQLException { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + + // load CDC schema to database + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Connection con = dbcp.getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("CREATE TABLE cdc.change_tables(\n" + + "object_id int,\n" + + //These four columns are computed from object_id/source_object_id in MS SQL, but for testing we put them as strings + "schemaName varchar(128),\n" + + "tableName varchar(128),\n" + + "sourceSchemaName varchar(128),\n" + + "sourceTableName varchar(128),\n" + + + "version int,\n" + + "capture_instance varchar(128),\n" + + "start_lsn int,\n" + + "end_lsn int,\n" + + "supports_net_changes BOOLEAN,\n" + + "has_drop_pending BOOLEAN,\n" + + "role_name varchar(128),\n" + + "index_name varchar(128),\n" + + "filegroup_name varchar(128),\n" + + "create_date TIMESTAMP,\n" + + "partition_switch BOOLEAN)"); + + stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" + + "start_lsn int,\n" + + "tran_begin_time TIMESTAMP,\n" + + "tran_end_time TIMESTAMP,\n" + + "tran_id int,\n" + + "tran_begin_lsn int)"); + + stmt.execute("CREATE TABLE cdc.index_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "index_ordinal int,\n" + + "column_id int)"); + + stmt.execute("CREATE TABLE cdc.captured_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "column_id int,\n" + + "column_type varchar(128),\n" + + "column_ordinal int,\n" + + "is_computed BOOLEAN)"); + } + + @AfterClass + public static void cleanUpAfterClass() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @Before + public void setup() throws InitializationException, IOException, SQLException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map<String, String> dbcpProperties = new HashMap<>(); + + processor = new MockCaptureChangeMSSQL(); + runner = TestRunners.newTestRunner(processor); + + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(CaptureChangeMSSQL.DBCP_SERVICE, "dbcp"); + + final MockRecordWriter writerService = new MockRecordWriter(null, false); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + runner.setProperty(CaptureChangeMSSQL.RECORD_WRITER, "writer"); + + runner.getStateManager().clear(Scope.CLUSTER); + } + + @After + public void teardown() throws IOException { + runner.getStateManager().clear(Scope.CLUSTER); + runner = null; + } + + @Test + public void testSelectGenerator() { + MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + List<ColumnDefinition> columns = new ArrayList<>(); + columns.add(new MSSQLColumnDefinition(Types.INTEGER, "ID", 1, true)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "LastName", 2, false)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "FirstName", 3, false)); + + MSSQLTableInfo ti = new MSSQLTableInfo("NiFi", "cdc", "Names", + "dbo", "dbo_Names_CT", 1000L, columns); + + String noMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, null); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTime); + + String withMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, new Timestamp(0)); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "WHERE t.tran_end_time > ?\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTime); + } + + @Test + public void testRetrieveAllChanges() throws SQLException, IOException { + setupNamesTable(); + — End diff – Line 215 is not blank, so it throws a CheckStyle violation
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157257938

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java —
          @@ -0,0 +1,387 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql.processors;
          +
          +import org.apache.commons.lang3.StringUtils;
          +import org.apache.nifi.annotation.behavior.DynamicProperty;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.Stateful;
          +import org.apache.nifi.annotation.behavior.TriggerSerially;
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.state.Scope;
          +import org.apache.nifi.components.state.StateManager;
          +import org.apache.nifi.components.state.StateMap;
          +import org.apache.nifi.dbcp.DBCPService;
          +import org.apache.nifi.expression.AttributeExpression;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.behavior.WritesAttributes;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
          +import org.apache.nifi.processor.ProcessSessionFactory;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.schema.access.SchemaNotFoundException;
          +import org.apache.nifi.serialization.RecordSetWriter;
          +import org.apache.nifi.serialization.RecordSetWriterFactory;
          +import org.apache.nifi.serialization.WriteResult;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +import org.apache.nifi.serialization.record.ResultSetRecordSet;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.sql.Connection;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Timestamp;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.ConcurrentHashMap;
          +import java.util.concurrent.atomic.AtomicLong;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +@TriggerSerially
          +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
          +@Tags(

          {"sql", "jdbc", "cdc", "mssql"}

          )
          +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
          + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
          +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
          + + "that it can continue from the same point in time if restarted.")
          +@WritesAttributes(

          { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")}

          )
          +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
          + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.

          {table_name}

          `, one for each table. "
          + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
          +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
          + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
          +
          + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
          + .name("record-writer")
          — End diff –

          Very awesome that you are using the Record API here!

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157257938 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java — @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags( {"sql", "jdbc", "cdc", "mssql"} ) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " + + "that it can continue from the same point in time if restarted.") +@WritesAttributes( { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")} ) +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp. {table_name} `, one for each table. " + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.") +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor { + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp."; + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") — End diff – Very awesome that you are using the Record API here!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157259941

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java —
          @@ -0,0 +1,387 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql.processors;
          +
          +import org.apache.commons.lang3.StringUtils;
          +import org.apache.nifi.annotation.behavior.DynamicProperty;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.Stateful;
          +import org.apache.nifi.annotation.behavior.TriggerSerially;
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.state.Scope;
          +import org.apache.nifi.components.state.StateManager;
          +import org.apache.nifi.components.state.StateMap;
          +import org.apache.nifi.dbcp.DBCPService;
          +import org.apache.nifi.expression.AttributeExpression;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.behavior.WritesAttributes;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
          +import org.apache.nifi.processor.ProcessSessionFactory;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.schema.access.SchemaNotFoundException;
          +import org.apache.nifi.serialization.RecordSetWriter;
          +import org.apache.nifi.serialization.RecordSetWriterFactory;
          +import org.apache.nifi.serialization.WriteResult;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +import org.apache.nifi.serialization.record.ResultSetRecordSet;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.sql.Connection;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Timestamp;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.ConcurrentHashMap;
          +import java.util.concurrent.atomic.AtomicLong;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +@TriggerSerially
          +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
          +@Tags(

          {"sql", "jdbc", "cdc", "mssql"}

          )
          +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
          + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
          — End diff –

          Should probably mention here that in a cluster, this processor is recommended to be run on the Primary Node only.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157259941 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java — @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags( {"sql", "jdbc", "cdc", "mssql"} ) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") — End diff – Should probably mention here that in a cluster, this processor is recommended to be run on the Primary Node only.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157259563

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java —
          @@ -0,0 +1,387 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql.processors;
          +
          +import org.apache.commons.lang3.StringUtils;
          +import org.apache.nifi.annotation.behavior.DynamicProperty;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.Stateful;
          +import org.apache.nifi.annotation.behavior.TriggerSerially;
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.state.Scope;
          +import org.apache.nifi.components.state.StateManager;
          +import org.apache.nifi.components.state.StateMap;
          +import org.apache.nifi.dbcp.DBCPService;
          +import org.apache.nifi.expression.AttributeExpression;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.behavior.WritesAttributes;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
          +import org.apache.nifi.processor.ProcessSessionFactory;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.schema.access.SchemaNotFoundException;
          +import org.apache.nifi.serialization.RecordSetWriter;
          +import org.apache.nifi.serialization.RecordSetWriterFactory;
          +import org.apache.nifi.serialization.WriteResult;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +import org.apache.nifi.serialization.record.ResultSetRecordSet;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.sql.Connection;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Timestamp;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.ConcurrentHashMap;
          +import java.util.concurrent.atomic.AtomicLong;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +@TriggerSerially
          +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
          +@Tags(

          {"sql", "jdbc", "cdc", "mssql"}

          )
          +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
          + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
          +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
          + + "that it can continue from the same point in time if restarted.")
          +@WritesAttributes(

          { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")}

          )
          +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
          + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.

          {table_name}

          `, one for each table. "
          + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
          +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
          + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
          +
          + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
          + .name("record-writer")
          + .displayName("Record Writer")
          + .description("Specifies the Controller Service to use for writing out the records")
          + .identifiesControllerService(RecordSetWriterFactory.class)
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
          + .name("cdcmssql-dbcp-service")
          + .displayName("Database Connection Pooling Service")
          + .description("The Controller Service that is used to obtain connection to database")
          + .required(true)
          + .identifiesControllerService(DBCPService.class)
          + .build();
          +
          + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
          + .name("cdcmssql-cdc-table-list")
          + .displayName("CDC Table List")
          + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
          + + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")
          + .required(false)
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
          + .name("cdcmssql-initial-snapshot")
          + .displayName("Generate an Initial Source Table Snapshot")
          + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
          + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
          + + "for extracting CDC changes.")
          + .allowableValues("true", "false")
          + .defaultValue("false")
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor
          + .Builder().name("cdcmssql-full-snapshot-row-limit")
          + .displayName("Change Set Row Limit")
          + .description("If a very large change occurs on the source table, "
          + + "the generated change set may be too large too quickly merge into a destination system. "
          + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
          + + "The fullsnapshot attribute will be set to true when this happens.")
          + .required(true)
          + .defaultValue("0")
          + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("Successfully created FlowFile from SQL query result set.")
          + .build();
          +
          + protected List<PropertyDescriptor> descriptors;
          + protected Set<Relationship> relationships;
          +
          + protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000);
          +
          + // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
          + protected Map<String,String> maxValueProperties;
          + protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils();
          +
          + public MSSQLCDCUtils getMssqlcdcUtils()

          { + return mssqlcdcUtils; + }

          +
          + @Override
          + protected void init(final ProcessorInitializationContext context)

          { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(RECORD_WRITER); + descriptors.add(DBCP_SERVICE); + descriptors.add(CDC_TABLES); + descriptors.add(TAKE_INITIAL_SNAPSHOT); + descriptors.add(FULL_SNAPSHOT_ROW_LIMIT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + }

          +
          + @Override
          + public Set<Relationship> getRelationships()

          { + return this.relationships; + }

          +
          + @Override
          + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
          + if(!propertyDescriptorName.startsWith("initial.timestamp."))

          { + return null; + }

          +
          + return new PropertyDescriptor.Builder()
          + .name(propertyDescriptorName)
          + .required(false)
          + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
          + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
          + .expressionLanguageSupported(false)
          + .dynamic(true)
          + .build();
          + }
          +
          + @Override
          + public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
          + ProcessSession session = processSessionFactory.createSession();
          +
          + final ComponentLog logger = getLogger();
          + final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
          + final DBCPService dbcpService = processContext.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
          +
          + final boolean takeInitialSnapshot = processContext.getProperty(TAKE_INITIAL_SNAPSHOT).asBoolean();
          + final int fullSnapshotRowLimit = processContext.getProperty(FULL_SNAPSHOT_ROW_LIMIT).asInteger();
          — End diff –

          Would have to add ".evaluateAttributeExpressions()" here if you add EL support to this field (see comment above)

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157259563 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java — @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags( {"sql", "jdbc", "cdc", "mssql"} ) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " + + "that it can continue from the same point in time if restarted.") +@WritesAttributes( { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")} ) +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp. {table_name} `, one for each table. " + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.") +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor { + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp."; + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("cdcmssql-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder() + .name("cdcmssql-cdc-table-list") + .displayName("CDC Table List") + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables " + + "are specified the [cdc] . [change_tables] table is queried for all of the available tables with change tracking enabled in the database.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder() + .name("cdcmssql-initial-snapshot") + .displayName("Generate an Initial Source Table Snapshot") + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the " + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point " + + "for extracting CDC changes.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor + .Builder().name("cdcmssql-full-snapshot-row-limit") + .displayName("Change Set Row Limit") + .description("If a very large change occurs on the source table, " + + "the generated change set may be too large too quickly merge into a destination system. " + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. " + + "The fullsnapshot attribute will be set to true when this happens.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + + protected List<PropertyDescriptor> descriptors; + protected Set<Relationship> relationships; + + protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000); + + // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time + protected Map<String,String> maxValueProperties; + protected MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + public MSSQLCDCUtils getMssqlcdcUtils() { + return mssqlcdcUtils; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(RECORD_WRITER); + descriptors.add(DBCP_SERVICE); + descriptors.add(CDC_TABLES); + descriptors.add(TAKE_INITIAL_SNAPSHOT); + descriptors.add(FULL_SNAPSHOT_ROW_LIMIT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + if(!propertyDescriptorName.startsWith("initial.timestamp.")) { + return null; + } + + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException { + ProcessSession session = processSessionFactory.createSession(); + + final ComponentLog logger = getLogger(); + final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final DBCPService dbcpService = processContext.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + + final boolean takeInitialSnapshot = processContext.getProperty(TAKE_INITIAL_SNAPSHOT).asBoolean(); + final int fullSnapshotRowLimit = processContext.getProperty(FULL_SNAPSHOT_ROW_LIMIT).asInteger(); — End diff – Would have to add ".evaluateAttributeExpressions()" here if you add EL support to this field (see comment above)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157256910

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java —
          @@ -0,0 +1,264 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql;
          +
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.event.ColumnDefinition;
          +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +
          +import java.sql.Connection;
          +import java.sql.Timestamp;
          +import java.sql.Types;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Statement;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +public class MSSQLCDCUtils {
          — End diff –

          I didn't see any (non-constant) member variables, seems like all the variables and methods could be static?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157256910 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java — @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; + +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.Types; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLCDCUtils { — End diff – I didn't see any (non-constant) member variables, seems like all the variables and methods could be static?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157257350

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java —
          @@ -0,0 +1,264 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql;
          +
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.event.ColumnDefinition;
          +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +
          +import java.sql.Connection;
          +import java.sql.Timestamp;
          +import java.sql.Types;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Statement;
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +public class MSSQLCDCUtils {
          + private static final String _columnSplit = "\n,";
          +
          + final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" +
          + " DB_NAME() AS [databaseName], \n" +
          + " SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS [schemaName], \n" +
          + " OBJECT_NAME(object_id) AS [tableName], \n" +
          + " SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId')) AS [sourceSchemaName],\n" +
          + " OBJECT_NAME(source_object_id) AS [sourceTableName] \n" +
          + "FROM [cdc].[change_tables]";
          +
          + final String LIST_TABLE_COLUMNS = "select cc.object_id\n" +
          + ",cc.column_name\n" +
          + ",cc.column_id\n" +
          + ",cc.column_type\n" +
          + ",cc.column_ordinal\n" +
          + ",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" +
          + "FROM cdc.captured_columns cc\n" +
          + "LEFT OUTER JOIN cdc.index_columns ic ON \n" +
          + "(ic.object_id = cc.object_id AND ic.column_name = cc.column_name)\n" +
          + "where cc.object_id=?\n" +
          + "ORDER BY cc.column_ordinal";
          +
          + public String getLIST_CHANGE_TRACKING_TABLES_SQL()

          { + return LIST_CHANGE_TRACKING_TABLES_SQL; + }

          +
          + public String getLIST_TABLE_COLUMNS()

          { + return LIST_TABLE_COLUMNS; + }

          +
          + public String getCURRENT_TIMESTAMP()

          { + return "CURRENT_TIMESTAMP"; + }

          +
          + public List<MSSQLTableInfo> getCDCTableList(Connection con) throws SQLException, CDCException {
          + ArrayList<MSSQLTableInfo> cdcTables = new ArrayList<>();
          +
          + try(final Statement st = con.createStatement()){
          + final ResultSet resultSet = st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL());
          +
          + while (resultSet.next())

          { + int objectId = resultSet.getInt("object_id"); + String databaseName = resultSet.getString("databaseName"); + String schemaName = resultSet.getString("schemaName"); + String tableName = resultSet.getString("tableName"); + String sourceSchemaName = resultSet.getString("sourceSchemaName"); + String sourceTableName = resultSet.getString("sourceTableName"); + + MSSQLTableInfo ti = new MSSQLTableInfo(databaseName, schemaName, tableName, sourceSchemaName, sourceTableName, Integer.toUnsignedLong(objectId), null); + cdcTables.add(ti); + }

          +
          + for (MSSQLTableInfo ti:cdcTables)

          { + List<ColumnDefinition> tableColums = getCaptureColumns(con, ti.getTableId()); + + ti.setColumns(tableColums); + }

          + }
          +
          + return cdcTables;
          + }
          +
          + public List<ColumnDefinition> getCaptureColumns(Connection con, long objectId) throws SQLException, CDCException {
          + ArrayList<ColumnDefinition> tableColumns = new ArrayList<>();
          + try(final PreparedStatement st = con.prepareStatement(getLIST_TABLE_COLUMNS())){
          + st.setLong(1, objectId);
          +
          + final ResultSet resultSet = st.executeQuery();
          + while (resultSet.next())

          { + String columnName = resultSet.getString("column_name"); + int columnId = resultSet.getInt("column_id"); + String columnType = resultSet.getString("column_type"); + int columnOrdinal = resultSet.getInt("column_ordinal"); + int isColumnKey = resultSet.getInt("key"); + + int jdbcType = TranslateMSSQLTypeToJDBCTypes(columnType); + + //get column list + MSSQLColumnDefinition col = new MSSQLColumnDefinition(jdbcType, columnName, columnOrdinal, isColumnKey==1); + tableColumns.add(col); + }

          + } catch (SQLException e) {
          — End diff –

          You don't really need this catch block to rethrow the same exception, maybe cast it to CDCException or replace the catch block with an empty finally block?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157257350 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java — @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; + +import java.sql.Connection; +import java.sql.Timestamp; +import java.sql.Types; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLCDCUtils { + private static final String _columnSplit = "\n,"; + + final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" + + " DB_NAME() AS [databaseName] , \n" + + " SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS [schemaName] , \n" + + " OBJECT_NAME(object_id) AS [tableName] , \n" + + " SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId')) AS [sourceSchemaName] ,\n" + + " OBJECT_NAME(source_object_id) AS [sourceTableName] \n" + + "FROM [cdc] . [change_tables] "; + + final String LIST_TABLE_COLUMNS = "select cc.object_id\n" + + ",cc.column_name\n" + + ",cc.column_id\n" + + ",cc.column_type\n" + + ",cc.column_ordinal\n" + + ",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" + + "FROM cdc.captured_columns cc\n" + + "LEFT OUTER JOIN cdc.index_columns ic ON \n" + + "(ic.object_id = cc.object_id AND ic.column_name = cc.column_name)\n" + + "where cc.object_id=?\n" + + "ORDER BY cc.column_ordinal"; + + public String getLIST_CHANGE_TRACKING_TABLES_SQL() { + return LIST_CHANGE_TRACKING_TABLES_SQL; + } + + public String getLIST_TABLE_COLUMNS() { + return LIST_TABLE_COLUMNS; + } + + public String getCURRENT_TIMESTAMP() { + return "CURRENT_TIMESTAMP"; + } + + public List<MSSQLTableInfo> getCDCTableList(Connection con) throws SQLException, CDCException { + ArrayList<MSSQLTableInfo> cdcTables = new ArrayList<>(); + + try(final Statement st = con.createStatement()){ + final ResultSet resultSet = st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL()); + + while (resultSet.next()) { + int objectId = resultSet.getInt("object_id"); + String databaseName = resultSet.getString("databaseName"); + String schemaName = resultSet.getString("schemaName"); + String tableName = resultSet.getString("tableName"); + String sourceSchemaName = resultSet.getString("sourceSchemaName"); + String sourceTableName = resultSet.getString("sourceTableName"); + + MSSQLTableInfo ti = new MSSQLTableInfo(databaseName, schemaName, tableName, sourceSchemaName, sourceTableName, Integer.toUnsignedLong(objectId), null); + cdcTables.add(ti); + } + + for (MSSQLTableInfo ti:cdcTables) { + List<ColumnDefinition> tableColums = getCaptureColumns(con, ti.getTableId()); + + ti.setColumns(tableColums); + } + } + + return cdcTables; + } + + public List<ColumnDefinition> getCaptureColumns(Connection con, long objectId) throws SQLException, CDCException { + ArrayList<ColumnDefinition> tableColumns = new ArrayList<>(); + try(final PreparedStatement st = con.prepareStatement(getLIST_TABLE_COLUMNS())){ + st.setLong(1, objectId); + + final ResultSet resultSet = st.executeQuery(); + while (resultSet.next()) { + String columnName = resultSet.getString("column_name"); + int columnId = resultSet.getInt("column_id"); + String columnType = resultSet.getString("column_type"); + int columnOrdinal = resultSet.getInt("column_ordinal"); + int isColumnKey = resultSet.getInt("key"); + + int jdbcType = TranslateMSSQLTypeToJDBCTypes(columnType); + + //get column list + MSSQLColumnDefinition col = new MSSQLColumnDefinition(jdbcType, columnName, columnOrdinal, isColumnKey==1); + tableColumns.add(col); + } + } catch (SQLException e) { — End diff – You don't really need this catch block to rethrow the same exception, maybe cast it to CDCException or replace the catch block with an empty finally block?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157257713

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java —
          @@ -0,0 +1,136 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql.event;
          +
          +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
          +import org.apache.nifi.util.StringUtils;
          +
          +import java.sql.Connection;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Timestamp;
          +
          +public class TableCapturePlan {
          +
          + public enum PlanTypes

          { + CDC, + SNAPSHOT + }

          +
          + public MSSQLTableInfo getTable()

          { + return table; + }

          +
          + public int getRowLimit()

          { + return rowLimit; + }

          +
          + public boolean getCaptureBaseline()

          { + return captureBaseline; + }

          +
          + public Timestamp getMaxTime()

          { + return maxTime; + }

          +
          + public PlanTypes getPlanType()

          { + return planType; + }

          +
          + private MSSQLTableInfo table;
          + private int rowLimit;
          + private boolean captureBaseline;
          + private Timestamp maxTime;
          +
          + private PlanTypes planType;
          +
          + public TableCapturePlan(MSSQLTableInfo table, int rowLimit, boolean captureBaseline, String sTime){
          + this.table = table;
          +
          + this.rowLimit = rowLimit;
          + this.captureBaseline = captureBaseline;
          +
          + if(!StringUtils.isEmpty(sTime))

          { + this.maxTime = Timestamp.valueOf(sTime); + }

          + }
          +
          + public void ComputeCapturePlan(Connection con, MSSQLCDCUtils mssqlcdcUtils) throws SQLException {
          — End diff –

          Nitpick on leading capital letter in method name

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157257713 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/event/TableCapturePlan.java — @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.event; + +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.util.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +public class TableCapturePlan { + + public enum PlanTypes { + CDC, + SNAPSHOT + } + + public MSSQLTableInfo getTable() { + return table; + } + + public int getRowLimit() { + return rowLimit; + } + + public boolean getCaptureBaseline() { + return captureBaseline; + } + + public Timestamp getMaxTime() { + return maxTime; + } + + public PlanTypes getPlanType() { + return planType; + } + + private MSSQLTableInfo table; + private int rowLimit; + private boolean captureBaseline; + private Timestamp maxTime; + + private PlanTypes planType; + + public TableCapturePlan(MSSQLTableInfo table, int rowLimit, boolean captureBaseline, String sTime){ + this.table = table; + + this.rowLimit = rowLimit; + this.captureBaseline = captureBaseline; + + if(!StringUtils.isEmpty(sTime)) { + this.maxTime = Timestamp.valueOf(sTime); + } + } + + public void ComputeCapturePlan(Connection con, MSSQLCDCUtils mssqlcdcUtils) throws SQLException { — End diff – Nitpick on leading capital letter in method name
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/2231#discussion_r157259349

          — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java —
          @@ -0,0 +1,387 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.nifi.cdc.mssql.processors;
          +
          +import org.apache.commons.lang3.StringUtils;
          +import org.apache.nifi.annotation.behavior.DynamicProperty;
          +import org.apache.nifi.annotation.behavior.InputRequirement;
          +import org.apache.nifi.annotation.behavior.Stateful;
          +import org.apache.nifi.annotation.behavior.TriggerSerially;
          +import org.apache.nifi.cdc.CDCException;
          +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
          +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
          +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
          +import org.apache.nifi.components.PropertyDescriptor;
          +import org.apache.nifi.components.state.Scope;
          +import org.apache.nifi.components.state.StateManager;
          +import org.apache.nifi.components.state.StateMap;
          +import org.apache.nifi.dbcp.DBCPService;
          +import org.apache.nifi.expression.AttributeExpression;
          +import org.apache.nifi.flowfile.FlowFile;
          +import org.apache.nifi.annotation.behavior.WritesAttribute;
          +import org.apache.nifi.annotation.behavior.WritesAttributes;
          +import org.apache.nifi.annotation.lifecycle.OnScheduled;
          +import org.apache.nifi.annotation.documentation.CapabilityDescription;
          +import org.apache.nifi.annotation.documentation.Tags;
          +import org.apache.nifi.logging.ComponentLog;
          +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
          +import org.apache.nifi.processor.ProcessSessionFactory;
          +import org.apache.nifi.processor.exception.ProcessException;
          +import org.apache.nifi.processor.ProcessContext;
          +import org.apache.nifi.processor.ProcessSession;
          +import org.apache.nifi.processor.ProcessorInitializationContext;
          +import org.apache.nifi.processor.Relationship;
          +import org.apache.nifi.processor.io.StreamCallback;
          +import org.apache.nifi.processor.util.StandardValidators;
          +import org.apache.nifi.schema.access.SchemaNotFoundException;
          +import org.apache.nifi.serialization.RecordSetWriter;
          +import org.apache.nifi.serialization.RecordSetWriterFactory;
          +import org.apache.nifi.serialization.WriteResult;
          +import org.apache.nifi.serialization.record.Record;
          +import org.apache.nifi.serialization.record.RecordSchema;
          +import org.apache.nifi.serialization.record.ResultSetRecordSet;
          +
          +import java.io.IOException;
          +import java.io.InputStream;
          +import java.io.OutputStream;
          +import java.sql.Connection;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +import java.sql.SQLException;
          +import java.sql.Timestamp;
          +import java.util.ArrayList;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.HashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.concurrent.ConcurrentHashMap;
          +import java.util.concurrent.atomic.AtomicLong;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +@TriggerSerially
          +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
          +@Tags(

          {"sql", "jdbc", "cdc", "mssql"}

          )
          +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
          + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.")
          +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so "
          + + "that it can continue from the same point in time if restarted.")
          +@WritesAttributes(

          { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")}

          )
          +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
          + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp.

          {table_name}

          `, one for each table. "
          + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.")
          +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
          + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp.";
          +
          + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
          + .name("record-writer")
          + .displayName("Record Writer")
          + .description("Specifies the Controller Service to use for writing out the records")
          + .identifiesControllerService(RecordSetWriterFactory.class)
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
          + .name("cdcmssql-dbcp-service")
          + .displayName("Database Connection Pooling Service")
          + .description("The Controller Service that is used to obtain connection to database")
          + .required(true)
          + .identifiesControllerService(DBCPService.class)
          + .build();
          +
          + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder()
          + .name("cdcmssql-cdc-table-list")
          + .displayName("CDC Table List")
          + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables "
          + + "are specified the [cdc].[change_tables] table is queried for all of the available tables with change tracking enabled in the database.")
          + .required(false)
          + .expressionLanguageSupported(true)
          + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
          + .build();
          +
          + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder()
          + .name("cdcmssql-initial-snapshot")
          + .displayName("Generate an Initial Source Table Snapshot")
          + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the "
          + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point "
          + + "for extracting CDC changes.")
          + .allowableValues("true", "false")
          + .defaultValue("false")
          + .required(true)
          + .build();
          +
          + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor
          + .Builder().name("cdcmssql-full-snapshot-row-limit")
          + .displayName("Change Set Row Limit")
          + .description("If a very large change occurs on the source table, "
          + + "the generated change set may be too large too quickly merge into a destination system. "
          + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. "
          + + "The fullsnapshot attribute will be set to true when this happens.")
          + .required(true)
          + .defaultValue("0")
          + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
          + .build();
          +
          + public static final Relationship REL_SUCCESS = new Relationship.Builder()
          + .name("success")
          + .description("Successfully created FlowFile from SQL query result set.")
          + .build();
          +
          + protected List<PropertyDescriptor> descriptors;
          + protected Set<Relationship> relationships;
          +
          + protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000);
          — End diff –

          Should this be configurable due to memory concerns? If each MSSQLTableInfo is likely to be small (just a few short strings or whatever), then this number is probably fine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r157259349 — Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java — @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql.processors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.event.TableCapturePlan; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags( {"sql", "jdbc", "cdc", "mssql"} ) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "for each table are output as Record Sets, ordered by the time, and sequence, at which the operation occurred.") +@Stateful(scopes = Scope.CLUSTER, description = "Information including the timestamp of the last CDC event per table in the database is stored by this processor, so " + + "that it can continue from the same point in time if restarted.") +@WritesAttributes( { + @WritesAttribute(attribute = "tablename", description="Name of the table this changeset was captured from."), + @WritesAttribute(attribute="mssqlcdc.row.count", description="The number of rows in this changeset"), + @WritesAttribute(attribute="fullsnapshot", description="Whether this was a full snapshot of the base table or not..")} ) +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "timestamp for reading CDC data from MS SQL. Properties should be added in the format `initial.timestamp. {table_name} `, one for each table. " + + "This property is ignored after the first successful run for a table writes to the state manager, and is only used again if state is cleared.") +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor { + public static final String INITIAL_TIMESTAMP_PROP_START = "initial.timestamp."; + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("cdcmssql-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CDC_TABLES = new PropertyDescriptor.Builder() + .name("cdcmssql-cdc-table-list") + .displayName("CDC Table List") + .description("The comma delimited list of tables in the source database to monitor for changes. If no tables " + + "are specified the [cdc] . [change_tables] table is queried for all of the available tables with change tracking enabled in the database.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new PropertyDescriptor.Builder() + .name("cdcmssql-initial-snapshot") + .displayName("Generate an Initial Source Table Snapshot") + .description("Usually CDC only includes recent historic changes. Setting this property to true will cause a snapshot of the " + + "source table to be taken using the same schema as the CDC extracts. The snapshot time will be used as the starting point " + + "for extracting CDC changes.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new PropertyDescriptor + .Builder().name("cdcmssql-full-snapshot-row-limit") + .displayName("Change Set Row Limit") + .description("If a very large change occurs on the source table, " + + "the generated change set may be too large too quickly merge into a destination system. " + + "Use this property to set a cut-off point where instead of returning a changeset a full snapshot will be generated instead. " + + "The fullsnapshot attribute will be set to true when this happens.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + + protected List<PropertyDescriptor> descriptors; + protected Set<Relationship> relationships; + + protected final Map<String, MSSQLTableInfo> schemaCache = new ConcurrentHashMap<String, MSSQLTableInfo>(1000); — End diff – Should this be configurable due to memory concerns? If each MSSQLTableInfo is likely to be small (just a few short strings or whatever), then this number is probably fine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user patricker opened a pull request:

          https://github.com/apache/nifi/pull/2231

          NIFI-4521 MS SQL CDC Processor

          A new Processor + new Bundle for Microsoft SQL Server CDC reading.

          Processor works similar to a multi-table QueryDatabaseTable. It stores state on each table, and has some features for handling first run, where it can load snapshots of existing tables, and for handling large change sets.

          I created Unit Tests also.

          You can read more about Microsoft's table schemas for storing CDC data here:
          https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server

              1. For all changes:
          • [x] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [x] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [x] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/patricker/nifi NIFI-4521

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/2231.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2231



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user patricker opened a pull request: https://github.com/apache/nifi/pull/2231 NIFI-4521 MS SQL CDC Processor A new Processor + new Bundle for Microsoft SQL Server CDC reading. Processor works similar to a multi-table QueryDatabaseTable. It stores state on each table, and has some features for handling first run, where it can load snapshots of existing tables, and for handling large change sets. I created Unit Tests also. You can read more about Microsoft's table schemas for storing CDC data here: https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server For all changes: [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [x] Has your PR been rebased against the latest commit within the target branch (typically master)? [x] Is your initial contribution a single, squashed commit? For code changes: [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [x] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [ ] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/patricker/nifi NIFI-4521 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2231.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2231
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user patricker closed the pull request at:

          https://github.com/apache/nifi/pull/2230

          Show
          githubbot ASF GitHub Bot added a comment - Github user patricker closed the pull request at: https://github.com/apache/nifi/pull/2230
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user patricker opened a pull request:

          https://github.com/apache/nifi/pull/2230

          NIFI-4521 MS SQL CDC Processor

          A new Processor + new Bundle for Microsoft SQL Server CDC reading.

          Processor works similar to a multi-table QueryDatabaseTable. It stores state on each table, and has some features for handling first run, where it can load snapshots of existing tables, and for handling large change sets.

          I created Unit Tests also.

          You can read more about Microsoft's table schemas for storing CDC data here:
          https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server

              1. For all changes:
          • [x] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [x] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [x] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/patricker/nifi NIFI-4521

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/2230.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2230


          commit a17c7464d0dea11bebd1a013a3b6fa04a0e57181
          Author: patricker <patricker@gmail.com>
          Date: 2017-09-18T04:43:57Z

          NIFI-4521 MS SQL CDC Processor


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user patricker opened a pull request: https://github.com/apache/nifi/pull/2230 NIFI-4521 MS SQL CDC Processor A new Processor + new Bundle for Microsoft SQL Server CDC reading. Processor works similar to a multi-table QueryDatabaseTable. It stores state on each table, and has some features for handling first run, where it can load snapshots of existing tables, and for handling large change sets. I created Unit Tests also. You can read more about Microsoft's table schemas for storing CDC data here: https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server For all changes: [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [x] Has your PR been rebased against the latest commit within the target branch (typically master)? [x] Is your initial contribution a single, squashed commit? For code changes: [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [x] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [ ] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/patricker/nifi NIFI-4521 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2230.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2230 commit a17c7464d0dea11bebd1a013a3b6fa04a0e57181 Author: patricker <patricker@gmail.com> Date: 2017-09-18T04:43:57Z NIFI-4521 MS SQL CDC Processor

            People

            • Assignee:
              patricker Peter Wicks
              Reporter:
              patricker Peter Wicks
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:

                Development