Ingest data with a pipeline in Microsoft Fabric
A data lakehouse is a common analytical data store for cloud-scale analytics solutions. One of the core tasks of a data engineer is to implement and manage the ingestion of data from multiple operational data sources into the lakehouse. In Microsoft Fabric, you can implement extract, transform, and load (ETL) or extract, load, and transform (ELT) solutions for data ingestion through the creation of pipelines.
Fabric also supports Apache Spark, enabling you to write and run code to process data at scale. By combining the pipeline and Spark capabilities in Fabric, you can implement complex data ingestion logic that copies data from external sources into the OneLake storage on which the lakehouse is based, and then uses Spark code to perform custom data transformations before loading it into tables for analysis.
This lab will take approximately 60 minutes to complete.
Note: You need a Microsoft Fabric trial to complete this exercise.
Create a workspace
Before working with data in Fabric, create a workspace with the Fabric trial enabled.
- On the Microsoft Fabric home page at
https://app.fabric.microsoft.com/home?experience=fabric
, select Synapse Data Engineering. - In the menu bar on the left, select Workspaces (the icon looks similar to đź—‡).
- Create a new workspace with a name of your choice, selecting a licensing mode that includes Fabric capacity (Trial, Premium, or Fabric).
-
When your new workspace opens, it should be empty.
Create a lakehouse
Now that you have a workspace, it’s time to create a data lakehouse into which you will ingest data.
-
In the Synapse Data Engineering home page, create a new Lakehouse with a name of your choice.
After a minute or so, a new lakehouse with no Tables or Files will be created.
-
On the Lake view tab in the pane on the left, in the … menu for the Files node, select New subfolder and create a subfolder named new_data.
Create a pipeline
A simple way to ingest data is to use a Copy Data activity in a pipeline to extract the data from a source and copy it to a file in the lakehouse.
- On the Home page for your lakehouse, select Get data and then select New data pipeline, and create a new data pipeline named Ingest Sales Data.
- If the Copy Data wizard doesn’t open automatically, select Copy Data > Use copy assistant in the pipeline editor page.
-
In the Copy Data wizard, on the Choose a data source page, type HTTP in the search bar and then select HTTP in the New sources section.
- In the Connect to data source pane, enter the following settings for the connection to your data source:
- URL:
https://raw.githubusercontent.com/MicrosoftLearning/dp-data/main/sales.csv
- Connection: Create new connection
- Connection name: Specify a unique name
- Data gateway: (none)
- Authentication kind: Anonymous
- URL:
- Select Next. Then ensure the following settings are selected:
- Relative URL: Leave blank
- Request method: GET
- Additional headers: Leave blank
- Binary copy: Unselected
- Request timeout: Leave blank
- Max concurrent connections: Leave blank
- Select Next, and wait for the data to be sampled and then ensure that the following settings are selected:
- File format: DelimitedText
- Column delimiter: Comma (,)
- Row delimiter: Line feed (\n)
- First row as header: Selected
- Compression type: None
- Select Preview data to see a sample of the data that will be ingested. Then close the data preview and select Next.
- On the Choose data destination page, select OneLake data hub and then select your existing lakehouse.
- Set the following data destination options, and then select Next:
- Root folder: Files
- Folder path name: new_data
- File name: sales.csv
- Copy behavior: None
- Set the following file format options and then select Next:
- File format: DelimitedText
- Column delimiter: Comma (,)
- Row delimiter: Line feed (\n)
- Add header to file: Selected
- Compression type: None
-
On the Copy summary page, review the details of your copy operation and then select Save + Run.
A new pipeline containing a Copy Data activity is created, as shown here:
-
When the pipeline starts to run, you can monitor its status in the Output pane under the pipeline designer. Use the ↻ (Refresh) icon to refresh the status, and wait until it has succeeeded.
- In the menu bar on the left, select your lakehouse.
- On the Home page, in the Lakehouse explorer pane, expand Files and select the new_data folder to verify that the sales.csv file has been copied.
Create a notebook
-
On the Home page for your lakehouse, in the Open notebook menu, select New notebook.
After a few seconds, a new notebook containing a single cell will open. Notebooks are made up of one or more cells that can contain code or markdown (formatted text).
-
Select the existing cell in the notebook, which contains some simple code, and then replace the default code with the following variable declaration.
table_name = "sales"
-
In the … menu for the cell (at its top-right) select Toggle parameter cell. This configures the cell so that the variables declared in it are treated as parameters when running the notebook from a pipeline.
-
Under the parameters cell, use the + Code button to add a new code cell. Then add the following code to it:
from pyspark.sql.functions import * # Read the new sales data df = spark.read.format("csv").option("header","true").load("Files/new_data/*.csv") ## Add month and year columns df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate"))) # Derive FirstName and LastName columns df = df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1)) # Filter and reorder columns df = df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"] # Load the data into a table df.write.format("delta").mode("append").saveAsTable(table_name)
This code loads the data from the sales.csv file that was ingested by the Copy Data activity, applies some transformation logic, and saves the transformed data as a table - appending the data if the table already exists.
-
Verify that your notebooks looks similar to this, and then use the â–· Run all button on the toolbar to run all of the cells it contains.
Note: Since this is the first time you’ve run any Spark code in this session, the Spark pool must be started. This means that the first cell can take a minute or so to complete.
- When the notebook run has completed, in the Lakehouse explorer pane on the left, in the … menu for Tables select Refresh and verify that a sales table has been created.
- In the notebook menu bar, use the ⚙️ Settings icon to view the notebook settings. Then set the Name of the notebook to Load Sales and close the settings pane.
- In the hub menu bar on the left, select your lakehouse.
- In the Explorer pane, refresh the view. Then expand Tables, and select the sales table to see a preview of the data it contains.
Modify the pipeline
Now that you’ve implemented a notebook to transform data and load it into a table, you can incorporate the notebook into a pipeline to create a reusable ETL process.
- In the hub menu bar on the left select the Ingest Sales Data pipeline you created previously.
-
On the Activities tab, in the More activities list, select Delete data. Then position the new Delete data activity to the left of the Copy data activity and connect its On completion output to the Copy data activity, as shown here:
- Select the Delete data activity, and in the pane below the design canvas, set the following properties:
- General:
- Name: Delete old files
- Source
- Connection: Your lakehouse
- File path type: Wildcard file path
- Folder path: Files / new_data
- Wildcard file name: *.csv
- Recursively: Selected
- Logging settings:
- Enable logging: Unselected
These settings will ensure that any existing .csv files are deleted before copying the sales.csv file.
- General:
- In the pipeline designer, on the Activities tab, select Notebook to add a Notebook activity to the pipeline.
-
Select the Copy data activity and then connect its On Completion output to the Notebook activity as shown here:
- Select the Notebook activity, and then in the pane below the design canvas, set the following properties:
- General:
- Name: Load Sales notebook
- Settings:
- Notebook: Load Sales
-
Base parameters: Add a new parameter with the following properties:
Name Type Value table_name String new_sales
The table_name parameter will be passed to the notebook and override the default value assigned to the table_name variable in the parameters cell.
- General:
-
On the Home tab, use the đź–« (Save) icon to save the pipeline. Then use the â–· Run button to run the pipeline, and wait for all of the activities to complete.
Note: In case you receive the error message Spark SQL queries are only possible in the context of a lakehouse. Please attach a lakehouse to proceed: Open your notebook, select the lakehouse you created on the left pane, select Remove all Lakehouses and then add it again. Go back to the pipeline designer and select â–· Run.
- In the hub menu bar on the left edge of the portal, select your lakehouse.
- In the Explorer pane, expand Tables and select the new_sales table to see a preview of the data it contains. This table was created by the notebook when it was run by the pipeline.
In this exercise, you implemented a data ingestion solution that uses a pipeline to copy data to your lakehouse from an external source, and then uses a Spark notebook to transform the data and load it into a table.
Clean up resources
In this exercise, you’ve learned how to implement a pipeline in Microsoft Fabric.
If you’ve finished exploring your lakehouse, you can delete the workspace you created for this exercise.
- In the bar on the left, select the icon for your workspace to view all of the items it contains.
- In the … menu on the toolbar, select Workspace settings.
- In the General section, select Remove this workspace.