Skip to content

prefect.packaging.serializers

ImportSerializer

Bases: Serializer

Serializes objects by storing their importable path.

Source code in prefect/packaging/serializers.py
188
189
190
191
192
193
194
195
196
197
198
199
class ImportSerializer(Serializer):
    """
    Serializes objects by storing their importable path.
    """

    type: Literal["import"] = "import"

    def dumps(self, obj: Any) -> bytes:
        return to_qualified_name(obj).encode()

    def loads(self, blob: bytes) -> Any:
        return from_qualified_name(blob.decode())

PickleSerializer

Bases: Serializer

Serializes objects using the pickle protocol.

If using cloudpickle, you may specify a list of 'pickle_modules'. These modules will be serialized by value instead of by reference, which means they do not have to be installed in the runtime location. This is especially useful for serializing objects that rely on local packages.

Wraps pickles in base64 for safe transmission.

Source code in prefect/packaging/serializers.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class PickleSerializer(Serializer):
    """
    Serializes objects using the pickle protocol.

    If using cloudpickle, you may specify a list of 'pickle_modules'. These modules will
    be serialized by value instead of by reference, which means they do not have to be
    installed in the runtime location. This is especially useful for serializing objects
    that rely on local packages.

    Wraps pickles in base64 for safe transmission.
    """

    type: Literal["pickle"] = "pickle"

    picklelib: str = "cloudpickle"
    picklelib_version: str = None
    pickle_modules: List[str] = pydantic.Field(default_factory=list)

    @pydantic.validator("picklelib")
    def check_picklelib(cls, value):
        """
        Check that the given pickle library is importable and has dumps/loads methods.
        """
        try:
            pickler = from_qualified_name(value)
        except (ImportError, AttributeError) as exc:
            raise ValueError(
                f"Failed to import requested pickle library: {value!r}."
            ) from exc

        if not callable(getattr(pickler, "dumps", None)):
            raise ValueError(
                f"Pickle library at {value!r} does not have a 'dumps' method."
            )

        if not callable(getattr(pickler, "loads", None)):
            raise ValueError(
                f"Pickle library at {value!r} does not have a 'loads' method."
            )

        return value

    @pydantic.root_validator
    def check_picklelib_version(cls, values):
        """
        Infers a default value for `picklelib_version` if null or ensures it matches
        the version retrieved from the `pickelib`.
        """
        picklelib = values.get("picklelib")
        picklelib_version = values.get("picklelib_version")

        if not picklelib:
            raise ValueError("Unable to check version of unrecognized picklelib module")

        pickler = from_qualified_name(picklelib)
        pickler_version = getattr(pickler, "__version__", None)

        if not picklelib_version:
            values["picklelib_version"] = pickler_version
        elif picklelib_version != pickler_version:
            warnings.warn(
                (
                    f"Mismatched {picklelib!r} versions. Found {pickler_version} in the"
                    f" environment but {picklelib_version} was requested. This may"
                    " cause the serializer to fail."
                ),
                RuntimeWarning,
                stacklevel=3,
            )

        return values

    @pydantic.root_validator
    def check_picklelib_and_modules(cls, values):
        """
        Prevents modules from being specified if picklelib is not cloudpickle
        """
        if values.get("picklelib") != "cloudpickle" and values.get("pickle_modules"):
            raise ValueError(
                "`pickle_modules` cannot be used without 'cloudpickle'. Got"
                f" {values.get('picklelib')!r}."
            )
        return values

    def dumps(self, obj: Any) -> bytes:
        pickler = from_qualified_name(self.picklelib)

        for module in self.pickle_modules:
            pickler.register_pickle_by_value(from_qualified_name(module))

        blob = pickler.dumps(obj)

        for module in self.pickle_modules:
            # Restore the pickler settings
            pickler.unregister_pickle_by_value(from_qualified_name(module))

        return base64.encodebytes(blob)

    def loads(self, blob: bytes) -> Any:
        pickler = from_qualified_name(self.picklelib)
        return pickler.loads(base64.decodebytes(blob))

check_picklelib

Check that the given pickle library is importable and has dumps/loads methods.

Source code in prefect/packaging/serializers.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@pydantic.validator("picklelib")
def check_picklelib(cls, value):
    """
    Check that the given pickle library is importable and has dumps/loads methods.
    """
    try:
        pickler = from_qualified_name(value)
    except (ImportError, AttributeError) as exc:
        raise ValueError(
            f"Failed to import requested pickle library: {value!r}."
        ) from exc

    if not callable(getattr(pickler, "dumps", None)):
        raise ValueError(
            f"Pickle library at {value!r} does not have a 'dumps' method."
        )

    if not callable(getattr(pickler, "loads", None)):
        raise ValueError(
            f"Pickle library at {value!r} does not have a 'loads' method."
        )

    return value

check_picklelib_and_modules

Prevents modules from being specified if picklelib is not cloudpickle

Source code in prefect/packaging/serializers.py
 99
100
101
102
103
104
105
106
107
108
109
@pydantic.root_validator
def check_picklelib_and_modules(cls, values):
    """
    Prevents modules from being specified if picklelib is not cloudpickle
    """
    if values.get("picklelib") != "cloudpickle" and values.get("pickle_modules"):
        raise ValueError(
            "`pickle_modules` cannot be used without 'cloudpickle'. Got"
            f" {values.get('picklelib')!r}."
        )
    return values

check_picklelib_version

Infers a default value for picklelib_version if null or ensures it matches the version retrieved from the pickelib.

Source code in prefect/packaging/serializers.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@pydantic.root_validator
def check_picklelib_version(cls, values):
    """
    Infers a default value for `picklelib_version` if null or ensures it matches
    the version retrieved from the `pickelib`.
    """
    picklelib = values.get("picklelib")
    picklelib_version = values.get("picklelib_version")

    if not picklelib:
        raise ValueError("Unable to check version of unrecognized picklelib module")

    pickler = from_qualified_name(picklelib)
    pickler_version = getattr(pickler, "__version__", None)

    if not picklelib_version:
        values["picklelib_version"] = pickler_version
    elif picklelib_version != pickler_version:
        warnings.warn(
            (
                f"Mismatched {picklelib!r} versions. Found {pickler_version} in the"
                f" environment but {picklelib_version} was requested. This may"
                " cause the serializer to fail."
            ),
            RuntimeWarning,
            stacklevel=3,
        )

    return values

SourceSerializer

Bases: Serializer

Serializes objects by retrieving the source code of the module they are defined in.

Creates a JSON blob with keys

Deserialization requires the code to run with exec.

Source code in prefect/packaging/serializers.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class SourceSerializer(Serializer):
    """
    Serializes objects by retrieving the source code of the module they are defined in.

    Creates a JSON blob with keys:
        source: The source code
        file_name: The name of the file the source was in
        symbol_name: The name of the object to extract from the source code

    Deserialization requires the code to run with `exec`.
    """

    type: Literal["source"] = "source"

    def dumps(self, obj: Any) -> bytes:
        module = inspect.getmodule(obj)

        if module is None:
            raise ValueError(f"Cannot determine source module for object: {obj!r}.")

        if not getattr(module, "__file__", None):
            raise ValueError(
                f"Found module {module!r} without source code file while serializing "
                f"object: {obj!r}."
            )

        source = inspect.getsource(module)

        return json.dumps(
            {
                "source": source,
                "file_name": os.path.basename(module.__file__),
                "symbol_name": obj.__name__,
            }
        ).encode()

    def loads(self, blob: bytes) -> Any:
        document = json.loads(blob)
        if not isinstance(document, dict) or set(document.keys()) != {
            "source",
            "file_name",
            "symbol_name",
        }:
            raise ValueError(
                "Invalid serialized data. "
                "Expected dictionary with keys 'source', 'file_name', and "
                "'symbol_name'. "
                f"Got: {document}"
            )

        with TemporaryDirectory() as tmpdir:
            temp_script = Path(tmpdir) / document["file_name"]
            temp_script.write_text(document["source"])
            module = load_script_as_module(str(temp_script))

        return getattr(module, document["symbol_name"])