Skip to content

prefect_shell.commands

Tasks for interacting with shell commands

ShellOperation

Bases: JobBlock

A block representing a shell operation, containing multiple commands.

For long-lasting operations, use the trigger method and utilize the block as a context manager for automatic closure of processes when context is exited. If not, manually call the close method to close processes.

For short-lasting operations, use the run method. Context is automatically managed with this method.

Attributes:

Name Type Description
commands List[str]

A list of commands to execute sequentially.

stream_output bool

Whether to stream output.

env Dict[str, str]

A dictionary of environment variables to set for the shell operation.

working_dir DirectoryPath

The working directory context the commands will be executed within.

shell str

The shell to use to execute the commands.

extension Optional[str]

The extension to use for the temporary file. if unset defaults to .ps1 on Windows and .sh on other platforms.

Examples:

Load a configured block:

from prefect_shell import ShellOperation

shell_operation = ShellOperation.load("BLOCK_NAME")

Source code in prefect_shell/commands.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
class ShellOperation(JobBlock):
    """
    A block representing a shell operation, containing multiple commands.

    For long-lasting operations, use the trigger method and utilize the block as a
    context manager for automatic closure of processes when context is exited.
    If not, manually call the close method to close processes.

    For short-lasting operations, use the run method. Context is automatically managed
    with this method.

    Attributes:
        commands: A list of commands to execute sequentially.
        stream_output: Whether to stream output.
        env: A dictionary of environment variables to set for the shell operation.
        working_dir: The working directory context the commands
            will be executed within.
        shell: The shell to use to execute the commands.
        extension: The extension to use for the temporary file.
            if unset defaults to `.ps1` on Windows and `.sh` on other platforms.

    Examples:
        Load a configured block:
        ```python
        from prefect_shell import ShellOperation

        shell_operation = ShellOperation.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "Shell Operation"
    _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/0b47a017e1b40381de770c17647c49cdf6388d1c-250x250.png"  # noqa: E501
    _documentation_url = "https://prefecthq.github.io/prefect-shell/commands/#prefect_shell.commands.ShellOperation"  # noqa: E501

    commands: List[str] = Field(
        default=..., description="A list of commands to execute sequentially."
    )
    stream_output: bool = Field(default=True, description="Whether to stream output.")
    env: Dict[str, str] = Field(
        default_factory=dict,
        title="Environment Variables",
        description="Environment variables to use for the subprocess.",
    )
    working_dir: DirectoryPath = Field(
        default=None,
        title="Working Directory",
        description=(
            "The absolute path to the working directory "
            "the command will be executed within."
        ),
    )
    shell: str = Field(
        default=None,
        description=(
            "The shell to run the command with; if unset, "
            "defaults to `powershell` on Windows and `bash` on other platforms."
        ),
    )
    extension: Optional[str] = Field(
        default=None,
        description=(
            "The extension to use for the temporary file; if unset, "
            "defaults to `.ps1` on Windows and `.sh` on other platforms."
        ),
    )

    _exit_stack: AsyncExitStack = PrivateAttr(
        default_factory=AsyncExitStack,
    )

    @contextmanager
    def _prep_trigger_command(self) -> Generator[str, None, None]:
        """
        Write the commands to a temporary file, handling all the details of
        creating the file and cleaning it up afterwards. Then, return the command
        to run the temporary file.
        """
        try:
            extension = self.extension or (".ps1" if sys.platform == "win32" else ".sh")
            temp_file = tempfile.NamedTemporaryFile(
                prefix="prefect-",
                suffix=extension,
                delete=False,
            )

            joined_commands = os.linesep.join(self.commands)
            self.logger.debug(
                f"Writing the following commands to "
                f"{temp_file.name!r}:{os.linesep}{joined_commands}"
            )
            temp_file.write(joined_commands.encode())

            if self.shell is None and sys.platform == "win32" or extension == ".ps1":
                shell = "powershell"
            elif self.shell is None:
                shell = "bash"
            else:
                shell = self.shell.lower()

            if shell == "powershell":
                # if powershell, set exit code to that of command
                temp_file.write("\r\nExit $LastExitCode".encode())
            temp_file.close()

            trigger_command = [shell, temp_file.name]
            yield trigger_command
        finally:
            if os.path.exists(temp_file.name):
                os.remove(temp_file.name)

    def _compile_kwargs(self, **open_kwargs: Dict[str, Any]) -> Dict[str, Any]:
        """
        Helper method to compile the kwargs for `open_process` so it's not repeated
        across the run and trigger methods.
        """
        trigger_command = self._exit_stack.enter_context(self._prep_trigger_command())
        input_env = os.environ.copy()
        input_env.update(self.env)
        input_open_kwargs = dict(
            command=trigger_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=input_env,
            cwd=self.working_dir,
            **open_kwargs,
        )
        return input_open_kwargs

    @sync_compatible
    async def trigger(self, **open_kwargs: Dict[str, Any]) -> ShellProcess:
        """
        Triggers a shell command and returns the shell command run object
        to track the execution of the run. This method is ideal for long-lasting
        shell commands; for short-lasting shell commands, it is recommended
        to use the `run` method instead.

        Args:
            **open_kwargs: Additional keyword arguments to pass to `open_process`.

        Returns:
            A `ShellProcess` object.

        Examples:
            Sleep for 5 seconds and then print "Hello, world!":
            ```python
            from prefect_shell import ShellOperation

            with ShellOperation(
                commands=["sleep 5", "echo 'Hello, world!'"],
            ) as shell_operation:
                shell_process = shell_operation.trigger()
                shell_process.wait_for_completion()
                shell_output = shell_process.fetch_result()
            ```
        """
        input_open_kwargs = self._compile_kwargs(**open_kwargs)
        process = await self._exit_stack.enter_async_context(
            open_process(**input_open_kwargs)
        )
        num_commands = len(self.commands)
        self.logger.info(
            f"PID {process.pid} triggered with {num_commands} commands running "
            f"inside the {(self.working_dir or '.')!r} directory."
        )
        shell_process = ShellProcess(shell_operation=self, process=process)
        await asyncio.create_task(shell_process._capture_output(process.stdout))
        await asyncio.create_task(shell_process._capture_output(process.stderr))
        return shell_process

    @sync_compatible
    async def run(self, **open_kwargs: Dict[str, Any]) -> List[str]:
        """
        Runs a shell command, but unlike the trigger method,
        additionally waits and fetches the result directly, automatically managing
        the context. This method is ideal for short-lasting shell commands;
        for long-lasting shell commands, it is
        recommended to use the `trigger` method instead.

        Args:
            **open_kwargs: Additional keyword arguments to pass to `open_process`.

        Returns:
            The lines output from the shell command as a list.

        Examples:
            Sleep for 5 seconds and then print "Hello, world!":
            ```python
            from prefect_shell import ShellOperation

            shell_output = ShellOperation(
                commands=["sleep 5", "echo 'Hello, world!'"]
            ).run()
            ```
        """
        input_open_kwargs = self._compile_kwargs(**open_kwargs)
        async with open_process(**input_open_kwargs) as process:
            shell_process = ShellProcess(shell_operation=self, process=process)
            num_commands = len(self.commands)
            self.logger.info(
                f"PID {process.pid} triggered with {num_commands} commands running "
                f"inside the {(self.working_dir or '.')!r} directory."
            )
            await asyncio.gather(
                shell_process._capture_output(process.stdout),
                shell_process._capture_output(process.stderr),
            )
            await shell_process.wait_for_completion()
            result = await shell_process.fetch_result()

        return result

    @sync_compatible
    async def close(self):
        """
        Close the job block.
        """
        await self._exit_stack.aclose()
        self.logger.info("Successfully closed all open processes.")

    async def aclose(self):
        """
        Asynchronous version of the close method.
        """
        await self.close()

    async def __aenter__(self) -> "ShellOperation":
        """
        Asynchronous version of the enter method.
        """
        return self

    async def __aexit__(self, *exc_info):
        """
        Asynchronous version of the exit method.
        """
        await self.close()

    def __enter__(self) -> "ShellOperation":
        """
        Enter the context of the job block.
        """
        return self

    def __exit__(self, *exc_info):
        """
        Exit the context of the job block.
        """
        self.close()

aclose async

Asynchronous version of the close method.

Source code in prefect_shell/commands.py
418
419
420
421
422
async def aclose(self):
    """
    Asynchronous version of the close method.
    """
    await self.close()

close async

Close the job block.

Source code in prefect_shell/commands.py
410
411
412
413
414
415
416
@sync_compatible
async def close(self):
    """
    Close the job block.
    """
    await self._exit_stack.aclose()
    self.logger.info("Successfully closed all open processes.")

run async

Runs a shell command, but unlike the trigger method, additionally waits and fetches the result directly, automatically managing the context. This method is ideal for short-lasting shell commands; for long-lasting shell commands, it is recommended to use the trigger method instead.

Parameters:

Name Type Description Default
**open_kwargs Dict[str, Any]

Additional keyword arguments to pass to open_process.

{}

Returns:

Type Description
List[str]

The lines output from the shell command as a list.

Examples:

Sleep for 5 seconds and then print "Hello, world!":

from prefect_shell import ShellOperation

shell_output = ShellOperation(
    commands=["sleep 5", "echo 'Hello, world!'"]
).run()

Source code in prefect_shell/commands.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
@sync_compatible
async def run(self, **open_kwargs: Dict[str, Any]) -> List[str]:
    """
    Runs a shell command, but unlike the trigger method,
    additionally waits and fetches the result directly, automatically managing
    the context. This method is ideal for short-lasting shell commands;
    for long-lasting shell commands, it is
    recommended to use the `trigger` method instead.

    Args:
        **open_kwargs: Additional keyword arguments to pass to `open_process`.

    Returns:
        The lines output from the shell command as a list.

    Examples:
        Sleep for 5 seconds and then print "Hello, world!":
        ```python
        from prefect_shell import ShellOperation

        shell_output = ShellOperation(
            commands=["sleep 5", "echo 'Hello, world!'"]
        ).run()
        ```
    """
    input_open_kwargs = self._compile_kwargs(**open_kwargs)
    async with open_process(**input_open_kwargs) as process:
        shell_process = ShellProcess(shell_operation=self, process=process)
        num_commands = len(self.commands)
        self.logger.info(
            f"PID {process.pid} triggered with {num_commands} commands running "
            f"inside the {(self.working_dir or '.')!r} directory."
        )
        await asyncio.gather(
            shell_process._capture_output(process.stdout),
            shell_process._capture_output(process.stderr),
        )
        await shell_process.wait_for_completion()
        result = await shell_process.fetch_result()

    return result

trigger async

Triggers a shell command and returns the shell command run object to track the execution of the run. This method is ideal for long-lasting shell commands; for short-lasting shell commands, it is recommended to use the run method instead.

Parameters:

Name Type Description Default
**open_kwargs Dict[str, Any]

Additional keyword arguments to pass to open_process.

{}

Returns:

Type Description
ShellProcess

A ShellProcess object.

Examples:

Sleep for 5 seconds and then print "Hello, world!":

from prefect_shell import ShellOperation

with ShellOperation(
    commands=["sleep 5", "echo 'Hello, world!'"],
) as shell_operation:
    shell_process = shell_operation.trigger()
    shell_process.wait_for_completion()
    shell_output = shell_process.fetch_result()

Source code in prefect_shell/commands.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@sync_compatible
async def trigger(self, **open_kwargs: Dict[str, Any]) -> ShellProcess:
    """
    Triggers a shell command and returns the shell command run object
    to track the execution of the run. This method is ideal for long-lasting
    shell commands; for short-lasting shell commands, it is recommended
    to use the `run` method instead.

    Args:
        **open_kwargs: Additional keyword arguments to pass to `open_process`.

    Returns:
        A `ShellProcess` object.

    Examples:
        Sleep for 5 seconds and then print "Hello, world!":
        ```python
        from prefect_shell import ShellOperation

        with ShellOperation(
            commands=["sleep 5", "echo 'Hello, world!'"],
        ) as shell_operation:
            shell_process = shell_operation.trigger()
            shell_process.wait_for_completion()
            shell_output = shell_process.fetch_result()
        ```
    """
    input_open_kwargs = self._compile_kwargs(**open_kwargs)
    process = await self._exit_stack.enter_async_context(
        open_process(**input_open_kwargs)
    )
    num_commands = len(self.commands)
    self.logger.info(
        f"PID {process.pid} triggered with {num_commands} commands running "
        f"inside the {(self.working_dir or '.')!r} directory."
    )
    shell_process = ShellProcess(shell_operation=self, process=process)
    await asyncio.create_task(shell_process._capture_output(process.stdout))
    await asyncio.create_task(shell_process._capture_output(process.stderr))
    return shell_process

ShellProcess

Bases: JobRun

A class representing a shell process.

Source code in prefect_shell/commands.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class ShellProcess(JobRun):
    """
    A class representing a shell process.
    """

    def __init__(self, shell_operation: "ShellOperation", process: Process):
        self._shell_operation = shell_operation
        self._process = process
        self._output = []

    @property
    def pid(self) -> int:
        """
        The PID of the process.

        Returns:
            The PID of the process.
        """
        return self._process.pid

    @property
    def return_code(self) -> Optional[int]:
        """
        The return code of the process.

        Returns:
            The return code of the process, or `None` if the process is still running.
        """
        return self._process.returncode

    async def _capture_output(self, source):
        """
        Capture output from source.
        """
        async for output in TextReceiveStream(source):
            text = output.rstrip()
            if self._shell_operation.stream_output:
                self.logger.info(f"PID {self.pid} stream output:{os.linesep}{text}")
            self._output.extend(text.split(os.linesep))

    @sync_compatible
    async def wait_for_completion(self) -> None:
        """
        Wait for the shell command to complete after a process is triggered.
        """
        self.logger.debug(f"Waiting for PID {self.pid} to complete.")

        await self._process.wait()

        if self.return_code != 0:
            raise RuntimeError(
                f"PID {self.pid} failed with return code {self.return_code}."
            )
        self.logger.info(
            f"PID {self.pid} completed with return code {self.return_code}."
        )

    @sync_compatible
    async def fetch_result(self) -> List[str]:
        """
        Retrieve the output of the shell operation.

        Returns:
            The lines output from the shell operation as a list.
        """
        if self._process.returncode is None:
            self.logger.info("Process is still running, result may be incomplete.")
        return self._output

pid: int property

The PID of the process.

Returns:

Type Description
int

The PID of the process.

return_code: Optional[int] property

The return code of the process.

Returns:

Type Description
Optional[int]

The return code of the process, or None if the process is still running.

fetch_result async

Retrieve the output of the shell operation.

Returns:

Type Description
List[str]

The lines output from the shell operation as a list.

Source code in prefect_shell/commands.py
186
187
188
189
190
191
192
193
194
195
196
@sync_compatible
async def fetch_result(self) -> List[str]:
    """
    Retrieve the output of the shell operation.

    Returns:
        The lines output from the shell operation as a list.
    """
    if self._process.returncode is None:
        self.logger.info("Process is still running, result may be incomplete.")
    return self._output

wait_for_completion async

Wait for the shell command to complete after a process is triggered.

Source code in prefect_shell/commands.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@sync_compatible
async def wait_for_completion(self) -> None:
    """
    Wait for the shell command to complete after a process is triggered.
    """
    self.logger.debug(f"Waiting for PID {self.pid} to complete.")

    await self._process.wait()

    if self.return_code != 0:
        raise RuntimeError(
            f"PID {self.pid} failed with return code {self.return_code}."
        )
    self.logger.info(
        f"PID {self.pid} completed with return code {self.return_code}."
    )

shell_run_command async

Runs arbitrary shell commands.

Parameters:

Name Type Description Default
command str

Shell command to be executed; can also be provided post-initialization by calling this task instance.

required
env Optional[dict]

Dictionary of environment variables to use for the subprocess; can also be provided at runtime.

None
helper_command Optional[str]

String representing a shell command, which will be executed prior to the command in the same process. Can be used to change directories, define helper functions, etc. for different commands in a flow.

None
shell Optional[str]

Shell to run the command with.

None
extension Optional[str]

File extension to be appended to the command to be executed.

None
return_all bool

Whether this task should return all lines of stdout as a list, or just the last line as a string.

False
stream_level int

The logging level of the stream; defaults to 20 equivalent to logging.INFO.

INFO
cwd Union[str, bytes, PathLike, None]

The working directory context the command will be executed within

None

Returns:

Type Description
Union[List, str]

If return all, returns all lines as a list; else the last line as a string.

Example

List contents in the current directory.

from prefect import flow
from prefect_shell import shell_run_command

@flow
def example_shell_run_command_flow():
    return shell_run_command(command="ls .", return_all=True)

example_shell_run_command_flow()

Source code in prefect_shell/commands.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@task
async def shell_run_command(
    command: str,
    env: Optional[dict] = None,
    helper_command: Optional[str] = None,
    shell: Optional[str] = None,
    extension: Optional[str] = None,
    return_all: bool = False,
    stream_level: int = logging.INFO,
    cwd: Union[str, bytes, os.PathLike, None] = None,
) -> Union[List, str]:
    """
    Runs arbitrary shell commands.

    Args:
        command: Shell command to be executed; can also be
            provided post-initialization by calling this task instance.
        env: Dictionary of environment variables to use for
            the subprocess; can also be provided at runtime.
        helper_command: String representing a shell command, which
            will be executed prior to the `command` in the same process.
            Can be used to change directories, define helper functions, etc.
            for different commands in a flow.
        shell: Shell to run the command with.
        extension: File extension to be appended to the command to be executed.
        return_all: Whether this task should return all lines of stdout as a list,
            or just the last line as a string.
        stream_level: The logging level of the stream;
            defaults to 20 equivalent to `logging.INFO`.
        cwd: The working directory context the command will be executed within

    Returns:
        If return all, returns all lines as a list; else the last line as a string.

    Example:
        List contents in the current directory.
        ```python
        from prefect import flow
        from prefect_shell import shell_run_command

        @flow
        def example_shell_run_command_flow():
            return shell_run_command(command="ls .", return_all=True)

        example_shell_run_command_flow()
        ```
    """
    logger = get_run_logger()

    current_env = os.environ.copy()
    current_env.update(env or {})

    if shell is None:
        # if shell is not specified:
        # use powershell for windows
        # use bash for other platforms
        shell = "powershell" if sys.platform == "win32" else "bash"

    extension = ".ps1" if shell.lower() == "powershell" else extension

    tmp = tempfile.NamedTemporaryFile(prefix="prefect-", suffix=extension, delete=False)
    try:
        if helper_command:
            tmp.write(helper_command.encode())
            tmp.write(os.linesep.encode())
        tmp.write(command.encode())
        if shell.lower() == "powershell":
            # if powershell, set exit code to that of command
            tmp.write("\r\nExit $LastExitCode".encode())
        tmp.close()

        shell_command = [shell, tmp.name]

        lines = []
        async with await anyio.open_process(
            shell_command, env=current_env, cwd=cwd
        ) as process:
            async for text in TextReceiveStream(process.stdout):
                logger.log(level=stream_level, msg=text)
                lines.extend(text.rstrip().split("\n"))

            await process.wait()
            if process.returncode:
                stderr = "\n".join(
                    [text async for text in TextReceiveStream(process.stderr)]
                )
                if not stderr and lines:
                    stderr = f"{lines[-1]}\n"
                msg = (
                    f"Command failed with exit code {process.returncode}:\n" f"{stderr}"
                )
                raise RuntimeError(msg)
    finally:
        if os.path.exists(tmp.name):
            os.remove(tmp.name)

    line = lines[-1] if lines else ""
    return lines if return_all else line