Skip to content

prefect-gcp

prefect-gcp helps you leverage the capabilities of Google Cloud Platform (GCP) in your workflows. For example, you can run flow on Vertex AI or Cloud Run, read and write data to BigQuery and Cloud Storage, retrieve secrets with Secret Manager.

Getting Started

Prerequisites

Install prefect-gcp for Prefect 2

pip install 'prefect[gcp]<3'

If using BigQuery, Cloud Storage, Secret Manager, or Vertex AI, see additional installation options.

To install with all additional functionality, use the following command:

pip install "prefect-gcp[all_extras]<3"

Register newly installed block types

Register the block types in the module to make them available for use.

prefect block register -m prefect_gcp

Authenticate using a GCP Credentials block

Authenticate with a service account to use prefect-gcp services.

  1. Refer to the GCP service account documentation to create and download a service account key file.
  2. Copy the JSON contents.
  3. Use the Python code below, replace the placeholders with your information.
from prefect_gcp import GcpCredentials

# replace this PLACEHOLDER dict with your own service account info
service_account_info = {
  "type": "service_account",
  "project_id": "PROJECT_ID",
  "private_key_id": "KEY_ID",
  "private_key": "-----BEGIN PRIVATE KEY-----\nPRIVATE_KEY\n-----END PRIVATE KEY-----\n",
  "client_email": "SERVICE_ACCOUNT_EMAIL",
  "client_id": "CLIENT_ID",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://accounts.google.com/o/oauth2/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/SERVICE_ACCOUNT_EMAIL"
}

GcpCredentials(
    service_account_info=service_account_info
).save("BLOCK-NAME-PLACEHOLDER")

service_account_info vs service_account_file

The advantage of using service_account_info, instead of service_account_file, is that it is accessible across containers.

If service_account_file is used, the provided file path must be available in the container executing the flow.

Alternatively, GCP can authenticate without storing credentials in a block. See the Third-pary Screts Guide for an analogous example that uses AWS Secrets Manager and Snowflake.

Run flows on Google Cloud Run or Vertex AI

Run flows on Google Cloud Run or Vertex AI to dynamically scale your infrastructure.

See the Google Cloud Run Worker Guide for a walkthrough of using Google Cloud Run to run workflows with a hybrid work pool.

If you're using Prefect Cloud, Google Cloud Run push work pools provide all the benefits of Google Cloud Run along with a quick setup and no worker needed.

Use Prefect with Google BigQuery

Read data from and write to Google BigQuery within your Prefect flows.

Be sure to install prefect-gcp with the BigQuery extra.

This code creates a new dataset in BigQuery, define a table, insert rows, and fetch data from the table:

from prefect import flow
from prefect_gcp.bigquery import GcpCredentials, BigQueryWarehouse

@flow
def bigquery_flow():
    all_rows = []
    gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")

    client = gcp_credentials.get_bigquery_client()
    client.create_dataset("test_example", exists_ok=True)

    with BigQueryWarehouse(gcp_credentials=gcp_credentials) as warehouse:
        warehouse.execute(
            "CREATE TABLE IF NOT EXISTS test_example.customers (name STRING, address STRING);"
        )
        warehouse.execute_many(
            "INSERT INTO test_example.customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Marvin", "address": "Highway 42"},
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = warehouse.fetch_many("SELECT * FROM test_example.customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.extend(new_rows)
    return all_rows


if __name__ == "__main__":
    bigquery_flow()

Use Prefect with Google Cloud Storage

Interact with Google Cloud Storage.

Be sure to install prefect-gcp with the Cloud Storage extra.

The code below uses prefect_gcp to upload a file to a Google Cloud Storage bucket and download the same file under a different file name.

from pathlib import Path
from prefect import flow
from prefect_gcp import GcpCredentials, GcsBucket


@flow
def cloud_storage_flow():
    # create a dummy file to upload
    file_path = Path("test-example.txt")
    file_path.write_text("Hello, Prefect!")

    gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
    gcs_bucket = GcsBucket(
        bucket="BUCKET-NAME-PLACEHOLDER",
        gcp_credentials=gcp_credentials
    )

    gcs_bucket_path = gcs_bucket.upload_from_path(file_path)
    downloaded_file_path = gcs_bucket.download_object_to_path(
        gcs_bucket_path, "downloaded-test-example.txt"
    )
    return downloaded_file_path.read_text()


if __name__ == "__main__":
    cloud_storage_flow()

Upload and download directories

GcsBucket supports uploading and downloading entire directories. To view examples, check out the Examples Catalog!

Save secrets with Google Secret Manager

Read and write secrets with Google Secret Manager.

Be sure to install prefect-gcp with the Secret Manager extra.

The code below writes a secret to the Secret Manager, reads the secret data, and deletes the secret.

from prefect import flow
from prefect_gcp import GcpCredentials, GcpSecret


@flow
def secret_manager_flow():
    gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
    gcp_secret = GcpSecret(secret_name="test-example", gcp_credentials=gcp_credentials)
    gcp_secret.write_secret(secret_data=b"Hello, Prefect!")
    secret_data = gcp_secret.read_secret()
    gcp_secret.delete_secret()
    return secret_data


if __name__ == "__main__":
    secret_manager_flow()

Access Google credentials or clients from GcpCredentials

You can instantiate a Google Cloud client, such as bigquery.Client.

Note that a GcpCredentials object is NOT a valid input to the underlying BigQuery client - use the get_credentials_from_service_account method to access and pass a google.auth.Credentials object.

import google.cloud.bigquery
from prefect import flow
from prefect_gcp import GcpCredentials

@flow
def create_bigquery_client():
    gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
    google_auth_credentials = gcp_credentials.get_credentials_from_service_account()
    bigquery_client = bigquery.Client(credentials=google_auth_credentials)

To access the underlying client, use the get_client method from GcpCredentials.

from prefect import flow
from prefect_gcp import GcpCredentials

@flow
def create_bigquery_client():
    gcp_credentials = GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")
    bigquery_client = gcp_credentials.get_client("bigquery")

Resources

For assistance using GCP, consult the Google Cloud documentation.

Refer to the prefect-gcp API documentation linked in the sidebar to explore all the capabilities of the prefect-gcp library.

Additional installation options

Additional installation options for GCP services are shown below.

To use Cloud Storage

pip install "prefect-gcp[cloud_storage]"

To use BigQuery

pip install "prefect-gcp[bigquery]<3"

To use Secret Manager

pip install "prefect-gcp[secret_manager]<3"

To use Vertex AI

pip install "prefect-gcp[aiplatform]<3"