Skip to content

prefect.packaging.docker

DockerPackageManifest

Bases: PackageManifest

Represents a flow packaged in a Docker image

Source code in prefect/packaging/docker.py
29
30
31
32
33
34
35
36
37
38
39
40
class DockerPackageManifest(PackageManifest):
    """
    Represents a flow packaged in a Docker image
    """

    type: Literal["docker"] = "docker"

    image: str
    image_flow_location: str

    async def unpackage(self) -> Flow:
        return load_flow_from_script(self.image_flow_location, self.flow_name)

DockerPackager

Bases: Packager

This packager builds a Docker image containing the flow and the runtime environment necessary to run the flow. The resulting image is optionally pushed to a container registry, given by registry_url.

Source code in prefect/packaging/docker.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class DockerPackager(Packager):
    """
    This packager builds a Docker image containing the flow and the runtime environment
    necessary to run the flow.  The resulting image is optionally pushed to a container
    registry, given by `registry_url`.
    """

    type: Literal["docker"] = "docker"

    base_image: Optional[str] = None
    python_environment: Optional[Union[PythonEnvironment, CondaEnvironment]] = None
    dockerfile: Optional[Path] = None
    platform: Optional[str] = (None,)
    image_flow_location: str = "/flow.py"
    registry_url: Optional[AnyHttpUrl] = None

    @root_validator
    def set_default_base_image(cls, values):
        if not values.get("base_image") and not values.get("dockerfile"):
            values["base_image"] = get_prefect_image_name(
                flavor=(
                    "conda"
                    if isinstance(values.get("python_environment"), CondaEnvironment)
                    else None
                )
            )
        return values

    @root_validator
    def base_image_and_dockerfile_exclusive(cls, values: Mapping[str, Any]):
        if values.get("base_image") and values.get("dockerfile"):
            raise ValueError(
                "Either `base_image` or `dockerfile` should be provided, but not both"
            )
        return values

    @root_validator
    def default_python_environment(cls, values: Mapping[str, Any]):
        if values.get("base_image") and not values.get("python_environment"):
            values["python_environment"] = PythonEnvironment.from_environment()
        return values

    @validator("registry_url", pre=True)
    def ensure_registry_url_is_prefixed(cls, value):
        if isinstance(value, str):
            if "://" not in value:
                return "https://" + value
        return value

    async def package(self, flow: Flow) -> DockerPackageManifest:
        """
        Package a flow as a Docker image and, optionally, push it to a registry
        """
        image_reference = await self._build_image(flow)

        if self.registry_url:
            image_name = f"{slugify(flow.name)}"
            image_reference = await run_sync_in_worker_thread(
                push_image, image_reference, self.registry_url, image_name
            )

        return self.base_manifest(flow).finalize(
            image=image_reference, image_flow_location=self.image_flow_location
        )

    async def _build_image(self, flow: Flow) -> str:
        if self.dockerfile:
            return await self._build_from_dockerfile()
        return await self._build_from_base_image(flow)

    async def _build_from_dockerfile(self) -> str:
        context = self.dockerfile.resolve().parent
        dockerfile = self.dockerfile.relative_to(context)
        return await run_sync_in_worker_thread(
            build_image,
            platform=self.platform,
            context=context,
            dockerfile=str(dockerfile),
        )

    async def _build_from_base_image(self, flow: Flow) -> str:
        with ImageBuilder(
            base_image=self.base_image, platform=self.platform
        ) as builder:
            for command in self.python_environment.install_commands():
                builder.add_line(to_run_command(command))

            source_info = json.loads(SourceSerializer().dumps(flow))

            builder.write_text(source_info["source"], self.image_flow_location)

            return await run_sync_in_worker_thread(
                builder.build, stream_progress_to=sys.stdout
            )

package async

Package a flow as a Docker image and, optionally, push it to a registry

Source code in prefect/packaging/docker.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
async def package(self, flow: Flow) -> DockerPackageManifest:
    """
    Package a flow as a Docker image and, optionally, push it to a registry
    """
    image_reference = await self._build_image(flow)

    if self.registry_url:
        image_name = f"{slugify(flow.name)}"
        image_reference = await run_sync_in_worker_thread(
            push_image, image_reference, self.registry_url, image_name
        )

    return self.base_manifest(flow).finalize(
        image=image_reference, image_flow_location=self.image_flow_location
    )