prefect-ray¶
Ray can run the tasks in your flow in parallel by distributing them over multiple machines.
The prefect-ray
integration makes it easy speed up your flow runs by integrating Ray into your code.
Getting Started¶
Prerequisites¶
- Prefect installed in a virtual environment.
Install prefect-ray for Prefect 2¶
pip install 'prefect[aws]<2'
Ray environment limitations
While we're excited about parallel task execution via Ray, there are a few limitations with Ray you should be aware of:
- Support for Python 3.11 is experimental.
- Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from
pip
alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component withconda
. See the Ray documentation for instructions. - Ray's Windows support is currently in beta.
See the Ray installation documentation for further compatibility information.
Running tasks on Ray¶
The RayTaskRunner
is a Prefect task runner that submits tasks to Ray for parallel execution.
By default, a temporary Ray instance is created for the duration of the flow run.
For example, this flow counts to three in parallel.
import time
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
If you already have a Ray instance running, you can provide the connection URL via an address
argument.
To configure your flow to use the RayTaskRunner
:
- Make sure the
prefect-ray
collection is installed as described earlier:pip install prefect-ray
. - In your flow code, import
RayTaskRunner
fromprefect_ray.task_runners
. - Assign it as the task runner when the flow is defined using the
task_runner=RayTaskRunner
argument.
For example, this flow uses the RayTaskRunner
with a local, temporary Ray instance created by Prefect at flow run time.
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner
@flow(task_runner=RayTaskRunner())
def my_flow():
...
This flow uses the RayTaskRunner
configured to access an existing Ray instance at ray://192.0.2.255:8786
.
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner
@flow(task_runner=RayTaskRunner(address="ray://192.0.2.255:8786"))
def my_flow():
...
RayTaskRunner
accepts the following optional parameters:
Parameter | Description |
---|---|
address | Address of a currently running Ray instance, starting with the ray:// URI. |
init_kwargs | Additional kwargs to use when calling ray.init . |
Note that Ray Client uses the ray:// URI to indicate the address of a Ray instance. If you don't provide the address
of a Ray instance, Prefect creates a temporary instance automatically.
Ray environment limitations
Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from pip
alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with conda
. See the Ray documentation for instructions.
See the Ray installation documentation for further compatibility information.
Running tasks on a Ray remote cluster¶
When using the RayTaskRunner
with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:
- By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs.
We recommend using the Prefect UI to configure a storage block to use for remote results storage.
Here's an example of a flow that uses caching and remote result storage:
from typing import List
from prefect import flow, get_run_logger, task
from prefect.filesystems import S3
from prefect.tasks import task_input_hash
from prefect_ray.task_runners import RayTaskRunner
# The result of this task will be cached in the configured result storage
@task(cache_key_fn=task_input_hash)
def say_hello(name: str) -> None:
logger = get_run_logger()
# This log statement will print only on the first run. Subsequent runs will be cached.
logger.info(f"hello {name}!")
return name
@flow(
task_runner=RayTaskRunner(
address="ray://<instance_public_ip_address>:10001",
),
# Using an S3 block that has already been created via the Prefect UI
result_storage="s3/my-result-storage",
)
def greetings(names: List[str]) -> None:
for name in names:
say_hello.submit(name)
if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
- If you get an error stating that the module 'prefect' cannot be found, ensure
prefect
is installed on the remote cluster, with:
pip install prefect
- If you get an error with a message similar to "File system created with scheme 's3' could not be created", ensure the required Python modules are installed on both local and remote machines. The required prerequisite modules can be found in the Prefect documentation. For example, if using S3 for storage:
pip install s3fs
- If you are seeing timeout or other connection errors, double check the address provided to the
RayTaskRunner
. The address should look similar to:address='ray://<head_node_ip_address>:10001'
:
RayTaskRunner(address="ray://1.23.199.255:10001")
Specifying remote options¶
The remote_options
context can be used to control the task’s remote options.
For example, we can set the number of CPUs and GPUs to use for the process
task:
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
@task
def process(x):
return x + 1
@flow(task_runner=RayTaskRunner())
def my_flow():
# equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
with remote_options(num_cpus=4, num_gpus=2):
process.submit(42)
Resources¶
Refer to the prefect-ray API documentation linked in the sidebar to explore all the capabilities of the prefect-ray library.
For assistance using Dask, consult the Ray documentation.