Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12584

Add Bucket File Syetem Table Sink

    XMLWordPrintableJSON

Details

    Description

      1. Motivation

      In flink, the file system (especially hdfs) is a very common output, but for users using sql, it does not support directly using sql to write data to the file system, so I want to add a bucket file system table sink, the user can register it to StreamTableEnvironment, so that table api and sql api can directly use the sink to write stream data to filesystem

      2.example

      tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data"))

                         .withFormat(new Json().deriveSchema())

                         .withSchema(new Schema()

                                .field("name", Types. STRING ())

                                .field("age", Types. INT ())

                         .inAppendMode()

                         .registerTableSink("myhdfssink");

      tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource");

       

       3.Some ideas to achieve this function

      1) Add a class Bucket which extends from ConnectorDescriptor, add some properties, such as basePath.

      2) Add a class BucketValidator which extends from the ConnectorDescriptorValidator and is used to check the bucket descriptor.

      3) Add a class FileSystemTableSink to implement the StreamTableSink interface.  In the emitDataStream method, construct StreamingFileSink for writing data to filesystem according to different properties.

      4) Add a factory class FileSystemTableSinkFactory to implement the StreamTableSinkFactory interface for constructing FileSystemTableSink

      5) The parameters of withFormat method is the implementation classes of the FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc later.

      Attachments

        Issue Links

          Activity

            People

              zhangjun Jun Zhang
              zhangjun Jun Zhang
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m