Discover the new Snowpark ML Toolkit + dbt Python models

Let’s do some feature engineering, training, and inference with Snowpark ML and the dbt Python models. First with with 50k rows and then with 50M rows, to prove how this new toolkit helps us scale with Snowflake — while the dbt Python models take care of all the boilerplate.

Image generated by AI

Intro

Snowpark ML is a new set of tools for building and deploying machine learning models in Snowflake. The best part is that you get familiar ML constructs (Scikit-Learn, XGBoost, LightGBM, …), paired with all the power, security, and scalability of Snowflake.

Why I’m writing this

So let’s do that: A full exploration of the Snowpark ML Toolkit using the dbt Python models on dbt Cloud. Zero packages installation, just scalable ML fun.

What’s interesting about dbt in this process:

  • dbt Cloud is not aware of the Snowpark ML libraries, and it doesn’t need to.
  • dbt Cloud’s here is mainly to wrap our code within a Snowpark boilerplate stored procedure and let Snowflake handle the rest.
  • You could run the same with the open source dbt-core on your desktop — I just love how dbt Cloud keeps everything running in the cloud while we work on this code.
  • Check my previous post for details:

Before we start

To follow along, you will need:

  • A Snowflake account (free trial works).
  • A dbt Cloud account (the free developer version works fine — or you could run all this locally, if you enjoy installing stuff).
  • Set up a user to connect your dbt Cloud account to your Snowflake account.
  • Set up a stage in your Snowflake account so it can read the typical ML example file (diamonds.csv) out of a public S3 bucket:
-- create csv format
CREATE FILE FORMAT IF NOT EXISTS CSVFORMAT
SKIP_HEADER = 1
TYPE = 'CSV';

-- create external stage with the csv format to stage the diamonds dataset
CREATE STAGE IF NOT EXISTS DIAMONDS_ASSETS
FILE_FORMAT = CSVFORMAT
URL = 's3://sfquickstarts/intro-to-machine-learning-with-snowpark-ml-for-python/diamonds.csv';

(Check out my previous post to understand what a dbt Python model is and they work with Snowflake)

Our first model: the diamonds data

This one is a typical dbt SQL model, that copies the data out of S3 (as set up in the previous step) into a Snowflake table:

-- diamonds.sql
{{config(materialized='table')}}

select $1::float "CARAT"
, trim(upper($2::string), '"') "CUT"
, trim(upper($3::string), '"') "COLOR"
, trim(upper($4::string), '"') "CLARITY"
, $5::float "DEPTH", $6::float "TABLE_PCT"
, $7::float "PRICE"
, $8::float "X", $9::float "Y", $10::float "Z"
from @DIAMONDS_ASSETS

Running this took less than 2 seconds — as it’s just a small file with 54k rows (we’ll worry about scalability and performance later in this post).

The diamonds table
Image generated by AI

Normalizing and One-Hot encoding with Snowpark ML

This where the fun starts, check out this dbt Python model:

# snowpark_ml_diamonds_transform.py

import numpy as np

import snowflake.ml.modeling.preprocessing as preproc
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.snowpark.types as T

# https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_deployment.ipynb

def model(dbt, session):
dbt.config(
packages = ['snowflake-ml-python']
)

diamonds_df = dbt.ref('diamonds')

# Normalize the CARAT column
snowml_mms = preproc.MinMaxScaler(input_cols=["CARAT"], output_cols=["CARAT_NORM"])
normalized_diamonds_df = snowml_mms.fit(diamonds_df).transform(diamonds_df)

# Reduce the number of decimals
new_col = normalized_diamonds_df.col("CARAT_NORM").cast(T.DecimalType(7, 6))
normalized_diamonds_df = normalized_diamonds_df.with_column("CARAT_NORM", new_col)

# Encode CUT and CLARITY preserve ordinal importance
categories = {
"CUT": np.array(["IDEAL", "PREMIUM", "VERY GOOD", "GOOD", "FAIR"]),
"CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
}
snowml_oe = preproc.OrdinalEncoder(input_cols=["CUT", "CLARITY"], output_cols=["CUT_OE", "CLARITY_OE"], categories=categories)
ord_encoded_diamonds_df = snowml_oe.fit(normalized_diamonds_df).transform(normalized_diamonds_df)
# Encode categoricals to numeric columns
snowml_ohe = preproc.OneHotEncoder(input_cols=["CUT", "COLOR", "CLARITY"], output_cols=["CUT_OHE", "COLOR_OHE", "CLARITY_OHE"])
transformed_diamonds_df = snowml_ohe.fit(ord_encoded_diamonds_df).transform(ord_encoded_diamonds_df)

# return ord_encoded_diamonds_df
return transformed_diamonds_df

What’s notable here:

  • We don’t need to install snowflake-ml-python as it’s already provided by Anaconda in Snowflake. We just need to ask for packages = [‘snowflake-ml-python’].
  • The snowflake.ml.modeling.preprocessing libraries offer libraries that look very similar to sklearn — but Snowflake takes care of making these run and scale within the Snowflake world.
  • Within these familiar sklearn constructs, we used here MinMaxScaler, OneHotEncoder, and OrdinalEncoder.
  • The last step of this model returns a Snowpark Dataframe, that gets persisted as a Snowflake table with the requested transformations.
Transformed and one-hot encoded diamonds
  • Running this step took 48s (we’ll check later how it scales with more data).
  • The main transformation and persistence step took only 0.44s on a Medium-wh. The query profile looks cool, and hints that Snowflake will make this scale:
Query profile — transforming, encoding, and persisting

A transformation pipeline

Let’s re-implement the above with a re-usable Pipeline:

# snowpark_ml_diamonds_transform_pipeline.py

import io
import joblib
import numpy as np

import snowflake.ml.modeling.preprocessing as preproc
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T

# https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_deployment.ipynb
# https://docs.getdbt.com/guides/dbt-ecosystem/dbt-python-snowpark/12-machine-learning-training-prediction

def save_file(session, model, path, dest_filename):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn.upload_stream(input_stream, path, dest_filename)
return "successfully created file: " + path

def model(dbt, session):
dbt.config(
packages = ['snowflake-ml-python', 'joblib']
)

diamonds_df = dbt.ref('diamonds')

CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]

categories = {
"CUT": np.array(["IDEAL", "PREMIUM", "VERY GOOD", "GOOD", "FAIR"]),
"CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
"COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J']),
}
preprocessing_pipeline = Pipeline(
steps=[
(
"OE",
preproc.OrdinalEncoder(
input_cols=CATEGORICAL_COLUMNS,
output_cols=CATEGORICAL_COLUMNS_OE,
categories=categories,
)
),
(
"MMS",
preproc.MinMaxScaler(
clip=True,
input_cols=NUMERICAL_COLUMNS,
output_cols=NUMERICAL_COLUMNS,
)
)
]
)

transformed_diamonds_df = preprocessing_pipeline.fit(diamonds_df).transform(diamonds_df)

version = '1.0'
session.sql('create or replace stage MODELSTAGE').collect()
save_file(session, preprocessing_pipeline, '@MODELSTAGE/preprocessing_pipeline_'+version, 'preprocessing_pipeline_'+version+'.joblib' )

return transformed_diamonds_df

What’s notable here:

  • We created a preprocessing_pipeline — which can be re-used for further training and deployment.
  • The save_file() method takes care of encoding and saving this binary encoded pipeline into a Snowflake stage for future re-use.
  • We went here for Ordinal Encoding (instead of One-Hot), which is a good thing for the saved pipeline to remember.
  • We used snowflake.ml.modeling.pipeline and the encoders in snowflake.ml.modeling.preprocessing— which look familiar, yet optimized for Snowflake.
  • The Snowflake docs display a * next to the libraries that support distributed execution (and you can expect more on the list to do so).
  • The model returns a data frame with all the diamonds pre-processed and ready for training our model — which gets persisted into a Snowflake table.
Diamonds ready for ML mining
  • The whole process took 49s on a M-wh, with the main transformation and persistence taking only 1.1s. The query profile is interesting too:
Query profile of the transformation pipeline

Train and predict with XGBoost

Another simple dbt Python model that trains and predicts with XGBoost:

import numpy as np

from snowflake.ml.modeling.xgboost import XGBRegressor
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T

# https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_deployment.ipynb
# https://docs.getdbt.com/guides/dbt-ecosystem/dbt-python-snowpark/12-machine-learning-training-prediction

def model(dbt, session):
dbt.config(
packages = ['snowflake-ml-python']
)

diamonds_df = dbt.ref('snowpark_ml_diamonds_transform_pipeline')
diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)
train_df, test_df = diamonds_train_df, diamonds_test_df

CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]
LABEL_COLUMNS = ['PRICE']
OUTPUT_COLUMNS = ['PREDICTED_PRICE']
regressor = XGBRegressor(
input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
label_cols=LABEL_COLUMNS,
output_cols=OUTPUT_COLUMNS
)

regressor.fit(train_df)
return regressor.predict(test_df)

What’s notable here:

  • Instead of using the Pipeline we saved in the previous step, we can just use the transformed data persisted at the end. We could have used it, if we wanted to show off using that binary file out of a Snowflake stage.
  • We get to divide our diamonds data into training and test with a simple diamonds_df.random_split(weights=[0.9, 0.1]).
  • Then we get to train with regressor.fit(train_df) and persist the result of our test data with return regressor.predict(test_df).
  • We can use Snowsight for a quick visual check that the predicted prices are pretty close to the expected results:
Predicted prices vs actual prices
  • We are using XGBRegressor as implemented by snowflake.ml.modeling.xgboost. The docs say it hasn’t been prepared for distributed execution yet — but by using it we will get any improvements deployed by Snowflake to snowflake.ml.
  • The whole process took 1m28s on a Medium-wh.
  • The fit step took 29s. It’s interesting to look at the implementation of the stored procedure happening under the hood:
XGBoost training in action inside Snowflake
  • The inference step took 8.2s, and the query profile shows how it gets distributed with a Python UDF:
Predicting price for 10% of the rows (test set)

Will it scale?

That was quick and cool. Now let’s see if it will scale with 1000x the amount of data.

Generating 1000 times more data

This is how I generated 1000x random data, based on the existing diamonds, with a dbt Python model:

# diamonds_random_1000x.py
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T

import random

def model(dbt, session):
df = dbt.ref('diamonds')

# Summarize the existing data to get random data in that range
grouped_df = df.groupBy().agg(
F.collect_set("CUT").alias("CUT"),
F.collect_set("COLOR").alias("COLOR"),
F.collect_set("CLARITY").alias("CLARITY"),
F.array_construct(F.min("CARAT"), F.max("CARAT")).alias("CARAT"),
F.array_construct(F.min("DEPTH"), F.max("DEPTH")).alias("DEPTH"),
F.array_construct(F.min("TABLE_PCT"), F.max("TABLE_PCT")).alias("TABLE_PCT"),
F.array_construct(F.min("X"), F.max("X")).alias("X"),
F.array_construct(F.min("Y"), F.max("Y")).alias("Y"),
F.array_construct(F.min("Z"), F.max("Z")).alias("Z"),
F.array_construct(F.min("PRICE"), F.max("PRICE")).alias("PRICE"),
)

@F.udf(input_types=[T.ArrayType()], return_type=T.StringType())
def random_from_array(x):
return random.choice(x)

num_rows = df.count() * 1000
range_df = session.range(num_rows).withColumn("dummy", F.col("id"))
exploded_df = grouped_df.crossJoin(range_df)

# Generate random values within the specified range for numerical columns
result_df = exploded_df.select(
random_from_array(F.col('CUT')).alias('CUT'),
random_from_array(F.col('COLOR')).alias('COLOR'),
random_from_array(F.col('CLARITY')).alias('CLARITY'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('CARAT')[1] - F.col('CARAT')[0]) + F.col('CARAT')[0]).alias('CARAT'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('DEPTH')[1] - F.col('DEPTH')[0]) + F.col('DEPTH')[0]).alias('DEPTH'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('TABLE_PCT')[1] - F.col('TABLE_PCT')[0]) + F.col('TABLE_PCT')[0]).alias('TABLE_PCT'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('X')[1] - F.col('X')[0]) + F.col('X')[0]).alias('X'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('Y')[1] - F.col('Y')[0]) + F.col('Y')[0]).alias('Y'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('Z')[1] - F.col('Z')[0]) + F.col('Z')[0]).alias('Z'),
)

result_df = result_df.withColumn("PRICE", F.col("TABLE_PCT") * F.col("Y"))
return result_df

What’s notable here:

  • This is the step that took me the longest time to write.
  • You can see how hard I had to work to find a way to produce a random number between the sample data min and max: (F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col(‘CARAT’)[1] — F.col(‘CARAT’)[0]) + F.col(‘CARAT’)[0]).alias(‘CARAT’) .
  • Then I wasn’t able to find an easy Dataframe way to choose a random value within the summary arrays of the categorical data. The easy solution instead was writing a simple Python UDF (random_from_array()).
  • I gave the price a simple pattern depending on a couple of these variables —then it will be XGBoost’s job to discover this pattern within the data.
  • Creating these 54 million rows took 1m7s in a Medium-wh, with the main generated query taking 57s. As always, it’s fun to look at that query profile:

Transformation pipeline with 1000x data

We are going to skip the code here, as it’s exactly the code as above, but instead of depending on dbt.ref(‘diamonds’) it asks for the just created dbt.ref(‘diamonds_random_1000x’) table.

What’s notable here:

  • The whole process took 1m on a M-wh, with the main transformation and persistence taking only 8.3s.
  • These are cool numbers at scale: The original transformation pipeline took 49s on a Medium-wh, with the transformation and persistence taking 1.1s. That’s pretty cool, as we scaled to 54 million rows (from only 54 thousand), taking only a little bit more time.
  • The query profile:
Only 8s with 1000x more data

Training and predicting with 1000x data

We are going to skip the code here, as it’s exactly the code as above. But instead of depending on dbt.ref(‘snowpark_ml_diamonds_transform_pipeline’), it asks for the just created dbt.ref(‘snowpark_ml_diamonds_transform_pipeline_1000x’).

What’s notable here:

  • After 2m43s, this step failed (at first). This is the log:
Failure log of training with 1000x the data
  • The reason is simple, and the fix is too: Our Medium-wh didn’t have enough RAM for XGBoost to train the model with 1000x the data. That’s why Snowflake has now the “Snowpark Optimized Warehouses” — which are like normal warehouses, but with a lot more RAM and related optimizations.
  • Once switched to a Snowpark-Medium-wh, the whole process took 5m53s. This is cool, compared to the previous 1m28s, because now we have 1000x the data.
  • The training part took 4m48s (compared to 29s). This is where distributed execution support might help us a lot (when/if Snowflake implements it transparently for you).
  • Predicting for 5.4 million rows took 15s (compared to 8.2s with 5.4 thousand rows). The query profile:
Predicting for 5.4 million rows of diamonds
  • Performance note: The Medium-Snowpark-wh had a similar performance than a Large-Snowpark-wh — given that this XGBoost isn’t (yet) distributed.
  • The predicted prices versus the actual prices look even better in this chart. We can say that XGBoost successfully found out the linear relationship I chose for the pricing of the random data:

Summary

As we used dbt to connect all these transformations, it also helps us visualize how each model relates to each other:

How dbt transformed `diamonds` through augmentation, transforming, and training.
  • dbt allows us to switch between SQL and Python transformations in a pipeline, and takes care of adding boilerplate code when pushing these to Snowflake.
  • snowflake.ml.modeling packages familiar Python ML tools (Scikit-Learn, XGBoost, LightGBM, …) and improves their performance and scalability on Snowflake.
  • Scaling from 50 thousand rows to 50 million rows was easy and quick.
  • We can expect Snowflake to keep improving these libraries and their scalability.
Image created by AI

Next steps

Suggestions from my previous post:

Want more?

  • Try this out with a Snowflake free trial account — you only need an email address to get started.
  • Try dbt Cloud — which I used as a cool web IDE integrated to dbt and Snowflake to develop the examples in this post (it does a lot more than that too, but that’s a story for another post).

I’m Felipe Hoffa, Data Cloud Advocate for Snowflake. Thanks for joining me on this adventure. You can follow me on Twitter and LinkedIn. And subscribe to reddit.com/r/snowflake for the most interesting Snowflake news.

Oh, and I’m on threads.net/fhoffa now too :)

--

--

Felipe Hoffa
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Data Cloud Advocate at Snowflake ❄️. Originally from Chile, now in San Francisco and around the world. Previously at Google. Let’s talk data.