Bases: Serializer
Serializes objects using the pickle protocol.
If using cloudpickle, you may specify a list of 'pickle_modules'. These modules will
be serialized by value instead of by reference, which means they do not have to be
installed in the runtime location. This is especially useful for serializing objects
that rely on local packages.
Wraps pickles in base64 for safe transmission.
Source code in prefect/packaging/serializers.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | class PickleSerializer(Serializer):
"""
Serializes objects using the pickle protocol.
If using cloudpickle, you may specify a list of 'pickle_modules'. These modules will
be serialized by value instead of by reference, which means they do not have to be
installed in the runtime location. This is especially useful for serializing objects
that rely on local packages.
Wraps pickles in base64 for safe transmission.
"""
type: Literal["pickle"] = "pickle"
picklelib: str = "cloudpickle"
picklelib_version: str = None
pickle_modules: List[str] = pydantic.Field(default_factory=list)
@pydantic.validator("picklelib")
def check_picklelib(cls, value):
"""
Check that the given pickle library is importable and has dumps/loads methods.
"""
try:
pickler = from_qualified_name(value)
except (ImportError, AttributeError) as exc:
raise ValueError(
f"Failed to import requested pickle library: {value!r}."
) from exc
if not callable(getattr(pickler, "dumps", None)):
raise ValueError(
f"Pickle library at {value!r} does not have a 'dumps' method."
)
if not callable(getattr(pickler, "loads", None)):
raise ValueError(
f"Pickle library at {value!r} does not have a 'loads' method."
)
return value
@pydantic.root_validator
def check_picklelib_version(cls, values):
"""
Infers a default value for `picklelib_version` if null or ensures it matches
the version retrieved from the `pickelib`.
"""
picklelib = values.get("picklelib")
picklelib_version = values.get("picklelib_version")
if not picklelib:
raise ValueError("Unable to check version of unrecognized picklelib module")
pickler = from_qualified_name(picklelib)
pickler_version = getattr(pickler, "__version__", None)
if not picklelib_version:
values["picklelib_version"] = pickler_version
elif picklelib_version != pickler_version:
warnings.warn(
(
f"Mismatched {picklelib!r} versions. Found {pickler_version} in the"
f" environment but {picklelib_version} was requested. This may"
" cause the serializer to fail."
),
RuntimeWarning,
stacklevel=3,
)
return values
@pydantic.root_validator
def check_picklelib_and_modules(cls, values):
"""
Prevents modules from being specified if picklelib is not cloudpickle
"""
if values.get("picklelib") != "cloudpickle" and values.get("pickle_modules"):
raise ValueError(
"`pickle_modules` cannot be used without 'cloudpickle'. Got"
f" {values.get('picklelib')!r}."
)
return values
def dumps(self, obj: Any) -> bytes:
pickler = from_qualified_name(self.picklelib)
for module in self.pickle_modules:
pickler.register_pickle_by_value(from_qualified_name(module))
blob = pickler.dumps(obj)
for module in self.pickle_modules:
# Restore the pickler settings
pickler.unregister_pickle_by_value(from_qualified_name(module))
return base64.encodebytes(blob)
def loads(self, blob: bytes) -> Any:
pickler = from_qualified_name(self.picklelib)
return pickler.loads(base64.decodebytes(blob))
|
check_picklelib
Check that the given pickle library is importable and has dumps/loads methods.
Source code in prefect/packaging/serializers.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | @pydantic.validator("picklelib")
def check_picklelib(cls, value):
"""
Check that the given pickle library is importable and has dumps/loads methods.
"""
try:
pickler = from_qualified_name(value)
except (ImportError, AttributeError) as exc:
raise ValueError(
f"Failed to import requested pickle library: {value!r}."
) from exc
if not callable(getattr(pickler, "dumps", None)):
raise ValueError(
f"Pickle library at {value!r} does not have a 'dumps' method."
)
if not callable(getattr(pickler, "loads", None)):
raise ValueError(
f"Pickle library at {value!r} does not have a 'loads' method."
)
return value
|
check_picklelib_and_modules
Prevents modules from being specified if picklelib is not cloudpickle
Source code in prefect/packaging/serializers.py
99
100
101
102
103
104
105
106
107
108
109 | @pydantic.root_validator
def check_picklelib_and_modules(cls, values):
"""
Prevents modules from being specified if picklelib is not cloudpickle
"""
if values.get("picklelib") != "cloudpickle" and values.get("pickle_modules"):
raise ValueError(
"`pickle_modules` cannot be used without 'cloudpickle'. Got"
f" {values.get('picklelib')!r}."
)
return values
|
check_picklelib_version
Infers a default value for picklelib_version
if null or ensures it matches
the version retrieved from the pickelib
.
Source code in prefect/packaging/serializers.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 | @pydantic.root_validator
def check_picklelib_version(cls, values):
"""
Infers a default value for `picklelib_version` if null or ensures it matches
the version retrieved from the `pickelib`.
"""
picklelib = values.get("picklelib")
picklelib_version = values.get("picklelib_version")
if not picklelib:
raise ValueError("Unable to check version of unrecognized picklelib module")
pickler = from_qualified_name(picklelib)
pickler_version = getattr(pickler, "__version__", None)
if not picklelib_version:
values["picklelib_version"] = pickler_version
elif picklelib_version != pickler_version:
warnings.warn(
(
f"Mismatched {picklelib!r} versions. Found {pickler_version} in the"
f" environment but {picklelib_version} was requested. This may"
" cause the serializer to fail."
),
RuntimeWarning,
stacklevel=3,
)
return values
|