Skip to content

CDC Postgres

February 1, 2024

Change Data Capture (CDC)

What is CDC?

CDC tracks changes in a database (inserts, updates, deletes) in real time. It’s essential for systems that rely on up-to-the-minute data or need to replicate data across multiple systems or even if you want to keep things in sync

Methods to Implement CDC

When to Use CDC

CDC shines in:

Where we used this mostly

Ways to Implement CDC

Database Triggers Example

This is a way we implemented Change Data Capture (CDC) mechanism for tracking changes in stock and price info in our PostgreSQL database

Step 1: Create the Audit Table

First, let’s create an audit table that will store changes made to the products table. This includes stock and price changes among other potential modifications.

CREATE TABLE product_changes (
    change_id SERIAL PRIMARY KEY,
    operation_type CHAR(1) NOT NULL,
    product_id INT NOT NULL,
    old_data JSONB,
    new_data JSONB,
    changed_at TIMESTAMP NOT NULL DEFAULT NOW()
);

Step 2: Create the Trigger Function

The trigger function, audit_products, captures the before and after states of a row for insert, update, and delete operations, and logs this information to the product_changes table.

CREATE OR REPLACE FUNCTION audit_products()
RETURNS TRIGGER AS $$
BEGIN
    IF (TG_OP = 'DELETE') THEN
        INSERT INTO product_changes(operation_type, product_id, old_data, new_data)
        VALUES ('D', OLD.id, row_to_json(OLD), NULL);
        RETURN OLD;
    ELSIF (TG_OP = 'UPDATE') THEN
        INSERT INTO product_changes(operation_type, product_id, old_data, new_data)
        VALUES ('U', NEW.id, row_to_json(OLD), row_to_json(NEW));
        RETURN NEW;
    ELSIF (TG_OP = 'INSERT') THEN
        INSERT INTO product_changes(operation_type, product_id, old_data, new_data)
        VALUES ('I', NEW.id, NULL, row_to_json(NEW));
        RETURN NEW;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

Step 3: Create Triggers for Each Operation

Finally, set up triggers on the products table to automatically call audit_products upon any insert, update, or delete operation. This ensures that any change to a product’s stock or price is captured.

CREATE TRIGGER products_insert AFTER INSERT ON products
FOR EACH ROW EXECUTE FUNCTION audit_products();

CREATE TRIGGER products_update AFTER UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION audit_products();

CREATE TRIGGER products_delete AFTER DELETE ON products
FOR EACH ROW EXECUTE FUNCTION audit_products();

In this type of setup, every insert, update, or delete operation on the products table will trigger the audit_products function, capturing the details of the operation in the product_changes audit table. This includes all fields of the products table, allowing us to track how stock and price change over time along with any other modifications to product data.

Polling-Based CDC: A Simple Approach

Polling involves scripting a regular check for new or changed data in the database. It’s easy but can be resource-heavy, if not implemented optimally.

Here is a simple approach we used to sync sales orders from one db to other for analytics purposes. this can work on cross db or cross server as well. like you can use it to sync between oracle and postgres or other ones, this script needs to be ran as a cron job or a scheduled task.

Step 1: Database Setup

First, create or modify a table to store the last sync timestamp. This table will hold a single record indicating the last time the sync was successfully completed.

CREATE TABLE sync_log (
    id SERIAL PRIMARY KEY,
    last_synced_at TIMESTAMP NOT NULL
);

-- Initialize with a far past date if no record exists
INSERT INTO sync_log (last_synced_at) VALUES ('1970-01-01 00:00:00');

Step 2: Python Script for Syncing Sales Data

This script fetches the last sync timestamp from the sync_log table, uses it to fetch new or updated sales records, processes them, and then updates the sync_log with the current timestamp.

import psycopg2
import psycopg2.extras
from datetime import datetime

def get_last_synced_at(cursor):
    cursor.execute("SELECT last_synced_at FROM sync_log ORDER BY id DESC LIMIT 1;")
    return cursor.fetchone()[0]

def update_last_synced_at(cursor, timestamp):
    cursor.execute("UPDATE sync_log SET last_synced_at = %s WHERE id = 1;", (timestamp,))

def fetch_new_sales(cursor, last_synced_at):
    query = """
    SELECT * FROM sales
    WHERE last_modified > %s
    ORDER BY last_modified ASC;
    """
    cursor.execute(query, (last_synced_at,))
    return cursor.fetchall()

def main():
    conn = psycopg2.connect(dbname="sales_db", user="db_user", password="db_pass", host="db_host")
    cursor = conn.cursor()

    try:
        last_synced_at = get_last_synced_at(cursor)
        print(f"Last synced at: {last_synced_at}")

        new_sales = fetch_new_sales(cursor, last_synced_at)
        for sale in new_sales:
            # Process each sale here (e.g., sync to another system)
            print(f"Syncing new sale: {sale}")

        if new_sales:
            # Update the last_synced_at to the timestamp of the last sale processed
            update_last_synced_at(cursor, new_sales[-1]['last_modified'])
            conn.commit()
            print("Sync completed successfully.")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        cursor.close()
        conn.close()

if __name__ == "__main__":
    main()

We can ensure minimal data transfer and also do the needed transformations before syncing the data to the other db.

Log-Based CDC with Kafka with Debezium

Using Kafka Connect for log-based CDC involves less intrusion, directly streaming database changes to Kafka.

Debezium is a popular Kafka Connect plugin for CDC, and it’s easy to set up with PostgreSQL. It is mostly used in microservices architecture, where we can have a service that listens to the kafka topic and then do the needed processing.

{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "harry",
    "database.password": "sorry_not_telling",
    "database.dbname": "pg_prod",
    "database.server.name": "camila1",
    "table.include.list": "schema.transactions,schema.users",
    "plugin.name": "pgoutput",
    "snapshot.mode": "initial"
  }
}

Using Solutions like Airbyte and PeerDB

If you are doing this for a large amount of data and a lot of tables, you might want to use a solution like Airbyte or PeerDB. These tools are built to handle the complexities of CDC and data replication at scale, so you don’t have to reinvent the wheel.