Workers¶
Prerequisites¶
Docker installed and running on your machine.
Why workers¶
In the previous section of the tutorial, you learned how work pools are a bridge between the Prefect orchestration layer and infrastructure for flow runs that can be dynamically provisioned.
You saw how you can transition from persistent infrastructure to dynamic infrastructure by using flow.deploy
instead of flow.serve
.
Work pools that rely on client-side workers take this a step further by enabling you to run work flows in your own Docker containers, Kubernetes clusters, and serverless environments such as AWS ECS, Azure Container Instances, and GCP Cloud Run.
The architecture of a worker-based work pool deployment can be summarized with the following diagram:
graph TD
subgraph your_infra["Your Execution Environment"]
worker["Worker"]
subgraph flow_run_infra[Flow Run Infra]
flow_run_a(("Flow Run A"))
end
subgraph flow_run_infra_2[Flow Run Infra]
flow_run_b(("Flow Run B"))
end
end
subgraph api["Prefect API"]
Deployment --> |assigned to| work_pool
work_pool(["Work Pool"])
end
worker --> |polls| work_pool
worker --> |creates| flow_run_infra
worker --> |creates| flow_run_infra_2
Notice above that the worker is in charge of provisioning the flow run infrastructure. In context of this tutorial, that flow run infrastructure is an ephemeral Docker container to host each flow run. Different worker types create different types of flow run infrastructure.
Now that we’ve reviewed the concepts of a work pool and worker, let’s create them so that you can deploy your tutorial flow, and execute it later using the Prefect API.
Set up a work pool and worker¶
For this tutorial you will create a Docker type work pool via the CLI.
Using the Docker work pool type means that all work sent to this work pool will run within a dedicated Docker container using a Docker client available to the worker.
Other work pool types
There are work pool types for serverless computing environments such as AWS ECS, Azure Container Instances, Google Cloud Run, and Vertex AI. Kubernetes is also a popular work pool type.
These options are expanded upon in various How-to Guides.
Create a work pool¶
In your terminal, run the following command to set up a Docker type work pool.
prefect work-pool create --type docker my-docker-pool
Let’s confirm that the work pool was successfully created by running the following command in the same terminal.
prefect work-pool ls
You should see your new my-docker-pool
listed in the output.
Finally, let’s double check that you can see this work pool in your Prefect UI.
Navigate to the Work Pools tab and verify that you see my-docker-pool
listed.
When you click into my-docker-pool
you should see a red status icon signifying that this work pool is not ready.
To make the work pool ready, you need to start a worker.
Start a worker¶
Workers are a lightweight polling process that kick off scheduled flow runs on a specific type of infrastructure (such as Docker).
To start a worker on your local machine, open a new terminal and confirm that your virtual environment has prefect
installed.
Run the following command in this new terminal to start the worker:
prefect worker start --pool my-docker-pool
You should see the worker start. It's now polling the Prefect API to check for any scheduled flow runs it should pick up and then submit for execution. You’ll see your new worker listed in the UI under the Workers tab of the Work Pools page with a recent last polled date.
You should also be able to see a Ready
status indicator on your work pool - progress!
You will need to keep this terminal session active for the worker to continue to pick up jobs. Since you are running this worker locally, the worker will terminate if you close the terminal. Therefore, in a production setting this worker should run as a daemonized or managed process.
Now that you’ve set up your work pool and worker, we have what we need to kick off and execute flow runs of flows deployed to this work pool.
Let's deploy your tutorial flow to my-docker-pool
.
Create the deployment¶
From our previous steps, we now have:
- A flow
- A work pool
- A worker
Now it’s time to put it all together.
We're going to update our repo_info.py
file to build a Docker image and update our deployment so our worker can execute it.
The updates that you need to make to repo_info.py
are:
- Change
flow.serve
toflow.deploy
. - Tell
flow.deploy
which work pool to deploy to. - Tell
flow.deploy
the name to use for the Docker image that will be built.
Here's what the updated repo_info.py
looks like:
import httpx
from prefect import flow
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
url = f"https://api.github.com/repos/{repo_name}"
response = httpx.get(url)
response.raise_for_status()
repo = response.json()
print(f"{repo_name} repository statistics 🤓:")
print(f"Stars 🌠 : {repo['stargazers_count']}")
print(f"Forks 🍴 : {repo['forks_count']}")
if __name__ == "__main__":
get_repo_info.deploy(
name="my-first-deployment",
work_pool_name="my-docker-pool",
image="my-first-deployment-image:tutorial",
push=False
)
Why the push=False
?
For this tutorial, your Docker worker is running on your machine, so we don't need to push the image built by flow.deploy
to a registry. When your worker is running on a remote machine, you will need to push the image to a registry that the worker can access.
Remove the push=False
argument, include your registry name, and ensure you've authenticated with the Docker CLI to push the image to a registry.
Now that you've updated your script, you can run it to deploy your flow to the work pool:
python repo_info.py
Prefect will build a custom Docker image containing your workflow code that the worker can use to dynamically spawn Docker containers whenever this workflow needs to run.
What Dockerfile?
In this example, Prefect generates a Dockerfile for you that will build an image based off of one of Prefect's published images. The generated Dockerfile will copy the current directory into the Docker image and install any dependencies listed in a requirements.txt
file.
If you want to use a custom Dockerfile, you can specify the path to the Dockerfile using the DeploymentImage
class:
import httpx
from prefect import flow
from prefect.deployments import DeploymentImage
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
url = f"https://api.github.com/repos/{repo_name}"
response = httpx.get(url)
response.raise_for_status()
repo = response.json()
print(f"{repo_name} repository statistics 🤓:")
print(f"Stars 🌠 : {repo['stargazers_count']}")
print(f"Forks 🍴 : {repo['forks_count']}")
if __name__ == "__main__":
get_repo_info.deploy(
name="my-first-deployment",
work_pool_name="my-docker-pool",
image=DeploymentImage(
name="my-first-deployment-image",
tag="tutorial",
dockerfile="Dockerfile"
),
push=False
)
Modify the deployment¶
If you need to make updates to your deployment, you can do so by modifying your script and rerunning it. You'll need to make one update to specify a value for job_variables
to ensure your Docker worker can successfully execute scheduled runs for this flow. See the example below.
The job_variables
section allows you to fine-tune the infrastructure settings for a specific deployment. These values override default values in the specified work pool's base job template.
When testing images locally without pushing them to a registry (to avoid potential errors like docker.errors.NotFound), it's recommended to include an image_pull_policy
job_variable set to Never
. However, for production workflows, always consider pushing images to a remote registry for more reliability and accessibility.
Here's how you can quickly set the image_pull_policy
to be Never
for this tutorial deployment without affecting the default value set on your work pool:
import httpx
from prefect import flow
@flow(log_prints=True)
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
url = f"https://api.github.com/repos/{repo_name}"
response = httpx.get(url)
response.raise_for_status()
repo = response.json()
print(f"{repo_name} repository statistics 🤓:")
print(f"Stars 🌠 : {repo['stargazers_count']}")
print(f"Forks 🍴 : {repo['forks_count']}")
if __name__ == "__main__":
get_repo_info.deploy(
name="my-first-deployment",
work_pool_name="my-docker-pool",
job_variables={"image_pull_policy": "Never"},
image="my-first-deployment-image:tutorial",
push=False
)
To register this update to your deployment's parameters with Prefect's API, run:
python repo_info.py
Now everything is set for us to submit a flow-run to the work pool:
prefect deployment run 'get_repo_info/my-deployment'
Common Pitfall
- Store and run your deploy scripts at the root of your repo, otherwise the built Docker file may be missing files that it needs to execute!
Did you know?
A Prefect flow can have more than one deployment. This pattern can be useful if you want your flow to run in different execution environments.
Next steps¶
- Go deeper with deployments and learn about configuring deployments in YAML with
prefect.yaml
. - Concepts contain deep dives into Prefect components.
- Guides provide step-by-step recipes for common Prefect operations including:
- Deploying flows on Kubernetes
- Deploying flows in Docker
- Deploying flows on serverless infrastructure
- Daemonizing workers
Happy building!