Details
-
Improvement
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
None
-
None
Description
Summarize
Flink is based on S3AInputStream.java to select datas stored in Tencent COS, it will call the getObject function of AmazonS3Client.java.
Now, Tencent COS have already support to pushdown the CSV and Parquert file format.
In these cases, using getObject to select datas will wastes a lots of bandwidth.
So, I think Flink SQL should support S3 Select, to reduce the waste of bandwidth.
Design
1. In HiveMapredSplitReader.java , we used int[] selectedFields to construct S3 SELECT SQL. And we created a new Class named S3SelectCsvReader which used AmazonS3Client.selectObjectContent function to readLine CSV File.
2. Flink Demo Table:
1) Table schema
Flink SQL> desc cos.test_s3a;
root
– name: STRING (col1) |
– age: INT (col2) |
– dt: STRING (col3,it's a partition column) |
2) Conversion relationship (FLINK SQL Convert To S3 SELECT SQL)
FlinkSQL S3 SELECT SQL
select name from cos.test_s3a; => SELECT s._1, null FROM S3Object s
select age from cos.test_s3a; => SELECT null, s._2 FROM S3Object s
select dt, name, age from cos.test_s3a; => SELECT s._1, s._2 FROM S3Object s
select dt from cos.test_s3a; => SELECT null, null FROM S3Object s
select * from cos.test_s3a; => SELECT s._1, s._2 FROM S3Object s
select name from cos.test_s3a where dt='2020-07-15'; => SELECT s._1, null FROM S3Object s
3) Patch Commit
https://github.com/Coderlxl/flink/commit/b211f4830a7301bf9283a6d37209000b176913ad