Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29729

Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Hi, I'm thinking if we can include the configured properties from flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat` besides hadoop configuration.

       

      I meet a use case that I want to query a table from S3 bucket with parquet format via filesystem connector, and I configured the AWS credential info in the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc. 

       

      The JobManager(SourceCoordinator) works well about "getFileStatus" of S3 objects and generate splits, but TaskManager(SourceOperator -> ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS credential info.

       

      After taking a deep analysis at the source code about creating ParquetReader to reader footer, I found that the AWS credential info is not passed during create & initialize S3AFileSystem, the detail info as showing in the bellow snapshot.

       

      The `hadoopConfig` only contains the properties from table format options and default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because the `hadoopConfig` is injected by `ParquetFileFormatFactory#createRuntimeDecoder` -> `ParquetColumnarRowInputFormat.createPartitionedFormat` -> `ParquetFileFormatFactory.generateParquetConfiguration`

       

      @Override
      public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
      DynamicTableSource.Context sourceContext,
      DataType producedDataType,
      int[][] projections) {
      
      return ParquetColumnarRowInputFormat.createPartitionedFormat(
      getParquetConfiguration(formatOptions),
      (RowType) Projection.of(projections).project(producedDataType).getLogicalType(),
      sourceContext.createTypeInformation(producedDataType),
      Collections.emptyList(),
      null,
      VectorizedColumnBatch.DEFAULT_SIZE,
      formatOptions.get(UTC_TIMEZONE),
      true);
      }
      
       
      
      private static Configuration getParquetConfiguration(ReadableConfig options) {
      Configuration conf = new Configuration();
      Properties properties = new Properties();
      ((org.apache.flink.configuration.Configuration) options).addAllToProperties(properties);
      properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString()));
      return conf;
      }
      
      

       

      I know that I can add the AWS credential info into core-site.xml or hdfs-site.xml, so that the `ParquetReader` can get the credential, but I think it might not a good practice, especially different flink jobs will use different AWS credential, so I'm thinking if we can combine the default hadoop configuration(static) and the properties from `flink-conf.yaml`(dynamic) during create `ParquetReader`. 

      For example, just like how this PR doing? https://github.com/apache/flink/pull/21130

       

      BTW,  I'm using Flink 1.15.1 in a standalone cluster to validate the whole process, but I think not only 1.15.1 version meet this problem, and not only access the objects/files from AWS S3 bucket, any other cloud object storage might also meet this problem.

       

      Besides change the code, is there any other solution can help me to handle this problem? thanks. 

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            lsy dalongliu
            stayrascal Rascal Wu
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment