prefect-gcp
¶
prefect-gcp
helps you leverage the capabilities of Google Cloud Platform (GCP) in your workflows.
For example, you can run flow on Vertex AI or Cloud Run, read and write data to BigQuery and Cloud Storage, retrieve secrets with Secret Manager.
Getting Started¶
Prerequisites¶
- Prefect installed in a virtual environment.
- An GCP account and the necessary permissions to access desired services.
Install prefect-gcp for Prefect 2¶
pip install 'prefect[gcp]<3'
If using BigQuery, Cloud Storage, Secret Manager, or Vertex AI, see additional installation options.
To install with all additional functionality, use the following command:
pip install "prefect-gcp[all_extras]<3"
Register newly installed block types¶
Register the block types in the module to make them available for use.
prefect block register -m prefect_gcp
Authenticate using a GCP Credentials block¶
Authenticate with a service account to use prefect-gcp
services.
- Refer to the GCP service account documentation to create and download a service account key file.
- Copy the JSON contents.
- Use the Python code below, replace the placeholders with your information.
from prefect_gcp import GcpCredentials
# replace this PLACEHOLDER dict with your own service account info
service_account_info = {
"type": "service_account",
"project_id": "PROJECT_ID",
"private_key_id": "KEY_ID",
"private_key": "-----BEGIN PRIVATE KEY-----\nPRIVATE_KEY\n-----END PRIVATE KEY-----\n",
"client_email": "SERVICE_ACCOUNT_EMAIL",
"client_id": "CLIENT_ID",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/SERVICE_ACCOUNT_EMAIL"
}
GcpCredentials(
service_account_info=service_account_info
).save("BLOCK-NAME-PLACEHOLDER")
service_account_info
vs service_account_file
The advantage of using service_account_info
, instead of service_account_file
, is that it is accessible across containers.
If service_account_file
is used, the provided file path must be available in the container executing the flow.
Alternatively, GCP can authenticate without storing credentials in a block. See the Third-pary Screts Guide for an analogous example that uses AWS Secrets Manager and Snowflake.
Run flows on Google Cloud Run or Vertex AI¶
Run flows on Google Cloud Run or Vertex AI to dynamically scale your infrastructure.
See the Google Cloud Run Worker Guide for a walkthrough of using Google Cloud Run to run workflows with a hybrid work pool.
If you're using Prefect Cloud, Google Cloud Run push work pools provide all the benefits of Google Cloud Run along with a quick setup and no worker needed.
Use Prefect with Google BigQuery¶
Read data from and write to Google BigQuery within your Prefect flows.
Be sure to install prefect-gcp
with the BigQuery extra.
This code creates a new dataset in BigQuery, define a table, insert rows, and fetch data from the table:
from prefect import flow
from prefect_gcp.bigquery import GcpCredentials, BigQueryWarehouse
@flow
def bigquery_flow():
all_rows = []
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
client = gcp_credentials.get_bigquery_client()
client.create_dataset("test_example", exists_ok=True)
with BigQueryWarehouse(gcp_credentials=gcp_credentials) as warehouse:
warehouse.execute(
"CREATE TABLE IF NOT EXISTS test_example.customers (name STRING, address STRING);"
)
warehouse.execute_many(
"INSERT INTO test_example.customers (name, address) VALUES (%(name)s, %(address)s);",
seq_of_parameters=[
{"name": "Marvin", "address": "Highway 42"},
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = warehouse.fetch_many("SELECT * FROM test_example.customers", size=2)
if len(new_rows) == 0:
break
all_rows.extend(new_rows)
return all_rows
if __name__ == "__main__":
bigquery_flow()
Use Prefect with Google Cloud Storage¶
Interact with Google Cloud Storage.
Be sure to install prefect-gcp
with the Cloud Storage extra.
The code below uses prefect_gcp
to upload a file to a Google Cloud Storage bucket and download the same file under a different file name.
from pathlib import Path
from prefect import flow
from prefect_gcp import GcpCredentials, GcsBucket
@flow
def cloud_storage_flow():
# create a dummy file to upload
file_path = Path("test-example.txt")
file_path.write_text("Hello, Prefect!")
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
gcs_bucket = GcsBucket(
bucket="BUCKET-NAME-PLACEHOLDER",
gcp_credentials=gcp_credentials
)
gcs_bucket_path = gcs_bucket.upload_from_path(file_path)
downloaded_file_path = gcs_bucket.download_object_to_path(
gcs_bucket_path, "downloaded-test-example.txt"
)
return downloaded_file_path.read_text()
if __name__ == "__main__":
cloud_storage_flow()
Upload and download directories
GcsBucket
supports uploading and downloading entire directories. To view examples, check out the Examples Catalog!
Save secrets with Google Secret Manager¶
Read and write secrets with Google Secret Manager.
Be sure to install prefect-gcp
with the Secret Manager extra.
The code below writes a secret to the Secret Manager, reads the secret data, and deletes the secret.
from prefect import flow
from prefect_gcp import GcpCredentials, GcpSecret
@flow
def secret_manager_flow():
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
gcp_secret = GcpSecret(secret_name="test-example", gcp_credentials=gcp_credentials)
gcp_secret.write_secret(secret_data=b"Hello, Prefect!")
secret_data = gcp_secret.read_secret()
gcp_secret.delete_secret()
return secret_data
if __name__ == "__main__":
secret_manager_flow()
Access Google credentials or clients from GcpCredentials¶
You can instantiate a Google Cloud client, such as bigquery.Client
.
Note that a GcpCredentials
object is NOT a valid input to the underlying BigQuery client - use the get_credentials_from_service_account
method to access and pass a google.auth.Credentials
object.
import google.cloud.bigquery
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
google_auth_credentials = gcp_credentials.get_credentials_from_service_account()
bigquery_client = bigquery.Client(credentials=google_auth_credentials)
To access the underlying client, use the get_client
method from GcpCredentials
.
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
bigquery_client = gcp_credentials.get_client("bigquery")
Resources¶
For assistance using GCP, consult the Google Cloud documentation.
Refer to the prefect-gcp API documentation linked in the sidebar to explore all the capabilities of the prefect-gcp library.
Additional installation options¶
Additional installation options for GCP services are shown below.
To use Cloud Storage¶
pip install "prefect-gcp[cloud_storage]"
To use BigQuery¶
pip install "prefect-gcp[bigquery]<3"
To use Secret Manager¶
pip install "prefect-gcp[secret_manager]<3"
To use Vertex AI¶
pip install "prefect-gcp[aiplatform]<3"