Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30809

flink-connector-elasticsearch7 updates data pipeline does not work

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • elasticsearch-3.0.0
    • None
    • None

    Description

      create elasticsearch in docker

      version: '2.1'
      services: 
       elasticsearch: 
         image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
         environment: 
           - cluster.name=docker-cluster
           - bootstrap.memory_lock=true
           - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
           - discovery.type=single-node
         ports: 
           - "9200:9200"
           - "9300:9300"
         ulimits: 
           memlock: 
             soft: -1
             hard: -1
           nofile: 
             soft: 65536
             hard: 65536
       kibana: 
         image: docker.elastic.co/kibana/kibana:7.17.7
         ports: 
           - "5601:5601"
      

      create table: records in mysql

      CREATE TABLE records (
       id bigint unsigned NOT NULL AUTO_INCREMENT,
       user_id bigint unsigned NOT NULL,
       create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
       PRIMARY KEY (id)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
      

      insert some datas

      INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, '2023-01-20 12:25:11');
      INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, '2023-01-20 12:25:30');
      INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, '2023-01-20 12:25:37');
      

      create pipeline in es:

      PUT /_ingest/pipeline/set_ingest_timestamp_fields
      {
       "processors": [
         {
           "set": {
             "field": "ingest_timestamp",
             "value": "{{_ingest.timestamp}}"
           }
         }
       ]
      }

      create index in es:

      PUT enriched_records
      {
       "settings": {
         "default_pipeline": "set_ingest_timestamp_fields",
         "number_of_shards": "1",
         "number_of_replicas": "0"
       }
      }

      excute flink sql:

      CREATE TABLE records (
         id INT,
         user_id INT,
         create_time TIMESTAMP(3),
         proc_time AS PROCTIME(),
         operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
         PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
         'connector' = 'mysql-cdc',
         'hostname' = 'localhost',
         'port' = '3306',
         'username' = 'root',
         'password' = '123456',
         'database-name' = 'test',
         'table-name' = 'records',
         'server-time-zone' = 'UTC'
      );
      CREATE TABLE enriched_records (
         id INT,
         user_id INT,
         create_time TIMESTAMP(3),
         proc_time TIMESTAMP_LTZ(3),
         operation_time TIMESTAMP_LTZ(3),
         PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
         'connector' = 'elasticsearch-7',
         'hosts' = 'http://localhost:9200',
         'index' = 'enriched_records'
      );
      INSERT INTO enriched_records
      SELECT
         o.id,
         o.user_id,
         o.create_time,
         o.proc_time,
         o.operation_time
      FROM records AS o; 
      

      We query the data in Elasticsearch use GET /enriched_records/_search and we find that each record has an ingest_timestamp field and the value is the recent time.

      {
          "_index":"enriched_records",
          "_type":"_doc",
          "_id":"3",
          "_score":1,
          "_source":{
              "operation_time":"1970-01-01 00:00:00Z",
              "create_time":"2023-01-20 12:25:37",
              "user_id":789,
              "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
              "id":3,
              "proc_time":"2023-01-28 05:21:40.233Z"
          }
      } 

      When we modify a record in MySQL, the value of the ingest_timestamp field does not change, and it seems that the pipeline set for this index is not working at this moment.

      {
          "_index":"enriched_records",
          "_type":"_doc",
          "_id":"3",
          "_score":1,
          "_source":{
              "operation_time":"2023-01-28 05:25:05Z",
              "create_time":"2023-01-20 12:25:37",
              "user_id":987,
              "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
              "id":3,
              "proc_time":"2023-01-28 05:25:05.529Z"
          }
      }
      

      If we directly modify a field in Elasticsearch, we can find that the value of the ingest_timestamp field will change.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            iduanyingjie iduanyingjie
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: