SQL

Building a Scalable Data Pipeline: Extracting Sales Data from Navision to Snowflake

In this blog post, we will learn how to extract sales data using SQL from a database, specifically Navision. We will define a SQL query to extract the data and execute it using Python. The resulting sales data will be fetched and can be further analyzed or visualized.

· 5 min read
Building a Scalable Data Pipeline: Extracting Sales Data from Navision to Snowflake
Photo by Caspar Camille Rubin / Unsplash

As someone interested in the field of data engineering, I've been curious about how businesses can extract, transform, and load (ETL) data from various sources to gain valuable insights. That's why I decided to explore the process of building a scalable data pipeline using SQL, and python. In this blog post, I'll specifically focus on extracting sales data from Navision, a popular ERP system, and loading it into Snowflake, a cloud-based data warehouse.

While I'm still relatively new to the world of data pipelines, I'm excited to walk through each step of the process and provide code examples to showcase how to extract, transform, and load data using Python and the Snowflake Python Connector. By creating blog posts about my learning, I hope to build an understanding of how to create a robust data pipeline that can handle large volumes of data and provide valuable insights to a business. So, let's dive into the world of data pipelines together and learn how to extract valuable insights from our data!

Step 1: Connect to Navision using Python and ODBC

The first step in building our data pipeline is to establish a connection to Navision using Python and the Open Database Connectivity (ODBC) API. ODBC is a widely used standard for accessing databases, and it allows us to connect to Navision using a driver provided by Microsoft.

To connect to Navision, we'll first need to install the pyodbc package - a Python module for accessing ODBC databases. Then, we can create a connection string that includes the necessary details for connecting to our Nav database.  Once we have a connection object, we can use it to execute SQL queries against the database and retrieve the data we need for our pipeline.

import pyodbc

# Connect to the Microsoft Dynamics NAV ERP system
def connect_to_navision():
    connection_string = "Driver={Microsoft ODBC for Navision};Server=server_name;Database=database_name;"
    connection = pyodbc.connect(connection_string)
    return connection.cursor()

Step 2: Extract data from Navision using SQL

In this step, we'll use SQL queries to extract the sales data from Navision.

First we identify the tables and columns that contain the sales data we want to extract. Then, we can write SQL queries to retrieve the data. For example, we might use a query to retrieve all sales orders for a given date range, or to retrieve sales data for a particular customer or product.

We may need to join multiple tables together to retrieve all the necessary information. We'll also need to ensure that our SQL queries are optimized for performance, as we might be working with large volumes of data.

We then execute our SQL queries using our connection object and retrieve the results. We'll store the extracted data in memory for now, as we'll need to transform and load the data in the next steps of our pipeline.

# Define a SQL query to extract sales data
query = """
    SELECT
        SalesHeader.DocNo_,
        SalesHeader.Sell_to_Customer,
        SalesLine.LineNo,
        SalesLine.ItemNo,
        SalesLine.Variant,
        SalesLine.Quantity,
        SalesLine.LineAmount
    FROM SalesHeader
    JOIN SalesLine ON SalesHeader.DocumentType = SalesLine.DocumentType AND SalesHeader.DocNo_ = SalesLine.DocumentNo_
    WHERE SalesHeader.PostingDate >= '2023-01-01' AND SalesHeader.PostingDate < '2023-02-01'
"""

# Execute the SQL query and fetch the result set
navision_cursor = connect_to_navision()
navision_cursor.execute(query)
sales_data_result = navision_cursor.fetchall()

Step 3: Transform the extracted data

We can now start transforming it into a format that is suitable for loading into our data warehouse. This may involve cleaning, filtering, and restructuring the data to ensure it meets the requirements of our data warehouse schema.

For example, we may need to perform data cleaning operations such as removing duplicates, correcting misspellings, or filling in missing values. We may also need to filter the data to exclude irrelevant or incomplete records, or to limit the data to a specific date range.

In addition, we may need to restructure the data to match the schema of our data warehouse. This could involve splitting data into multiple tables, merging data from multiple sources, or aggregating data to a higher level of granularity.

We can do this by using Python libraries such as Pandas. We'll load the extracted data into a Pandas DataFrame and apply our transformations using the DataFrame API. Once our transformations are complete, we'll have a clean and structured dataset ready for loading into our data warehouse.

import pandas as pd

# Load extracted data into a pandas DataFrame
df = pd.DataFrame(extracted_data, columns=['DocNo_', 'Sell_to_Customer', 'LineNo', 'ItemNo', 'Variant', 'Quantity', 'LineAmount'])

# Clean the data
df = df.drop_duplicates()
df = df.fillna(0)

# Filter the data
df = df[df['DocNo_'].str.startswith('SO')]

# Add a new column for the total amount
df['TotalAmount'] = df['Quantity'] * df['LineAmount']

# Print the transformed data
print(df.head())

In this example, we load the extracted data into a pandas DataFrame and perform the following transformations:

  • Drop any duplicate records in the dataset using the drop_duplicates method.
  • Fill in any missing values with zero using the fillna method.
  • Filter the dataset to only include records where the DocNo_ column starts with the string 'SO'.
  • Add a new column to the dataset that calculates the total amount for each sales line item by multiplying the Quantity and LineAmount columns together.

Finally, we print the transformed data using the head method to show the first few rows of the DataFrame. As always, the specific transformations needed will depend on the data and requirements of the project.

Step 4: Load the Transformed Data into Snowflake

A data warehouse is a large repository of data that is optimized for reporting and analysis. It typically stores data in a structured way that makes it easy to query and analyze. There are several options for data warehouses, including cloud-based solutions like Amazon Redshift, Google BigQuery, and Microsoft Azure Synapse Analytics, as well as on-premise solutions like Microsoft SQL Server.

import snowflake.connector

# Connect to Snowflake
conn = snowflake.connector.connect(
    user='<your_user>',
    password='<your_password>',
    account='<your_account>',
    warehouse='<your_warehouse>',
    database='<your_database>',
    schema='<your_schema>'
)

# Create a cursor object
cur = conn.cursor()

# Create the table if it doesn't exist
cur.execute("""
    CREATE TABLE IF NOT EXISTS sales (
        DocNo_ VARCHAR(50),
        Sell_to_Customer VARCHAR(50),
        LineNo INT,
        ItemNo VARCHAR(50),
        Variant VARCHAR(50),
        Quantity INT,
        LineAmount NUMBER,
        TotalAmount NUMBER
    )
""")

# Load the transformed data into the table using a prepared statement
for _, row in df.iterrows():
    stmt = cur.execute("""
        INSERT INTO sales (DocNo_, Sell_to_Customer, LineNo, ItemNo, Variant, Quantity, LineAmount, TotalAmount)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['DocNo_'],
        row['Sell_to_Customer'],
        row['LineNo'],
        row['ItemNo'],
        row['Variant'],
        row['Quantity'],
        row['LineAmount'],
        row['TotalAmount']
    ))

# Commit the transaction
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()

We could then look into using something like Apache Airflow to automate the extraction, transformation, and loading of the data from our ERP to our data warehouse and then schedule this to run as a recurring, automated task.  I believe this is stored procedures,  but I don't know how to do that yet so that's all for this blog post!