Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
elasticsearch-3.0.0
-
None
-
None
Description
create elasticsearch in docker
version: '2.1' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7 environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - discovery.type=single-node ports: - "9200:9200" - "9300:9300" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 kibana: image: docker.elastic.co/kibana/kibana:7.17.7 ports: - "5601:5601"
create table: records in mysql
CREATE TABLE records ( id bigint unsigned NOT NULL AUTO_INCREMENT, user_id bigint unsigned NOT NULL, create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
insert some datas
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, '2023-01-20 12:25:11'); INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, '2023-01-20 12:25:30'); INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, '2023-01-20 12:25:37');
create pipeline in es:
PUT /_ingest/pipeline/set_ingest_timestamp_fields { "processors": [ { "set": { "field": "ingest_timestamp", "value": "{{_ingest.timestamp}}" } } ] }
create index in es:
PUT enriched_records { "settings": { "default_pipeline": "set_ingest_timestamp_fields", "number_of_shards": "1", "number_of_replicas": "0" } }
excute flink sql:
CREATE TABLE records ( id INT, user_id INT, create_time TIMESTAMP(3), proc_time AS PROCTIME(), operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'test', 'table-name' = 'records', 'server-time-zone' = 'UTC' ); CREATE TABLE enriched_records ( id INT, user_id INT, create_time TIMESTAMP(3), proc_time TIMESTAMP_LTZ(3), operation_time TIMESTAMP_LTZ(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'enriched_records' ); INSERT INTO enriched_records SELECT o.id, o.user_id, o.create_time, o.proc_time, o.operation_time FROM records AS o;
We query the data in Elasticsearch use GET /enriched_records/_search and we find that each record has an ingest_timestamp field and the value is the recent time.
{ "_index":"enriched_records", "_type":"_doc", "_id":"3", "_score":1, "_source":{ "operation_time":"1970-01-01 00:00:00Z", "create_time":"2023-01-20 12:25:37", "user_id":789, "ingest_timestamp":"2023-01-28T05:21:40.539754251Z", "id":3, "proc_time":"2023-01-28 05:21:40.233Z" } }
When we modify a record in MySQL, the value of the ingest_timestamp field does not change, and it seems that the pipeline set for this index is not working at this moment.
{ "_index":"enriched_records", "_type":"_doc", "_id":"3", "_score":1, "_source":{ "operation_time":"2023-01-28 05:25:05Z", "create_time":"2023-01-20 12:25:37", "user_id":987, "ingest_timestamp":"2023-01-28T05:21:40.539754251Z", "id":3, "proc_time":"2023-01-28 05:25:05.529Z" } }
If we directly modify a field in Elasticsearch, we can find that the value of the ingest_timestamp field will change.