Debezium

Working with database systems has always been a difficult and frustrating job. Recently, I had the opportunity to work with a new solution for databases used to detect changes from a database and Copy that data to another database of the same or different type. To solve this problem, I use a solution with a name like title - C hange D ata C apture or simply put, CDC.

What is Change Data Capture?

As its name implies, catching data changes , this is the technique used for us to catch changes in data contained in the database. Catching changes in data will help us solve quite a few problems in data processing, which we will learn about in the next section.

To be able to catch this data change, there are many different ways, the most primitive we can use is the TRIGGER mechanism in already supported databases to catch ACTIONs about update, insert, delete, etc. .. Or more gently, we can use tools to do this, typically Debezium is the most prominent tool.

CDC Benefits

The first benefit that everyone can see is copying data to other systems . If talking about this benefit, some of you may say: Database systems already support replication mechanisms. Use it right away, why do you need to use an external tool that is a headache and is less stable? Okay, right! If you only need to copy data from databases of the same type (MySQL => MySQL, Mongo => Mongo), then using the database's features is best. However, now if you want to copy data from MySQL to MongoDB or from MySQL to PostgreSQL, there will be no such mechanism. In this problem CDC will stand in the middle to detect changes in the Database that needs to monitor and process data, then can use code to process and push data and the system needs to copy data.

Another equally important benefit is the ability to back up data . Data change events will be stored, so if by chance your database drops at 9am, you can take a backup at 3am and reapply the saved changes from 3am to 9am. In theory, if you do not miss any events, your data will be fully restored as before it was dropped. Too much, right?

Continuing with the first benefit, after we copy data to another system we can use this system for testing instead of interacting directly on the real database system. It is not uncommon for developers to run queries that take minutes to process during testing, and even worse, can cause system locks. If this problem is mild, it will reduce system performance, if it is severe, it will cause a crash. CDC is also a way to help us minimize cases like this from happening.

In addition, CDC also supports some specific problems of each system or processing Big Data. If you have ever applied CDC to these problems, please share with me below.

Debezium - CDC Tool

There's no use talking about theory without any examples, that's why I'll introduce a tool that I've had time to work with and find quite good, this tool is Debezium. Debezium at its core uses Kafka to generate messages corresponding to data change events. Debezium uses Connectors to connect to database systems and detect changes. Debezium 1.9 currently supports MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, DB2, Cassandra, Vitess. You can see instructions for each connector at the official document: https://debezium.io/documentation/reference/2.0/connectors/index.html

This is Debezium's model, first of all we also have source DB - where we track data changes. Kafka Connect plays the role of detecting changes and pushing events into Apache Kafka. Data can then be pushed to sinks depending on usage needs.

In this section, I will describe the installation steps of MySQL, the type of database that everyone probably works with the most.

For MySQL, Debezium will rely on binlog to be able to detect changes in data, so for systems that need monitoring, you need to enable this binlog feature and make sure the user used to connect to needs to have it. permission to read binlog.

I work with Kubernetes a lot, so I will guide everyone to build this tool on K8s. For other environments like VM or Docker, there are basically similar components.

Debezium when running on K8s will use the Strimzi Operator (this is an Operator for Kafka). First we create a separate namespace for this application:

kubectl create ns debezium-example

Then we need to install Strimzi Operator

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0

Create Secret for demo database

cat << EOF | kubectl create -n debezium-example -f
apiVersion: v1
kind: Secret
metadata:
  name: debezium-secret
  namespace: debezium-example
type: Opaque
data:
  username: ZGViZXppdW0=
  password: ZGJ6

Create User and assign permissions to Debezium

cat << EOF | kubectl create -n debezium-example -f
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: debezium-example
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["debezium-secret"]
  verbs: ["get"]
EOF
$ cat << EOF | kubectl create -n debezium-example -f
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
  namespace: debezium-example
subjects:
- kind: ServiceAccount
  name: debezium-connect-cluster-connect
  namespace: debezium-example
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io

Now comes the important part, we will create a Kafka cluster to store changes events. The configuration below will create 1 kafka pod corresponding to 1 broker and 1 zookeeper pod.

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: debezium-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Next we deploy a MySQL database to test, the user and password of this DB are mysqluser - msqlpw

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:1.9
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql

Now we will deploy components that connect to MySQL and detect changes. First we need to create KafkaConnect to detect changes:

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: 10.110.154.103/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF

Then we deploy additional KafkaConnectors to connect to MySQL attached to the KafkaConnect created above

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: ${secrets:debezium-example/debezium-secret:username}
    database.password: ${secrets:debezium-example/debezium-secret:password}
    database.server.id: 184054
    database.server.name: mysql
    database.include.list: inventory
    database.history.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    database.history.kafka.topic: schema-changes.inventory
EOF

So the setup is complete and now we can monitor changes in the database. We run this command to listen for messages in kafka

kubectl run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers

Open another terminal. Now we will access the DB and add a record to test:

kubectl run -n debezium-example -it --rm --image=mysql:8.0 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium

Add another record:

sql> update customers set first_name="Sally Marie" where id=1001;

If you see a JSON message like the following, the setup was successful:

{
...
  "payload": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Sally Marie",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "{debezium-version}",
      "connector": "mysql",
      "name": "mysql",
      "ts_ms": 1646300467000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 401,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "u",
    "ts_ms": 1646300467746,
    "transaction": null
  }
}

The message event notification will have 3 main items including source (source containing data, for example how much binlog file), before (data before change) and after (data after change). From the before and after sections, we will detect how the data has changed (edited, deleted or updated).

In addition, Debezium is also capable of doing many other tasks such as automatically replicating changes between 2 different DBs,...

Last updated