From 6d23a83fcbfa0c3b93c36c32e8c27a698a9bc7f9 Mon Sep 17 00:00:00 2001 From: A5rocks Date: Tue, 16 Jan 2024 13:06:36 +0900 Subject: [PATCH 1/9] Remove deprecated unbounded queue --- check.sh | 6 +- src/trio/_core/__init__.py | 1 - src/trio/_core/_generated_io_kqueue.py | 4 +- src/trio/_core/_generated_io_windows.py | 6 +- src/trio/_core/_io_kqueue.py | 30 ++-- src/trio/_core/_io_windows.py | 20 ++- src/trio/_core/_parking_lot.py | 11 +- src/trio/_core/_tests/test_unbounded_queue.py | 154 ----------------- src/trio/_core/_tests/test_windows.py | 20 +-- src/trio/_core/_unbounded_queue.py | 161 ------------------ src/trio/_tools/gen_exports.py | 4 +- src/trio/lowlevel.py | 2 - 12 files changed, 49 insertions(+), 370 deletions(-) delete mode 100644 src/trio/_core/_tests/test_unbounded_queue.py delete mode 100644 src/trio/_core/_unbounded_queue.py diff --git a/check.sh b/check.sh index 9d26a21468..0d2fcdd26f 100755 --- a/check.sh +++ b/check.sh @@ -1,6 +1,7 @@ #!/bin/bash set -ex +set -o pipefail ON_GITHUB_CI=true EXIT_STATUS=0 @@ -55,8 +56,7 @@ MYPY=0 echo "::group::Mypy" # Cleanup previous runs. rm -f mypy_annotate.dat -# Pipefail makes these pipelines fail if mypy does, even if mypy_annotate.py succeeds. -set -o pipefail + mypy --show-error-end --platform linux | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Linux \ || { echo "* Mypy (Linux) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; } # Darwin tests FreeBSD too @@ -64,7 +64,7 @@ mypy --show-error-end --platform darwin | python ./src/trio/_tools/mypy_annotate || { echo "* Mypy (Mac) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; } mypy --show-error-end --platform win32 | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Windows \ || { echo "* Mypy (Windows) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; } -set +o pipefail + # Re-display errors using Github's syntax, read out of mypy_annotate.dat python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat # Then discard. diff --git a/src/trio/_core/__init__.py b/src/trio/_core/__init__.py index 71f5f17eb2..3cec8a52e0 100644 --- a/src/trio/_core/__init__.py +++ b/src/trio/_core/__init__.py @@ -63,7 +63,6 @@ temporarily_detach_coroutine_object, wait_task_rescheduled, ) -from ._unbounded_queue import UnboundedQueue, UnboundedQueueStatistics # Windows imports if sys.platform == "win32": diff --git a/src/trio/_core/_generated_io_kqueue.py b/src/trio/_core/_generated_io_kqueue.py index 39662fd902..a39e8dd02c 100644 --- a/src/trio/_core/_generated_io_kqueue.py +++ b/src/trio/_core/_generated_io_kqueue.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: import select - from .. import _core + from .. import _channel from .._file_io import _HasFileNo from ._traps import Abort, RaiseCancelT import sys @@ -43,7 +43,7 @@ def current_kqueue() -> select.kqueue: def monitor_kevent( ident: int, filter: int -) -> ContextManager[_core.UnboundedQueue[select.kevent]]: +) -> ContextManager[_channel.MemoryRecvChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. diff --git a/src/trio/_core/_generated_io_windows.py b/src/trio/_core/_generated_io_windows.py index bb23e630c2..5644838ca9 100644 --- a/src/trio/_core/_generated_io_windows.py +++ b/src/trio/_core/_generated_io_windows.py @@ -11,8 +11,8 @@ if TYPE_CHECKING: from typing_extensions import Buffer + from .._channel import MemoryReceiveChannel from .._file_io import _HasFileNo - from ._unbounded_queue import UnboundedQueue from ._windows_cffi import CData, Handle import sys @@ -189,7 +189,9 @@ def current_iocp() -> int: raise RuntimeError("must be called from async context") from None -def monitor_completion_key() -> ContextManager[tuple[int, UnboundedQueue[object]]]: +def monitor_completion_key() -> ( + ContextManager[tuple[int, MemoryReceiveChannel[object]]] +): """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__ and `#52 diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py index 61c37fe26e..4c44605e6e 100644 --- a/src/trio/_core/_io_kqueue.py +++ b/src/trio/_core/_io_kqueue.py @@ -1,6 +1,7 @@ from __future__ import annotations import errno +import math import select import sys from contextlib import contextmanager @@ -9,14 +10,14 @@ import attr import outcome -from .. import _core +from .. import _channel, _core from ._run import _public from ._wakeup_socketpair import WakeupSocketpair if TYPE_CHECKING: from typing_extensions import TypeAlias - from .._core import Abort, RaiseCancelT, Task, UnboundedQueue + from .._core import Abort, RaiseCancelT, Task from .._file_io import _HasFileNo assert not TYPE_CHECKING or (sys.platform != "linux" and sys.platform != "win32") @@ -34,10 +35,9 @@ class _KqueueStatistics: @attr.s(slots=True, eq=False) class KqueueIOManager: _kqueue: select.kqueue = attr.ib(factory=select.kqueue) - # {(ident, filter): Task or UnboundedQueue} - _registered: dict[tuple[int, int], Task | UnboundedQueue[select.kevent]] = attr.ib( - factory=dict - ) + _registered: dict[ + tuple[int, int], Task | _channel.MemorySendChannel[select.kevent] + ] = attr.ib(factory=dict) _force_wakeup: WakeupSocketpair = attr.ib(factory=WakeupSocketpair) _force_wakeup_fd: int | None = attr.ib(default=None) @@ -94,7 +94,7 @@ def process_events(self, events: EventResult) -> None: if isinstance(receiver, _core.Task): _core.reschedule(receiver, outcome.Value(event)) else: - receiver.put_nowait(event) + receiver.send_nowait(event) # kevent registration is complicated -- e.g. aio submission can # implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will @@ -119,7 +119,7 @@ def current_kqueue(self) -> select.kqueue: @_public def monitor_kevent( self, ident: int, filter: int - ) -> Iterator[_core.UnboundedQueue[select.kevent]]: + ) -> Iterator[_channel.MemoryRecvChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. @@ -129,11 +129,12 @@ def monitor_kevent( raise _core.BusyResourceError( "attempt to register multiple listeners for same ident/filter pair" ) - q = _core.UnboundedQueue[select.kevent]() - self._registered[key] = q + send, recv = _channel.open_memory_channel[select.kevent](math.inf) + self._registered[key] = send try: - yield q + yield recv finally: + send.close() del self._registered[key] @_public @@ -274,8 +275,5 @@ def notify_closing(self, fd: int | _HasFileNo) -> None: _core.reschedule(receiver, outcome.Error(exc)) del self._registered[key] else: - # XX this is an interesting example of a case where being able - # to close a queue would be useful... - raise NotImplementedError( - "can't close an fd that monitor_kevent is using" - ) + receiver.close() + del self._registered[key] diff --git a/src/trio/_core/_io_windows.py b/src/trio/_core/_io_windows.py index bdc100ddb5..0520cfabd8 100644 --- a/src/trio/_core/_io_windows.py +++ b/src/trio/_core/_io_windows.py @@ -2,6 +2,7 @@ import enum import itertools +import math import socket import sys from contextlib import contextmanager @@ -43,9 +44,9 @@ if TYPE_CHECKING: from typing_extensions import Buffer, TypeAlias + from .._channel import MemoryReceiveChannel, MemorySendChannel from .._file_io import _HasFileNo from ._traps import Abort, RaiseCancelT - from ._unbounded_queue import UnboundedQueue EventResult: TypeAlias = int T = TypeVar("T") @@ -435,7 +436,7 @@ def __init__(self) -> None: self._overlapped_waiters: dict[CData, _core.Task] = {} self._posted_too_late_to_cancel: set[CData] = set() - self._completion_key_queues: dict[int, UnboundedQueue[object]] = {} + self._completion_key_queues: dict[int, MemorySendChannel[object]] = {} self._completion_key_counter = itertools.count(CKeys.USER_DEFINED) with socket.socket() as s: @@ -610,7 +611,7 @@ def process_events(self, received: EventResult) -> None: info = CompletionKeyEventInfo( lpOverlapped=overlapped, dwNumberOfBytesTransferred=transferred ) - queue.put_nowait(info) + queue.send_nowait(info) def _register_with_iocp(self, handle_: int | CData, completion_key: int) -> None: handle = _handle(handle_) @@ -981,16 +982,21 @@ def current_iocp(self) -> int: @contextmanager @_public - def monitor_completion_key(self) -> Iterator[tuple[int, UnboundedQueue[object]]]: + def monitor_completion_key( + self, + ) -> Iterator[tuple[int, MemoryReceiveChannel[object]]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__ and `#52 `__. """ + from .._channel import open_memory_channel + key = next(self._completion_key_counter) - queue = _core.UnboundedQueue[object]() - self._completion_key_queues[key] = queue + send, recv = open_memory_channel[object](math.inf) + self._completion_key_queues[key] = send try: - yield (key, queue) + yield (key, recv) finally: + send.close() del self._completion_key_queues[key] diff --git a/src/trio/_core/_parking_lot.py b/src/trio/_core/_parking_lot.py index 1ad1762cfd..5b7713b241 100644 --- a/src/trio/_core/_parking_lot.py +++ b/src/trio/_core/_parking_lot.py @@ -21,12 +21,11 @@ # theirs and our tasks are lighter, so for us #objects is smaller and #tasks # is larger. # -# This is in the core because for two reasons. First, it's used by -# UnboundedQueue, and UnboundedQueue is used for a number of things in the -# core. And second, it's responsible for providing fairness to all of our -# high-level synchronization primitives (locks, queues, etc.). For now with -# our FIFO scheduler this is relatively trivial (it's just a FIFO waitqueue), -# but in the future we ever start support task priorities or fair scheduling +# This is in the core because it's responsible for providing fairness to all +# of our high-level synchronization primitives (locks, queues, etc.). For now +# with our FIFO scheduler this is relatively trivial (it's just a FIFO +# waitqueue), but in the future we ever start support task priorities or fair +# scheduling # # https://github.com/python-trio/trio/issues/32 # diff --git a/src/trio/_core/_tests/test_unbounded_queue.py b/src/trio/_core/_tests/test_unbounded_queue.py deleted file mode 100644 index 33eb41a5b3..0000000000 --- a/src/trio/_core/_tests/test_unbounded_queue.py +++ /dev/null @@ -1,154 +0,0 @@ -from __future__ import annotations - -import itertools - -import pytest - -from ... import _core -from ...testing import assert_checkpoints, wait_all_tasks_blocked - -pytestmark = pytest.mark.filterwarnings( - "ignore:.*UnboundedQueue:trio.TrioDeprecationWarning" -) - - -async def test_UnboundedQueue_basic() -> None: - q: _core.UnboundedQueue[str | int | None] = _core.UnboundedQueue() - q.put_nowait("hi") - assert await q.get_batch() == ["hi"] - with pytest.raises(_core.WouldBlock): - q.get_batch_nowait() - q.put_nowait(1) - q.put_nowait(2) - q.put_nowait(3) - assert q.get_batch_nowait() == [1, 2, 3] - - assert q.empty() - assert q.qsize() == 0 - q.put_nowait(None) - assert not q.empty() - assert q.qsize() == 1 - - stats = q.statistics() - assert stats.qsize == 1 - assert stats.tasks_waiting == 0 - - # smoke test - repr(q) - - -async def test_UnboundedQueue_blocking() -> None: - record = [] - q = _core.UnboundedQueue[int]() - - async def get_batch_consumer() -> None: - while True: - batch = await q.get_batch() - assert batch - record.append(batch) - - async def aiter_consumer() -> None: - async for batch in q: - assert batch - record.append(batch) - - for consumer in (get_batch_consumer, aiter_consumer): - record.clear() - async with _core.open_nursery() as nursery: - nursery.start_soon(consumer) - await _core.wait_all_tasks_blocked() - stats = q.statistics() - assert stats.qsize == 0 - assert stats.tasks_waiting == 1 - q.put_nowait(10) - q.put_nowait(11) - await _core.wait_all_tasks_blocked() - q.put_nowait(12) - await _core.wait_all_tasks_blocked() - assert record == [[10, 11], [12]] - nursery.cancel_scope.cancel() - - -async def test_UnboundedQueue_fairness() -> None: - q = _core.UnboundedQueue[int]() - - # If there's no-one else around, we can put stuff in and take it out - # again, no problem - q.put_nowait(1) - q.put_nowait(2) - assert q.get_batch_nowait() == [1, 2] - - result = None - - async def get_batch(q: _core.UnboundedQueue[int]) -> None: - nonlocal result - result = await q.get_batch() - - # But if someone else is waiting to read, then they get dibs - async with _core.open_nursery() as nursery: - nursery.start_soon(get_batch, q) - await _core.wait_all_tasks_blocked() - q.put_nowait(3) - q.put_nowait(4) - with pytest.raises(_core.WouldBlock): - q.get_batch_nowait() - assert result == [3, 4] - - # If two tasks are trying to read, they alternate - record = [] - - async def reader(name: str) -> None: - while True: - record.append((name, await q.get_batch())) - - async with _core.open_nursery() as nursery: - nursery.start_soon(reader, "a") - await _core.wait_all_tasks_blocked() - nursery.start_soon(reader, "b") - await _core.wait_all_tasks_blocked() - - for i in range(20): - q.put_nowait(i) - await _core.wait_all_tasks_blocked() - - nursery.cancel_scope.cancel() - - assert record == list(zip(itertools.cycle("ab"), [[i] for i in range(20)])) - - -async def test_UnboundedQueue_trivial_yields() -> None: - q = _core.UnboundedQueue[None]() - - q.put_nowait(None) - with assert_checkpoints(): - await q.get_batch() - - q.put_nowait(None) - with assert_checkpoints(): - async for _ in q: # pragma: no branch - break - - -async def test_UnboundedQueue_no_spurious_wakeups() -> None: - # If we have two tasks waiting, and put two items into the queue... then - # only one task wakes up - record = [] - - async def getter(q: _core.UnboundedQueue[int], i: int) -> None: - got = await q.get_batch() - record.append((i, got)) - - async with _core.open_nursery() as nursery: - q = _core.UnboundedQueue[int]() - nursery.start_soon(getter, q, 1) - await wait_all_tasks_blocked() - nursery.start_soon(getter, q, 2) - await wait_all_tasks_blocked() - - for i in range(10): - q.put_nowait(i) - await wait_all_tasks_blocked() - - assert record == [(1, list(range(10)))] - - nursery.cancel_scope.cancel() diff --git a/src/trio/_core/_tests/test_windows.py b/src/trio/_core/_tests/test_windows.py index f378fd8114..f11ccaf256 100644 --- a/src/trio/_core/_tests/test_windows.py +++ b/src/trio/_core/_tests/test_windows.py @@ -77,10 +77,6 @@ def test_winerror(monkeypatch: pytest.MonkeyPatch) -> None: assert exc.value.filename2 == "b/file" -# The undocumented API that this is testing should be changed to stop using -# UnboundedQueue (or just removed until we have time to redo it), but until -# then we filter out the warning. -@pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning") async def test_completion_key_listen() -> None: from .. import _io_windows @@ -96,17 +92,13 @@ async def post(key: int) -> None: with _core.monitor_completion_key() as (key, queue): async with _core.open_nursery() as nursery: nursery.start_soon(post, key) - i = 0 + print("loop") - async for batch in queue: # pragma: no branch - print("got some", batch) - for info in batch: - assert isinstance(info, _io_windows.CompletionKeyEventInfo) - assert info.lpOverlapped == 0 - assert info.dwNumberOfBytesTransferred == i - i += 1 - if i == 10: - break + for i in range(10): + info = await queue.receive() + assert isinstance(info, _io_windows.CompletionKeyEventInfo) + assert info.lpOverlapped == 0 + assert info.dwNumberOfBytesTransferred == i print("end loop") diff --git a/src/trio/_core/_unbounded_queue.py b/src/trio/_core/_unbounded_queue.py deleted file mode 100644 index 7c5c536676..0000000000 --- a/src/trio/_core/_unbounded_queue.py +++ /dev/null @@ -1,161 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Generic, TypeVar - -import attr - -from .. import _core -from .._deprecate import deprecated -from .._util import final - -T = TypeVar("T") - -if TYPE_CHECKING: - from typing_extensions import Self - - -@attr.s(slots=True, frozen=True) -class UnboundedQueueStatistics: - """An object containing debugging information. - - Currently, the following fields are defined: - - * ``qsize``: The number of items currently in the queue. - * ``tasks_waiting``: The number of tasks blocked on this queue's - :meth:`get_batch` method. - - """ - - qsize: int = attr.ib() - tasks_waiting: int = attr.ib() - - -@final -class UnboundedQueue(Generic[T]): - """An unbounded queue suitable for certain unusual forms of inter-task - communication. - - This class is designed for use as a queue in cases where the producer for - some reason cannot be subjected to back-pressure, i.e., :meth:`put_nowait` - has to always succeed. In order to prevent the queue backlog from actually - growing without bound, the consumer API is modified to dequeue items in - "batches". If a consumer task processes each batch without yielding, then - this helps achieve (but does not guarantee) an effective bound on the - queue's memory use, at the cost of potentially increasing system latencies - in general. You should generally prefer to use a memory channel - instead if you can. - - Currently each batch completely empties the queue, but `this may change in - the future `__. - - A :class:`UnboundedQueue` object can be used as an asynchronous iterator, - where each iteration returns a new batch of items. I.e., these two loops - are equivalent:: - - async for batch in queue: - ... - - while True: - obj = await queue.get_batch() - ... - - """ - - @deprecated( - "0.9.0", - issue=497, - thing="trio.lowlevel.UnboundedQueue", - instead="trio.open_memory_channel(math.inf)", - ) - def __init__(self) -> None: - self._lot = _core.ParkingLot() - self._data: list[T] = [] - # used to allow handoff from put to the first task in the lot - self._can_get = False - - def __repr__(self) -> str: - return f"" - - def qsize(self) -> int: - """Returns the number of items currently in the queue.""" - return len(self._data) - - def empty(self) -> bool: - """Returns True if the queue is empty, False otherwise. - - There is some subtlety to interpreting this method's return value: see - `issue #63 `__. - - """ - return not self._data - - @_core.enable_ki_protection - def put_nowait(self, obj: T) -> None: - """Put an object into the queue, without blocking. - - This always succeeds, because the queue is unbounded. We don't provide - a blocking ``put`` method, because it would never need to block. - - Args: - obj (object): The object to enqueue. - - """ - if not self._data: - assert not self._can_get - if self._lot: - self._lot.unpark(count=1) - else: - self._can_get = True - self._data.append(obj) - - def _get_batch_protected(self) -> list[T]: - data = self._data.copy() - self._data.clear() - self._can_get = False - return data - - def get_batch_nowait(self) -> list[T]: - """Attempt to get the next batch from the queue, without blocking. - - Returns: - list: A list of dequeued items, in order. On a successful call this - list is always non-empty; if it would be empty we raise - :exc:`~trio.WouldBlock` instead. - - Raises: - ~trio.WouldBlock: if the queue is empty. - - """ - if not self._can_get: - raise _core.WouldBlock - return self._get_batch_protected() - - async def get_batch(self) -> list[T]: - """Get the next batch from the queue, blocking as necessary. - - Returns: - list: A list of dequeued items, in order. This list is always - non-empty. - - """ - await _core.checkpoint_if_cancelled() - if not self._can_get: - await self._lot.park() - return self._get_batch_protected() - else: - try: - return self._get_batch_protected() - finally: - await _core.cancel_shielded_checkpoint() - - def statistics(self) -> UnboundedQueueStatistics: - """Return an :class:`UnboundedQueueStatistics` object containing debugging information.""" - return UnboundedQueueStatistics( - qsize=len(self._data), tasks_waiting=self._lot.statistics().tasks_waiting - ) - - def __aiter__(self) -> Self: - return self - - async def __anext__(self) -> list[T]: - return await self.get_batch() diff --git a/src/trio/_tools/gen_exports.py b/src/trio/_tools/gen_exports.py index 7156d7015d..2cfe6604af 100755 --- a/src/trio/_tools/gen_exports.py +++ b/src/trio/_tools/gen_exports.py @@ -382,7 +382,7 @@ def main() -> None: # pragma: no cover if TYPE_CHECKING: import select - from .. import _core + from .. import _core, _channel from ._traps import Abort, RaiseCancelT from .._file_io import _HasFileNo """ @@ -395,7 +395,7 @@ def main() -> None: # pragma: no cover from ._windows_cffi import Handle, CData from typing_extensions import Buffer - from ._unbounded_queue import UnboundedQueue + from .._channel import MemoryReceiveChannel """ diff --git a/src/trio/lowlevel.py b/src/trio/lowlevel.py index 964dabb556..2ce49b2e44 100644 --- a/src/trio/lowlevel.py +++ b/src/trio/lowlevel.py @@ -18,8 +18,6 @@ RunVarToken as RunVarToken, Task as Task, TrioToken as TrioToken, - UnboundedQueue as UnboundedQueue, - UnboundedQueueStatistics as UnboundedQueueStatistics, add_instrument as add_instrument, cancel_shielded_checkpoint as cancel_shielded_checkpoint, checkpoint as checkpoint, From 6dd38089868f2b99e61c7bd665b13083fb6e9b1a Mon Sep 17 00:00:00 2001 From: A5rocks Date: Tue, 16 Jan 2024 13:11:48 +0900 Subject: [PATCH 2/9] Fix dumb typing mistake Also make the generation script always use unix newlines, even on Windows --- src/trio/_core/_generated_io_kqueue.py | 2 +- src/trio/_core/_io_kqueue.py | 2 +- src/trio/_tools/gen_exports.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/trio/_core/_generated_io_kqueue.py b/src/trio/_core/_generated_io_kqueue.py index a39e8dd02c..716612efe9 100644 --- a/src/trio/_core/_generated_io_kqueue.py +++ b/src/trio/_core/_generated_io_kqueue.py @@ -43,7 +43,7 @@ def current_kqueue() -> select.kqueue: def monitor_kevent( ident: int, filter: int -) -> ContextManager[_channel.MemoryRecvChannel[select.kevent]]: +) -> ContextManager[_channel.MemoryReceiveChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py index 4c44605e6e..5e2761b2c5 100644 --- a/src/trio/_core/_io_kqueue.py +++ b/src/trio/_core/_io_kqueue.py @@ -119,7 +119,7 @@ def current_kqueue(self) -> select.kqueue: @_public def monitor_kevent( self, ident: int, filter: int - ) -> Iterator[_channel.MemoryRecvChannel[select.kevent]]: + ) -> Iterator[_channel.MemoryReceiveChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. diff --git a/src/trio/_tools/gen_exports.py b/src/trio/_tools/gen_exports.py index 2cfe6604af..19aa6f7ec3 100755 --- a/src/trio/_tools/gen_exports.py +++ b/src/trio/_tools/gen_exports.py @@ -300,7 +300,7 @@ def process(files: Iterable[File], *, do_test: bool) -> None: print("Generated sources are up to date.") else: for new_path, new_source in new_files.items(): - with open(new_path, "w", encoding="utf-8") as f: + with open(new_path, "w", encoding="utf-8", newline="\n") as f: f.write(new_source) print("Regenerated sources successfully.") From a28490ea0184b6d2e8aa13b9128f2eb18168d5bb Mon Sep 17 00:00:00 2001 From: A5rocks Date: Tue, 16 Jan 2024 13:15:28 +0900 Subject: [PATCH 3/9] Fix circular import on MacOS --- src/trio/_core/_generated_io_kqueue.py | 4 ++-- src/trio/_core/_io_kqueue.py | 11 +++++++---- src/trio/_tools/gen_exports.py | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/trio/_core/_generated_io_kqueue.py b/src/trio/_core/_generated_io_kqueue.py index 716612efe9..c7c69ef45f 100644 --- a/src/trio/_core/_generated_io_kqueue.py +++ b/src/trio/_core/_generated_io_kqueue.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: import select - from .. import _channel + from .._channel import MemoryReceiveChannel from .._file_io import _HasFileNo from ._traps import Abort, RaiseCancelT import sys @@ -43,7 +43,7 @@ def current_kqueue() -> select.kqueue: def monitor_kevent( ident: int, filter: int -) -> ContextManager[_channel.MemoryReceiveChannel[select.kevent]]: +) -> ContextManager[MemoryReceiveChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py index 5e2761b2c5..c581da212d 100644 --- a/src/trio/_core/_io_kqueue.py +++ b/src/trio/_core/_io_kqueue.py @@ -10,13 +10,14 @@ import attr import outcome -from .. import _channel, _core +from .. import _core from ._run import _public from ._wakeup_socketpair import WakeupSocketpair if TYPE_CHECKING: from typing_extensions import TypeAlias + from .._channel import MemoryReceiveChannel, MemorySendChannel from .._core import Abort, RaiseCancelT, Task from .._file_io import _HasFileNo @@ -36,7 +37,7 @@ class _KqueueStatistics: class KqueueIOManager: _kqueue: select.kqueue = attr.ib(factory=select.kqueue) _registered: dict[ - tuple[int, int], Task | _channel.MemorySendChannel[select.kevent] + tuple[int, int], Task | MemorySendChannel[select.kevent] ] = attr.ib(factory=dict) _force_wakeup: WakeupSocketpair = attr.ib(factory=WakeupSocketpair) _force_wakeup_fd: int | None = attr.ib(default=None) @@ -119,17 +120,19 @@ def current_kqueue(self) -> select.kqueue: @_public def monitor_kevent( self, ident: int, filter: int - ) -> Iterator[_channel.MemoryReceiveChannel[select.kevent]]: + ) -> Iterator[MemoryReceiveChannel[select.kevent]]: """TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__. """ + from .._channel import open_memory_channel + key = (ident, filter) if key in self._registered: raise _core.BusyResourceError( "attempt to register multiple listeners for same ident/filter pair" ) - send, recv = _channel.open_memory_channel[select.kevent](math.inf) + send, recv = open_memory_channel[select.kevent](math.inf) self._registered[key] = send try: yield recv diff --git a/src/trio/_tools/gen_exports.py b/src/trio/_tools/gen_exports.py index 19aa6f7ec3..c70dbe0aac 100755 --- a/src/trio/_tools/gen_exports.py +++ b/src/trio/_tools/gen_exports.py @@ -382,9 +382,9 @@ def main() -> None: # pragma: no cover if TYPE_CHECKING: import select - from .. import _core, _channel - from ._traps import Abort, RaiseCancelT + from .._channel import MemoryReceiveChannel from .._file_io import _HasFileNo + from ._traps import Abort, RaiseCancelT """ IMPORTS_WINDOWS = """\ From 31891d1fe72769b20ed6ddd034b02b6918e57dd2 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 22 Dec 2024 09:53:01 +0000 Subject: [PATCH 4/9] remove filterwarning about UnboundedQueue --- src/trio/_core/_tests/test_io.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/trio/_core/_tests/test_io.py b/src/trio/_core/_tests/test_io.py index 379daa025e..b06cff2026 100644 --- a/src/trio/_core/_tests/test_io.py +++ b/src/trio/_core/_tests/test_io.py @@ -384,7 +384,6 @@ def check(*, expected_readers: int, expected_writers: int) -> None: check(expected_readers=1, expected_writers=0) -@pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning") async def test_io_manager_kqueue_monitors_statistics() -> None: def check( *, From 009cbb502a26c020f40741575ce158a7137873a3 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 23 Dec 2024 07:51:55 +0000 Subject: [PATCH 5/9] add some tests for kqueue --- src/trio/_core/_tests/test_io.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/trio/_core/_tests/test_io.py b/src/trio/_core/_tests/test_io.py index b06cff2026..2a9a2c1176 100644 --- a/src/trio/_core/_tests/test_io.py +++ b/src/trio/_core/_tests/test_io.py @@ -410,13 +410,21 @@ def check( # 1 for call_soon_task check(expected_monitors=0, expected_readers=1, expected_writers=0) - with _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ): + with _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ) as q: with ( pytest.raises(_core.BusyResourceError), _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ), ): pass # pragma: no cover check(expected_monitors=1, expected_readers=1, expected_writers=0) + b1.send(b"\x00") + with trio.fail_after(1): + await q.receive() + + _core.notify_closing(a1) + a1.close() + with trio.fail_after(1): + assert len([v async for v in q]) == 0 check(expected_monitors=0, expected_readers=1, expected_writers=0) From 289ed25ddcc67a361acf7f8d691b0411806192c0 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 23 Dec 2024 09:00:49 +0000 Subject: [PATCH 6/9] just check for close --- src/trio/_core/_tests/test_io.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/trio/_core/_tests/test_io.py b/src/trio/_core/_tests/test_io.py index 2a9a2c1176..e3f7a324fb 100644 --- a/src/trio/_core/_tests/test_io.py +++ b/src/trio/_core/_tests/test_io.py @@ -417,10 +417,6 @@ def check( ): pass # pragma: no cover check(expected_monitors=1, expected_readers=1, expected_writers=0) - b1.send(b"\x00") - with trio.fail_after(1): - await q.receive() - _core.notify_closing(a1) a1.close() with trio.fail_after(1): From 3644bcf20dd16e24a6a3e60bdea98c33c3a4a04c Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 Dec 2024 08:05:05 +0000 Subject: [PATCH 7/9] catch a KeyError in monitor_kevent --- src/trio/_core/_io_kqueue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py index 2383ed64bd..48e4aa82fa 100644 --- a/src/trio/_core/_io_kqueue.py +++ b/src/trio/_core/_io_kqueue.py @@ -142,7 +142,10 @@ def monitor_kevent( yield recv finally: send.close() - del self._registered[key] + try: + del self._registered[key] + except KeyError: + pass @_public async def wait_kevent( From 68435cd5e593250dab44706c7f095db9c09b1860 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 Dec 2024 08:05:27 +0000 Subject: [PATCH 8/9] dedupe unregistering kevent --- src/trio/_core/_io_kqueue.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py index 48e4aa82fa..54f5ec3f5c 100644 --- a/src/trio/_core/_io_kqueue.py +++ b/src/trio/_core/_io_kqueue.py @@ -280,9 +280,9 @@ def notify_closing(self, fd: int | _HasFileNo) -> None: for filter_ in [select.KQ_FILTER_READ, select.KQ_FILTER_WRITE]: key = (fd, filter_) - receiver = self._registered.get(key) - - if receiver is None: + try: + receiver = self._registered.pop(key) + except KeyError: continue if type(receiver) is _core.Task: @@ -290,7 +290,5 @@ def notify_closing(self, fd: int | _HasFileNo) -> None: self._kqueue.control([event], 0) exc = _core.ClosedResourceError("another task closed this fd") _core.reschedule(receiver, outcome.Error(exc)) - del self._registered[key] else: receiver.close() - del self._registered[key] From df90319716863b69c929d0889fc088a814527b35 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 24 Dec 2024 08:19:19 +0000 Subject: [PATCH 9/9] fix ruff --- src/trio/_core/_io_kqueue.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py index 54f5ec3f5c..fed4da83f4 100644 --- a/src/trio/_core/_io_kqueue.py +++ b/src/trio/_core/_io_kqueue.py @@ -142,10 +142,7 @@ def monitor_kevent( yield recv finally: send.close() - try: - del self._registered[key] - except KeyError: - pass + self._registered.pop(key, None) @_public async def wait_kevent(