plexpy/lib/pydantic/_internal/_utils.py
2024-03-24 17:55:28 -07:00

363 lines
12 KiB
Python

"""Bucket of reusable internal utilities.
This should be reduced as much as possible with functions only used in one place, moved to that place.
"""
from __future__ import annotations as _annotations
import dataclasses
import keyword
import typing
import weakref
from collections import OrderedDict, defaultdict, deque
from copy import deepcopy
from itertools import zip_longest
from types import BuiltinFunctionType, CodeType, FunctionType, GeneratorType, LambdaType, ModuleType
from typing import Any, Mapping, TypeVar
from typing_extensions import TypeAlias, TypeGuard
from . import _repr, _typing_extra
if typing.TYPE_CHECKING:
MappingIntStrAny: TypeAlias = 'typing.Mapping[int, Any] | typing.Mapping[str, Any]'
AbstractSetIntStr: TypeAlias = 'typing.AbstractSet[int] | typing.AbstractSet[str]'
from ..main import BaseModel
# these are types that are returned unchanged by deepcopy
IMMUTABLE_NON_COLLECTIONS_TYPES: set[type[Any]] = {
int,
float,
complex,
str,
bool,
bytes,
type,
_typing_extra.NoneType,
FunctionType,
BuiltinFunctionType,
LambdaType,
weakref.ref,
CodeType,
# note: including ModuleType will differ from behaviour of deepcopy by not producing error.
# It might be not a good idea in general, but considering that this function used only internally
# against default values of fields, this will allow to actually have a field with module as default value
ModuleType,
NotImplemented.__class__,
Ellipsis.__class__,
}
# these are types that if empty, might be copied with simple copy() instead of deepcopy()
BUILTIN_COLLECTIONS: set[type[Any]] = {
list,
set,
tuple,
frozenset,
dict,
OrderedDict,
defaultdict,
deque,
}
def sequence_like(v: Any) -> bool:
return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque))
def lenient_isinstance(o: Any, class_or_tuple: type[Any] | tuple[type[Any], ...] | None) -> bool: # pragma: no cover
try:
return isinstance(o, class_or_tuple) # type: ignore[arg-type]
except TypeError:
return False
def lenient_issubclass(cls: Any, class_or_tuple: Any) -> bool: # pragma: no cover
try:
return isinstance(cls, type) and issubclass(cls, class_or_tuple)
except TypeError:
if isinstance(cls, _typing_extra.WithArgsTypes):
return False
raise # pragma: no cover
def is_model_class(cls: Any) -> TypeGuard[type[BaseModel]]:
"""Returns true if cls is a _proper_ subclass of BaseModel, and provides proper type-checking,
unlike raw calls to lenient_issubclass.
"""
from ..main import BaseModel
return lenient_issubclass(cls, BaseModel) and cls is not BaseModel
def is_valid_identifier(identifier: str) -> bool:
"""Checks that a string is a valid identifier and not a Python keyword.
:param identifier: The identifier to test.
:return: True if the identifier is valid.
"""
return identifier.isidentifier() and not keyword.iskeyword(identifier)
KeyType = TypeVar('KeyType')
def deep_update(mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]) -> dict[KeyType, Any]:
updated_mapping = mapping.copy()
for updating_mapping in updating_mappings:
for k, v in updating_mapping.items():
if k in updated_mapping and isinstance(updated_mapping[k], dict) and isinstance(v, dict):
updated_mapping[k] = deep_update(updated_mapping[k], v)
else:
updated_mapping[k] = v
return updated_mapping
def update_not_none(mapping: dict[Any, Any], **update: Any) -> None:
mapping.update({k: v for k, v in update.items() if v is not None})
T = TypeVar('T')
def unique_list(
input_list: list[T] | tuple[T, ...],
*,
name_factory: typing.Callable[[T], str] = str,
) -> list[T]:
"""Make a list unique while maintaining order.
We update the list if another one with the same name is set
(e.g. model validator overridden in subclass).
"""
result: list[T] = []
result_names: list[str] = []
for v in input_list:
v_name = name_factory(v)
if v_name not in result_names:
result_names.append(v_name)
result.append(v)
else:
result[result_names.index(v_name)] = v
return result
class ValueItems(_repr.Representation):
"""Class for more convenient calculation of excluded or included fields on values."""
__slots__ = ('_items', '_type')
def __init__(self, value: Any, items: AbstractSetIntStr | MappingIntStrAny) -> None:
items = self._coerce_items(items)
if isinstance(value, (list, tuple)):
items = self._normalize_indexes(items, len(value)) # type: ignore
self._items: MappingIntStrAny = items # type: ignore
def is_excluded(self, item: Any) -> bool:
"""Check if item is fully excluded.
:param item: key or index of a value
"""
return self.is_true(self._items.get(item))
def is_included(self, item: Any) -> bool:
"""Check if value is contained in self._items.
:param item: key or index of value
"""
return item in self._items
def for_element(self, e: int | str) -> AbstractSetIntStr | MappingIntStrAny | None:
""":param e: key or index of element on value
:return: raw values for element if self._items is dict and contain needed element
"""
item = self._items.get(e) # type: ignore
return item if not self.is_true(item) else None
def _normalize_indexes(self, items: MappingIntStrAny, v_length: int) -> dict[int | str, Any]:
""":param items: dict or set of indexes which will be normalized
:param v_length: length of sequence indexes of which will be
>>> self._normalize_indexes({0: True, -2: True, -1: True}, 4)
{0: True, 2: True, 3: True}
>>> self._normalize_indexes({'__all__': True}, 4)
{0: True, 1: True, 2: True, 3: True}
"""
normalized_items: dict[int | str, Any] = {}
all_items = None
for i, v in items.items():
if not (isinstance(v, typing.Mapping) or isinstance(v, typing.AbstractSet) or self.is_true(v)):
raise TypeError(f'Unexpected type of exclude value for index "{i}" {v.__class__}')
if i == '__all__':
all_items = self._coerce_value(v)
continue
if not isinstance(i, int):
raise TypeError(
'Excluding fields from a sequence of sub-models or dicts must be performed index-wise: '
'expected integer keys or keyword "__all__"'
)
normalized_i = v_length + i if i < 0 else i
normalized_items[normalized_i] = self.merge(v, normalized_items.get(normalized_i))
if not all_items:
return normalized_items
if self.is_true(all_items):
for i in range(v_length):
normalized_items.setdefault(i, ...)
return normalized_items
for i in range(v_length):
normalized_item = normalized_items.setdefault(i, {})
if not self.is_true(normalized_item):
normalized_items[i] = self.merge(all_items, normalized_item)
return normalized_items
@classmethod
def merge(cls, base: Any, override: Any, intersect: bool = False) -> Any:
"""Merge a `base` item with an `override` item.
Both `base` and `override` are converted to dictionaries if possible.
Sets are converted to dictionaries with the sets entries as keys and
Ellipsis as values.
Each key-value pair existing in `base` is merged with `override`,
while the rest of the key-value pairs are updated recursively with this function.
Merging takes place based on the "union" of keys if `intersect` is
set to `False` (default) and on the intersection of keys if
`intersect` is set to `True`.
"""
override = cls._coerce_value(override)
base = cls._coerce_value(base)
if override is None:
return base
if cls.is_true(base) or base is None:
return override
if cls.is_true(override):
return base if intersect else override
# intersection or union of keys while preserving ordering:
if intersect:
merge_keys = [k for k in base if k in override] + [k for k in override if k in base]
else:
merge_keys = list(base) + [k for k in override if k not in base]
merged: dict[int | str, Any] = {}
for k in merge_keys:
merged_item = cls.merge(base.get(k), override.get(k), intersect=intersect)
if merged_item is not None:
merged[k] = merged_item
return merged
@staticmethod
def _coerce_items(items: AbstractSetIntStr | MappingIntStrAny) -> MappingIntStrAny:
if isinstance(items, typing.Mapping):
pass
elif isinstance(items, typing.AbstractSet):
items = dict.fromkeys(items, ...) # type: ignore
else:
class_name = getattr(items, '__class__', '???')
raise TypeError(f'Unexpected type of exclude value {class_name}')
return items # type: ignore
@classmethod
def _coerce_value(cls, value: Any) -> Any:
if value is None or cls.is_true(value):
return value
return cls._coerce_items(value)
@staticmethod
def is_true(v: Any) -> bool:
return v is True or v is ...
def __repr_args__(self) -> _repr.ReprArgs:
return [(None, self._items)]
if typing.TYPE_CHECKING:
def ClassAttribute(name: str, value: T) -> T:
...
else:
class ClassAttribute:
"""Hide class attribute from its instances."""
__slots__ = 'name', 'value'
def __init__(self, name: str, value: Any) -> None:
self.name = name
self.value = value
def __get__(self, instance: Any, owner: type[Any]) -> None:
if instance is None:
return self.value
raise AttributeError(f'{self.name!r} attribute of {owner.__name__!r} is class-only')
Obj = TypeVar('Obj')
def smart_deepcopy(obj: Obj) -> Obj:
"""Return type as is for immutable built-in types
Use obj.copy() for built-in empty collections
Use copy.deepcopy() for non-empty collections and unknown objects.
"""
obj_type = obj.__class__
if obj_type in IMMUTABLE_NON_COLLECTIONS_TYPES:
return obj # fastest case: obj is immutable and not collection therefore will not be copied anyway
try:
if not obj and obj_type in BUILTIN_COLLECTIONS:
# faster way for empty collections, no need to copy its members
return obj if obj_type is tuple else obj.copy() # tuple doesn't have copy method # type: ignore
except (TypeError, ValueError, RuntimeError):
# do we really dare to catch ALL errors? Seems a bit risky
pass
return deepcopy(obj) # slowest way when we actually might need a deepcopy
_SENTINEL = object()
def all_identical(left: typing.Iterable[Any], right: typing.Iterable[Any]) -> bool:
"""Check that the items of `left` are the same objects as those in `right`.
>>> a, b = object(), object()
>>> all_identical([a, b, a], [a, b, a])
True
>>> all_identical([a, b, [a]], [a, b, [a]]) # new list object, while "equal" is not "identical"
False
"""
for left_item, right_item in zip_longest(left, right, fillvalue=_SENTINEL):
if left_item is not right_item:
return False
return True
@dataclasses.dataclass(frozen=True)
class SafeGetItemProxy:
"""Wrapper redirecting `__getitem__` to `get` with a sentinel value as default
This makes is safe to use in `operator.itemgetter` when some keys may be missing
"""
# Define __slots__manually for performances
# @dataclasses.dataclass() only support slots=True in python>=3.10
__slots__ = ('wrapped',)
wrapped: Mapping[str, Any]
def __getitem__(self, __key: str) -> Any:
return self.wrapped.get(__key, _SENTINEL)
# required to pass the object to operator.itemgetter() instances due to a quirk of typeshed
# https://github.com/python/mypy/issues/13713
# https://github.com/python/typeshed/pull/8785
# Since this is typing-only, hide it in a typing.TYPE_CHECKING block
if typing.TYPE_CHECKING:
def __contains__(self, __key: str) -> bool:
return self.wrapped.__contains__(__key)