Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      It would be nice to integrate the table APIs with the JDBC connectors so that the rows in the tables can be directly pushed into JDBC.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Implemented for 1.4.0 with 43e5a81d4e95f4f7b239ab90f12dfb66e7ae8a48

          Show
          fhueske Fabian Hueske added a comment - Implemented for 1.4.0 with 43e5a81d4e95f4f7b239ab90f12dfb66e7ae8a48
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3712

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3712
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3712

          Thanks for the update @haohui.

          I'll merge this PR
          I have to change some of the type mappings to make it work with `JDBCOutputFormat`. I think it would be a good idea to redesign the type handling in `JDBCOutputFormat`. I'll open a JIRA for that.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3712 Thanks for the update @haohui. I'll merge this PR I have to change some of the type mappings to make it work with `JDBCOutputFormat`. I think it would be a good idea to redesign the type handling in `JDBCOutputFormat`. I'll open a JIRA for that. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3712

          Thanks @fhueske ! Updated the PR to address the comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Thanks @fhueske ! Updated the PR to address the comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132162563

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java —
          @@ -0,0 +1,62 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.runtime.state.FunctionInitializationContext;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
          +import org.apache.flink.types.Row;
          +
          +class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
          + final JDBCOutputFormat outputFormat;
          +
          + JDBCSinkFunction(JDBCOutputFormat outputFormat)

          { + this.outputFormat = outputFormat; + }

          +
          + @Override
          + public void invoke(Row value) throws Exception

          { + outputFormat.writeRecord(value); + }

          +
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception

          { + outputFormat.flush(); + }

          +
          + @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception

          { + }

          +
          + @Override
          + public void open(Configuration parameters) throws Exception {
          + super.open(parameters);
          + RuntimeContext ctx = getRuntimeContext();
          + outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
          — End diff –

          add a call `outputFormat.setRuntimeContext(ctx);` before calling `open()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132162563 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java — @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.types.Row; + +class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction { + final JDBCOutputFormat outputFormat; + + JDBCSinkFunction(JDBCOutputFormat outputFormat) { + this.outputFormat = outputFormat; + } + + @Override + public void invoke(Row value) throws Exception { + outputFormat.writeRecord(value); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + outputFormat.flush(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext ctx = getRuntimeContext(); + outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); — End diff – add a call `outputFormat.setRuntimeContext(ctx);` before calling `open()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132166866

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java —
          @@ -0,0 +1,87 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.CompositeType;
          +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
          +
          +import java.sql.Types;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.CHAR_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DATE_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
          +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
          +
          +class JDBCTypeUtil {
          + private static final Map<BasicTypeInfo<?>, Integer> BASIC_TYPES;
          +
          + static {
          + HashMap<BasicTypeInfo<?>, Integer> m = new HashMap<>();
          + m.put(STRING_TYPE_INFO, Types.VARCHAR);
          + m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN);
          + m.put(BYTE_TYPE_INFO, Types.TINYINT);
          + m.put(SHORT_TYPE_INFO, Types.SMALLINT);
          + m.put(INT_TYPE_INFO, Types.INTEGER);
          + m.put(LONG_TYPE_INFO, Types.BIGINT);
          + m.put(FLOAT_TYPE_INFO, Types.FLOAT);
          + m.put(DOUBLE_TYPE_INFO, Types.DOUBLE);
          + m.put(CHAR_TYPE_INFO, Types.SMALLINT);
          — End diff –

          `JDBCOutputFormat` will insert a `SMALLINT` by casting to `short`. This cast will fail for `Character`.

          Please double check the type assignment and align it with `JDBCOutputFormat.writeRecord()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132166866 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java — @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; + +import java.sql.Types; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_INT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.CHAR_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DATE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; + +class JDBCTypeUtil { + private static final Map<BasicTypeInfo<?>, Integer> BASIC_TYPES; + + static { + HashMap<BasicTypeInfo<?>, Integer> m = new HashMap<>(); + m.put(STRING_TYPE_INFO, Types.VARCHAR); + m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN); + m.put(BYTE_TYPE_INFO, Types.TINYINT); + m.put(SHORT_TYPE_INFO, Types.SMALLINT); + m.put(INT_TYPE_INFO, Types.INTEGER); + m.put(LONG_TYPE_INFO, Types.BIGINT); + m.put(FLOAT_TYPE_INFO, Types.FLOAT); + m.put(DOUBLE_TYPE_INFO, Types.DOUBLE); + m.put(CHAR_TYPE_INFO, Types.SMALLINT); — End diff – `JDBCOutputFormat` will insert a `SMALLINT` by casting to `short`. This cast will fail for `Character`. Please double check the type assignment and align it with `JDBCOutputFormat.writeRecord()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132141735

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.BatchTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + *
          + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink.
          + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or
          + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p>
          + */
          +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
          + private final JDBCSinkFunction sink;
          +
          + private String[] fieldNames;
          + private TypeInformation[] fieldTypes;
          +
          + JDBCAppendTableSink(JDBCOutputFormat outputFormat)

          { + this.sink = new JDBCSinkFunction(outputFormat); + }

          +
          + public static JDBCAppendTableSinkBuilder builder()

          { + return new JDBCAppendTableSinkBuilder(); + }

          +
          + @Override
          + public void emitDataStream(DataStream<Row> dataStream)

          { + dataStream.addSink(sink); + }

          +
          + @Override
          + public void emitDataSet(DataSet<Row> dataSet)

          { + dataSet.output(sink.outputFormat); + }

          +
          + @Override
          + public TypeInformation<Row> getOutputType()

          { + return new RowTypeInfo(fieldTypes, fieldNames); + }

          +
          + @Override
          + public String[] getFieldNames()

          { + return fieldNames; + }

          +
          + @Override
          + public TypeInformation<?>[] getFieldTypes()

          { + return fieldTypes; + }

          +
          + @Override
          + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
          + int[] types = sink.outputFormat.getTypesArray();
          + Preconditions.checkArgument(fieldTypes.length == types.length);
          — End diff –

          Give a detailed error message like: `"Schema of output table incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...], actual [type1, type2]"`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132141735 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * An at-least-once Table sink for JDBC. + * + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink. + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p> + */ +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { + private final JDBCSinkFunction sink; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + JDBCAppendTableSink(JDBCOutputFormat outputFormat) { + this.sink = new JDBCSinkFunction(outputFormat); + } + + public static JDBCAppendTableSinkBuilder builder() { + return new JDBCAppendTableSinkBuilder(); + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream.addSink(sink); + } + + @Override + public void emitDataSet(DataSet<Row> dataSet) { + dataSet.output(sink.outputFormat); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + int[] types = sink.outputFormat.getTypesArray(); + Preconditions.checkArgument(fieldTypes.length == types.length); — End diff – Give a detailed error message like: `"Schema of output table incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...] , actual [type1, type2] "`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132146531

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.util.Preconditions;
          +
          +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
          +
          +/**
          + * A builder to configure and build the JDBCAppendTableSink.
          + */
          +public class JDBCAppendTableSinkBuilder {
          + private String username;
          + private String password;
          + private String driverName;
          + private String dbURL;
          + private String query;
          + private int batchSize = DEFAULT_BATCH_INTERVAL;
          + private TypeInformation<?>[] fieldTypes;
          +
          + public JDBCAppendTableSinkBuilder setUsername(String username)

          { + this.username = username; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setPassword(String password)

          { + this.password = password; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDrivername(String drivername)

          { + this.driverName = drivername; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL)

          { + this.dbURL = dbURL; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setQuery(String query)

          { + this.query = query; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setBatchSize(int batchSize)

          { + this.batchSize = batchSize; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>... fieldTypes)

          { + this.fieldTypes = fieldTypes; + return this; + }

          +
          + /**
          + * Finalizes the configuration and checks validity.
          + *
          + * @return Configured JDBCOutputFormat
          + */
          + public JDBCAppendTableSink build() {
          + Preconditions.checkNotNull(fieldTypes, "Row type is unspecified");
          — End diff –

          change error message to `"Types of the query parameters are not specified. Please specify types using the setFieldTypes() method."` (or `setParameterTypes()` if we rename the method).`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132146531 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL; + +/** + * A builder to configure and build the JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkBuilder { + private String username; + private String password; + private String driverName; + private String dbURL; + private String query; + private int batchSize = DEFAULT_BATCH_INTERVAL; + private TypeInformation<?>[] fieldTypes; + + public JDBCAppendTableSinkBuilder setUsername(String username) { + this.username = username; + return this; + } + + public JDBCAppendTableSinkBuilder setPassword(String password) { + this.password = password; + return this; + } + + public JDBCAppendTableSinkBuilder setDrivername(String drivername) { + this.driverName = drivername; + return this; + } + + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) { + this.dbURL = dbURL; + return this; + } + + public JDBCAppendTableSinkBuilder setQuery(String query) { + this.query = query; + return this; + } + + public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>... fieldTypes) { + this.fieldTypes = fieldTypes; + return this; + } + + /** + * Finalizes the configuration and checks validity. + * + * @return Configured JDBCOutputFormat + */ + public JDBCAppendTableSink build() { + Preconditions.checkNotNull(fieldTypes, "Row type is unspecified"); — End diff – change error message to `"Types of the query parameters are not specified. Please specify types using the setFieldTypes() method."` (or `setParameterTypes()` if we rename the method).`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132168767

          — Diff: docs/dev/table/sourceSinks.md —
          @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
          Provided TableSinks
          -------------------

          -*TODO*
          +### JDBCAppendSink
          +
          +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
          +
          +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +
          +JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
          + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
          + .setDBUrl("jdbc:derby:memory:ebookshop")
          + .setQuery("INSERT INTO books (id) VALUES ")
          + .setFieldTypes(new TypeInformation<?>[]

          {INT_TYPE_INFO}

          )
          — End diff –

          change to `setParameterTypes()` if we rename the method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132168767 — Diff: docs/dev/table/sourceSinks.md — @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource Provided TableSinks ------------------- -* TODO * +### JDBCAppendSink + +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database. + +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} + +JDBCAppendTableSink sink = JDBCAppendTableSink.builder() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("INSERT INTO books (id) VALUES ") + .setFieldTypes(new TypeInformation<?>[] {INT_TYPE_INFO} ) — End diff – change to `setParameterTypes()` if we rename the method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132157359

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.util.Preconditions;
          +
          +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
          +
          +/**
          + * A builder to configure and build the JDBCAppendTableSink.
          + */
          +public class JDBCAppendTableSinkBuilder {
          + private String username;
          + private String password;
          + private String driverName;
          + private String dbURL;
          + private String query;
          + private int batchSize = DEFAULT_BATCH_INTERVAL;
          + private TypeInformation<?>[] fieldTypes;
          +
          + public JDBCAppendTableSinkBuilder setUsername(String username)

          { + this.username = username; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setPassword(String password)

          { + this.password = password; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDrivername(String drivername)

          { + this.driverName = drivername; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL)

          { + this.dbURL = dbURL; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setQuery(String query)

          { + this.query = query; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setBatchSize(int batchSize)

          { + this.batchSize = batchSize; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>... fieldTypes) {
          — End diff –

          Should we rename the method to `setParameterTypes()` and offer an overloaded version `setParameterTypes(int... paramTypes)` that allows to specify types as `java.sql.Types`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132157359 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL; + +/** + * A builder to configure and build the JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkBuilder { + private String username; + private String password; + private String driverName; + private String dbURL; + private String query; + private int batchSize = DEFAULT_BATCH_INTERVAL; + private TypeInformation<?>[] fieldTypes; + + public JDBCAppendTableSinkBuilder setUsername(String username) { + this.username = username; + return this; + } + + public JDBCAppendTableSinkBuilder setPassword(String password) { + this.password = password; + return this; + } + + public JDBCAppendTableSinkBuilder setDrivername(String drivername) { + this.driverName = drivername; + return this; + } + + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) { + this.dbURL = dbURL; + return this; + } + + public JDBCAppendTableSinkBuilder setQuery(String query) { + this.query = query; + return this; + } + + public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>... fieldTypes) { — End diff – Should we rename the method to `setParameterTypes()` and offer an overloaded version `setParameterTypes(int... paramTypes)` that allows to specify types as `java.sql.Types`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132160714

          — Diff: docs/dev/table/sourceSinks.md —
          @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
          Provided TableSinks
          -------------------

          -*TODO*
          +### JDBCAppendSink
          +
          +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
          +
          +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +
          +JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
          + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
          + .setDBUrl("jdbc:derby:memory:ebookshop")
          + .setQuery("INSERT INTO books (id) VALUES ")
          + .setFieldTypes(new TypeInformation<?>[]

          {INT_TYPE_INFO}

          )
          — End diff –

          use varargs?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132160714 — Diff: docs/dev/table/sourceSinks.md — @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource Provided TableSinks ------------------- -* TODO * +### JDBCAppendSink + +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database. + +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} + +JDBCAppendTableSink sink = JDBCAppendTableSink.builder() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("INSERT INTO books (id) VALUES ") + .setFieldTypes(new TypeInformation<?>[] {INT_TYPE_INFO} ) — End diff – use varargs?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132160738

          — Diff: docs/dev/table/sourceSinks.md —
          @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
          Provided TableSinks
          -------------------

          -*TODO*
          +### JDBCAppendSink
          +
          +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
          +
          +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +
          +JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
          + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
          + .setDBUrl("jdbc:derby:memory:ebookshop")
          + .setQuery("INSERT INTO books (id) VALUES ")
          + .setFieldTypes(new TypeInformation<?>[]

          {INT_TYPE_INFO}

          )
          + .build();
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +val sink = JDBCAppendTableSink.builder()
          + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
          + .setDBUrl("jdbc:derby:memory:ebookshop")
          + .setQuery("INSERT INTO books (id) VALUES ")
          + .setFieldTypes(Array(INT_TYPE_INFO))
          — End diff –

          use varargs?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132160738 — Diff: docs/dev/table/sourceSinks.md — @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource Provided TableSinks ------------------- -* TODO * +### JDBCAppendSink + +<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to implement upsert inside the database. + +To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>) to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} + +JDBCAppendTableSink sink = JDBCAppendTableSink.builder() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("INSERT INTO books (id) VALUES ") + .setFieldTypes(new TypeInformation<?>[] {INT_TYPE_INFO} ) + .build(); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +val sink = JDBCAppendTableSink.builder() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("INSERT INTO books (id) VALUES ") + .setFieldTypes(Array(INT_TYPE_INFO)) — End diff – use varargs?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132146108

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.util.Preconditions;
          +
          +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
          +
          +/**
          + * A builder to configure and build the JDBCAppendTableSink.
          + */
          +public class JDBCAppendTableSinkBuilder {
          + private String username;
          + private String password;
          + private String driverName;
          + private String dbURL;
          + private String query;
          + private int batchSize = DEFAULT_BATCH_INTERVAL;
          + private TypeInformation<?>[] fieldTypes;
          +
          + public JDBCAppendTableSinkBuilder setUsername(String username) {
          — End diff –

          Add JavaDocs to the public configuration methods.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132146108 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL; + +/** + * A builder to configure and build the JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkBuilder { + private String username; + private String password; + private String driverName; + private String dbURL; + private String query; + private int batchSize = DEFAULT_BATCH_INTERVAL; + private TypeInformation<?>[] fieldTypes; + + public JDBCAppendTableSinkBuilder setUsername(String username) { — End diff – Add JavaDocs to the public configuration methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132141831

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.BatchTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + *
          + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink.
          + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or
          + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p>
          + */
          +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
          + private final JDBCSinkFunction sink;
          +
          + private String[] fieldNames;
          + private TypeInformation[] fieldTypes;
          +
          + JDBCAppendTableSink(JDBCOutputFormat outputFormat)

          { + this.sink = new JDBCSinkFunction(outputFormat); + }

          +
          + public static JDBCAppendTableSinkBuilder builder()

          { + return new JDBCAppendTableSinkBuilder(); + }

          +
          + @Override
          + public void emitDataStream(DataStream<Row> dataStream)

          { + dataStream.addSink(sink); + }

          +
          + @Override
          + public void emitDataSet(DataSet<Row> dataSet)

          { + dataSet.output(sink.outputFormat); + }

          +
          + @Override
          + public TypeInformation<Row> getOutputType()

          { + return new RowTypeInfo(fieldTypes, fieldNames); + }

          +
          + @Override
          + public String[] getFieldNames()

          { + return fieldNames; + }

          +
          + @Override
          + public TypeInformation<?>[] getFieldTypes()

          { + return fieldTypes; + }

          +
          + @Override
          + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
          + int[] types = sink.outputFormat.getTypesArray();
          + Preconditions.checkArgument(fieldTypes.length == types.length);
          + for (int i = 0; i < types.length; ++i) {
          + Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
          — End diff –

          add more details to error message like: `"Schema of output table incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...], actual [type1, type2]"`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132141831 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * An at-least-once Table sink for JDBC. + * + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink. + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p> + */ +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { + private final JDBCSinkFunction sink; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + JDBCAppendTableSink(JDBCOutputFormat outputFormat) { + this.sink = new JDBCSinkFunction(outputFormat); + } + + public static JDBCAppendTableSinkBuilder builder() { + return new JDBCAppendTableSinkBuilder(); + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream.addSink(sink); + } + + @Override + public void emitDataSet(DataSet<Row> dataSet) { + dataSet.output(sink.outputFormat); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + int[] types = sink.outputFormat.getTypesArray(); + Preconditions.checkArgument(fieldTypes.length == types.length); + for (int i = 0; i < types.length; ++i) { + Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes [i] ) == types [i] , — End diff – add more details to error message like: `"Schema of output table incompatible with JDBCAppendTableSink: expected [type1, type2, type3, ...] , actual [type1, type2] "`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132142854

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.BatchTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.util.Preconditions;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + *
          + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink.
          + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or
          + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p>
          + */
          +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
          + private final JDBCSinkFunction sink;
          +
          + private String[] fieldNames;
          + private TypeInformation[] fieldTypes;
          +
          + JDBCAppendTableSink(JDBCOutputFormat outputFormat)

          { + this.sink = new JDBCSinkFunction(outputFormat); + }

          +
          + public static JDBCAppendTableSinkBuilder builder()

          { + return new JDBCAppendTableSinkBuilder(); + }

          +
          + @Override
          + public void emitDataStream(DataStream<Row> dataStream)

          { + dataStream.addSink(sink); + }

          +
          + @Override
          + public void emitDataSet(DataSet<Row> dataSet)

          { + dataSet.output(sink.outputFormat); + }

          +
          + @Override
          + public TypeInformation<Row> getOutputType()

          { + return new RowTypeInfo(fieldTypes, fieldNames); + }

          +
          + @Override
          + public String[] getFieldNames()

          { + return fieldNames; + }

          +
          + @Override
          + public TypeInformation<?>[] getFieldTypes()

          { + return fieldTypes; + }

          +
          + @Override
          + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
          + int[] types = sink.outputFormat.getTypesArray();
          + Preconditions.checkArgument(fieldTypes.length == types.length);
          + for (int i = 0; i < types.length; ++i)

          { + Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i], + "Incompatible types between fields and JDBC format at " + i); + }

          +
          + JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat);
          — End diff –

          Passing the reference should be fine, but to be sure we could create a deep copy via `SerializationUtils.clone()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132142854 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * An at-least-once Table sink for JDBC. + * + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink. + * However, one common use case is to run idempotent queries (e.g., <code>REPLACE</code> or + * <code>INSERT OVERWRITE</code>) to upsert into the database and achieve exactly-once semantic.</p> + */ +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { + private final JDBCSinkFunction sink; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + JDBCAppendTableSink(JDBCOutputFormat outputFormat) { + this.sink = new JDBCSinkFunction(outputFormat); + } + + public static JDBCAppendTableSinkBuilder builder() { + return new JDBCAppendTableSinkBuilder(); + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream.addSink(sink); + } + + @Override + public void emitDataSet(DataSet<Row> dataSet) { + dataSet.output(sink.outputFormat); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + int[] types = sink.outputFormat.getTypesArray(); + Preconditions.checkArgument(fieldTypes.length == types.length); + for (int i = 0; i < types.length; ++i) { + Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i], + "Incompatible types between fields and JDBC format at " + i); + } + + JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat); — End diff – Passing the reference should be fine, but to be sure we could create a deep copy via `SerializationUtils.clone()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r132022891

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java —
          @@ -0,0 +1,85 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.BatchTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + */
          +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
          + private final JDBCSinkFunction sink;
          +
          + private String[] fieldNames;
          + private TypeInformation[] fieldTypes;
          +
          + JDBCAppendTableSink(JDBCOutputFormat outputFormat)

          { + this.sink = new JDBCSinkFunction(outputFormat); + }

          +
          + public static JDBCAppendTableSinkBuilder builder()

          { + return new JDBCAppendTableSinkBuilder(); + }

          +
          + @Override
          + public void emitDataStream(DataStream<Row> dataStream)

          { + dataStream.addSink(sink); + }

          +
          + @Override
          + public void emitDataSet(DataSet<Row> dataSet)

          { + dataSet.output(sink.outputFormat); + }

          +
          + @Override
          + public TypeInformation<Row> getOutputType()

          { + return new RowTypeInfo(fieldTypes, fieldNames); + }

          +
          + @Override
          + public String[] getFieldNames()

          { + return fieldNames; + }

          +
          + @Override
          + public TypeInformation<?>[] getFieldTypes()

          { + return fieldTypes; + }

          +
          + @Override
          + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
          + JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat);
          — End diff –

          The `JDBCOutputFormat` now is only constructed via `JDBCAppendableSinkBuilder`, thus the types should always match, but it is a good idea to add the checks to catch potential bugs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r132022891 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java — @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. + */ +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { + private final JDBCSinkFunction sink; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + JDBCAppendTableSink(JDBCOutputFormat outputFormat) { + this.sink = new JDBCSinkFunction(outputFormat); + } + + public static JDBCAppendTableSinkBuilder builder() { + return new JDBCAppendTableSinkBuilder(); + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream.addSink(sink); + } + + @Override + public void emitDataSet(DataSet<Row> dataSet) { + dataSet.output(sink.outputFormat); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat); — End diff – The `JDBCOutputFormat` now is only constructed via `JDBCAppendableSinkBuilder`, thus the types should always match, but it is a good idea to add the checks to catch potential bugs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r128220251

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java —
          @@ -218,10 +225,7 @@ public void writeRecord(Row row) throws IOException {
          @Override
          public void close() throws IOException {
          try {

          • if (upload != null) {
          • upload.executeBatch();
          • upload.close();
              • End diff –

          we should `close()` the `PreparedStatement`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r128220251 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java — @@ -218,10 +225,7 @@ public void writeRecord(Row row) throws IOException { @Override public void close() throws IOException { try { if (upload != null) { upload.executeBatch(); upload.close(); End diff – we should `close()` the `PreparedStatement`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r128226313

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java —
          @@ -0,0 +1,79 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.datastream.DataStreamSink;
          +import org.apache.flink.streaming.api.datastream.DataStreamSource;
          +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          +import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
          +import org.apache.flink.streaming.api.operators.StreamSource;
          +import org.apache.flink.types.Row;
          +
          +import org.junit.Test;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +
          +import java.io.IOException;
          +
          +import static org.junit.Assert.assertSame;
          +import static org.mockito.Matchers.any;
          +import static org.mockito.Mockito.doAnswer;
          +import static org.mockito.Mockito.mock;
          +
          +/**
          + * Test for JDBCAppendTableSink.
          + */
          +public class JDBCAppendTableSinkTest {
          + private static final String[] FIELD_NAMES = new String[]

          {"foo"}

          ;
          + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]

          { + BasicTypeInfo.STRING_TYPE_INFO + }

          ;
          + private static final RowTypeInfo ROW_TYPE = new RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
          +
          + @Test
          + public void testAppendTableSink() throws IOException {
          + JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
          + .setDrivername("foo")
          + .setDBUrl("bar")
          + .setQuery("insert into %s (id) values ")
          + .setFieldTypes(FIELD_TYPES)
          + .build();
          +
          + StreamExecutionEnvironment env =
          + mock(StreamExecutionEnvironment.class);
          + doAnswer(new Answer() {
          + @Override
          + public Object answer(InvocationOnMock invocationOnMock) throws Throwable

          { + return invocationOnMock.getArguments()[0]; + }

          + }).when(env).clean(any());
          +
          + TypeSerializer<Row> ts = ROW_TYPE.createSerializer(mock(ExecutionConfig.class));
          + FromElementsFunction<Row> func = new FromElementsFunction<>(ts, Row.of("foo"));
          + DataStream<Row> ds = new DataStreamSource<>(env, ROW_TYPE, new StreamSource<>(func), false, "foo");
          + DataStreamSink<Row> dsSink = ds.addSink(sink.getSink());
          — End diff –

          I think we should test for the correctness of the `emitDataStream()` method.
          Could be done as follows:

          ```
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

          DataStream<Row> ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
          sink.emitDataStream(ds);

          Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs();
          assertEquals(1, sinkIds.size());
          int sinkId = sinkIds.iterator().next();

          StreamSink planSink = (StreamSink)env.getStreamGraph().getStreamNode(sinkId).getOperator();
          assertSame(sink.getSink(), planSink.getUserFunction());
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r128226313 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java — @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.types.Row; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; + +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +/** + * Test for JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"} ; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO + } ; + private static final RowTypeInfo ROW_TYPE = new RowTypeInfo(FIELD_TYPES, FIELD_NAMES); + + @Test + public void testAppendTableSink() throws IOException { + JDBCAppendTableSink sink = JDBCAppendTableSink.builder() + .setDrivername("foo") + .setDBUrl("bar") + .setQuery("insert into %s (id) values ") + .setFieldTypes(FIELD_TYPES) + .build(); + + StreamExecutionEnvironment env = + mock(StreamExecutionEnvironment.class); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return invocationOnMock.getArguments()[0]; + } + }).when(env).clean(any()); + + TypeSerializer<Row> ts = ROW_TYPE.createSerializer(mock(ExecutionConfig.class)); + FromElementsFunction<Row> func = new FromElementsFunction<>(ts, Row.of("foo")); + DataStream<Row> ds = new DataStreamSource<>(env, ROW_TYPE, new StreamSource<>(func), false, "foo"); + DataStreamSink<Row> dsSink = ds.addSink(sink.getSink()); — End diff – I think we should test for the correctness of the `emitDataStream()` method. Could be done as follows: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Row> ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE); sink.emitDataStream(ds); Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs(); assertEquals(1, sinkIds.size()); int sinkId = sinkIds.iterator().next(); StreamSink planSink = (StreamSink)env.getStreamGraph().getStreamNode(sinkId).getOperator(); assertSame(sink.getSink(), planSink.getUserFunction()); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r128219296

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java —
          @@ -0,0 +1,85 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.BatchTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + */
          +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
          + private final JDBCSinkFunction sink;
          +
          + private String[] fieldNames;
          + private TypeInformation[] fieldTypes;
          +
          + JDBCAppendTableSink(JDBCOutputFormat outputFormat)

          { + this.sink = new JDBCSinkFunction(outputFormat); + }

          +
          + public static JDBCAppendTableSinkBuilder builder()

          { + return new JDBCAppendTableSinkBuilder(); + }

          +
          + @Override
          + public void emitDataStream(DataStream<Row> dataStream)

          { + dataStream.addSink(sink); + }

          +
          + @Override
          + public void emitDataSet(DataSet<Row> dataSet)

          { + dataSet.output(sink.outputFormat); + }

          +
          + @Override
          + public TypeInformation<Row> getOutputType()

          { + return new RowTypeInfo(fieldTypes, fieldNames); + }

          +
          + @Override
          + public String[] getFieldNames()

          { + return fieldNames; + }

          +
          + @Override
          + public TypeInformation<?>[] getFieldTypes()

          { + return fieldTypes; + }

          +
          + @Override
          + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
          + JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat);
          — End diff –

          We could validate that the types of the `JDBCOutputFormat` match the `fieldTypes` which are provided by the optimizer.
          Or do you have concerns regarding such a check?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r128219296 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java — @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. + */ +public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { + private final JDBCSinkFunction sink; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + JDBCAppendTableSink(JDBCOutputFormat outputFormat) { + this.sink = new JDBCSinkFunction(outputFormat); + } + + public static JDBCAppendTableSinkBuilder builder() { + return new JDBCAppendTableSinkBuilder(); + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream.addSink(sink); + } + + @Override + public void emitDataSet(DataSet<Row> dataSet) { + dataSet.output(sink.outputFormat); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + JDBCAppendTableSink copy = new JDBCAppendTableSink(sink.outputFormat); — End diff – We could validate that the types of the `JDBCOutputFormat` match the `fieldTypes` which are provided by the optimizer. Or do you have concerns regarding such a check?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r128219589

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.util.Preconditions;
          +
          +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
          +
          +/**
          + * A builder to configure and build the JDBCAppendTableSink.
          + */
          +public class JDBCAppendTableSinkBuilder {
          + private String username;
          + private String password;
          + private String driverName;
          + private String dbURL;
          + private String query;
          + private int batchInterval = DEFAULT_BATCH_INTERVAL;
          + private TypeInformation<?>[] fieldTypes;
          +
          + public JDBCAppendTableSinkBuilder setUsername(String username)

          { + this.username = username; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setPassword(String password)

          { + this.password = password; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDrivername(String drivername)

          { + this.driverName = drivername; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL)

          { + this.dbURL = dbURL; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setQuery(String query)

          { + this.query = query; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) {
          — End diff –

          IMO, interval has a temporal connotation. `batchInterval` -> `batchSize`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r128219589 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL; + +/** + * A builder to configure and build the JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkBuilder { + private String username; + private String password; + private String driverName; + private String dbURL; + private String query; + private int batchInterval = DEFAULT_BATCH_INTERVAL; + private TypeInformation<?>[] fieldTypes; + + public JDBCAppendTableSinkBuilder setUsername(String username) { + this.username = username; + return this; + } + + public JDBCAppendTableSinkBuilder setPassword(String password) { + this.password = password; + return this; + } + + public JDBCAppendTableSinkBuilder setDrivername(String drivername) { + this.driverName = drivername; + return this; + } + + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) { + this.dbURL = dbURL; + return this; + } + + public JDBCAppendTableSinkBuilder setQuery(String query) { + this.query = query; + return this; + } + + public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) { — End diff – IMO, interval has a temporal connotation. `batchInterval` -> `batchSize`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r128221151

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.util.Preconditions;
          +
          +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
          +
          +/**
          + * A builder to configure and build the JDBCAppendTableSink.
          + */
          +public class JDBCAppendTableSinkBuilder {
          + private String username;
          + private String password;
          + private String driverName;
          + private String dbURL;
          + private String query;
          + private int batchInterval = DEFAULT_BATCH_INTERVAL;
          + private TypeInformation<?>[] fieldTypes;
          +
          + public JDBCAppendTableSinkBuilder setUsername(String username)

          { + this.username = username; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setPassword(String password)

          { + this.password = password; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDrivername(String drivername)

          { + this.driverName = drivername; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL)

          { + this.dbURL = dbURL; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setQuery(String query)

          { + this.query = query; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval)

          { + this.batchInterval = batchInterval; + return this; + }

          +
          + public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>[] fieldTypes) {
          — End diff –

          Make `fieldTypes` a vararg for convenience?

          I would think that `java.sql.Types` would be more natural in the context of a JDBC sink but I'm open for `TypeInformation` as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r128221151 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL; + +/** + * A builder to configure and build the JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkBuilder { + private String username; + private String password; + private String driverName; + private String dbURL; + private String query; + private int batchInterval = DEFAULT_BATCH_INTERVAL; + private TypeInformation<?>[] fieldTypes; + + public JDBCAppendTableSinkBuilder setUsername(String username) { + this.username = username; + return this; + } + + public JDBCAppendTableSinkBuilder setPassword(String password) { + this.password = password; + return this; + } + + public JDBCAppendTableSinkBuilder setDrivername(String drivername) { + this.driverName = drivername; + return this; + } + + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) { + this.dbURL = dbURL; + return this; + } + + public JDBCAppendTableSinkBuilder setQuery(String query) { + this.query = query; + return this; + } + + public JDBCAppendTableSinkBuilder setBatchInterval(int batchInterval) { + this.batchInterval = batchInterval; + return this; + } + + public JDBCAppendTableSinkBuilder setFieldTypes(TypeInformation<?>[] fieldTypes) { — End diff – Make `fieldTypes` a vararg for convenience? I would think that `java.sql.Types` would be more natural in the context of a JDBC sink but I'm open for `TypeInformation` as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r128218692

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java —
          @@ -0,0 +1,85 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.DataSet;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.BatchTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          — End diff –

          Add a comment that `exactly-once` can be achieved by idempotent insert operations?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r128218692 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java — @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. — End diff – Add a comment that `exactly-once` can be achieved by idempotent insert operations?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3712

          It would also be good to add the `JdbcTableSink` to the documentation (incl. an example) once the API is fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3712 It would also be good to add the `JdbcTableSink` to the documentation (incl. an example) once the API is fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r126736740

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java —
          @@ -0,0 +1,128 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.types.Row;
          +
          +import org.junit.After;
          +import org.junit.Test;
          +
          +import java.io.IOException;
          +import java.sql.Connection;
          +import java.sql.DriverManager;
          +import java.sql.PreparedStatement;
          +import java.sql.ResultSet;
          +
          +import static org.junit.Assert.assertArrayEquals;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertFalse;
          +import static org.junit.Assert.assertNotSame;
          +import static org.mockito.Mockito.doReturn;
          +import static org.mockito.Mockito.mock;
          +
          +/**
          + * Test for JDBCTableSink.
          + */
          +public class JDBCTableSinkTest extends JDBCTestBase {
          + private static final String[] FIELD_NAMES = new String[]

          {"foo"}

          ;
          + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]

          { + BasicTypeInfo.STRING_TYPE_INFO + }

          ;
          +
          +
          + private JDBCOutputFormat jdbcOutputFormat;
          +
          + @After
          + public void tearDown() throws IOException {
          + if (jdbcOutputFormat != null)

          { + jdbcOutputFormat.close(); + }

          + jdbcOutputFormat = null;
          + }
          +
          + @Test
          + public void testFlush() throws Exception {
          — End diff –

          This tests rather the `JdbcOutputFormat.flush()` method. I would move (and adapt) this test to the `JdbcOutputFormatTest`.

          For the `JdbcTableSink` we need tests that check the configuration (which is done below) and a check that the `emitDataStream()` method (and the returned `SinkFunction`) is working correctly. I would do this by calling `emitDataStream()` with a mocked `DataStream<Row>` and fetching the `SinkFunction` from the returned `DataStreamSink` (`sink.getTransformation().getOperator().getUserFunction()`).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r126736740 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java — @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** + * Test for JDBCTableSink. + */ +public class JDBCTableSinkTest extends JDBCTestBase { + private static final String[] FIELD_NAMES = new String[] {"foo"} ; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO + } ; + + + private JDBCOutputFormat jdbcOutputFormat; + + @After + public void tearDown() throws IOException { + if (jdbcOutputFormat != null) { + jdbcOutputFormat.close(); + } + jdbcOutputFormat = null; + } + + @Test + public void testFlush() throws Exception { — End diff – This tests rather the `JdbcOutputFormat.flush()` method. I would move (and adapt) this test to the `JdbcOutputFormatTest`. For the `JdbcTableSink` we need tests that check the configuration (which is done below) and a check that the `emitDataStream()` method (and the returned `SinkFunction`) is working correctly. I would do this by calling `emitDataStream()` with a mocked `DataStream<Row>` and fetching the `SinkFunction` from the returned `DataStreamSink` (`sink.getTransformation().getOperator().getUserFunction()`).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r126718250

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,102 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.runtime.state.FunctionInitializationContext;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + */
          +public class JDBCTableSink extends RichSinkFunction<Row>
          — End diff –

          I would not extend `RichSinkFunction`. Although this might work in practice, I think this mixes the logical representation of a table (in the catalog and during optimization) with the actual runtime code. I'd rather implement a separate JdbcSinkFunction (within this file) and instantiate it in `emitDataStream()`.

          I also think that we should implement the `BatchTableSink` interface which would directly use the `JdbcOutputFormat`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r126718250 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. + */ +public class JDBCTableSink extends RichSinkFunction<Row> — End diff – I would not extend `RichSinkFunction`. Although this might work in practice, I think this mixes the logical representation of a table (in the catalog and during optimization) with the actual runtime code. I'd rather implement a separate JdbcSinkFunction (within this file) and instantiate it in `emitDataStream()`. I also think that we should implement the `BatchTableSink` interface which would directly use the `JdbcOutputFormat`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r126724241

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,102 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.runtime.state.FunctionInitializationContext;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + */
          +public class JDBCTableSink extends RichSinkFunction<Row>
          + implements AppendStreamTableSink<Row>, CheckpointedFunction {
          + private final JDBCOutputFormat outputFormat;
          +
          + private String[] fieldNames;
          + private TypeInformation[] fieldTypes;
          +
          + public JDBCTableSink(JDBCOutputFormat outputFormat)

          { + this.outputFormat = outputFormat; + }

          +
          + @Override
          + public void emitDataStream(DataStream<Row> dataStream)

          { + dataStream.addSink(this); + }

          +
          + @Override
          + public TypeInformation<Row> getOutputType()

          { + return new RowTypeInfo(fieldTypes, fieldNames); + }

          +
          + @Override
          + public String[] getFieldNames()

          { + return fieldNames; + }

          +
          + @Override
          + public TypeInformation<?>[] getFieldTypes()

          { + return fieldTypes; + }

          +
          + @Override
          + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
          + JDBCTableSink copy = new JDBCTableSink(outputFormat);
          + copy.fieldNames = fieldNames;
          + copy.fieldTypes = fieldTypes;
          — End diff –

          We could validate that the types provided by the Table API are compatible with the types that the `JdbcOutputFormat` expects to avoid exceptions during execution.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r126724241 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. + */ +public class JDBCTableSink extends RichSinkFunction<Row> + implements AppendStreamTableSink<Row>, CheckpointedFunction { + private final JDBCOutputFormat outputFormat; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + public JDBCTableSink(JDBCOutputFormat outputFormat) { + this.outputFormat = outputFormat; + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream.addSink(this); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + JDBCTableSink copy = new JDBCTableSink(outputFormat); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; — End diff – We could validate that the types provided by the Table API are compatible with the types that the `JdbcOutputFormat` expects to avoid exceptions during execution.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3712

          Yes, I think that would work well. We could provide a builder for the table source and forward calls to the `JDBCOutputFormatBuilder`.

          Regarding the query template: we can also make it an optional parameter to override the standard `INSERT INTO ... VALUES` template if set.

          If you want the source to support upsert writes you might want to implement the `UpsertStreamTableSink` rather than the `AppendStreamTableSink`. The `UpsertStreamTableSink` supports append writes as well as updates and deletes by exposing the unique key fields of a `Table`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3712 Yes, I think that would work well. We could provide a builder for the table source and forward calls to the `JDBCOutputFormatBuilder`. Regarding the query template: we can also make it an optional parameter to override the standard `INSERT INTO ... VALUES` template if set. If you want the source to support upsert writes you might want to implement the `UpsertStreamTableSink` rather than the `AppendStreamTableSink`. The `UpsertStreamTableSink` supports append writes as well as updates and deletes by exposing the unique key fields of a `Table`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3712

          It is important to have some flexibility on the query as different SQL engines have slightly different syntax on DML.

          For example, SQLite supports INSERT OVERWRITE where MySQL supports REPLACE INTO to upsert records with unique keys.

          I like the APIs you proposed, do you think it addresses your concerns if it forwards the parameters to JDbCOutputFormat and construct it internally?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 It is important to have some flexibility on the query as different SQL engines have slightly different syntax on DML. For example, SQLite supports INSERT OVERWRITE where MySQL supports REPLACE INTO to upsert records with unique keys. I like the APIs you proposed, do you think it addresses your concerns if it forwards the parameters to JDbCOutputFormat and construct it internally?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r122972333

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,102 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.runtime.state.FunctionInitializationContext;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + */
          +public class JDBCTableSink extends RichSinkFunction<Row>
          — End diff –

          I think it could also be easily extended to support batch output by implementing the `BatchTableSink` interface and implementing `emitDataSet(dataSet: DataSet[Row])` as
          ```
          def emitDataSet(dataSet: DataSet[Row]): Unit =

          { dataSet.output(jdbcOutputFormat) }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r122972333 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. + */ +public class JDBCTableSink extends RichSinkFunction<Row> — End diff – I think it could also be easily extended to support batch output by implementing the `BatchTableSink` interface and implementing `emitDataSet(dataSet: DataSet [Row] )` as ``` def emitDataSet(dataSet: DataSet [Row] ): Unit = { dataSet.output(jdbcOutputFormat) } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r122971462

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,102 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.runtime.state.FunctionInitializationContext;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + */
          +public class JDBCTableSink extends RichSinkFunction<Row>
          — End diff –

          Rename to `JDBCAppendTableSink`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r122971462 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. + */ +public class JDBCTableSink extends RichSinkFunction<Row> — End diff – Rename to `JDBCAppendTableSink`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r122970495

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,102 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.functions.RuntimeContext;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.runtime.state.FunctionInitializationContext;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
          +import org.apache.flink.table.sinks.AppendStreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +/**
          + * An at-least-once Table sink for JDBC.
          + */
          +public class JDBCTableSink extends RichSinkFunction<Row>
          + implements AppendStreamTableSink<Row>, CheckpointedFunction {
          + private final JDBCOutputFormat outputFormat;
          +
          + private String[] fieldNames;
          + private TypeInformation[] fieldTypes;
          +
          + public JDBCTableSink(JDBCOutputFormat outputFormat) {
          — End diff –

          What do you think about not exposing the `JDBCOutputFormat` to the user, but to configure it internally.

          Of course we would need many of the configuration parameters (user, pw, driver, dburl, and table name). Users could either specify field names of the table to write to (fields are mapped by position) or not (we use the field names of the `Table` to emit). For this information we can construct a parameterized insert query: `INSERT INTO $table ($f1, $f2, $f3) VALUES (?, ?, ?)`. The field types are automatically provided by the `configure()` call.

          This would be a tighter integration with the Table API (using provided field types and possibly field names).
          Does this work for your use case or do you need the flexibility of specifying your own query?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r122970495 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +/** + * An at-least-once Table sink for JDBC. + */ +public class JDBCTableSink extends RichSinkFunction<Row> + implements AppendStreamTableSink<Row>, CheckpointedFunction { + private final JDBCOutputFormat outputFormat; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + public JDBCTableSink(JDBCOutputFormat outputFormat) { — End diff – What do you think about not exposing the `JDBCOutputFormat` to the user, but to configure it internally. Of course we would need many of the configuration parameters (user, pw, driver, dburl, and table name). Users could either specify field names of the table to write to (fields are mapped by position) or not (we use the field names of the `Table` to emit). For this information we can construct a parameterized insert query: `INSERT INTO $table ($f1, $f2, $f3) VALUES (?, ?, ?)`. The field types are automatically provided by the `configure()` call. This would be a tighter integration with the Table API (using provided field types and possibly field names). Does this work for your use case or do you need the flexibility of specifying your own query?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3712

          Thanks @zentol!
          I'll have a look at it as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3712 Thanks @zentol! I'll have a look at it as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3712

          +1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3712 +1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r120765156

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java —
          @@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
          upload.addBatch();
          batchCount++;
          if (batchCount >= batchInterval)

          { - upload.executeBatch(); - batchCount = 0; + flush(); }

          } catch (SQLException | IllegalArgumentException e)

          { throw new IllegalArgumentException("writeRecord() failed", e); }

          }

          + void flush() throws SQLException {
          + if (upload != null) {
          + upload.executeBatch();
          — End diff –

          It is a synchronous call. It will throw `SQLException` and abort the sink. The behavior has not been changed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r120765156 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java — @@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException { upload.addBatch(); batchCount++; if (batchCount >= batchInterval) { - upload.executeBatch(); - batchCount = 0; + flush(); } } catch (SQLException | IllegalArgumentException e) { throw new IllegalArgumentException("writeRecord() failed", e); } } + void flush() throws SQLException { + if (upload != null) { + upload.executeBatch(); — End diff – It is a synchronous call. It will throw `SQLException` and abort the sink. The behavior has not been changed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r120575757

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java —
          @@ -0,0 +1,79 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.types.Row;
          +
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertArrayEquals;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertNotSame;
          +import static org.mockito.Mockito.mock;
          +import static org.mockito.Mockito.verify;
          +
          +/**
          + * Test for JDBCTableSink.
          + */
          +public class JDBCTableSinkTest {
          + private static final String[] FIELD_NAMES = new String[]

          {"foo"}

          ;
          + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]

          { + BasicTypeInfo.STRING_TYPE_INFO + }

          ;
          +
          + @Test
          + public void testOutputSink() throws Exception

          { + JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class); + JDBCTableSink sink = new JDBCTableSink(outputFormat); + @SuppressWarnings("unchecked") + DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class); + sink.emitDataStream(dataStream); + verify(dataStream).addSink(sink); + }

          +
          + @Test
          + public void testFlush() throws Exception {
          + JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
          + JDBCTableSink sink = new JDBCTableSink(outputFormat);
          + @SuppressWarnings("unchecked")
          + DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class);
          + sink.emitDataStream(dataStream);
          + sink.snapshotState(mock(FunctionSnapshotContext.class));
          + verify(dataStream).addSink(sink);
          + verify(outputFormat).flush();
          — End diff –

          let's not use mocking for this test. Just create an actual format/sink, give N values to the sink where N < batchSize, verify they haven't been written yet, call flush, verify they were written.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r120575757 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java — @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Test for JDBCTableSink. + */ +public class JDBCTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"} ; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO + } ; + + @Test + public void testOutputSink() throws Exception { + JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class); + JDBCTableSink sink = new JDBCTableSink(outputFormat); + @SuppressWarnings("unchecked") + DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class); + sink.emitDataStream(dataStream); + verify(dataStream).addSink(sink); + } + + @Test + public void testFlush() throws Exception { + JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class); + JDBCTableSink sink = new JDBCTableSink(outputFormat); + @SuppressWarnings("unchecked") + DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class); + sink.emitDataStream(dataStream); + sink.snapshotState(mock(FunctionSnapshotContext.class)); + verify(dataStream).addSink(sink); + verify(outputFormat).flush(); — End diff – let's not use mocking for this test. Just create an actual format/sink, give N values to the sink where N < batchSize, verify they haven't been written yet, call flush, verify they were written.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r120576118

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java —
          @@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException {
          upload.addBatch();
          batchCount++;
          if (batchCount >= batchInterval)

          { - upload.executeBatch(); - batchCount = 0; + flush(); }

          } catch (SQLException | IllegalArgumentException e)

          { throw new IllegalArgumentException("writeRecord() failed", e); }

          }

          + void flush() throws SQLException {
          + if (upload != null) {
          + upload.executeBatch();
          — End diff –

          It's been a while since i worked with JDBC, I take it this is a synchronous call? What happens if this call fails?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r120576118 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java — @@ -202,14 +202,20 @@ public void writeRecord(Row row) throws IOException { upload.addBatch(); batchCount++; if (batchCount >= batchInterval) { - upload.executeBatch(); - batchCount = 0; + flush(); } } catch (SQLException | IllegalArgumentException e) { throw new IllegalArgumentException("writeRecord() failed", e); } } + void flush() throws SQLException { + if (upload != null) { + upload.executeBatch(); — End diff – It's been a while since i worked with JDBC, I take it this is a synchronous call? What happens if this call fails?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r120576032

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java —
          @@ -0,0 +1,79 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.runtime.state.FunctionSnapshotContext;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.types.Row;
          +
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertArrayEquals;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertNotSame;
          +import static org.mockito.Mockito.mock;
          +import static org.mockito.Mockito.verify;
          +
          +/**
          + * Test for JDBCTableSink.
          + */
          +public class JDBCTableSinkTest {
          + private static final String[] FIELD_NAMES = new String[]

          {"foo"}

          ;
          + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]

          { + BasicTypeInfo.STRING_TYPE_INFO + }

          ;
          +
          + @Test
          + public void testOutputSink() throws Exception {
          + JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class);
          + JDBCTableSink sink = new JDBCTableSink(outputFormat);
          + @SuppressWarnings("unchecked")
          + DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class);
          + sink.emitDataStream(dataStream);
          + verify(dataStream).addSink(sink);
          — End diff –

          you don't have to test this, as it is not a detail of the JDBCTableSink but the table API.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r120576032 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java — @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Test for JDBCTableSink. + */ +public class JDBCTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"} ; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO + } ; + + @Test + public void testOutputSink() throws Exception { + JDBCOutputFormat outputFormat = mock(JDBCOutputFormat.class); + JDBCTableSink sink = new JDBCTableSink(outputFormat); + @SuppressWarnings("unchecked") + DataStream<Row> dataStream = (DataStream<Row>) mock(DataStream.class); + sink.emitDataStream(dataStream); + verify(dataStream).addSink(sink); — End diff – you don't have to test this, as it is not a detail of the JDBCTableSink but the table API.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3712

          @fhueske @zentol can you please take another look? Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 @fhueske @zentol can you please take another look? Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3712

          Correct me if I'm wrong – will something like the following work?

          ```
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception

          { + outputFormat.flush(); + }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Correct me if I'm wrong – will something like the following work? ``` + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + outputFormat.flush(); + } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3712

          Great, just wanted to make sure we're on the same page.

          Why did you revert the usage of the GenericWriteAheadSink? Now we're back to where we started, not having any guarantee that data is written when a checkpoint is being completed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3712 Great, just wanted to make sure we're on the same page. Why did you revert the usage of the GenericWriteAheadSink? Now we're back to where we started, not having any guarantee that data is written when a checkpoint is being completed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3712

          I think an at-least-once sink with support for upserts would also be very useful.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3712 I think an at-least-once sink with support for upserts would also be very useful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3712

          Thanks for your pointer of the prototype!

          > Do you intend to provide exactly-once guarantees for arbitrary updates?

          As I think about it a little bit more, I think it might make sense to start with the at-least-once semantic first. In practice we make the JDBC call idempotent using `INSERT IF NOT EXISTS`.

          The exactly-once part is more tricky and let's separate it out for now. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Thanks for your pointer of the prototype! > Do you intend to provide exactly-once guarantees for arbitrary updates? As I think about it a little bit more, I think it might make sense to start with the at-least-once semantic first. In practice we make the JDBC call idempotent using `INSERT IF NOT EXISTS`. The exactly-once part is more tricky and let's separate it out for now. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r113411053

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java —
          @@ -0,0 +1,71 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
          +import org.apache.flink.types.Row;
          +import org.junit.Test;
          +import org.mockito.Mockito;
          +
          +import static org.junit.Assert.assertArrayEquals;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertNotSame;
          +import static org.mockito.Matchers.anyString;
          +import static org.mockito.Matchers.same;
          +import static org.mockito.Mockito.mock;
          +import static org.mockito.Mockito.verify;
          +
          +public class JDBCTableSinkTest {
          + private static final String[] FIELD_NAMES = new String[]

          {"foo"}

          ;
          + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {
          — End diff –

          remove space after `[]`. move `STRING_TYPE_INFO` to this line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113411053 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java — @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.types.Row; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class JDBCTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"} ; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { — End diff – remove space after `[]`. move `STRING_TYPE_INFO` to this line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r113412006

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,91 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
          +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
          +import org.apache.flink.table.sinks.StreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +import java.io.IOException;
          +import java.util.UUID;
          +
          +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> {
          + private final JDBCOutputFormat outputFormat;
          + private final CheckpointCommitter committer;
          + private final String[] fieldNames;
          + private final TypeInformation[] fieldTypes;
          +
          + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer,
          + JDBCOutputFormat outputFormat, String[] fieldNames,
          + TypeInformation[] fieldTypes) throws Exception

          { + super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); + this.outputFormat = outputFormat; + this.committer = committer; + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + }

          +
          + @Override
          + public void emitDataStream(DataStream<Row> dataStream)

          { + dataStream.transform("JDBC Sink", getOutputType(), this); + }

          +
          + @Override
          + public TypeInformation<Row> getOutputType()

          { + return new RowTypeInfo(fieldTypes, fieldNames); + }

          +
          + @Override
          + public String[] getFieldNames()

          { + return fieldNames; + }

          +
          + @Override
          + public TypeInformation<?>[] getFieldTypes()

          { + return fieldTypes; + }

          +
          + @Override
          + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
          + try

          { + return new JDBCTableSink(committer, serializer, outputFormat, fieldNames, fieldTypes); + }

          catch (Exception e)

          { + LOG.warn("Failed to create a copy of the sink.", e); + return null; + }

          + }
          +
          + @Override
          + protected boolean sendValues(Iterable<Row> value, long timestamp) throws Exception {
          + for (Row r : value) {
          + try {
          + outputFormat.writeRecord(r);
          — End diff –

          This doesn't guarantee in any way that the values are actually being sent; you need some kind of flushing functionality for this to work properly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113412006 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer, + JDBCOutputFormat outputFormat, String[] fieldNames, + TypeInformation[] fieldTypes) throws Exception { + super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); + this.outputFormat = outputFormat; + this.committer = committer; + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + dataStream.transform("JDBC Sink", getOutputType(), this); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + try { + return new JDBCTableSink(committer, serializer, outputFormat, fieldNames, fieldTypes); + } catch (Exception e) { + LOG.warn("Failed to create a copy of the sink.", e); + return null; + } + } + + @Override + protected boolean sendValues(Iterable<Row> value, long timestamp) throws Exception { + for (Row r : value) { + try { + outputFormat.writeRecord(r); — End diff – This doesn't guarantee in any way that the values are actually being sent; you need some kind of flushing functionality for this to work properly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r113410894

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,91 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
          +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
          +import org.apache.flink.table.sinks.StreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +import java.io.IOException;
          +import java.util.UUID;
          +
          +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> {
          + private final JDBCOutputFormat outputFormat;
          + private final CheckpointCommitter committer;
          + private final String[] fieldNames;
          + private final TypeInformation[] fieldTypes;
          +
          + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer,
          + JDBCOutputFormat outputFormat, String[] fieldNames,
          + TypeInformation[] fieldTypes) throws Exception {
          — End diff –

          like the cassandra sink the `fieldNames/Types` should be removed to provide a clean API to the user.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113410894 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer, + JDBCOutputFormat outputFormat, String[] fieldNames, + TypeInformation[] fieldTypes) throws Exception { — End diff – like the cassandra sink the `fieldNames/Types` should be removed to provide a clean API to the user.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r113412806

          — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java —
          @@ -0,0 +1,91 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
          +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
          +import org.apache.flink.table.sinks.StreamTableSink;
          +import org.apache.flink.table.sinks.TableSink;
          +import org.apache.flink.types.Row;
          +
          +import java.io.IOException;
          +import java.util.UUID;
          +
          +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> {
          + private final JDBCOutputFormat outputFormat;
          + private final CheckpointCommitter committer;
          + private final String[] fieldNames;
          + private final TypeInformation[] fieldTypes;
          +
          + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer,
          — End diff –

          I would propose either adding a JDBCCheckpointCommitter that cooperates with the sink (as seen in this [prototype](https://github.com/zentol/flink/commit/92e878b59a7371ac9cad402d0b009c7439cd1900) or omitting the `CheckpointCommitter` argument and providing a dummy to the `GenericWriteAheadSink`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113412806 — Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSink.java — @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.UUID; + +public class JDBCTableSink extends GenericWriteAheadSink<Row> implements StreamTableSink<Row> { + private final JDBCOutputFormat outputFormat; + private final CheckpointCommitter committer; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + + public JDBCTableSink(CheckpointCommitter committer, TypeSerializer<Row> serializer, — End diff – I would propose either adding a JDBCCheckpointCommitter that cooperates with the sink (as seen in this [prototype] ( https://github.com/zentol/flink/commit/92e878b59a7371ac9cad402d0b009c7439cd1900 ) or omitting the `CheckpointCommitter` argument and providing a dummy to the `GenericWriteAheadSink`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3712#discussion_r113410993

          — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java —
          @@ -0,0 +1,71 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.java.io.jdbc;
          +
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
          +import org.apache.flink.types.Row;
          +import org.junit.Test;
          +import org.mockito.Mockito;
          +
          +import static org.junit.Assert.assertArrayEquals;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertNotSame;
          +import static org.mockito.Matchers.anyString;
          +import static org.mockito.Matchers.same;
          +import static org.mockito.Mockito.mock;
          +import static org.mockito.Mockito.verify;
          +
          +public class JDBCTableSinkTest {
          + private static final String[] FIELD_NAMES = new String[]

          {"foo"}

          ;
          — End diff –

          remove space after `[]`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3712#discussion_r113410993 — Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSinkTest.java — @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.types.Row; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class JDBCTableSinkTest { + private static final String[] FIELD_NAMES = new String[] {"foo"} ; — End diff – remove space after `[]`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3712

          The GenericWriteAheadSink would work for this. It can be implemented just the `CassandraWriteAheadSink`. Since we can also use transactions it can be a bit more sophisticated even.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3712 The GenericWriteAheadSink would work for this. It can be implemented just the `CassandraWriteAheadSink`. Since we can also use transactions it can be a bit more sophisticated even.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3712

          There is `GenericWriteAheadSink`, which buffers elements and writes to Cassandra. I think this needs some more thought, maybe a design outline on the Jira issue. I'm also not 100 % sure if the generic write-ahead sink will work for this. @zentol might have a better answer, though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3712 There is `GenericWriteAheadSink`, which buffers elements and writes to Cassandra. I think this needs some more thought, maybe a design outline on the Jira issue. I'm also not 100 % sure if the generic write-ahead sink will work for this. @zentol might have a better answer, though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3712

          Thanks for the review. I'm not aware of the fact that OutputFormats are not integrated with Flink's checkpointing mechanism.

          To address this problem, maybe we can do something similar to the `FlinkKafkaProducerBase`? What do you think @zentol and @aljoscha ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3712 Thanks for the review. I'm not aware of the fact that OutputFormats are not integrated with Flink's checkpointing mechanism. To address this problem, maybe we can do something similar to the `FlinkKafkaProducerBase`? What do you think @zentol and @aljoscha ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3712

          Hi @haohui, I think a JdbcTableSink would be a great feature!

          However, there is a big issue with wrapping the `JdbcOutputFormat`. OutputFormats are not integrated with Flink's checkpointing mechanism. The `JdbcOutputFormat` buffers rows to write them out in batches. When records are buffered that arrived before the last checkpoint, they will be lost in case of a failure because they will not be replayed.

          The JdbcTableSink should be integrated with Flink's checkpointing mechanism. In a nutshell, it should buffer records and commit them to the database when a checkpoint is taken. I think we need to think a bit more about a proper design for this feature. @zentol and @aljoscha might have some thoughts on this as well as they are more familiar with the implementation of checkpoint-aware sinks.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3712 Hi @haohui, I think a JdbcTableSink would be a great feature! However, there is a big issue with wrapping the `JdbcOutputFormat`. OutputFormats are not integrated with Flink's checkpointing mechanism. The `JdbcOutputFormat` buffers rows to write them out in batches. When records are buffered that arrived before the last checkpoint, they will be lost in case of a failure because they will not be replayed. The JdbcTableSink should be integrated with Flink's checkpointing mechanism. In a nutshell, it should buffer records and commit them to the database when a checkpoint is taken. I think we need to think a bit more about a proper design for this feature. @zentol and @aljoscha might have some thoughts on this as well as they are more familiar with the implementation of checkpoint-aware sinks. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui opened a pull request:

          https://github.com/apache/flink/pull/3712

          FLINK-6281 Create TableSink for JDBC.

          This PR implements the `StreamTableSink` interface for the JDBC connectors so that the streaming SQL APIs can directly interact with them.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-6281

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3712.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3712


          commit 2bfd014bedc5bd64f346652dfb5ddb41cc36cc3f
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-04-12T06:56:56Z

          FLINK-6281 Create TableSink for JDBC.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3712 FLINK-6281 Create TableSink for JDBC. This PR implements the `StreamTableSink` interface for the JDBC connectors so that the streaming SQL APIs can directly interact with them. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-6281 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3712.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3712 commit 2bfd014bedc5bd64f346652dfb5ddb41cc36cc3f Author: Haohui Mai <wheat9@apache.org> Date: 2017-04-12T06:56:56Z FLINK-6281 Create TableSink for JDBC.

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              wheat9 Haohui Mai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development