Skip to main content

Snowflake

How to implement incremental loading in Snowflake using Stream and Merge

tiethnic colleagues sitting at desk looking at laptop computer in office. stock photo

Snowflake is Cloud hosted relational database used to create Datawarehouse on demand. Data in the data warehouse can be loaded as full load or incremental load. The full load is a process of deleting whole existing data and reloading it again. Full loads are time and resource-consuming tasks compared to incremental loads that only load a small amount of new or updated data instead of loading full data every time. We can achieve incremental loading in snowflake by implementing change data capture (CDC)using Stream and Merge objects. Stream object is used for change data capture which includes inserts, updates, and deletes, as well as metadata about each change so that actions can be taken using the changed data. The data captured using stream is then merged to the target table using match and not match condition.

 

What are Stream and Merge?

Merge

Merge is command is used to perform some alterations on the table, to update the existing records, delete the old/inactive records, or add new rows from another table.

Snowflake offers two clauses to perform Merge:

  1. Matched Clause – Matched Clause performs Update and Delete operation on the target table when the rows satisfy the condition.
  2. Not Matched Clause – Not Matched Clause performs the Insert operation when the row satisfying conditions are not matched. The rows from the source table that are not matched with the target table will be inserted.

Stream

Stream is a table created on the top of the source to capture change data; it tracks the changes made to source table rows.

The created stream object just holds the offset from where change data capture can be tracked, however, the main data in source remain unaltered.

3 additional columns are added to the source table in a stream-

Column
Description
METADATA$ACTION
It may have only two values Insert/Delete
METADATA$ISUPDATE
This will be flagged as True if the record is an updated
METADATA$ROW_ID
There are unique hash keys that will be tracked against each change.

As we know now what is stream and merge , Let’s see how to use stream and merge to load the data-

Step 1-

Connect to the Snowflake DB and Create sample source and target tables

Capture1

Step2-

Create stream on source table using below query-

 

Picture2

Step3

Let’s insert some dummy data into the source table-

Picture3

After inserting data into the source let’s check data captured in the stream-

Picture4

As we inserted data the first time in the source the newly inserted rows will be flagged as INSERT in the METADATA$ACTION column and METADATA$UPDATE as FALSE in the stream.

 

Step4-

Inset data into target using stream and merge using below query-

Picture5

As we are inserting data the first time there will not be any matching personal_id in the target table and as the METADATA$ACTION flag is INSERT, the merge command will insert the whole data into the target table as it is.

 

Picture6

 

Step5

Let’s update a few source rows and load them again to target-

Picture7

 

As soon as we update the source table, the stream will capture these changes and update the stream data.

 

Picture8

The updated row will be marked as INSERT and the older row which we updated will be marked as Delete in the METADATA$ACTION column. so that when we load updated data from source to target older row with City Nagpur will get deleted and updated row with City Mumbai will get inserted.

Again run the same stream and merge command we used earlier to load only updated data in target, updated target data will look like this-

Picture9

 

Here you have successfully achieved incremental loading using snowflake.

To automate this load process we can create a task, this task will run after a specified time interval and load data into the target if there are any source changes.

Happy Reading!

 

 

 

Thoughts on “How to implement incremental loading in Snowflake using Stream and Merge”

  1. Shobhit Bisen

    I have stared snowflake from last week and this blog is very helpful to me foe understanding Merge statement.
    thank you so much for sharing.

  2. Hey Praful,

    I have Implemented the Incremental operations by creating Snowflake Stream and Merge Statement and it’s perfectly working fine with data volume of 950 millions rows in the target table.
    Incremental Checks are quite fast and perform the upsert and delete operation within 2 minutes using Warehouse with Size = XSMALL.

    Just a query ,if we wanted to execute this process on hourly schedule , do we need to go with a snowflake task or some other way to schedule this process ?

  3. How to get information on which column is changed using Stream ?
    I have to capture changes in selected columns only and update them in log tables.

  4. How do you implement incremental load if your source data is a file from a local server and the target table is in Snowflake?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Praful Pelne

Praful is a data enthusiast having 5 years of experience in ETL tools, Data governance, Datawarehousing and SQL.

More from this Author

Follow Us