Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExceptionGroup #5254

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add python-implemented ExceptionGroup
  • Loading branch information
youknowone committed Jun 22, 2024
commit 1815c43d7c191be63b0fa19a4f75ae3fa4fdb529
2 changes: 0 additions & 2 deletions Lib/test/test_baseexception.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ def verify_instance_interface(self, ins):
"%s missing %s attribute" %
(ins.__class__.__name__, attr))

# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_inheritance(self):
# Make sure the inheritance hierarchy matches the documentation
exc_set = set()
Expand Down
1 change: 1 addition & 0 deletions src/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub type InitHook = Box<dyn FnOnce(&mut VirtualMachine)>;
/// let mut settings = Settings::default();
/// settings.debug = true;
/// // You may want to add paths to `rustpython_vm::Settings::path_list` to allow import python libraries.
/// settings.path_list.push("Lib".to_owned()); // add standard library directory
/// settings.path_list.push("".to_owned()); // add current working directory
/// let interpreter = rustpython::InterpreterConfig::new()
/// .settings(settings)
Expand Down
330 changes: 330 additions & 0 deletions vm/Lib/python_builtins/_py_exceptiongroup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
# Copied from https://github.com/agronholm/ExceptionGroup/blob/1.2.1/src/exceptiongroup/_exceptions.py
# License: https://github.com/agronholm/exceptiongroup/blob/1.2.1/LICENSE
from __future__ import annotations

from collections.abc import Callable, Sequence
from functools import partial
from typing import TYPE_CHECKING, Generic, Type, TypeVar, cast, overload

_BaseExceptionT_co = TypeVar("_BaseExceptionT_co", bound=BaseException, covariant=True)
_BaseExceptionT = TypeVar("_BaseExceptionT", bound=BaseException)
_ExceptionT_co = TypeVar("_ExceptionT_co", bound=Exception, covariant=True)
_ExceptionT = TypeVar("_ExceptionT", bound=Exception)
# using typing.Self would require a typing_extensions dependency on py<3.11
_ExceptionGroupSelf = TypeVar("_ExceptionGroupSelf", bound="ExceptionGroup")
_BaseExceptionGroupSelf = TypeVar("_BaseExceptionGroupSelf", bound="BaseExceptionGroup")


def check_direct_subclass(
exc: BaseException, parents: tuple[type[BaseException]]
) -> bool:
from inspect import getmro # requires rustpython-stdlib

for cls in getmro(exc.__class__)[:-1]:
if cls in parents:
return True

return False


def get_condition_filter(
condition: type[_BaseExceptionT]
| tuple[type[_BaseExceptionT], ...]
| Callable[[_BaseExceptionT_co], bool],
) -> Callable[[_BaseExceptionT_co], bool]:
from inspect import isclass # requires rustpython-stdlib

if isclass(condition) and issubclass(
cast(Type[BaseException], condition), BaseException
):
return partial(check_direct_subclass, parents=(condition,))
elif isinstance(condition, tuple):
if all(isclass(x) and issubclass(x, BaseException) for x in condition):
return partial(check_direct_subclass, parents=condition)
elif callable(condition):
return cast("Callable[[BaseException], bool]", condition)

raise TypeError("expected a function, exception type or tuple of exception types")


def _derive_and_copy_attributes(self, excs):
eg = self.derive(excs)
eg.__cause__ = self.__cause__
eg.__context__ = self.__context__
eg.__traceback__ = self.__traceback__
if hasattr(self, "__notes__"):
# Create a new list so that add_note() only affects one exceptiongroup
eg.__notes__ = list(self.__notes__)
return eg


class BaseExceptionGroup(BaseException, Generic[_BaseExceptionT_co]):
"""A combination of multiple unrelated exceptions."""

def __new__(
cls: type[_BaseExceptionGroupSelf],
__message: str,
__exceptions: Sequence[_BaseExceptionT_co],
) -> _BaseExceptionGroupSelf:
if not isinstance(__message, str):
raise TypeError(f"argument 1 must be str, not {type(__message)}")
if not isinstance(__exceptions, Sequence):
raise TypeError("second argument (exceptions) must be a sequence")
if not __exceptions:
raise ValueError(
"second argument (exceptions) must be a non-empty sequence"
)

for i, exc in enumerate(__exceptions):
if not isinstance(exc, BaseException):
raise ValueError(
f"Item {i} of second argument (exceptions) is not an exception"
)

if cls is BaseExceptionGroup:
if all(isinstance(exc, Exception) for exc in __exceptions):
cls = ExceptionGroup

if issubclass(cls, Exception):
for exc in __exceptions:
if not isinstance(exc, Exception):
if cls is ExceptionGroup:
raise TypeError(
"Cannot nest BaseExceptions in an ExceptionGroup"
)
else:
raise TypeError(
f"Cannot nest BaseExceptions in {cls.__name__!r}"
)

instance = super().__new__(cls, __message, __exceptions)
instance._message = __message
instance._exceptions = __exceptions
return instance

def add_note(self, note: str) -> None:
if not isinstance(note, str):
raise TypeError(
f"Expected a string, got note={note!r} (type {type(note).__name__})"
)

if not hasattr(self, "__notes__"):
self.__notes__: list[str] = []

self.__notes__.append(note)

@property
def message(self) -> str:
return self._message

@property
def exceptions(
self,
) -> tuple[_BaseExceptionT_co | BaseExceptionGroup[_BaseExceptionT_co], ...]:
return tuple(self._exceptions)

@overload
def subgroup(
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
) -> ExceptionGroup[_ExceptionT] | None: ...

@overload
def subgroup(
self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...]
) -> BaseExceptionGroup[_BaseExceptionT] | None: ...

@overload
def subgroup(
self,
__condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
) -> BaseExceptionGroup[_BaseExceptionT_co] | None: ...

def subgroup(
self,
__condition: type[_BaseExceptionT]
| tuple[type[_BaseExceptionT], ...]
| Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
) -> BaseExceptionGroup[_BaseExceptionT] | None:
condition = get_condition_filter(__condition)
modified = False
if condition(self):
return self

exceptions: list[BaseException] = []
for exc in self.exceptions:
if isinstance(exc, BaseExceptionGroup):
subgroup = exc.subgroup(__condition)
if subgroup is not None:
exceptions.append(subgroup)

if subgroup is not exc:
modified = True
elif condition(exc):
exceptions.append(exc)
else:
modified = True

if not modified:
return self
elif exceptions:
group = _derive_and_copy_attributes(self, exceptions)
return group
else:
return None

@overload
def split(
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
) -> tuple[
ExceptionGroup[_ExceptionT] | None,
BaseExceptionGroup[_BaseExceptionT_co] | None,
]: ...

@overload
def split(
self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...]
) -> tuple[
BaseExceptionGroup[_BaseExceptionT] | None,
BaseExceptionGroup[_BaseExceptionT_co] | None,
]: ...

@overload
def split(
self,
__condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool],
) -> tuple[
BaseExceptionGroup[_BaseExceptionT_co] | None,
BaseExceptionGroup[_BaseExceptionT_co] | None,
]: ...

def split(
self,
__condition: type[_BaseExceptionT]
| tuple[type[_BaseExceptionT], ...]
| Callable[[_BaseExceptionT_co], bool],
) -> (
tuple[
ExceptionGroup[_ExceptionT] | None,
BaseExceptionGroup[_BaseExceptionT_co] | None,
]
| tuple[
BaseExceptionGroup[_BaseExceptionT] | None,
BaseExceptionGroup[_BaseExceptionT_co] | None,
]
| tuple[
BaseExceptionGroup[_BaseExceptionT_co] | None,
BaseExceptionGroup[_BaseExceptionT_co] | None,
]
):
condition = get_condition_filter(__condition)
if condition(self):
return self, None

matching_exceptions: list[BaseException] = []
nonmatching_exceptions: list[BaseException] = []
for exc in self.exceptions:
if isinstance(exc, BaseExceptionGroup):
matching, nonmatching = exc.split(condition)
if matching is not None:
matching_exceptions.append(matching)

if nonmatching is not None:
nonmatching_exceptions.append(nonmatching)
elif condition(exc):
matching_exceptions.append(exc)
else:
nonmatching_exceptions.append(exc)

matching_group: _BaseExceptionGroupSelf | None = None
if matching_exceptions:
matching_group = _derive_and_copy_attributes(self, matching_exceptions)

nonmatching_group: _BaseExceptionGroupSelf | None = None
if nonmatching_exceptions:
nonmatching_group = _derive_and_copy_attributes(
self, nonmatching_exceptions
)

return matching_group, nonmatching_group

@overload
def derive(self, __excs: Sequence[_ExceptionT]) -> ExceptionGroup[_ExceptionT]: ...

@overload
def derive(
self, __excs: Sequence[_BaseExceptionT]
) -> BaseExceptionGroup[_BaseExceptionT]: ...

def derive(
self, __excs: Sequence[_BaseExceptionT]
) -> BaseExceptionGroup[_BaseExceptionT]:
return BaseExceptionGroup(self.message, __excs)

def __str__(self) -> str:
suffix = "" if len(self._exceptions) == 1 else "s"
return f"{self.message} ({len(self._exceptions)} sub-exception{suffix})"

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.message!r}, {self._exceptions!r})"


class ExceptionGroup(BaseExceptionGroup[_ExceptionT_co], Exception):
def __new__(
cls: type[_ExceptionGroupSelf],
__message: str,
__exceptions: Sequence[_ExceptionT_co],
) -> _ExceptionGroupSelf:
return super().__new__(cls, __message, __exceptions)

if TYPE_CHECKING:

@property
def exceptions(
self,
) -> tuple[_ExceptionT_co | ExceptionGroup[_ExceptionT_co], ...]: ...

@overload # type: ignore[override]
def subgroup(
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
) -> ExceptionGroup[_ExceptionT] | None: ...

@overload
def subgroup(
self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool]
) -> ExceptionGroup[_ExceptionT_co] | None: ...

def subgroup(
self,
__condition: type[_ExceptionT]
| tuple[type[_ExceptionT], ...]
| Callable[[_ExceptionT_co], bool],
) -> ExceptionGroup[_ExceptionT] | None:
return super().subgroup(__condition)

@overload
def split(
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...]
) -> tuple[
ExceptionGroup[_ExceptionT] | None, ExceptionGroup[_ExceptionT_co] | None
]: ...

@overload
def split(
self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool]
) -> tuple[
ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None
]: ...

def split(
self: _ExceptionGroupSelf,
__condition: type[_ExceptionT]
| tuple[type[_ExceptionT], ...]
| Callable[[_ExceptionT_co], bool],
) -> tuple[
ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None
]:
return super().split(__condition)


BaseExceptionGroup.__module__ = 'builtins'
ExceptionGroup.__module__ = 'builtins'
20 changes: 19 additions & 1 deletion vm/src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,11 @@ impl VirtualMachine {
}
}

let expect_stdlib =
cfg!(feature = "freeze-stdlib") || !self.state.settings.path_list.is_empty();

#[cfg(feature = "encodings")]
if cfg!(feature = "freeze-stdlib") || !self.state.settings.path_list.is_empty() {
if expect_stdlib {
if let Err(e) = self.import_encodings() {
eprintln!(
"encodings initialization failed. Only utf-8 encoding will be supported."
Expand All @@ -354,6 +357,21 @@ impl VirtualMachine {
);
}

if expect_stdlib {
// enable python-implemented ExceptionGroup when stdlib exists
let py_core_init = || -> PyResult<()> {
let exception_group = import::import_frozen(self, "_py_exceptiongroup")?;
let base_exception_group = exception_group.get_attr("BaseExceptionGroup", self)?;
self.builtins
.set_attr("BaseExceptionGroup", base_exception_group, self)?;
let exception_group = exception_group.get_attr("ExceptionGroup", self)?;
self.builtins
.set_attr("ExceptionGroup", exception_group, self)?;
Ok(())
};
self.expect_pyresult(py_core_init(), "exceptiongroup initialization failed");
}

self.initialized = true;
}

Expand Down