Notebooks
S
Snowflake
Task Graphs Dmf Quality Checks

Task Graphs Dmf Quality Checks

data-sciencenotebookData Pipeline Observabilitymachine-learningsnowflake-demo-notebooksdata-engineeringPythonsql

Quickstart: Running DMFs as Quality Gate in ELT Pipeline

Snowflake has released Data Metric Functions (DMFs) - a native solution to run a range of quality checks on your data (requires Enterprise edition or higher). Users can either choose from a growing library of system DMFs or write their own “UDMFs” with custom logic and thresholds.

Users use Tasks, a native orchestration capability, to schedule, modularize and orchestrate our ELT processing steps by connecting multiple Tasks to a Task Graph (aka DAG). Each Task runs a piece of code on a certain trigger and optionally a defined condition. Since Tasks can run almost anything (python, java, scala, sql, function, stored procedures, notebooks,…) they can also run Data Metric Functions. This allows us to integrate data quality checks deeply into our ingestion and transformation pipelines.

With the following 6 steps we will set up a simple ELT data pipeline based on data quality checks that you can easily apply to your existing or next Task pipeline.

1. Set up Demo Data Ingestion Stream

For simplicity we will just use the ACCOUNTADMIN role for this demo setup. If you don’t have it or want to use a separate role for this demo, you can check the Appendix at the end to grant all required privileges. All following code will run in the context of this DEMO schema. So make sure you keep the context or use your own schema and warehouse.

[ ]

Just to have a live demo we will first set up a Task that loads new rows into our source table to simulate a continuous ingestion. In your case that could be from a user interface, or something like sensor-data or analytics from a connector or some other database.

We will use some free weather data from the Snowflake Marketplace:

  • Go to Snowflake Marketplace
  • Get the free "Weather Source LLC: frostbyte" data share (This data may be used in connection with the Snowflake Quickstart, but is provided solely by WeatherSource, and not by or on behalf of Snowflake.)
  • Under "options rename the shared database "DEMO_WEATHER_DATA" just to shorten it

Now we can run the script below to create a Task that continuously loads small batches of data into a source table, while intentionally adding some quality issues to it.

[ ]
[ ]
[ ]
[ ]

2. Setting up the demo transformation pipeline

For this demo setup we will use 4 tables:

  • Source table - where new data comes in
  • Landing table - where we load the new batch and run the quality checks on it
  • Target table - for all “clean” data that meets expectations
  • Quarantine table - for all “bad” data that failed expectations

The source table we already have from Step 2. So let’s create the other three:

[ ]
[ ]
[ ]

Now we can build a Task Graph that runs whenever new data is added to the source table. So first we set up a Stream on the source table CONTINUOUS_WEATHER_DATA:

[ ]

Next we create the first Task to insert all new rows from the Stream into the landing table RAW_WEATHER_TABLE as soon as new data is available.

🔔 New Feature: “Triggered Tasks” — We can simplify orchestration by omitting the schedule for our task and just set STREAM_HAS_DATA as a condition for the task to run.

[ ]

Task 2: Transformation

This second task will run directly after the first task and simulate a transformation of the new dataset. In your case this might be much more complex. For our demo we keep it simple and just filter for the hot days with an average temperature over 68°F.

Once the new data is inserted into the target table CLEAN_WEATHER_DATA we empty the landing table again.

[ ]
[ ]

Let’s switch to the Task Graph UI to

  • See the graph we created
  • Check the run history to see if we have any errors
  • check the return values for each Task

3. Assigning quality checks to the landing table

Let’s first have a look at all system Data Metric Functions that are already available by default. We can see them in Snowsight as Functions under the SNOWFLAKE.CORE schema or alternatively query for all DMFs in the account that our role is allowed to see:

[ ]

Now for our specific Demo dataset we want to also add a range-check to make sure that our temperature values are plausible and further data analysis from consumers downstream is not impacted by unrealistic values caused by faulty sensors.

For that we can write a UDMF (user-defined Data Metric Function) defining a range of plausible fahrenheit values:

[ ]

We can now test our UDMF by test-running it manually on our source table:

[ ]

Now we can assign our UDMF together with a few system DMFs to our landing table:

[ ]

The results of all scheduled checks performed by Data Metric Functions assigned to tables are stored in the view SNOWFLAKE.LOCAL.DATA_QUALITY_MONITORING_RESULTS. So we can query them or build us a simple Snowsight dashboard by running something like:

[ ]

4. Run DMFs as "Quality gate" part of the pipeline

Because we want our quality check Task to run all DMFs that are assigned to our landing table, even if we add or remove some DMFs later on, we don’t just want to call them explicitly from the Task. Instead we first build a helper function to modularize our code.

The function (UDTF) will accept a table name as argument and return all DMFs that are currently assigned to a column of this table.

[ ]

Before we call it within the Task, let’s test run it first:

[ ]

Now we can define a new Task to get all DMFs from this function and then run them all.

We store the result of each check in a TEST_RESULT variable and then sum them up in a RESULTS_SUMMARY variable.

This will give us the total of issues found from all checks and we can pass it on as output to the Return value of this Task.

If our RESULT_SUMMARY remains ‘0’ then we know all checks have passed.

[ ]

Now we just have to update our other transformation tasks to run AFTER the new quality check task.

And we are adding a condition to run ONLY if all quality checks have passed. For that we can use the Task return value as a condition.

🔔 New Feature: “Task Return Value as Condition”  —  We can add a condition for a Child Task to run, based on the Return Value of a predecessor Task.

[ ]

5. Isolate datasets with quality issues

Now we could just completely ignore the new dataset, clear the landing table and wait for the next one. More likely though we want to analyze that dataset and potentially even fix the data quality issues. To do that later we will first isolate this batch into our quarantine table.

So we add another Task to our graph and invert the condition so that it only runs when a quality check failed:

[ ]

Now we can let this run, knowing that all batches with quality issues will be isolated and all batches that are good will be transformed further. Since we can not predict if and when this might happen, we want to finish this demo by adding a notification in case of quality issues.

6. Add notification about quality issues

Let us add another Task to our graph to send a notification when quality issues have been detected and rows were isolated. But maybe we know our data is not perfect and we don't want to get a notification every single time.

So let's use DMFs one more time to define a threshold and notify only when more than 1% of new weather data was quarantined. First we create a new UDMF to compare the number of rows in the quarantine table to those in the target table:

[ ]

Now we assign it to the quarantine table:

[ ]

And now we can create another task that runs only if new rows were isolated and then checks if they surpass the 1% threshold and only then sends us a notification.

[ ]
[ ]

With this dependency setup we are also reducing redundant notifications, as they will only trigger when new quality issues are detected and the percentage of bad rows is still above 1%.

Once our Task Graph had a few runs we can now also see the 2 different paths that can occur. Navigate to Monitoring / Task History and filter to our DEX_DB/DEMO schema and our LOAD_RAW_DATA root task to see the history of graph runs.

We can see they are all successful, as they are handling both cases (quality checks passed or failed).

Selecting a run from the History list we will mostly see graphs where the checks passed and data was processed mixed with a few occasional runs that did detect quality issues and isolated the dataset instead.

Now make it yours!

While this setup should be generic enough for you to apply to your existing ELT Task graphs there are many opportunities for you to further customize and automate this according to your needs.

  • You can start by writing and running your own DMFs.
  • You can customize the notifications logic and message content.
  • Or you can Automatically process the isolated rows by adding more Tasks to the isolated data branch of the graph that can delete, sanitize or extrapolate data and then merge it back into the clean-data table.
  • Or we add a Streamlit App with a data-editor for a data expert to manually review and correct the isolated rows before merging them…
[ ]