|
| 1 | +import os |
| 2 | +import signal |
| 3 | +import sys |
| 4 | +import time |
| 5 | + |
| 6 | + |
| 7 | +def test_ki_newline_injection() -> None: |
| 8 | + assert sys.platform != "win32" |
| 9 | + |
| 10 | + import pty |
| 11 | + |
| 12 | + # NOTE: this cannot be subprocess.Popen because pty.fork |
| 13 | + # does some magic to set the controlling terminal. |
| 14 | + # (which I don't know how to replicate... so I copied this |
| 15 | + # structure from pty.spawn...) |
| 16 | + pid, pty_fd = pty.fork() # type: ignore[attr-defined,unused-ignore] |
| 17 | + if pid == 0: |
| 18 | + os.execlp(sys.executable, *[sys.executable, "-u", "-m", "trio"]) |
| 19 | + |
| 20 | + # setup: |
| 21 | + buffer = b"" |
| 22 | + while not buffer.endswith(b"import trio\r\n>>> "): |
| 23 | + buffer += os.read(pty_fd, 4096) |
| 24 | + |
| 25 | + # sanity check: |
| 26 | + print("AAAAAAAAAAAAAAAAAA") |
| 27 | + print(buffer.decode()) |
| 28 | + buffer = b"" |
| 29 | + os.write(pty_fd, b'print("hello!")\n') |
| 30 | + while not buffer.endswith(b">>> "): |
| 31 | + buffer += os.read(pty_fd, 4096) |
| 32 | + |
| 33 | + assert buffer.count(b"hello!") == 2 |
| 34 | + |
| 35 | + # press ctrl+c |
| 36 | + time.sleep(5) # hypothesis: things are slow... |
| 37 | + print("BBBBBBBBBBBBBBBB") |
| 38 | + print(buffer.decode()) |
| 39 | + buffer = b"" |
| 40 | + os.kill(pid, signal.SIGINT) |
| 41 | + while not buffer.endswith(b">>> "): |
| 42 | + buffer += os.read(pty_fd, 4096) |
| 43 | + print(5, buffer) |
| 44 | + |
| 45 | + assert b"KeyboardInterrupt" in buffer |
| 46 | + |
| 47 | + # press ctrl+c later |
| 48 | + print("CCCCCCCCCCCCCCCC") |
| 49 | + print(buffer.decode()) |
| 50 | + buffer = b"" |
| 51 | + os.write(pty_fd, b'print("hello!")') |
| 52 | + os.kill(pid, signal.SIGINT) |
| 53 | + while not buffer.endswith(b">>> "): |
| 54 | + buffer += os.read(pty_fd, 4096) |
| 55 | + |
| 56 | + assert b"KeyboardInterrupt" in buffer |
| 57 | + print("DDDDDDDDDDDDDDDDDDD") |
| 58 | + print(buffer.decode()) |
| 59 | + os.close(pty_fd) |
| 60 | + os.waitpid(pid, 0)[1] |
| 61 | + |
| 62 | + |
| 63 | +test_ki_newline_injection() |
0 commit comments