Upgrade from Agents to Workers¶
Upgrading from agents to workers significantly enhances the experience of deploying flows. It simplifies the specification of each flow's infrastructure and runtime environment.
A worker is the fusion of an agent with an infrastructure block. Like agents, workers poll a work pool for flow runs that are scheduled to start. Like infrastructure blocks, workers are typed - they work with only one kind of infrastructure, and they specify the default configuration for jobs submitted to that infrastructure.
Accordingly, workers are not a drop-in replacement for agents. Using workers requires deploying flows differently. In particular, deploying a flow with a worker does not involve specifying an infrastructure block. Instead, infrastructure configuration is specified on the work pool and passed to each worker that polls work from that pool.
This guide provides an overview of the differences between agents and workers. It also describes how to upgrade from agents to workers in just a few quick steps.
Enhancements¶
Workers¶
- Improved visibility into the status of each worker, including when a worker was started and when it last polled.
- Better handling of race conditions for high availability use cases.
Work pools¶
- Work pools allow greater customization and governance of infrastructure parameters for deployments via their base job template.
- Prefect Cloud push work pools enable flow execution in your cloud provider environment without needing to host a worker.
- Prefect Cloud managed work pools allow you to run flows on Prefect's infrastructure, without needing to host a worker or configure cloud provider infrastructure.
Improved deployment interfaces¶
- The Python deployment experience with
.deploy()
or the alternative deployment experience withprefect.yaml
are more flexible and easier to use than block and agent-based deployments. - Both options allow you to deploy multiple flows with a single command.
- Both options allow you to build Docker images for your flows to create portable execution environments.
- The YAML-based API supports templating to enable dryer deployment definitions.
What's different¶
-
Deployment CLI and Python SDK:
prefect deployment build <entrypoint>
/prefect deployment apply
-->prefect deploy
Prefect will now automatically detect flows in your repo and provide a wizard 🧙 to guide you through setting required attributes for your deployments.
Deployment.build_from_flow
-->flow.deploy
-
Configuring remote flow code storage:
storage blocks --> pull action
When using the YAML-based deployment API, you can configure a pull action in your
prefect.yaml
file to specify how to retrieve flow code for your deployments. You can use configuration from your existing storage blocks to define your pull action via templating.When using the Python deployment API, you can pass any storage block to the
flow.deploy
method to specify how to retrieve flow code for your deployment. -
Configuring flow run infrastructure:
infrastructure blocks --> typed work pool
Default infrastructure config is now set on the typed work pool, and can be overwritten by individual deployments.
-
Managing multiple deployments:
Create and/or update many deployments at once through a
prefect.yaml
file or use thedeploy
function.
What's similar¶
- Storage blocks can be set as the pull action in a
prefect.yaml
file. - Infrastructure blocks have configuration fields similar to typed work pools.
-
Deployment-level infrastructure overrides operate in much the same way.
infra_override
->job_variable
-
The process for starting an agent and starting a worker in your environment are virtually identical.
prefect agent start --pool <work pool name>
-->prefect worker start --pool <work pool name>
Worker Helm chart
If you host your agents in a Kubernetes cluster, you can use the Prefect worker Helm chart to host workers in your cluster.
Upgrade guide¶
If you have existing deployments that use infrastructure blocks, you can quickly upgrade them to be compatible with workers by following these steps:
This new work pool will replace your infrastructure block.
You can use the .publish_as_work_pool
method on any infrastructure block to create a work pool with the same configuration.
For example, if you have a KubernetesJob
infrastructure block named 'my-k8s-job', you can create a work pool with the same configuration with this script:
from prefect.infrastructure import KubernetesJob
KubernetesJob.load("my-k8s-job").publish_as_work_pool()
Running this script will create a work pool named 'my-k8s-job' with the same configuration as your infrastructure block.
Serving flows
If you are using a Process
infrastructure block and a LocalFilesystem
storage block (or aren't using an infrastructure and storage block at all), you can use flow.serve
to create a deployment without needing to specify a work pool name or start a worker.
This is a quick way to create a deployment for a flow and is a great way to manage your deployments if you don't need the dynamic infrastructure creation or configuration offered by workers.
Check out our Docker guide for how to build a served flow into a Docker image and host it in your environment.
This worker will replace your agent and poll your new work pool for flow runs to execute.
prefect worker start -p <work pool name>
- Deploy your flows to the new work pool
To deploy your flows to the new work pool, you can use flow.deploy
for a Pythonic deployment experience or prefect deploy
for a YAML-based deployment experience.
If you currently use Deployment.build_from_flow
, we recommend using flow.deploy
.
If you currently use prefect deployment build
and prefect deployment apply
, we recommend using prefect deploy
.
flow.deploy
¶
If you have a Python script that uses Deployment.build_from_flow
, you can replace it with flow.deploy
.
Most arguments to Deployment.build_from_flow
can be translated directly to flow.deploy
, but here are some changes that you may need to make:
- Replace
infrastructure
withwork_pool_name
. - If you've used the
.publish_as_work_pool
method on your infrastructure block, use the name of the created work pool. - Replace
infra_overrides
withjob_variables
. - Replace
storage
with a call toflow.from_source
. flow.from_source
will load your flow from a remote storage location and make it deployable. Your existing storage block can be passed to thesource
argument offlow.from_source
.
Below are some examples of how to translate Deployment.build_from_flow
to flow.deploy
.
Deploying without any blocks¶
If you aren't using any blocks:
from prefect import flow
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a Python script!")
if __name__ == "__main__":
Deployment.build_from_flow(
my_flow,
name="my-deployment",
parameters=dict(name="Marvin"),
)
You can replace Deployment.build_from_flow
with flow.serve
:
from prefect import flow
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a Python script!")
if __name__ == "__main__":
my_flow.serve(
name="my-deployment",
parameters=dict(name="Marvin"),
)
This will start a process that will serve your flow and execute any flow runs that are scheduled to start.
Deploying using a storage block¶
If you currently use a storage block to load your flow code but no infrastructure block:
from prefect import flow
from prefect.storage import GitHub
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a GitHub repo!")
if __name__ == "__main__":
Deployment.build_from_flow(
my_flow,
name="my-deployment",
storage=GitHub.load("demo-repo"),
parameters=dict(name="Marvin"),
)
you can use flow.from_source
to load your flow from the same location and flow.serve
to create a deployment:
from prefect import flow
from prefect.storage import GitHub
if __name__ == "__main__":
flow.from_source(
source=GitHub.load("demo-repo"),
entrypoint="example.py:my_flow"
).serve(
name="my-deployment",
parameters=dict(name="Marvin"),
)
This will allow you to execute scheduled flow runs without starting a worker. Additionally, the process serving your flow will regularly check for updates to your flow code and automatically update the flow if it detects any changes to the code.
Deploying using an infrastructure and storage block¶
For the code below, we'll need to create a work pool from our infrastructure block and pass it to flow.deploy
as the work_pool_name
argument. We'll also need to pass our storage block to flow.from_source
as the source
argument.
from prefect import flow
from prefect.deployments import Deployment
from prefect.filesystems import GitHub
from prefect.infrastructure.kubernetes import KubernetesJob
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a GitHub repo!")
if __name__ == "__main__":
Deployment.build_from_flow(
my_flow,
name="my-deployment",
storage=GitHub.load("demo-repo"),
entrypoint="example.py:my_flow",
infrastructure=KubernetesJob.load("my-k8s-job"),
infra_overrides=dict(pull_policy="Never"),
parameters=dict(name="Marvin"),
)
The equivalent deployment code using flow.deploy
would look like this:
from prefect import flow
from prefect.storage import GitHub
if __name__ == "__main__":
flow.from_source(
source=GitHub.load("demo-repo"),
entrypoint="example.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-k8s-job",
job_variables=dict(pull_policy="Never"),
parameters=dict(name="Marvin"),
)
Note that when using flow.from_source(...).deploy(...)
, the flow you're deploying does not need to be available locally before running your script.
Deploying via a Docker image¶
If you currently bake your flow code into a Docker image before deploying, you can use the image
argument of flow.deploy
to build a Docker image as part of your deployment process:
from prefect import flow
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a Docker image!")
if __name__ == "__main__":
my_flow.deploy(
name="my-deployment",
image="my-repo/my-image:latest",
work_pool_name="my-k8s-job",
job_variables=dict(pull_policy="Never"),
parameters=dict(name="Marvin"),
)
You can skip a flow.from_source
call when building an image with flow.deploy
. Prefect will keep track of the flow's source code location in the image and load it from that location when the flow is executed.
Using prefect deploy
¶
Always run prefect deploy
commands from the root level of your repo!
With agents, you might have had multiple deployment.yaml
files, but under worker deployment patterns, each repo will have a single prefect.yaml
file located at the root of the repo that contains deployment configuration for all flows in that repo.
To set up a new prefect.yaml
file for your deployments, run the following command from the root level of your repo:
prefect deploy
This will start a wizard that will guide you through setting up your deployment.
For step 4, select y
on the last prompt to save the configuration for the deployment.
Saving the configuration for your deployment will result in a prefect.yaml
file populated with your first deployment. You can use this YAML file to edit and define multiple deployments for this repo.
You can add more deployments to the deployments
list in your prefect.yaml
file and/or by continuing to use the deployment creation wizard.
For more information on deployments, check out our in-depth guide for deploying flows to work pools.