Skip to content

Add warning about non-blocking mode and TTYs #3315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

dcolascione
Copy link

Add a note reminding users that FdStream might cause "spooky action at a distance" in interactive use-cases, hopefully saving them time during debugging.

@dcolascione dcolascione force-pushed the tweak-fdstream-docs branch from b8fca35 to 0cf4690 Compare August 9, 2025 04:20
Copy link
Contributor

@A5rocks A5rocks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine as is but could be better!

@dcolascione
Copy link
Author

dcolascione commented Aug 11, 2025

Not germane to this PR in particular, but, I wish we had a better option for people. I've found this approach works for me. (I got the atomically-replace-the-FD idea from #174 (comment) but tweaked a bit.)

@cache
def dummy_fd() -> int:
    # directory FDs always fail with read and write
    return os.open("/", os.O_DIRECTORY)

class SyncFdWork(Protocol[T_covariant]):
    def __call__(self, fd: int, /) -> T_covariant: ...

class SafeFdStreamBase(AsyncResource):
    @classmethod
    def dup(cls, fd: int) -> Self:
        return cls(_fd=os.fdopen(os.dup(fd), "w+b", buffering=0))

    @classmethod
    def closing(cls, fd: io.RawIOBase):
        return cls(_fd=fd)

    def __init__(self, *, _fd: io.RawIOBase):
        self.__fd = _fd
        self.__spare_fds: list[io.RawIOBase] = []
        self.fileno = _fd.fileno

    async def _sync_op(self, sync_fn: SyncFdWork[T_covariant]) -> T_covariant:
        lock = threading.Lock()
        spare_fds = self.__spare_fds
        op_fd = (
            self.__spare_fds.pop()
            if spare_fds
            else os.fdopen(os.dup(self.__fd.fileno()), "w+b", buffering=0)
        )
        running_thread: int = 0
        cancelled = False
        interrupted = False

        try:
            async with trio.open_nursery() as nursery:

                async def _cancel_watcher():
                    nonlocal cancelled, interrupted
                    try:
                        await trio.sleep_forever()
                    finally:
                        with lock:
                            if running_thread:  # Wakes up blockers in kernel
                                os.dup2(dummy_fd(), op_fd.fileno())
                                # This pthread_kill is safe.  We know thread it running:
                                # it can't exit while we hold the lock.  It doesn't
                                # randomly exit with lock held either.  We use SIGURG because
                                # it's harmless and defaults to doing nothing.
                                signal.pthread_kill(running_thread, signal.SIGURG)
                                interrupted = True
                            cancelled = True

                start(_cancel_watcher).soon(nursery)

                def _sync_work() -> T_covariant:
                    nonlocal running_thread
                    with lock:
                        assert not running_thread
                        if cancelled:
                            raise IsADirectoryError()
                        running_thread = threading.get_ident()
                        assert running_thread
                    try:
                        return sync_fn(op_fd.fileno())
                    finally:
                        with lock:
                            assert running_thread == threading.get_ident()
                            assert running_thread
                            running_thread = 0

                try:
                    ret = await trio.to_thread.run_sync(_sync_work)
                except IsADirectoryError:
                    if cancelled:
                        await trio.sleep_forever()  # Will raise
                    raise
                nursery.cancel_scope.cancel()
                return ret
        finally:
            if interrupted:
                op_fd.close()
            else:
                spare_fds.append(op_fd)

    @override
    async def aclose(self) -> None:
        self.__fd.close()  # N.B. sync close
        for fd in self.__spare_fds:
            fd.close()


class SafeFdReceiveStream(ReceiveStream, SafeFdStreamBase):
    @override
    async def receive_some(self, max_bytes: int | None = None) -> bytes | bytearray:
        if max_bytes is None:
            max_bytes = DEFAULT_READ_SIZE
        await wait_readable(self)
        return await self._sync_op(lambda fd: os.read(fd, max_bytes))


class SafeFdSendStream(SendStream, SafeFdStreamBase):
    @override
    async def send_all(self, data: bytes | bytearray | memoryview) -> None:
        await wait_writable(self)

        def _write_all(fd: int) -> None:
            mv = memoryview(data)
            while mv:
                mv = mv[os.write(fd, mv) :]

        return await self._sync_op(_write_all)

    @override
    async def wait_send_all_might_not_block(self) -> None:
        await wait_writable(self)

Copy link

codecov bot commented Aug 11, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 100.00000%. Comparing base (f8a51b6) to head (fb665e6).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@               Coverage Diff               @@
##                 main        #3315   +/-   ##
===============================================
  Coverage   100.00000%   100.00000%           
===============================================
  Files             125          125           
  Lines           19253        19253           
  Branches         1304         1304           
===============================================
  Hits            19253        19253           
Files with missing lines Coverage Δ
src/trio/_unix_pipes.py 100.00000% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@A5rocks A5rocks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good enough for me.

(I can't comment on your fd thing because I don't know too much about file descriptors lol)

@A5rocks A5rocks added the skip newsfragment Newsfragment is not required label Aug 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
skip newsfragment Newsfragment is not required
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants