Infrastructure¶
Workers are recommended
Infrastructure blocks are part of the agent based deployment model. Work Pools and Workers simplify the specification of each flow's infrastructure and runtime environment. If you have existing agents, you can upgrade from agents to workers to significantly enhance the experience of deploying flows.
Users may specify an infrastructure block when creating a deployment. This block will be used to specify infrastructure for flow runs created by the deployment at runtime.
Infrastructure can only be used with a deployment. When you run a flow directly by calling the flow yourself, you are responsible for the environment in which the flow executes.
Infrastructure overview¶
Prefect uses infrastructure to create the environment for a user's flow to execute.
Infrastructure is attached to a deployment and is propagated to flow runs created for that deployment. Infrastructure is deserialized by the agent and it has two jobs:
- Create execution environment infrastructure for the flow run.
- Run a Python command to start the
prefect.engine
in the infrastructure, which retrieves the flow from storage and executes the flow.
The engine acquires and calls the flow. Infrastructure doesn't know anything about how the flow is stored, it's just passing a flow run ID to the engine.
Infrastructure is specific to the environments in which flows will run. Prefect currently provides the following infrastructure types:
Process
runs flows in a local subprocess.DockerContainer
runs flows in a Docker container.KubernetesJob
runs flows in a Kubernetes Job.ECSTask
runs flows in an Amazon ECS Task.Cloud Run
runs flows in a Google Cloud Run Job.Container Instance
runs flows in an Azure Container Instance.
What about tasks?
Flows and tasks can both use configuration objects to manage the environment in which code runs.
Flows use infrastructure.
Tasks use task runners. For more on how task runners work, see Task Runners.
Using infrastructure¶
You may create customized infrastructure blocks through the Prefect UI or Prefect Cloud Blocks page or create them in code and save them to the API using the blocks .save()
method.
Once created, there are two distinct ways to use infrastructure in a deployment:
- Starting with Prefect defaults — this is what happens when you pass the
-i
or--infra
flag and provide a type when building deployment files. - Pre-configure infrastructure settings as blocks and base your deployment infrastructure on those settings — by passing
-ib
or--infra-block
and a block slug when building deployment files.
For example, when creating your deployment files, the supported Prefect infrastrucure types are:
process
docker-container
kubernetes-job
ecs-task
cloud-run-job
container-instance-job
$ prefect deployment build ./my_flow.py:my_flow -n my-flow-deployment -t test -i docker-container -sb s3/my-bucket --override env.EXTRA_PIP_PACKAGES=s3fs
Found flow 'my-flow'
Successfully uploaded 2 files to s3://bucket-full-of-sunshine
Deployment YAML created at '/Users/terry/test/flows/infra/deployment.yaml'.
In this example we specify the DockerContainer
infrastructure in addition to a preconfigured AWS S3 bucket storage block.
The default deployment YAML filename may be edited as needed to add an infrastructure type or infrastructure settings.
###
### A complete description of a Prefect Deployment for flow 'my-flow'
###
name: my-flow-deployment
description: null
version: e29de5d01b06d61b4e321d40f34a480c
# The work queue that will handle this deployment's runs
work_queue_name: default
work_pool_name: default-agent-pool
tags:
- test
parameters: {}
schedules: []
paused: true
infra_overrides:
env.EXTRA_PIP_PACKAGES: s3fs
infrastructure:
type: docker-container
env: {}
labels: {}
name: null
command:
- python
- -m
- prefect.engine
image: prefecthq/prefect:2-latest
image_pull_policy: null
networks: []
network_mode: null
auto_remove: false
volumes: []
stream_output: true
memswap_limit: null
mem_limit: null
privileged: false
block_type_slug: docker-container
_block_type_slug: docker-container
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: my-flow
manifest_path: my_flow-manifest.json
storage:
bucket_path: bucket-full-of-sunshine
aws_access_key_id: '**********'
aws_secret_access_key: '**********'
_is_anonymous: true
_block_document_name: anonymous-xxxxxxxx-f1ff-4265-b55c-6353a6d65333
_block_document_id: xxxxxxxx-06c2-4c3c-a505-4a8db0147011
block_type_slug: s3
_block_type_slug: s3
path: ''
entrypoint: my_flow.py:my-flow
parameter_openapi_schema:
title: Parameters
type: object
properties: {}
required: null
definitions: null
timestamp: '2023-02-08T23:00:14.974642+00:00'
Editing deployment YAML
Note the big DO NOT EDIT comment in the deployment YAML: In practice, anything above this block can be freely edited before running prefect deployment apply
to create the deployment on the API.
Once the deployment exists, any flow runs that this deployment starts will use DockerContainer
infrastructure.
You can also create custom infrastructure blocks — either in the Prefect UI for in code via the API — and use the settings in the block to configure your infastructure. For example, here we specify settings for Kubernetes infrastructure in a block named k8sdev
.
from prefect.infrastructure import KubernetesJob, KubernetesImagePullPolicy
k8s_job = KubernetesJob(
namespace="dev",
image="prefecthq/prefect:2.0.0-python3.9",
image_pull_policy=KubernetesImagePullPolicy.IF_NOT_PRESENT,
)
k8s_job.save("k8sdev")
Now we can apply the infrastrucure type and settings in the block by specifying the block slug kubernetes-job/k8sdev
as the infrastructure type when building a deployment:
prefect deployment build flows/k8s_example.py:k8s_flow --name k8sdev --tag k8s -sb s3/dev -ib kubernetes-job/k8sdev
See Deployments for more information about deployment build options.
Configuring infrastructure¶
Every infrastrcture type has type-specific options.
Process¶
Process
infrastructure runs a command in a new process.
Current environment variables and Prefect settings will be included in the created process. Configured environment variables will override any current environment variables.
Process
supports the following settings:
Attributes | Description |
---|---|
command | A list of strings specifying the command to start the flow run. In most cases you should not override this. |
env | Environment variables to set for the new process. |
labels | Labels for the process. Labels are for metadata purposes only and cannot be attached to the process itself. |
name | A name for the process. For display purposes only. |
DockerContainer¶
DockerContainer
infrastructure executes flow runs in a container.
Requirements for DockerContainer
:
- Docker Engine must be available.
- You must configure remote Storage. Local storage is not supported for Docker.
- The API must be available from within the flow run container. To facilitate connections to locally hosted APIs,
localhost
and127.0.0.1
will be replaced withhost.docker.internal
. - The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
DockerContainer
supports the following settings:
Attributes | Description |
---|---|
auto_remove | Bool indicating whether the container will be removed on completion. If False, the container will remain after exit for inspection. |
command | A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. |
env | Environment variables to set for the container. |
image | An optional string specifying the name of a Docker image to use. Defaults to the Prefect image. If the image is stored anywhere other than a public Docker Hub registry, use a corresponding registry block, e.g. DockerRegistry or ensure otherwise that your execution layer is authenticated to pull the image from the image registry. |
image_pull_policy | Specifies if the image should be pulled. One of 'ALWAYS', 'NEVER', 'IF_NOT_PRESENT'. |
image_registry | A DockerRegistry block containing credentials to use if image is stored in a private image registry. |
labels | An optional dictionary of labels, mapping name to value. |
name | An optional name for the container. |
networks | An optional list of strings specifying Docker networks to connect the container to. |
network_mode | Set the network mode for the created container. Defaults to 'host' if a local API url is detected, otherwise the Docker default of 'bridge' is used. If 'networks' is set, this cannot be set. |
stream_output | Bool indicating whether to stream output from the subprocess to local standard output. |
volumes | An optional list of volume mount strings in the format of "local_path:container_path". |
Prefect automatically sets a Docker image matching the Python and Prefect version you're using at deployment time. You can see all available images at Docker Hub.
KubernetesJob¶
KubernetesJob
infrastructure executes flow runs in a Kubernetes Job.
Requirements for KubernetesJob
:
kubectl
must be available.- You must configure remote Storage. Local storage is not supported for Kubernetes.
- The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
The Prefect CLI command prefect kubernetes manifest server
automatically generates a Kubernetes manifest with default settings for Prefect deployments. By default, it simply prints out the YAML configuration for a manifest. You can pipe this output to a file of your choice and edit as necessary.
KubernetesJob
supports the following settings:
Attributes | Description |
---|---|
cluster_config | An optional Kubernetes cluster config to use for this job. |
command | A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. |
customizations | A list of JSON 6902 patches to apply to the base Job manifest. Alternatively, a valid JSON string is allowed (handy for deployments CLI). |
env | Environment variables to set for the container. |
finished_job_ttl | The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed. |
image | String specifying the tag of a Docker image to use for the Job. |
image_pull_policy | The Kubernetes image pull policy to use for job containers. |
job | The base manifest for the Kubernetes Job. |
job_watch_timeout_seconds | Number of seconds to watch for job creation before timing out (defaults to None). |
labels | Dictionary of labels to add to the Job. |
name | An optional name for the job. |
namespace | String signifying the Kubernetes namespace to use. |
pod_watch_timeout_seconds | Number of seconds to watch for pod creation before timing out (default 60). |
service_account_name | An optional string specifying which Kubernetes service account to use. |
stream_output | Bool indicating whether to stream output from the subprocess to local standard output. |
KubernetesJob overrides and customizations¶
When creating deployments using KubernetesJob
infrastructure, the infra_overrides
parameter expects a dictionary. For a KubernetesJob
, the customizations
parameter expects a list.
Containers expect a list of objects, even if there is only one.
For any patches applying to the container, the path value should be a list, for example:
/spec/templates/spec/containers/0/resources
A Kubernetes-Job
infrastructure block defined in Python:
customizations = [
{
"op": "add",
"path": "/spec/template/spec/containers/0/resources",
"value": {
"requests": {
"cpu": "2000m",
"memory": "4gi"
},
"limits": {
"cpu": "4000m",
"memory": "8Gi",
"nvidia.com/gpu": "1"
}
},
}
]
k8s_job = KubernetesJob(
namespace=namespace,
image=image_name,
image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
finished_job_ttl=300,
job_watch_timeout_seconds=600,
pod_watch_timeout_seconds=600,
service_account_name="prefect-server",
customizations=customizations,
)
k8s_job.save("devk8s")
A Deployment
with infra-overrides defined in Python:
infra_overrides={
"customizations": [
{
"op": "add",
"path": "/spec/template/spec/containers/0/resources",
"value": {
"requests": {
"cpu": "2000m",
"memory": "4gi"
},
"limits": {
"cpu": "4000m",
"memory": "8Gi",
"nvidia.com/gpu": "1"
}
},
}
]
}
# Load an already created K8s Block
k8sjob = k8s_job.load("devk8s")
deployment = Deployment.build_from_flow(
flow=my_flow,
name="s3-example",
version=2,
work_queue_name="aws",
infrastructure=k8sjob,
storage=storage,
infra_overrides=infra_overrides,
)
deployment.apply()
ECSTask¶
ECSTask
infrastructure runs your flow in an ECS Task.
Requirements for ECSTask
:
- The ephemeral Prefect API won't work with ECS directly. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
- The
prefect-aws
collection must be installed within the agent environment:pip install prefect-aws
- The
ECSTask
andAwsCredentials
blocks must be registered within the agent environment:prefect block register -m prefect_aws.ecs
- You must configure remote Storage. Local storage is not supported for ECS tasks. The most commonly used type of storage with
ECSTask
is S3. If you leverage that type of block, make sure thats3fs
is installed within your agent and flow run environment. The easiest way to satisfy all the installation-related points mentioned above is to include the following commands in your Dockerfile:
FROM prefecthq/prefect:2-python3.9 # example base image
RUN pip install s3fs prefect-aws
Make sure to allocate enough CPU and memory to your agent, and consider adding retries
When you start a Prefect agent on AWS ECS Fargate, allocate as much CPU and memory as needed for your workloads. Your agent needs enough resources to appropriately provision infrastructure for your flow runs and to monitor their execution. Otherwise, your flow runs may get stuck in a Pending
state. Alternatively, set a work-queue concurrency limit to ensure that the agent will not try to process all runs at the same time.
Some API calls to provision infrastructure may fail due to unexpected issues on the client side (for example, transient errors such as ConnectionError
, HTTPClientError
, or RequestTimeout
), or due to server-side rate limiting from the AWS service. To mitigate those issues, we recommend adding environment variables such as AWS_MAX_ATTEMPTS
(can be set to an integer value such as 10) and AWS_RETRY_MODE
(can be set to a string value including standard
or adaptive
modes). Those environment variables must be added within the agent environment, e.g. on your ECS service running the agent, rather than on the ECSTask
infrastructure block.
Docker images¶
Learn about options for Prefect-maintained Docker images in the Docker guide.