通过 Apache-kafka 将删除事件从 MySQL 流式传输到 PostgreSQL

时间:2023-04-03
本文介绍了通过 Apache-kafka 将删除事件从 MySQL 流式传输到 PostgreSQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在尝试使用 Apache Kafka 将事件从 MySQL 流式传输到 PostgreSQL. 虽然插入和更新工作正常,但我无法理解了解如何从 MySQL 中删除记录并将此事件流式传输到 PostgreSQL.

I am trying to stream events from MySQL to PostgreSQL using Apache Kafka. Although insertions and updates work fine, I can't figure out how to delete a record from MySQL and stream this event to PostgreSQL.

假设以下拓扑:

               +-------------+
               |             |
               |    MySQL    |
               |             |
               +------+------+
                      |
                      |
                      |
      +---------------v------------------+
      |                                  |
      |           Kafka Connect          |
      |  (Debezium, JDBC connectors)     |
      |                                  |
      +---------------+------------------+
                      |
                      |
                      |
                      |
              +-------v--------+
              |                |
              |   PostgreSQL   |
              |                |
              +----------------+

我正在使用以下 docker 镜像;

I am using the following docker images;

  1. Apache-Zookeper
  2. Apache-Kafka
  3. Debezium/JDBC 连接器

然后

# Start the application
export DEBEZIUM_VERSION=0.6
docker-compose up

# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

这里是MySQL数据库的内容;

Here is the content of MySQL database;

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

并且我们可以验证PostgresSQL的内容是一样的;

And we can verify that the content of PostgresSQL is identical;

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email         
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
(4 rows)

假设我想从 MySQL 数据库中删除 id=1004 的记录;

Assume that I want to delete the record with id=1004 from MySQL database;

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> delete from customers where id = 1004;


docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
+------+------------+-----------+-----------------------+

虽然从 MySQL 中删除了该记录,但该条目仍然出现在 PostgresSQL 中

Although the record is deleted from MySQL, the entry still appears in PostgresSQL

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'

 last_name |  id  | first_name |         email         
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
(4 rows)

我知道支持软删除,但是是否可以从 PostgresSQL 中完全删除该特定条目(通过 Apache-Kafka 从 MySQL 流式传输 del 事件)?

I understand that soft deletes are supported however, is it possible to completely delete that particular entry from PostgresSQL as well (by streaming the del event from MySQL via Apache-Kafka)?

这是source.json文件的内容

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3"
    }
}

这里是jdbc-sink.json文件的内容

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

我也尝试设置 "pk.mode": "record_key""delete.enabled": "true" (错误修复建议) 但这种修改似乎不起作用.

I have also tried to set "pk.mode": "record_key" and "delete.enabled": "true" (bug fix suggestion) but this modification doesn't seem to work.

推荐答案

Confluent JDBC 接收器连接器当前不支持删除.有一个待处理的拉取请求(您已链接到它),但尚未合并.

Deletes are currently not supported by the Confluent JDBC sink connector. There's a pending pull request (you already linked to it), but this hasn't been merged yet.

目前,您可以自己基于该分支构建 JDBC 接收器连接器,也可以创建一个简单的自定义接收器连接器,该连接器通过在目标数据库上执行相应的 DELETE 语句来处理逻辑删除事件.

For the time being, you could either build the JDBC sink connector based on that branch yourself, or you create a simple custom sink connector which just handles tombstone events by executing a corresponding DELETE statement on the target database.

这篇关于通过 Apache-kafka 将删除事件从 MySQL 流式传输到 PostgreSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

上一篇:重置 JDBC Kafka 连接器以从一开始就开始拉行? 下一篇:用于生产中 rds 的 Mysql debezium 连接器导致死锁

相关文章