prefect-shell¶
Execute shell commands from within Prefect flows.
Getting Started¶
Prerequisites¶
Install prefect-shell
for Prefect 2¶
pip install 'prefect[shell]<3'
Register newly installed block types¶
Register the block types in the prefect-aws module to make them available for use.
prefect block register -m prefect_shell
Examples¶
Integrate shell commands with Prefect flows¶
With prefect-shell, you can use shell commands (and/or scripts) in Prefect flows to provide observability and resiliency. Prefect-shell can be a useful tool if you're transitioning your orchestration from shell scripts to Prefect.
Let's get the shell-abration started!
The Python code below has shell commands embedded in a Prefect flow:
from prefect import flow
from datetime import datetime
from prefect_shell import ShellOperation
@flow
def download_data():
today = datetime.today().strftime("%Y%m%d")
# for short running operations, you can use the `run` method
# which automatically manages the context
ShellOperation(
commands=[
"mkdir -p data",
"mkdir -p data/${today}"
],
env={"today": today}
).run()
# for long running operations, you can use a context manager
with ShellOperation(
commands=[
"curl -O https://masie_web.apps.nsidc.org/pub/DATASETS/NOAA/G02135/north/daily/data/N_seaice_extent_daily_v3.0.csv",
],
working_dir=f"data/{today}",
) as download_csv_operation:
# trigger runs the process in the background
download_csv_process = download_csv_operation.trigger()
# then do other things here in the meantime, like download another file
...
# when you're ready, wait for the process to finish
download_csv_process.wait_for_completion()
# if you'd like to get the output lines, you can use the `fetch_result` method
output_lines = download_csv_process.fetch_result()
if __name__ == "__main__":
download_data()
Running this script results in output like this:
14:48:16.550 | INFO | prefect.engine - Created flow run 'tentacled-chachalaca' for flow 'download-data'
14:48:17.977 | INFO | Flow run 'tentacled-chachalaca' - PID 19360 triggered with 2 commands running inside the '.' directory.
14:48:17.987 | INFO | Flow run 'tentacled-chachalaca' - PID 19360 completed with return code 0.
14:48:17.994 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 triggered with 1 commands running inside the PosixPath('data/20230201') directory.
14:48:18.009 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
% Total % Received % Xferd Average Speed Time Time Time Current
Dl
14:48:18.010 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
oad Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
14:48:18.840 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
11 1630k 11 192k 0 0 229k 0 0:00:07 --:--:-- 0:00:07 231k
14:48:19.839 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
83 1630k 83 1368k 0 0 745k 0 0:00:02 0:00:01 0:00:01 747k
14:48:19.993 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
100 1630k 100 1630k 0 0 819k 0 0
14:48:19.994 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
:00:01 0:00:01 --:--:-- 821k
14:48:19.996 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 completed with return code 0.
14:48:19.998 | INFO | Flow run 'tentacled-chachalaca' - Successfully closed all open processes.
14:48:20.203 | INFO | Flow run 'tentacled-chachalaca' - Finished in state Completed()
Save shell commands in Prefect Blocks¶
You can save commands within a ShellOperation
block, then reuse them across multiple flows.
Save the block with desired commands:
from prefect_shell import ShellOperation
ping_op = ShellOperation(commands=["ping -t 1 prefect.io"])
ping_op.save("block-name")
# Load the saved block:
ping_op = ShellOperation.load("block-name")
Resources¶
Refer to the prefect-shell API documentation linked in the sidebar to explore all the capabilities of the prefect-shell library.