Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22874

flink table partition trigger doesn't effect as expectation when sink into hive table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.13.1
    • None
    • Connectors / Hive
    • None
    • my misunderstanding of streaming partition sink cause this issue, it worked normally after I had enabled checkpointing.

    Description

      I am trying to sink into hive partitioned table which partition commit trigger is declared as "

      partition-time", and I had assigned watermark on the dataStream. When I input some data into dataStream it can not commit hive partition on time. Here's my code

      //ddl of hive table 
      create table test_table(username string)
      partitioned by (ts bigint)
      stored as orc
      TBLPROPERTIES (
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.policy.kind'='metastore,success-file'
      );
      // flink application code
      
      val streamEnv = ...
      val dataStream:DataStream[(String, Long)] = ...
      
      
      // assign watermark and output watermark info in processFunction
      class MyProcessFunction extends ProcessFunction[(String, Long), (String, Long, Long)] {
        override def processElement(value: (String, Long), ctx: ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: Collector[(String, Long, Long)]): Unit = {
          out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
        }
      }
      
      val resultStream = dataStream
      .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
            element._2 * 1000
          }
        }))
      .process(new MyProcessFunction)
      
      //
      val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
      
      // convert dataStream into hive catalog table and sink into hive
      streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
      val catalog = ...
      streamTableEnv.registerCatalog("hive", catalog)
      streamTableEnv.useCatalog("hive")
      streamTableEnv.executeSql("insert into test_table select _1,_2 from default_catalog.default_database.test_catalog_t").print()
      
      
      // flink use the default parallelism 4
      // input data
      (a, 1)
      (b, 2)
      (c, 3)
      (d, 4)
      (a, 5)
       ...
      
      // result
      there are much partition directories on hdfs but all they are inprogressing files and never would be commit to hive metastore.

      Attachments

        Activity

          People

            Unassigned Unassigned
            SpongebobZ Spongebob
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: