Discover the new Snowpark ML Toolkit + dbt Python models
Let’s do some feature engineering, training, and inference with Snowpark ML and the dbt Python models. First with with 50k rows and then with 50M rows, to prove how this new toolkit helps us scale with Snowflake — while the dbt Python models take care of all the boilerplate.
Intro
Snowpark ML is a new set of tools for building and deploying machine learning models in Snowflake. The best part is that you get familiar ML constructs (Scikit-Learn, XGBoost, LightGBM, …), paired with all the power, security, and scalability of Snowflake.
Why I’m writing this
- In my previous post I shared how dbt and Snowflake pair beautifully to power the new dbt Python models.
- The dbt Python models docs offer an in-depth use case of how to do ML prep, cleaning, training, and prediction with dbt Python + Snowpark.
- But — those docs don’t leverage the new Snowpark ML Toolkit — which will make these previous tasks perform even better.
- Meanwhile the Snowpark ML Toolkit quickstart offers an in-depth explanation of how to use this new ML Toolkit.
- But — that quickstart starts by asking us to download and install Python libraries in our VMs. I don’t like that step. I would rather use a cloud tool (like dbt Cloud), and have every library managed “magically”.
So let’s do that: A full exploration of the Snowpark ML Toolkit using the dbt Python models on dbt Cloud. Zero packages installation, just scalable ML fun.
What’s interesting about dbt in this process:
- dbt Cloud is not aware of the Snowpark ML libraries, and it doesn’t need to.
- dbt Cloud’s here is mainly to wrap our code within a Snowpark boilerplate stored procedure and let Snowflake handle the rest.
- You could run the same with the open source dbt-core on your desktop — I just love how dbt Cloud keeps everything running in the cloud while we work on this code.
- Check my previous post for details:
Before we start
To follow along, you will need:
- A Snowflake account (free trial works).
- A dbt Cloud account (the free developer version works fine — or you could run all this locally, if you enjoy installing stuff).
- Set up a user to connect your dbt Cloud account to your Snowflake account.
- Set up a stage in your Snowflake account so it can read the typical ML example file (
diamonds.csv
) out of a public S3 bucket:
-- create csv format
CREATE FILE FORMAT IF NOT EXISTS CSVFORMAT
SKIP_HEADER = 1
TYPE = 'CSV';
-- create external stage with the csv format to stage the diamonds dataset
CREATE STAGE IF NOT EXISTS DIAMONDS_ASSETS
FILE_FORMAT = CSVFORMAT
URL = 's3://sfquickstarts/intro-to-machine-learning-with-snowpark-ml-for-python/diamonds.csv';
(Check out my previous post to understand what a dbt Python model is and they work with Snowflake)
Our first model: the diamonds data
This one is a typical dbt SQL model, that copies the data out of S3 (as set up in the previous step) into a Snowflake table:
-- diamonds.sql
{{config(materialized='table')}}
select $1::float "CARAT"
, trim(upper($2::string), '"') "CUT"
, trim(upper($3::string), '"') "COLOR"
, trim(upper($4::string), '"') "CLARITY"
, $5::float "DEPTH", $6::float "TABLE_PCT"
, $7::float "PRICE"
, $8::float "X", $9::float "Y", $10::float "Z"
from @DIAMONDS_ASSETS
Running this took less than 2 seconds — as it’s just a small file with 54k rows (we’ll worry about scalability and performance later in this post).
Normalizing and One-Hot encoding with Snowpark ML
This where the fun starts, check out this dbt Python model:
# snowpark_ml_diamonds_transform.py
import numpy as np
import snowflake.ml.modeling.preprocessing as preproc
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.snowpark.types as T
# https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_deployment.ipynb
def model(dbt, session):
dbt.config(
packages = ['snowflake-ml-python']
)
diamonds_df = dbt.ref('diamonds')
# Normalize the CARAT column
snowml_mms = preproc.MinMaxScaler(input_cols=["CARAT"], output_cols=["CARAT_NORM"])
normalized_diamonds_df = snowml_mms.fit(diamonds_df).transform(diamonds_df)
# Reduce the number of decimals
new_col = normalized_diamonds_df.col("CARAT_NORM").cast(T.DecimalType(7, 6))
normalized_diamonds_df = normalized_diamonds_df.with_column("CARAT_NORM", new_col)
# Encode CUT and CLARITY preserve ordinal importance
categories = {
"CUT": np.array(["IDEAL", "PREMIUM", "VERY GOOD", "GOOD", "FAIR"]),
"CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
}
snowml_oe = preproc.OrdinalEncoder(input_cols=["CUT", "CLARITY"], output_cols=["CUT_OE", "CLARITY_OE"], categories=categories)
ord_encoded_diamonds_df = snowml_oe.fit(normalized_diamonds_df).transform(normalized_diamonds_df)
# Encode categoricals to numeric columns
snowml_ohe = preproc.OneHotEncoder(input_cols=["CUT", "COLOR", "CLARITY"], output_cols=["CUT_OHE", "COLOR_OHE", "CLARITY_OHE"])
transformed_diamonds_df = snowml_ohe.fit(ord_encoded_diamonds_df).transform(ord_encoded_diamonds_df)
# return ord_encoded_diamonds_df
return transformed_diamonds_df
What’s notable here:
- We don’t need to install
snowflake-ml-python
as it’s already provided by Anaconda in Snowflake. We just need to ask forpackages = [‘snowflake-ml-python’]
. - The
snowflake.ml.modeling.preprocessing
libraries offer libraries that look very similar to sklearn — but Snowflake takes care of making these run and scale within the Snowflake world. - Within these familiar sklearn constructs, we used here
MinMaxScaler
,OneHotEncoder
, andOrdinalEncoder
. - The last step of this model returns a Snowpark Dataframe, that gets persisted as a Snowflake table with the requested transformations.
- Running this step took 48s (we’ll check later how it scales with more data).
- The main transformation and persistence step took only 0.44s on a Medium-wh. The query profile looks cool, and hints that Snowflake will make this scale:
A transformation pipeline
Let’s re-implement the above with a re-usable Pipeline:
# snowpark_ml_diamonds_transform_pipeline.py
import io
import joblib
import numpy as np
import snowflake.ml.modeling.preprocessing as preproc
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
# https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_deployment.ipynb
# https://docs.getdbt.com/guides/dbt-ecosystem/dbt-python-snowpark/12-machine-learning-training-prediction
def save_file(session, model, path, dest_filename):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn.upload_stream(input_stream, path, dest_filename)
return "successfully created file: " + path
def model(dbt, session):
dbt.config(
packages = ['snowflake-ml-python', 'joblib']
)
diamonds_df = dbt.ref('diamonds')
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]
categories = {
"CUT": np.array(["IDEAL", "PREMIUM", "VERY GOOD", "GOOD", "FAIR"]),
"CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
"COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J']),
}
preprocessing_pipeline = Pipeline(
steps=[
(
"OE",
preproc.OrdinalEncoder(
input_cols=CATEGORICAL_COLUMNS,
output_cols=CATEGORICAL_COLUMNS_OE,
categories=categories,
)
),
(
"MMS",
preproc.MinMaxScaler(
clip=True,
input_cols=NUMERICAL_COLUMNS,
output_cols=NUMERICAL_COLUMNS,
)
)
]
)
transformed_diamonds_df = preprocessing_pipeline.fit(diamonds_df).transform(diamonds_df)
version = '1.0'
session.sql('create or replace stage MODELSTAGE').collect()
save_file(session, preprocessing_pipeline, '@MODELSTAGE/preprocessing_pipeline_'+version, 'preprocessing_pipeline_'+version+'.joblib' )
return transformed_diamonds_df
What’s notable here:
- We created a
preprocessing_pipeline
— which can be re-used for further training and deployment. - The
save_file()
method takes care of encoding and saving this binary encoded pipeline into a Snowflake stage for future re-use. - We went here for Ordinal Encoding (instead of One-Hot), which is a good thing for the saved pipeline to remember.
- We used
snowflake.ml.modeling.pipeline
and the encoders insnowflake.ml.modeling.preprocessing
— which look familiar, yet optimized for Snowflake. - The Snowflake docs display a
*
next to the libraries that support distributed execution (and you can expect more on the list to do so). - The model returns a data frame with all the
diamonds
pre-processed and ready for training our model — which gets persisted into a Snowflake table.
- The whole process took 49s on a M-wh, with the main transformation and persistence taking only 1.1s. The query profile is interesting too:
Train and predict with XGBoost
Another simple dbt Python model that trains and predicts with XGBoost:
import numpy as np
from snowflake.ml.modeling.xgboost import XGBRegressor
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
# https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_deployment.ipynb
# https://docs.getdbt.com/guides/dbt-ecosystem/dbt-python-snowpark/12-machine-learning-training-prediction
def model(dbt, session):
dbt.config(
packages = ['snowflake-ml-python']
)
diamonds_df = dbt.ref('snowpark_ml_diamonds_transform_pipeline')
diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)
train_df, test_df = diamonds_train_df, diamonds_test_df
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]
LABEL_COLUMNS = ['PRICE']
OUTPUT_COLUMNS = ['PREDICTED_PRICE']
regressor = XGBRegressor(
input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
label_cols=LABEL_COLUMNS,
output_cols=OUTPUT_COLUMNS
)
regressor.fit(train_df)
return regressor.predict(test_df)
What’s notable here:
- Instead of using the Pipeline we saved in the previous step, we can just use the transformed data persisted at the end. We could have used it, if we wanted to show off using that binary file out of a Snowflake stage.
- We get to divide our
diamonds
data into training and test with a simplediamonds_df.random_split(weights=[0.9, 0.1])
. - Then we get to train with
regressor.fit(train_df)
and persist the result of our test data withreturn regressor.predict(test_df)
. - We can use Snowsight for a quick visual check that the predicted prices are pretty close to the expected results:
- We are using
XGBRegressor
as implemented bysnowflake.ml.modeling.xgboost
. The docs say it hasn’t been prepared for distributed execution yet — but by using it we will get any improvements deployed by Snowflake tosnowflake.ml
. - The whole process took 1m28s on a Medium-wh.
- The
fit
step took 29s. It’s interesting to look at the implementation of the stored procedure happening under the hood:
- The inference step took 8.2s, and the query profile shows how it gets distributed with a Python UDF:
Will it scale?
That was quick and cool. Now let’s see if it will scale with 1000x the amount of data.
Generating 1000 times more data
This is how I generated 1000x random data, based on the existing diamonds, with a dbt Python model:
# diamonds_random_1000x.py
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
import random
def model(dbt, session):
df = dbt.ref('diamonds')
# Summarize the existing data to get random data in that range
grouped_df = df.groupBy().agg(
F.collect_set("CUT").alias("CUT"),
F.collect_set("COLOR").alias("COLOR"),
F.collect_set("CLARITY").alias("CLARITY"),
F.array_construct(F.min("CARAT"), F.max("CARAT")).alias("CARAT"),
F.array_construct(F.min("DEPTH"), F.max("DEPTH")).alias("DEPTH"),
F.array_construct(F.min("TABLE_PCT"), F.max("TABLE_PCT")).alias("TABLE_PCT"),
F.array_construct(F.min("X"), F.max("X")).alias("X"),
F.array_construct(F.min("Y"), F.max("Y")).alias("Y"),
F.array_construct(F.min("Z"), F.max("Z")).alias("Z"),
F.array_construct(F.min("PRICE"), F.max("PRICE")).alias("PRICE"),
)
@F.udf(input_types=[T.ArrayType()], return_type=T.StringType())
def random_from_array(x):
return random.choice(x)
num_rows = df.count() * 1000
range_df = session.range(num_rows).withColumn("dummy", F.col("id"))
exploded_df = grouped_df.crossJoin(range_df)
# Generate random values within the specified range for numerical columns
result_df = exploded_df.select(
random_from_array(F.col('CUT')).alias('CUT'),
random_from_array(F.col('COLOR')).alias('COLOR'),
random_from_array(F.col('CLARITY')).alias('CLARITY'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('CARAT')[1] - F.col('CARAT')[0]) + F.col('CARAT')[0]).alias('CARAT'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('DEPTH')[1] - F.col('DEPTH')[0]) + F.col('DEPTH')[0]).alias('DEPTH'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('TABLE_PCT')[1] - F.col('TABLE_PCT')[0]) + F.col('TABLE_PCT')[0]).alias('TABLE_PCT'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('X')[1] - F.col('X')[0]) + F.col('X')[0]).alias('X'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('Y')[1] - F.col('Y')[0]) + F.col('Y')[0]).alias('Y'),
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col('Z')[1] - F.col('Z')[0]) + F.col('Z')[0]).alias('Z'),
)
result_df = result_df.withColumn("PRICE", F.col("TABLE_PCT") * F.col("Y"))
return result_df
What’s notable here:
- This is the step that took me the longest time to write.
- You can see how hard I had to work to find a way to produce a random number between the sample data min and max:
(F.uniform(F.cast(F.lit(0), T.FloatType()), F.lit(1), F.random()) * (F.col(‘CARAT’)[1] — F.col(‘CARAT’)[0]) + F.col(‘CARAT’)[0]).alias(‘CARAT’)
. - Then I wasn’t able to find an easy Dataframe way to choose a random value within the summary arrays of the categorical data. The easy solution instead was writing a simple Python UDF (
random_from_array()
). - I gave the price a simple pattern depending on a couple of these variables —then it will be XGBoost’s job to discover this pattern within the data.
- Creating these 54 million rows took 1m7s in a Medium-wh, with the main generated query taking 57s. As always, it’s fun to look at that query profile:
Transformation pipeline with 1000x data
We are going to skip the code here, as it’s exactly the code as above, but instead of depending on dbt.ref(‘diamonds’)
it asks for the just created dbt.ref(‘diamonds_random_1000x’)
table.
What’s notable here:
- The whole process took 1m on a M-wh, with the main transformation and persistence taking only 8.3s.
- These are cool numbers at scale: The original transformation pipeline took 49s on a Medium-wh, with the transformation and persistence taking 1.1s. That’s pretty cool, as we scaled to 54 million rows (from only 54 thousand), taking only a little bit more time.
- The query profile:
Training and predicting with 1000x data
We are going to skip the code here, as it’s exactly the code as above. But instead of depending on dbt.ref(‘snowpark_ml_diamonds_transform_pipeline’)
, it asks for the just created dbt.ref(‘snowpark_ml_diamonds_transform_pipeline_1000x’)
.
What’s notable here:
- After 2m43s, this step failed (at first). This is the log:
- The reason is simple, and the fix is too: Our Medium-wh didn’t have enough RAM for XGBoost to train the model with 1000x the data. That’s why Snowflake has now the “Snowpark Optimized Warehouses” — which are like normal warehouses, but with a lot more RAM and related optimizations.
- Once switched to a Snowpark-Medium-wh, the whole process took 5m53s. This is cool, compared to the previous 1m28s, because now we have 1000x the data.
- The training part took 4m48s (compared to 29s). This is where distributed execution support might help us a lot (when/if Snowflake implements it transparently for you).
- Predicting for 5.4 million rows took 15s (compared to 8.2s with 5.4 thousand rows). The query profile:
- Performance note: The Medium-Snowpark-wh had a similar performance than a Large-Snowpark-wh — given that this XGBoost isn’t (yet) distributed.
- The predicted prices versus the actual prices look even better in this chart. We can say that XGBoost successfully found out the linear relationship I chose for the pricing of the random data:
Summary
As we used dbt to connect all these transformations, it also helps us visualize how each model relates to each other:
- dbt allows us to switch between SQL and Python transformations in a pipeline, and takes care of adding boilerplate code when pushing these to Snowflake.
snowflake.ml.modeling
packages familiar Python ML tools (Scikit-Learn, XGBoost, LightGBM, …) and improves their performance and scalability on Snowflake.- Scaling from 50 thousand rows to 50 million rows was easy and quick.
- We can expect Snowflake to keep improving these libraries and their scalability.
Next steps
- We saw how to persist binaries into a Snowflake stage, but registry and deployment of models could be better handled (stay tuned).
- Check “ML on Snowflake at scale with Snowpark (Part-2)” from Simran Khara and “ML on Snowflake at scale with Snowpark Python and XGBoost” from Chase Ginther.
Suggestions from my previous post:
- Try the dbt Python models in your Snowflake account, and share your results.
- Participate in the dbt community to shape the future of the dbt Python models.
- Read the dbt Python model docs, and the Snowflake Snowpark for Python library docs.
- Share your finding withs the dbt community on the dbt Slack channels #dbt-core-python-models and #db-snowflake.
- Try some dbt+Snowflake quickstarts like “Data Engineering with Snowpark Python and dbt” and “Leverage dbt Cloud to Generate ML ready pipelines using Snowpark Python”.
- Compare the simplicity, power, and performance of the dbt Python models on Snowflake — versus the set up that dbt had to pull off to run Python models in other platforms.
- Check the previous posts by Jeremiah Hansen and Eda Johnson “Data Engineering with Snowpark Python and dbt” and “A First Look at the dbt Python Models with Snowpark”. Also phData’s “How to Use dbt With Snowpark Python to Implement Sentiment Analysis” and “How to Build a Python Model in dbt with Snowflake”.
- For ML, dbt has an in-depth guide “Leverage dbt Cloud to generate analytics and ML-ready pipelines with SQL and Python with Snowflake”, and tropos published “Time-Series Forecasting With Python For Snowpark And Dbt Labs”.
Want more?
- Try this out with a Snowflake free trial account — you only need an email address to get started.
- Try dbt Cloud — which I used as a cool web IDE integrated to dbt and Snowflake to develop the examples in this post (it does a lot more than that too, but that’s a story for another post).
I’m Felipe Hoffa, Data Cloud Advocate for Snowflake. Thanks for joining me on this adventure. You can follow me on Twitter and LinkedIn. And subscribe to reddit.com/r/snowflake for the most interesting Snowflake news.
Oh, and I’m on threads.net/fhoffa now too :)