Lab 09: Implementing Data Quality Constraints for Insurance Claims
Introduction
You are a data engineer at ClearCover Insurance, a fictional insurance provider. Each day, raw claims data arrives from regional offices and partner brokers. Unfortunately, the data is inconsistent: some records are missing required identifiers, claim amounts are formatted as strings or carry negative values, dates are occasionally malformed, and the source schema may silently gain new columns over time.
Your mission is to build a Lakeflow Spark Declarative Pipeline that enforces data quality constraints at every layer of the pipeline — catching bad records before they reach actuarial models and reporting dashboards.
You work through the following exercises:
| Exercise | Topic |
|---|---|
| Exercise 1 | Set up the ClearCover Insurance Data Platform (notebook) |
| Exercise 2 | Explore data quality issues in Catalog Explorer |
| Exercise 3 | Implement nullability and status validation |
| Exercise 4 | Add data type checks using col().cast() |
| Exercise 5 | Handle schema drift with rescued data |
| Exercise 6 | Create, run, and monitor the pipeline |
🤖 Databricks Assistant — Use it always
Throughout every exercise in this lab, you are expected and encouraged to use the Databricks Assistant. Every exercise includes a suggested prompt to get you started. The Assistant is your pair programmer — use it to generate code, understand errors, and explore alternatives.
To open the Databricks Assistant, select the
on the right side of any notebook cell, or use the keyboard shortcut.
Prerequisites
- An Azure Databricks Premium workspace is already provisioned and available.
- You are familiar with basic Python and PySpark concepts.
- You have completed the earlier labs in this learning path (or are comfortable with Unity Catalog basics).
Importing the Setup Notebook
- In the Databricks workspace, click Workspace in the left sidebar.
- Navigate to or create a folder where you want to store the lab.
- Click the ⋮ (kebab) menu or right-click the folder, then select Import.
- Choose URL, enter the following URL, and click Import:
https://raw.githubusercontent.com/MicrosoftLearning/DP-750T00-Implement-Data-Engineering-Solutions-using-Azure-Databricks/refs/heads/main/Allfiles/09-implement-manage-data-quality-constraints-unity-catalog.ipynb - Open the imported notebook and, in the compute selector at the top, choose Serverless compute.
Exercise 1: Set up the ClearCover Insurance Data Platform
Run all cells in the setup notebook 09-implement-manage-data-quality-constraints-unity-catalog from top to bottom.
The notebook creates the following objects:
| Object | Description |
|---|---|
insurance_lab catalog |
Top-level namespace for the ClearCover Insurance platform |
insurance_lab.bronze schema |
Raw, unprocessed claims data as received from source systems |
insurance_lab.silver schema |
Validated and type-safe records |
insurance_lab.gold schema |
Aggregated reporting data |
insurance_lab.bronze.raw_files volume |
Landing zone for raw CSV claim files |
insurance_lab.bronze.claims_raw table |
Delta table with 20 raw claims records |
After the notebook completes, verify the objects in Catalog Explorer before continuing.
Exercise 2: Explore Data Quality Issues
Before writing any pipeline code, explore the raw data to understand the quality problems you need to fix.
Task 2.1: Query the raw claims table
Open a new SQL query editor (or a notebook cell) and run:
SELECT *
FROM insurance_lab.bronze.claims_raw
ORDER BY claim_id NULLS LAST;
Review the results and find at least one row for each of the following issues:
| Issue | Column(s) to check |
|---|---|
| Missing primary identifier | claim_id or customer_id is NULL |
| Unparseable date | claim_date contains a non-date string |
| Unparseable amount | claim_amount contains N/A or is empty |
| Negative amount | claim_amount is a negative number |
| Invalid status | status is not OPEN, PENDING, or CLOSED |
Task 2.2: Inspect the schema
Run the following to confirm that claim_date and claim_amount are stored as STRING:
DESCRIBE TABLE insurance_lab.bronze.claims_raw;
These columns are intentionally strings in the bronze layer. The pipeline exercises will enforce the correct types during ingestion into silver.
Exercise 3: Nullability and Status Validation
Task 3.0: Create the ETL pipeline and import the pipeline file
Before writing any pipeline code, you need to create a Lakeflow Spark Declarative Pipeline in Databricks and import the starter pipeline file.
Create the pipeline:
- In the Databricks workspace left sidebar, click Jobs & Pipelines.
- Click Create ETL pipeline (start with an Empty file, Python).
-
Configure the pipeline with the following settings:
Setting Value Pipeline name ClearCover Claims Quality PipelinePipeline mode Triggered Target catalog insurance_lab, schemae silverCompute Serverless - Click Create — do not add source code yet.
Import the pipeline file:
- Select the transformations folder and select the ⋮ (kebab) menu or right-click the folder, then select Import.
- Choose URL, enter the following URL, and click Import:
https://raw.githubusercontent.com/MicrosoftLearning/DP-750T00-Implement-Data-Engineering-Solutions-using-Azure-Databricks/refs/heads/main/Allfiles/09-implement-manage-data-quality-constraints-unity-catalog.py - The file appears in your workspace as a Python source file — open it and keep it open throughout exercises 3–5.
With the pipeline configured, you will now edit the pipeline file to add data quality constraints.
Task 3.1: Add nullability and status expectations to claims_validated()
Open 09-implement-manage-data-quality-constraints-unity-catalog.py and add the following expectations to the claims_validated() function. Place all decorators between @dp.table(...) and def claims_validated():.
| Expectation name | Condition | Action |
|---|---|---|
valid_claim_id |
claim_id IS NOT NULL |
Drop |
valid_customer_id |
customer_id IS NOT NULL |
Drop |
valid_status |
status IN ('OPEN', 'PENDING', 'CLOSED') |
Warn (keep) |
valid_coverage |
coverage_amount > 0 |
Fail pipeline |
Use @dp.expect_or_drop to drop violating rows, @dp.expect to warn without dropping, and @dp.expect_or_fail to stop the pipeline on a violation.
🤖 Ask the Databricks Assistant: “Show me how to use
expect_or_drop,expect, andexpect_or_faildecorators in a Lakeflow Spark Declarative Pipelines Python function”
Exercise 4: Data Type Checks
The claim_date and claim_amount columns arrive as strings. When col().cast() cannot parse a value, it returns NULL instead of raising an error. You can use that behaviour to identify and drop invalid records.
Task 4.1: Apply col().cast() inside claims_validated()
Inside the claims_validated() function body, before the return statement, add two withColumn calls:
- Convert
claim_datefrom STRING to DATE usingcol('claim_date').cast('date') - Convert
claim_amountfrom STRING to DECIMAL(12,2) usingcol('claim_amount').cast('decimal(12,2)')
The transformed columns replace the originals, so downstream expectations and consumers see typed values.
🤖 Ask the Databricks Assistant: “In PySpark, use
withColumnandcol().cast()to convert a streaming dataframe column from STRING to DATE type, and another column from STRING to DECIMAL(12,2). Show me the full withColumn syntax.”
Task 4.2: Drop records with unparseable dates
After the cast in task 4.1, any row where claim_date is still NULL had an invalid original value. Add an @dp.expect_or_drop decorator to drop these rows:
expectation name: valid_claim_date
condition: claim_date IS NOT NULL
Task 4.3: Drop records with unparseable or missing amounts
Similarly, drop rows where claim_amount could not be cast:
expectation name: valid_claim_amount
condition: claim_amount IS NOT NULL
Task 4.4: Drop records with negative claim amounts
A negative claim amount is invalid in any insurance context. Drop these rows:
expectation name: non_negative_amount
condition: claim_amount >= 0
💡 Hint: Place all expectation decorators between
@dp.table(...)anddef claims_validated():. Their order does not affect the result — all expectations are evaluated on each row.
🤖 Ask the Databricks Assistant: “I’m using Lakeflow Spark Declarative Pipelines in Python. After applying col().cast() to convert a column from STRING to DATE, which expectation condition do I use to drop rows where the cast failed?”
Exercise 5: Handle Schema Drift with Rescued Data
ClearCover receives claims files from several partner brokers. Occasionally a broker adds extra columns — such as broker_reference or fraud_score — without prior notice. Rather than crashing the pipeline when this happens, you want to capture unexpected data in a separate column for investigation.
Task 5.1: Implement Auto Loader with rescue schema evolution mode
Complete the claims_rescued() function in 09-implement-manage-data-quality-constraints-unity-catalog.py.
Use spark.readStream with Auto Loader (cloudFiles format) to read CSV files from:
/Volumes/insurance_lab/bronze/raw_files/
Configure it with the following options:
| Option | Value |
|---|---|
cloudFiles.format |
csv |
header |
true |
cloudFiles.schemaLocation |
/Volumes/insurance_lab/bronze/raw_files/_schema |
cloudFiles.schemaEvolutionMode |
rescue |
rescuedDataColumn |
_rescued_data |
cloudFiles.inferColumnTypes |
true |
Remove the pass statement and return the configured readStream.
🤖 Ask the Databricks Assistant: “Write a complete PySpark Auto Loader readStream block using cloudFiles format CSV with schemaEvolutionMode rescue and a _rescued_data column. Explain what each option does.”
💡 Hint: When the source file matches the expected schema,
_rescued_datawill beNULLfor every row. If a future file adds new columns (likefraud_score), their values are captured as JSON in_rescued_datainstead of breaking the pipeline.
Exercise 6: Create, Run, and Monitor the Pipeline
With the pipeline code complete, use the Databricks UI to create and run a Lakeflow Spark Declarative Pipeline.
Task 6.1: Save your pipeline file
Make sure you have saved all changes to 09-implement-manage-data-quality-constraints-unity-catalog.py in the workspace editor before continuing.
Task 6.2: Create the pipeline
- In the Databricks workspace left sidebar, click Jobs & Pipelines.
- Click Create pipeline.
-
Configure the pipeline with the following settings:
Setting Value Pipeline name ClearCover Claims Quality PipelinePipeline mode Triggered Source code Browse to your imported 09-implement-manage-data-quality-constraints-unity-catalog.pyworkspace fileTarget catalog insurance_labCompute Serverless - Click Create.
Task 6.3: Run the pipeline
Click Start to trigger a full pipeline run. Wait for the run to complete.
Observe the pipeline DAG in the graph view. You should see three dataset nodes:
silver.claims_validatedsilver.claims_rescuedgold.claims_summary
Task 6.4: Monitor data quality metrics
- In the pipeline graph, click the
claims_validateddataset node. - In the right-hand panel, open the Data quality tab.
- Review the expectation results and answer the following:
- Which expectations dropped records, and how many?
- Which expectation issued warnings (kept records but logged violations)?
- Did
valid_coveragetrigger a fail? If so, this indicates a row in the source withcoverage_amount <= 0— investigate which row caused it.
💡 Hint: If
valid_coveragefails the pipeline, examineinsurance_lab.bronze.claims_rawfor rows wherecoverage_amountis zero or NULL. The error message in the pipeline event log will also show the violating record.
Task 6.5: Query the output tables
Run the following queries to verify the pipeline output:
-- How many claims made it through all validations?
SELECT COUNT(*) AS valid_claim_count
FROM insurance_lab.silver.claims_validated;
-- What types and statuses appear in the validated silver layer?
SELECT claim_type, status, COUNT(*) AS count
FROM insurance_lab.silver.claims_validated
GROUP BY claim_type, status
ORDER BY claim_type, status;
-- Review the gold summary
SELECT *
FROM insurance_lab.gold.claims_summary
ORDER BY claim_type, status;
-- Did Auto Loader capture any rescued data?
SELECT claim_id, _rescued_data
FROM insurance_lab.silver.claims_rescued
WHERE _rescued_data IS NOT NULL;
🤖 Ask the Databricks Assistant: “Looking at the counts in
insurance_lab.bronze.claims_rawvsinsurance_lab.silver.claims_validated, explain what each row reduction tells me about the data quality issues in the bronze data.”
Clean Up (Optional)
To remove all lab resources when you are done:
DROP CATALOG IF EXISTS insurance_lab CASCADE;