Lightweight batch streaming Meetup RSVPs to Snowflake with GCP Pub/Sub
Let’s take a real-time streaming source — all the RSVPs from Meetup.com — into Snowflake, with the help of Google Cloud Pub/Sub
2022 Update
Meetup stopped supporting the streaming API. This means we can’t keep streaming Meetup RSVPs, but the pattern is still useful with any other real life feed.
Use case
We want to move a stream of data with ~4k messages per hour to Snowflake, using Google Pub/Sub as temporary storage.
This is a real use case, consisting of all worldwide RSVPs on meetup.com being published in real time. This data has been collected from Meetup through their official API.
With this we can observe the top cities in North America by # of RSVPs since October 2021:
And the top topics by RSVPs for each city:
Quick summary of solution
Most of this fits into the permanent GCP free tier:
- Python script in an e2-micro VM captures messages 24x7, publishes to Pub/Sub. Free tier, or $6.00/month.
- Pub/Sub moves ~4GB/month. Free tier, or <<$1.00/month.
- Python script launches every 10 minutes, reads from Pub/Sub, sends files to GCS. Costs included within the above e2-micro.
- Snowflake Snowpipe detects new hourly GCS files, ingests into Snowflake. If running Snowflake in AWS, should consider GCP egress costs. Less than 2 credits per month.
- Snowflake then makes it easy to parse the incoming JSON objects and analyze our data.
Quick survey of alternatives
Some paths considered, but not followed:
- Use Kafka: It would be more complex and expensive than Pub/Sub.
- Use Pub/Sub Lite: It requires provisioning, which should be cheaper for streams with more data — but not for this case.
- Use Dataflow: Recommended by Google, but requires running 24/7 in streaming mode — gets expensive.
- Use external functions: Calling Pub/Sub directly from Snowflake through a proxy to unload messages sounds like an interesting idea, but unexplored.
- Skip Pub/Sub, have the Python script produce GCS files: High risk of losing messages while composing files. Pub/Sub serves as a great permanent-temporary storage instead.
- Use a different cloud toolset: Azure and AWS have good alternatives to this pipeline. I chose GCP just because I’m more familiar with it — which might be the case for people interested in this particular solution.
The code
Python — Capture and publish to pub/sub:
read.py
: Opens the Meetup RSVP stream and reads from it forever, sending each line to Pub/Sub.
import requests
import time
from google.cloud import pubsub_v1project_id = "myproject"
topic_id = "meetup202011"publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
futures = dict()minutes_ago = 1000*(int(time.time())-600)
url = "https://stream.meetup.com/2/rsvps?since_mtime="+str(minutes_ago)
print(url)
r = requests.get(url, stream=True)for line in r.iter_lines(decode_unicode=True):
# print(line)
publisher.publish(topic_path, line.encode("utf-8"))
Python — Drain pub/sub for 20 seconds, send files:
write.py
: Reads messages stored in Pub/Sub, and creates a file to send to GCS. Due to its behavior, we need to ask Pub/Sub repeated times for more messages, within a 20 second window. After the file has been written to GCS, acknowledges the messages are safe to Pub/Sub.
import time
from google.cloud import pubsub_v1
from google.cloud import storageproject_id = "myproject"
subscription_id = "meetup20201201"
gcs_bucket = 'my-bucket'
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
max_messages = 20000
max_time = 20
start_time = int(time.time())
filename = 'file%i.gz' % start_timewith subscriber:
ack_ids = []
with gzip.open(filename, 'wb') as f:
while (
len(ack_ids) < max_messages
and time.time() < start_time + max_time
):
response = subscriber.pull(
request={
"subscription": subscription_path,
"max_messages": max_messages-len(ack_ids)}
)
print ("<received %i messages" % len(response.received_messages))
if not response.received_messages:
break
for received_message in response.received_messages:
f.write(received_message.message.data)
f.write('\n'.encode())
ack_ids.append(received_message.ack_id)
print ("received %i messages>" % len(response.received_messages))if ack_ids:
gcs = storage.Client()
bucket = gcs.get_bucket(gcs_bucket)
blob = bucket.blob('logged/%s' % filename)
blob.upload_from_filename(filename=filename)
# Acknowledges the received messages so they will not be sent again.
ack_arrays = [ack_ids[i:i + 1000] for i in range(0, len(ack_ids), 1000)] for x in ack_arrays:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": x}
)
print ("ack %i" % len(x))
os.remove(filename)print(
f"Received and acknowledged {len(ack_ids)} messages from {subscription_path}."
)
Pub/Sub — Create the topic and subscriptions for the above code to work
Follow the Pub/Sub docs for the topic_path
and subscription_path
used above to exist.
VM shell — Run the Python code
Read from the Meetup stream forever. If there’s any error that aborts the Python script, wait 10 seconds, and run the script again:
while true; do python3 read.py; sleep 10; done
Wake up every 10 minutes to read messages stored in Pub/Sub:
while true; do python3 write.py; sleep 600; done
Snowflake — set up integration with GCS
Follow the docs “Configuring an Integration for Google Cloud Storage”:
create or replace storage integration temp_fhoffa_gcs_meetup
type = external_stage
storage_provider = gcs
enabled = true
storage_allowed_locations = ('gcs://my-bucket/folder/');
desc storage integration temp_fhoffa_gcs_meetup;create or replace stage temp_fhoffa_gcs_meetup
storage_integration = temp_fhoffa_gcs_meetup
url = 'gcs://my-bucket/folder/'
file_format = (type=json);
Playing with the files, make sure they exist:
list @temp_fhoffa_gcs_meetup;select parse_json($1), METADATA$FILENAME, METADATA$FILE_ROW_NUMBER
from '@temp_fhoffa_gcs_meetup' (pattern=>'.*file1674367459.gz')
;select count(*), array_agg(distinct metadata$filename)
from '@temp_fhoffa_gcs_meetup';
Set up an integration with GCS and Pub/Sub for new file notifications:
create notification integration temp_meetup202011_pubsub_int
type = queue
notification_provider = gcp_pubsub
enabled = true
gcp_pubsub_subscription_name = 'projects/fhoffa/subscriptions/meetup20201125_snowpipe_gcs';
desc notification integration temp_meetup202011_pubsub_int;
Create a table to ingest the arriving semi-structured data:
create table meetup202011_rsvps (x variant);
Tell Snowpipe to load into this table every time a new file arrives to GCS:
create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;
Writing queries in Snowflake
Thanks to the VARIANT
type, writing queries in Snowflake is straightforward, even without defining views nor schemas.
Top cities in North America by # of RSVPs
The top 2 cities by # of RSVPS in North America since October 2021 are Toronto and New York, with New York having almost 3 times more RSVPs than Toronto:
select x:group.group_city
|| ', '
|| coalesce(x:group.group_state, x:group.group_country) city
, count(*) rsvps
from meetup202011_rsvps
where x:group.group_country in ('us', 'ca', 'mx')
and x:mtime > date_part(epoch_millisecond, '2021-10-01'::timestamp)
group by 1
order by rsvps desc
limit 500;
Top topics per city
It’s interesting to see how different cities prefer different topics.
For example, the top 3 topics for Meetup events in San Francisco, Boston, Austin, Denver, and New York since October 1st 2021. My personal highlights for each city are:
- Austin: New in town
- Boston: Social
- Denver: Adventure
- Miami: Fun times
- New York: Social network
- San Francisco: Software development
select x:group.group_city::string city
, topic.value:urlkey::string topic
, count(*) rsvps
from meetup202011_rsvps, table(flatten(input=>x:group.group_topics)) topic
where x:mtime > date_part(epoch_millisecond, '2021-10-01'::timestamp)
and x:group.group_country = 'us'
and x:group.group_city in ('Austin', 'San Francisco', 'New York', 'Boston', 'Miami')
group by 1, 2
qualify row_number() over(partition by city order by rsvps desc) <= 3
order by city, rsvps desc;
Running this pipeline in practice, observed metrics
These dashboards live within the Google Cloud console.
During a regular week we can see peaks of 2 messages per second:
The average message size fluctuates around 1.4KB:
We can see how the messages accumulate for within 10 minute windows, and then drain according to the script schedule:
Due to the behavior of Pub/Sub, some messages don’t drain as expected, staying in Pub/Sub for 20 minutes (or sometimes much more):
Highlights and notes
GCP
- Pub/Sub requires no setup nor provisioning, and ensures data won’t be lost. It’s pretty cheap at low streaming rates.
- The e2-micro instance is included within the free tier.
- GCP makes it easy to fire up alarms in case the stream stops.
Snowflake
- It’s easy to configure Snowpipe to trigger automatically with new GCS files, even when running Snowflake on AWS.
- Snowpipe brings in each new file immediately into Snowflake.
- The VARIANT type makes it easy to query semi-structured data, like the data contained in these JSON messages.
Next steps
- Handle duplicate rows with Snowflake “Streams and Tasks”.
- My teammates are working hard into how real-time streaming data should work — and you can join the team building the future of big data and stream processing.
Want more?
- Try this out with a Snowflake free trial account — you only need an email address to get started.
I’m Felipe Hoffa, Data Cloud Advocate for Snowflake. Thanks for joining me on this adventure. You can follow me on Twitter and LinkedIn. Check reddit.com/r/snowflake for the most interesting Snowflake news.