Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18549

flink 1.11 can not commit partition automatically

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • 1.11.0
    • None
    • Table SQL / Runtime
    • None

    Description

      I use the sql of flink 1.11, read from kafka and writing to hdfs, I found that the partition cannot be submitted automatically. This is my complete code。

      My checkpoint interval is 10s. I think it should be normal that there will be _SUCCESS file under the partition of hdfs every 10s, but in fact there is no

       

             StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            bsEnv.enableCheckpointing(10000);
            bsEnv.setParallelism(1);
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
      
            String sqlSource = "CREATE TABLE  source_kafka (\n" +
                               "    appName  STRING,\n" +
                               "    appVersion STRING,\n" +
                               "    uploadTime STRING\n" +
                               ") WITH (\n" +
                               "  'connector.type' = 'kafka',       \n" +
                               "  'connector.version' = '0.10',\n" +
                               "  'connector.topic' = 'test_topic',\n" +
                               "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                               "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
                               "  'connector.properties.group.id' = 'testGroup',\n" +
                               "  'format.type'='json',\n" +
                               "  'update-mode' = 'append' )";
      
            tEnv.executeSql(sqlSource);
      
      
            String sql = "CREATE TABLE fs_table (\n" +
                         "    appName  STRING,\n" +
                         "    appVersion STRING,\n" +
                         "    uploadTime STRING,\n" +
                         "  dt STRING," +
                         "  h string" +
                         ")  PARTITIONED BY (dt,h)  WITH (\n" +
                         "  'connector'='filesystem',\n" +
                           "  'path'='hdfs://localhost/tmp/',\n" +
                           " 'sink.partition-commit.policy.kind' = 'success-file', " +
                           "  'format'='orc'\n" +
                           ")";
            tEnv.executeSql(sql);
      
            String insertSql = "insert into  fs_table SELECT appName ,appVersion,uploadTime, " +
                               " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
      
            tEnv.executeSql(insertSql);
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhangjun Jun Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: