Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5714

Use a builder pattern for creating CsvTableSource

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Right now, the constructor of the CsvTableSource can have up to 9 parameters. In Scala this might not be a problem because of default values, but Java doesn't have this functionality.

      I propose to have a builder pattern here:

      CsvTableSource
      .builder()
      .field("myfield", Types.STRING)
      .field("myfield2", Types.INT)
      .quoteCharacter(';')
      .build()
      

        Issue Links

          Activity

          Hide
          jark Jark Wu added a comment -

          I like this idea! I will work on this.

          Show
          jark Jark Wu added a comment - I like this idea! I will work on this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user wuchong opened a pull request:

          https://github.com/apache/flink/pull/3273

          FLINK-5714 [table] Use a builder pattern for creating CsvTableSource

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          Add a builder to create CsvTableSource. And I have also updated the documentation.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/wuchong/flink CsvTableSource-builder-FLINK-5714

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3273.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3273


          commit 10ffca16901f748c3e2c3818f76cebbc9fcc3153
          Author: Jark Wu <wuchong.wc@alibaba-inc.com>
          Date: 2017-02-06T13:57:04Z

          FLINK-5714 [table] Use a builder pattern for creating CsvTableSource


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/3273 FLINK-5714 [table] Use a builder pattern for creating CsvTableSource Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed Add a builder to create CsvTableSource. And I have also updated the documentation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink CsvTableSource-builder- FLINK-5714 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3273.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3273 commit 10ffca16901f748c3e2c3818f76cebbc9fcc3153 Author: Jark Wu <wuchong.wc@alibaba-inc.com> Date: 2017-02-06T13:57:04Z FLINK-5714 [table] Use a builder pattern for creating CsvTableSource
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100452526

          — Diff: docs/dev/table_api.md —
          @@ -253,56 +253,51 @@ Table result = tableEnvironment.ingest("kafka-source");

          The `CsvTableSource` is already included in `flink-table` without additional dependecies.

          -It can be configured with the following properties:
          -

          • - `path` The path to the CSV file, required.
          • - `fieldNames` The names of the table fields, required.
          • - `fieldTypes` The types of the table fields, required.
          • - `fieldDelim` The field delimiter, `","` by default.
          • - `rowDelim` The row delimiter, `"\n"` by default.
          • - `quoteCharacter` An optional quote character for String values, `null` by default.
          • - `ignoreFirstLine` Flag to ignore the first line, `false` by default.
          • - `ignoreComments` An optional prefix to indicate comments, `null` by default.
          • - `lenient` Flag to skip records with parse error instead to fail, `false` by default.
            +It can be created by `CsvTableSource.builder()`, the builder has the following methods to configure properties:
            +
            + - `path(String path)` Sets the path to the CSV file, required.
            + - `field(String fieldName, TypeInformation<?> fieldType)` Adds a field with the field name and field type information, required.
            + - `fieldDelimiter(String delim)` Sets the field delimiter, `","` by default.
            + - `lineDelimiter(String delim)` Sets the line delimiter, `"\n"` by default.
            + - `quoteCharacter(Character quote)` Sets the quote character for String values, `null` by default.
            + - `commentPrefix(String prefix)` Sets a prefix to indicate comments, null by default.
              • End diff –

          Shall we change to null to `null` to keep consistency

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100452526 — Diff: docs/dev/table_api.md — @@ -253,56 +253,51 @@ Table result = tableEnvironment.ingest("kafka-source"); The `CsvTableSource` is already included in `flink-table` without additional dependecies. -It can be configured with the following properties: - - `path` The path to the CSV file, required. - `fieldNames` The names of the table fields, required. - `fieldTypes` The types of the table fields, required. - `fieldDelim` The field delimiter, `","` by default. - `rowDelim` The row delimiter, `"\n"` by default. - `quoteCharacter` An optional quote character for String values, `null` by default. - `ignoreFirstLine` Flag to ignore the first line, `false` by default. - `ignoreComments` An optional prefix to indicate comments, `null` by default. - `lenient` Flag to skip records with parse error instead to fail, `false` by default. +It can be created by `CsvTableSource.builder()`, the builder has the following methods to configure properties: + + - `path(String path)` Sets the path to the CSV file, required. + - `field(String fieldName, TypeInformation<?> fieldType)` Adds a field with the field name and field type information, required. + - `fieldDelimiter(String delim)` Sets the field delimiter, `","` by default. + - `lineDelimiter(String delim)` Sets the line delimiter, `"\n"` by default. + - `quoteCharacter(Character quote)` Sets the quote character for String values, `null` by default. + - `commentPrefix(String prefix)` Sets a prefix to indicate comments, null by default. End diff – Shall we change to null to `null` to keep consistency
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100452676

          — Diff: docs/dev/table_api.md —
          @@ -253,56 +253,51 @@ Table result = tableEnvironment.ingest("kafka-source");

          The `CsvTableSource` is already included in `flink-table` without additional dependecies.

          -It can be configured with the following properties:
          -

          • - `path` The path to the CSV file, required.
          • - `fieldNames` The names of the table fields, required.
          • - `fieldTypes` The types of the table fields, required.
          • - `fieldDelim` The field delimiter, `","` by default.
          • - `rowDelim` The row delimiter, `"\n"` by default.
          • - `quoteCharacter` An optional quote character for String values, `null` by default.
          • - `ignoreFirstLine` Flag to ignore the first line, `false` by default.
          • - `ignoreComments` An optional prefix to indicate comments, `null` by default.
          • - `lenient` Flag to skip records with parse error instead to fail, `false` by default.
            +It can be created by `CsvTableSource.builder()`, the builder has the following methods to configure properties:
            +
            + - `path(String path)` Sets the path to the CSV file, required.
            + - `field(String fieldName, TypeInformation<?> fieldType)` Adds a field with the field name and field type information, required.
            + - `fieldDelimiter(String delim)` Sets the field delimiter, `","` by default.
            + - `lineDelimiter(String delim)` Sets the line delimiter, `"\n"` by default.
            + - `quoteCharacter(Character quote)` Sets the quote character for String values, `null` by default.
            + - `commentPrefix(String prefix)` Sets a prefix to indicate comments, null by default.
            + - `ignoreFirstLine()` Ignore the first line. Not skip the first line by default.
              • End diff –

          disabled by default?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100452676 — Diff: docs/dev/table_api.md — @@ -253,56 +253,51 @@ Table result = tableEnvironment.ingest("kafka-source"); The `CsvTableSource` is already included in `flink-table` without additional dependecies. -It can be configured with the following properties: - - `path` The path to the CSV file, required. - `fieldNames` The names of the table fields, required. - `fieldTypes` The types of the table fields, required. - `fieldDelim` The field delimiter, `","` by default. - `rowDelim` The row delimiter, `"\n"` by default. - `quoteCharacter` An optional quote character for String values, `null` by default. - `ignoreFirstLine` Flag to ignore the first line, `false` by default. - `ignoreComments` An optional prefix to indicate comments, `null` by default. - `lenient` Flag to skip records with parse error instead to fail, `false` by default. +It can be created by `CsvTableSource.builder()`, the builder has the following methods to configure properties: + + - `path(String path)` Sets the path to the CSV file, required. + - `field(String fieldName, TypeInformation<?> fieldType)` Adds a field with the field name and field type information, required. + - `fieldDelimiter(String delim)` Sets the field delimiter, `","` by default. + - `lineDelimiter(String delim)` Sets the line delimiter, `"\n"` by default. + - `quoteCharacter(Character quote)` Sets the quote character for String values, `null` by default. + - `commentPrefix(String prefix)` Sets a prefix to indicate comments, null by default. + - `ignoreFirstLine()` Ignore the first line. Not skip the first line by default. End diff – disabled by default?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100453644

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -138,4 +141,160 @@ class CsvTableSource(

          inputFormat
          }
          +
          + override def equals(other: Any): Boolean = other match

          { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + }

          +
          + override def hashCode(): Int =

          { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + }

          +}
          +
          +object CsvTableSource {
          +
          + /**
          + * A builder for creating [[CsvTableSource]] instances.
          + *
          + * For example:
          + *
          + * {{{
          + * val source: CsvTableSource = new CsvTableSourceBuilder()
          — End diff –

          CsvTableSourceBuilder should be CsvTableSource.builder()
          and does this comment a little duplicated with method builder ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100453644 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -138,4 +141,160 @@ class CsvTableSource( inputFormat } + + override def equals(other: Any): Boolean = other match { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object CsvTableSource { + + /** + * A builder for creating [ [CsvTableSource] ] instances. + * + * For example: + * + * {{{ + * val source: CsvTableSource = new CsvTableSourceBuilder() — End diff – CsvTableSourceBuilder should be CsvTableSource.builder() and does this comment a little duplicated with method builder ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100454106

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -138,4 +141,160 @@ class CsvTableSource(

          inputFormat
          }
          +
          + override def equals(other: Any): Boolean = other match

          { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + }

          +
          + override def hashCode(): Int =

          { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + }

          +}
          +
          +object CsvTableSource {
          +
          + /**
          + * A builder for creating [[CsvTableSource]] instances.
          + *
          + * For example:
          + *
          + * {{

          { + * val source: CsvTableSource = new CsvTableSourceBuilder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * }

          }}
          + *
          + */
          + class Builder {
          +
          + private val fieldNames: ListBuffer[String] = ListBuffer[String]()
          — End diff –

          Can we bind the name and type to a MutableMap and do some check when we adding fields? Like can not have duplicated filed names?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100454106 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -138,4 +141,160 @@ class CsvTableSource( inputFormat } + + override def equals(other: Any): Boolean = other match { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object CsvTableSource { + + /** + * A builder for creating [ [CsvTableSource] ] instances. + * + * For example: + * + * {{ { + * val source: CsvTableSource = new CsvTableSourceBuilder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * } }} + * + */ + class Builder { + + private val fieldNames: ListBuffer [String] = ListBuffer [String] () — End diff – Can we bind the name and type to a MutableMap and do some check when we adding fields? Like can not have duplicated filed names?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100455433

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -44,15 +47,15 @@ import org.apache.flink.table.api.TableException

          • @param lenient Flag to skip records with parse error instead to fail, false by default.
            */
            class CsvTableSource(
          • path: String,
          • fieldNames: Array[String],
          • fieldTypes: Array[TypeInformation[_]],
          • fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
          • rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
          • quoteCharacter: Character = null,
          • ignoreFirstLine: Boolean = false,
          • ignoreComments: String = null,
          • lenient: Boolean = false)
            + private val path: String,
              • End diff –

          I think these two forms are identical

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100455433 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -44,15 +47,15 @@ import org.apache.flink.table.api.TableException @param lenient Flag to skip records with parse error instead to fail, false by default. */ class CsvTableSource( path: String, fieldNames: Array [String] , fieldTypes: Array[TypeInformation [_] ], fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, quoteCharacter: Character = null, ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) + private val path: String, End diff – I think these two forms are identical
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100455645

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -138,4 +141,160 @@ class CsvTableSource(

          inputFormat
          }
          +
          + override def equals(other: Any): Boolean = other match

          { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + }

          +
          + override def hashCode(): Int =

          { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + }

          +}
          +
          +object CsvTableSource {
          +
          + /**
          + * A builder for creating [[CsvTableSource]] instances.
          + *
          + * For example:
          + *
          + * {{

          { + * val source: CsvTableSource = new CsvTableSourceBuilder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * }

          }}
          + *
          + */
          + class Builder {
          +
          + private val fieldNames: ListBuffer[String] = ListBuffer[String]()
          + private val fieldTypes: ListBuffer[TypeInformation[_]] = ListBuffer[TypeInformation[_]]()
          + private var quoteCharacter: Character = _
          + private var path: String = _
          + private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER
          + private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER
          + private var isIgnoreFirstLine: Boolean = false
          + private var commentPrefix: String = _
          + private var lenient: Boolean = false
          +
          +
          + /**
          + * Sets the path to the CSV file.
          + * @param path the path to the CSV file
          + */
          + def path(path: String): Builder =

          { + this.path = path + this + }

          +
          + /**
          + * Sets the field delimiter, "," by default.
          + * @param delim the field delimiter
          + */
          + def fieldDelimiter(delim: String): Builder =

          { + this.fieldDelim = delim + this + }

          +
          + /**
          + * Sets the line delimiter, "\n" by default.
          + * @param delim the line delimiter
          + */
          + def lineDelimiter(delim: String): Builder =

          { + this.lineDelim = delim + this + }

          +
          + /**
          + * Add a field with the field name and the type information.
          + * @param fieldName the field name
          + * @param fieldType the type information of the field
          + */
          + def field(fieldName: String, fieldType: TypeInformation[_]): Builder =

          { + this.fieldNames += fieldName + this.fieldTypes += fieldType + this + }

          +
          + /**
          + * Sets a quote character for String values, null by default.
          + * @param quote the quote character
          + */
          + def quoteCharacter(quote: Character): Builder =

          { + this.quoteCharacter = quote + this + }

          +
          + /**
          + * Sets a prefix to indicate comments, null by default.
          + * @param prefix the prefix to indicate comments
          + */
          + def commentPrefix(prefix: String): Builder =

          { + this.commentPrefix = prefix + this + }

          +
          + /**
          + * Ignore the first line. Not skip the first line by default.
          + */
          + def ignoreFirstLine: Builder =

          { + this.isIgnoreFirstLine = true + this + }

          +
          + /**
          + * Skip records with parse error instead to fail. Throw an exception by default.
          + */
          + def ignoreParseErrors: Builder =

          { + this.lenient = true + this + }

          +
          + /**
          + * Apply the current values and constructs a newly-created [[CsvTableSource]].
          + * @return a newly-created [[CsvTableSource]].
          + */
          + def build: CsvTableSource = {
          + Preconditions.checkNotNull(path, "Path must not be null.")
          — End diff –

          Fields can not be empty too?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100455645 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -138,4 +141,160 @@ class CsvTableSource( inputFormat } + + override def equals(other: Any): Boolean = other match { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object CsvTableSource { + + /** + * A builder for creating [ [CsvTableSource] ] instances. + * + * For example: + * + * {{ { + * val source: CsvTableSource = new CsvTableSourceBuilder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * } }} + * + */ + class Builder { + + private val fieldNames: ListBuffer [String] = ListBuffer [String] () + private val fieldTypes: ListBuffer[TypeInformation [_] ] = ListBuffer[TypeInformation [_] ]() + private var quoteCharacter: Character = _ + private var path: String = _ + private var fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER + private var lineDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER + private var isIgnoreFirstLine: Boolean = false + private var commentPrefix: String = _ + private var lenient: Boolean = false + + + /** + * Sets the path to the CSV file. + * @param path the path to the CSV file + */ + def path(path: String): Builder = { + this.path = path + this + } + + /** + * Sets the field delimiter, "," by default. + * @param delim the field delimiter + */ + def fieldDelimiter(delim: String): Builder = { + this.fieldDelim = delim + this + } + + /** + * Sets the line delimiter, "\n" by default. + * @param delim the line delimiter + */ + def lineDelimiter(delim: String): Builder = { + this.lineDelim = delim + this + } + + /** + * Add a field with the field name and the type information. + * @param fieldName the field name + * @param fieldType the type information of the field + */ + def field(fieldName: String, fieldType: TypeInformation [_] ): Builder = { + this.fieldNames += fieldName + this.fieldTypes += fieldType + this + } + + /** + * Sets a quote character for String values, null by default. + * @param quote the quote character + */ + def quoteCharacter(quote: Character): Builder = { + this.quoteCharacter = quote + this + } + + /** + * Sets a prefix to indicate comments, null by default. + * @param prefix the prefix to indicate comments + */ + def commentPrefix(prefix: String): Builder = { + this.commentPrefix = prefix + this + } + + /** + * Ignore the first line. Not skip the first line by default. + */ + def ignoreFirstLine: Builder = { + this.isIgnoreFirstLine = true + this + } + + /** + * Skip records with parse error instead to fail. Throw an exception by default. + */ + def ignoreParseErrors: Builder = { + this.lenient = true + this + } + + /** + * Apply the current values and constructs a newly-created [ [CsvTableSource] ]. + * @return a newly-created [ [CsvTableSource] ]. + */ + def build: CsvTableSource = { + Preconditions.checkNotNull(path, "Path must not be null.") — End diff – Fields can not be empty too?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100458026

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -138,4 +141,160 @@ class CsvTableSource(

          inputFormat
          }
          +
          + override def equals(other: Any): Boolean = other match

          { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + }

          +
          + override def hashCode(): Int =

          { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + }

          +}
          +
          +object CsvTableSource {
          +
          + /**
          + * A builder for creating [[CsvTableSource]] instances.
          + *
          + * For example:
          + *
          + * {{{
          + * val source: CsvTableSource = new CsvTableSourceBuilder()
          — End diff –

          Yes, `new CsvTableSourceBuilder()` should be `new CsvTableSource.Builder()`. The comment is a little duplicate. But I think it is reasonable to let user create a builder by the constructor, not only the static `builder` method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100458026 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -138,4 +141,160 @@ class CsvTableSource( inputFormat } + + override def equals(other: Any): Boolean = other match { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object CsvTableSource { + + /** + * A builder for creating [ [CsvTableSource] ] instances. + * + * For example: + * + * {{{ + * val source: CsvTableSource = new CsvTableSourceBuilder() — End diff – Yes, `new CsvTableSourceBuilder()` should be `new CsvTableSource.Builder()`. The comment is a little duplicate. But I think it is reasonable to let user create a builder by the constructor, not only the static `builder` method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100458044

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -138,4 +141,160 @@ class CsvTableSource(

          inputFormat
          }
          +
          + override def equals(other: Any): Boolean = other match

          { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + }

          +
          + override def hashCode(): Int =

          { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + }

          +}
          +
          +object CsvTableSource {
          +
          + /**
          + * A builder for creating [[CsvTableSource]] instances.
          + *
          + * For example:
          + *
          + * {{

          { + * val source: CsvTableSource = new CsvTableSourceBuilder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * }

          }}
          + *
          + */
          + class Builder {
          +
          + private val fieldNames: ListBuffer[String] = ListBuffer[String]()
          — End diff –

          Good point!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100458044 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -138,4 +141,160 @@ class CsvTableSource( inputFormat } + + override def equals(other: Any): Boolean = other match { + case that: CsvTableSource => returnType == that.returnType && + path == that.path && + fieldDelim == that.fieldDelim && + rowDelim == that.rowDelim && + quoteCharacter == that.quoteCharacter && + ignoreFirstLine == that.ignoreFirstLine && + ignoreComments == that.ignoreComments && + lenient == that.lenient + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(returnType) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object CsvTableSource { + + /** + * A builder for creating [ [CsvTableSource] ] instances. + * + * For example: + * + * {{ { + * val source: CsvTableSource = new CsvTableSourceBuilder() + * .path("/path/to/your/file.csv") + * .field("myfield", Types.STRING) + * .field("myfield2", Types.INT) + * .build() + * } }} + * + */ + class Builder { + + private val fieldNames: ListBuffer [String] = ListBuffer [String] () — End diff – Good point!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r100458059

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -44,15 +47,15 @@ import org.apache.flink.table.api.TableException

          • @param lenient Flag to skip records with parse error instead to fail, false by default.
            */
            class CsvTableSource(
          • path: String,
          • fieldNames: Array[String],
          • fieldTypes: Array[TypeInformation[_]],
          • fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
          • rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
          • quoteCharacter: Character = null,
          • ignoreFirstLine: Boolean = false,
          • ignoreComments: String = null,
          • lenient: Boolean = false)
            + private val path: String,
              • End diff –

          Actually, I want to access these fields in the `equals(Object)` by `that.path`. The `private val` modifier will create getter/setter implicitly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r100458059 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -44,15 +47,15 @@ import org.apache.flink.table.api.TableException @param lenient Flag to skip records with parse error instead to fail, false by default. */ class CsvTableSource( path: String, fieldNames: Array [String] , fieldTypes: Array[TypeInformation [_] ], fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, quoteCharacter: Character = null, ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) + private val path: String, End diff – Actually, I want to access these fields in the `equals(Object)` by `that.path`. The `private val` modifier will create getter/setter implicitly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3273

          Thank you for your advice. I will update the PR soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3273 Thank you for your advice. I will update the PR soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3273

          updated!

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3273 updated!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3273#discussion_r101234016

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala —
          @@ -266,11 +266,16 @@ object CsvTableSource {

          • @return a newly-created [[CsvTableSource]].
            */
            def build: CsvTableSource = {
          • Preconditions.checkNotNull(path, "Path must not be null.")
            + if (path == null) {
            + throw new IllegalArgumentException("Path must be defined.")
              • End diff –

          You can still use Preconditions.checkArgument though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3273#discussion_r101234016 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala — @@ -266,11 +266,16 @@ object CsvTableSource { @return a newly-created [ [CsvTableSource] ]. */ def build: CsvTableSource = { Preconditions.checkNotNull(path, "Path must not be null.") + if (path == null) { + throw new IllegalArgumentException("Path must be defined.") End diff – You can still use Preconditions.checkArgument though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3273

          Looks good to merge. Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3273 Looks good to merge. Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3273

          I found another issue: We should use a `LinkedHashMap` otherwise the order of fields and types is not guaranteed. I fixed it and will merge this now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3273 I found another issue: We should use a `LinkedHashMap` otherwise the order of fields and types is not guaranteed. I fixed it and will merge this now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3273

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3273
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 6c310a7628f6da09289dedc465db9294507fb10e

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 6c310a7628f6da09289dedc465db9294507fb10e

            People

            • Assignee:
              jark Jark Wu
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development