Change Data Capture (CDC) Using Streams And Tasks

Authors: Deepak Mishra, Onkar Bongirwar

A stream is a Snowflake object that provides change data capture (CDC) capabilities to track the changes in a table. It records changes made to a table, including information about inserts, updates, and deletes, as well as metadata about each change.

Streams are supported on standard (local) tables, external tables, and directory tables.

Types of Streams

1. Standard: A standard table stream tracks all DML changes to the source table, including inserts, updates, and deletes (including table truncates).

2. Append-only: An append-only table stream tracks row inserts only. Update and delete operations (including table truncates) are not recorded.

3. Insert-only (supported on external tables only): An insert-only stream tracks row inserts only; they do not record delete operations that remove rows from an inserted set.

Introduction to Tasks

A task object defines a recurring schedule for executing a SQL statement, including statements that call stored procedures. Tasks can be chained together for successive execution to support more complex periodic processing.

Users can define a simple tree-like structure of tasks that executes consecutive SQL statements to process data and move it to various destination tables.

Currently, a task can execute a single SQL statement. Tasks can also be used independently to generate periodic reports by inserting or merging rows into a report table or performing other periodic work.

CDC using Streams and Tasks

Streams and Tasks can be combined to create effective ELT pipelines. These inbuilt functionalities can be used to eliminate external third-party ELT tools required for data processing and orchestration, thus helping in reducing significant costs and dependency.

Source: Medium

Demo

Step 1: Creating source and target table.

Step 2: Creating a stream on the source table.

The syntax for creating a Stream:

CREATE STREAM stream_name on TABLE table_name;

Now, the next step is to query the stream. You can query the stream like a table.

Select * from stream_name;

Step 3: Creating a Task for consuming the data from the stream frequently.

The syntax for creating a Task:

CREATE TASK task_name

WAREHOUSE = warehouse_name

SCHEDULE = ‘{ <num> MINUTE | USING CRON <expr> <time_zone> }’

WHEN <boolean_expr>

AS

<sql>

Once the Task is created, the next step is to run it. You can do it by running the following command.

ALTER TASK task_name RESUME;

Step 4: Loading the data and performing various DML operations.

Once everything is set up, the next thing to do is add data and perform some DML operations on the source table.

In order to add data into the source table, you need to follow the following steps:

1. Click on the Database tab and select the respective database.

2. Choose the table in which you want to load the data. Once you click on the load table, a dialog box will appear. Select the warehouse, source file location, and file format, then click load, and the data will get loaded into your table.

Let us perform some DML operations on the source table to check whether the same is reflected on the target table.

  1. Insert:

The syntax for inserting a row:

INSERT INTO table_name (column1, column2, column3, …)

VALUES (value1, value2, value3, …);

Initially, there are 4,713 rows in the source table.

After inserting some new data, the number of rows increases, and the same gets reflected in the target table.

  1. Updates:

The syntax for updating a row:

All the updates made in the source table are reflected in the target table.

  1. Delete:

Similarly, write delete queries to delete records from the source table and observe the changes getting applied to the target table.

Deleting the data from the source table results in data deletion from the target table.

Also, you can keep track of your Tasks by running the following command.

Select * from table(information_schema.task_history()) order by scheduled_time desc;