prefect-snowflake¶
Welcome¶
The prefect-snowflake collection makes it easy to connect to a Snowflake database in your Prefect flows. Check out the examples below to get started!
Getting Started¶
Integrate with Prefect flows¶
Prefect works with Snowflake by providing dataflow automation for faster, more efficient data pipeline creation, execution, and monitoring.
This results in reduced errors, increased confidence in your data, and ultimately, faster insights.
To set up a table, use the execute
and execute_many
methods. Then, use the fetch_many
method to retrieve data in a stream until there's no more data.
By using the SnowflakeConnector
as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.
Be sure to install prefect-snowflake, register the blocks, and create a credentials block to run the examples below!
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
@task
def setup_table(block_name: str) -> None:
with SnowflakeConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SnowflakeConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def snowflake_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__=="__main__":
snowflake_flow()
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
import asyncio
@task
async def setup_table(block_name: str) -> None:
with await SnowflakeConnector.load(block_name) as connector:
await connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await connector.execute_many(
"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
@task
async def fetch_data(block_name: str) -> list:
all_rows = []
with await SnowflakeConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
async def snowflake_flow(block_name: str) -> list:
await setup_table(block_name)
all_rows = await fetch_data(block_name)
return all_rows
if __name__=="__main__":
asyncio.run(snowflake_flow("example"))
Access underlying Snowflake connection¶
If the native methods of the block don't meet your requirements, don't worry.
You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.
import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas
@flow
def snowflake_write_pandas_flow():
connector = SnowflakeConnector.load("my-block")
with connector.get_connection() as connection:
table_name = "TABLE_NAME"
ddl = "NAME STRING, NUMBER INT"
statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
with connection.cursor() as cursor:
cursor.execute(statement)
# case sensitivity matters here!
df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
success, num_chunks, num_rows, _ = write_pandas(
conn=connection,
df=df,
table_name=table_name,
database=snowflake_connector.database,
schema=snowflake_connector.schema_ # note the "_" suffix
)
Resources¶
For more tips on how to use tasks and flows in an integration, check out Using Collections!
Installation¶
Install prefect-snowflake
for Prefect:
pip install 'prefect[snowflake]<3'
Registering blocks¶
Register blocks in this module to make them available for use.
prefect block register -m prefect_aws
Saving credentials to block¶
Note, to use the load
method on a block, you must already have a block document saved through code or saved through the UI.
Below is a walkthrough on saving a SnowflakeCredentials
block through code.
- Head over to https://app.snowflake.com/.
- Login to your Snowflake account, e.g. nh12345.us-east-2.aws, with your username and password.
- Use those credentials to fill replace the placeholders below.
from prefect_snowflake import SnowflakeCredentials
credentials = SnowflakeCredentials(
account="ACCOUNT-PLACEHOLDER", # resembles nh12345.us-east-2.aws
user="USER-PLACEHOLDER",
password="PASSWORD-PLACEHOLDER"
)
credentials.save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
Then, to create a SnowflakeConnector
block:
- After logging in, click on any worksheet.
- On the left side, select a database and schema.
- On the top right, select a warehouse.
- Create a short script, replacing the placeholders below.
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector
credentials = SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
connector = SnowflakeConnector(
credentials=credentials,
database="DATABASE-PLACEHOLDER",
schema="SCHEMA-PLACEHOLDER",
warehouse="COMPUTE_WH",
)
connector.save("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
Congrats! You can now easily load the saved block, which holds your credentials and connection info:
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector
SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
SnowflakeConnector.load("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
Registering blocks
Register blocks in this module to view and edit them on Prefect Cloud:
prefect block register -m prefect_snowflake
A list of available blocks in prefect-snowflake
and their setup instructions can be found here.
Feedback¶
If you encounter any bugs while using prefect-snowflake
, feel free to open an issue in the prefect repository.
If you have any questions or issues while using prefect-snowflake
, you can find help in the Prefect Slack community.
Contributing¶
If you'd like to help contribute to fix an issue or add a feature to prefect-snowflake
, please propose changes through a pull request from a fork of the Prefect repository.