Skip to content

ConPTY may fail to reach EOF after child exit, with possibly incomplete output #565

@ofek

Description

@ofek

Summary

I am seeing what looks like an EOF / post-exit drain problem on ConPTY-backed Windows PTYs created by pywinpty 3.0.3.

With a simple Python producer that copies a large file to stdout and exits cleanly, get_exitstatus() becomes 0 but the PTY never reports EOF within a post-exit drain window. In this case only a small fraction of the expected output is observed, which suggests some trailing output may never become readable after process exit.

Main reproducer:

uv run --script measure_pywinpty.py --producer python

I stripped the app-side relay logic down pretty aggressively and this still reproduces when talking to pywinpty directly, so this does not appear to be caused by my Python-side output handling.

I also reviewed the current source of both projects:

  • In pywinpty/src/lib.rs, read() only takes blocking: bool and delegates directly to Rust under py.detach(...), so Python does not control read size and the GIL is not held during the PTY read/write calls.
  • In winpty-rs/src/pty/base.rs, the shared read helper already uses a fixed 32 KiB ReadFile buffer.
  • The same shared winpty-rs base implementation already uses a background reading thread, OVERLAPPED, and CancelIoEx, so this does not appear to be the old pre-async ConPTY implementation.
  • In winpty-rs/src/pty/conpty/pty_impl.rs, the ConPTY path creates named pipes with 128 KiB quotas.

That suggests Python-side relay logic is not obviously responsible for this EOF / drain behavior.

Possibly related:

This issue is specifically about post-exit EOF / drain behavior. I am filing the throughput / fragmentation issue separately.

Environment

  • Windows build: 10.0.26100
  • Python: 3.14
  • pywinpty: 3.0.3

Reproduction

I attached a self-contained script measure_pywinpty.py that generates a deterministic large text fixture, measures a direct non-PTY pipe-capture baseline, and then measures the same producer through a ConPTY-backed PTY created by pywinpty.

Run it with:

uv run --script measure_pywinpty.py --producer python
Benchmark reproducer script: measure_pywinpty.py
# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "pywinpty==3.0.3",
# ]
# ///
from __future__ import annotations

import argparse
import json
import platform
import shutil
import statistics
import subprocess
import sys
import time
from pathlib import Path

import winpty

CONPTY_BACKEND_NAME = "conpty"
DEFAULT_LINE_COUNT = 200_000
LEFT_BLOCK = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" * 2
RIGHT_BLOCK = LEFT_BLOCK[::-1]
POST_EXIT_DRAIN_SECONDS = 2.0
POST_EXIT_DRAIN_POLL_SECONDS = 0.01


def create_fixture(path: Path, line_count: int) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)

    with path.open("w", encoding="utf-8", newline="\n") as stream:
        stream.write("PyWinPTY large output throughput fixture\n")
        stream.write(f"lines={line_count}\n")
        for i in range(1, line_count + 1):
            stream.write(f"{i:06d} | {LEFT_BLOCK} | {RIGHT_BLOCK}\n")


def build_command(producer: str, fixture_path: Path) -> list[str]:
    fixture_literal = "'" + str(fixture_path).replace("'", "''") + "'"

    if producer == "type":
        return ["cmd.exe", "/d", "/c", "type", str(fixture_path)]

    if producer == "bat":
        return ["bat", "-P", str(fixture_path)]

    if producer == "cat":
        return ["cat", str(fixture_path)]

    if producer == "get_content":
        return [
            "pwsh.exe",
            "-NoProfile",
            "-Command",
            f"Get-Content -Path {fixture_literal}",
        ]

    if producer == "python":
        payload = """\
import shutil
import sys

with open(sys.argv[1], "rb") as f:
    shutil.copyfileobj(f, sys.stdout.buffer)
"""
        return [sys.executable, "-u", "-c", payload, str(fixture_path)]

    message = f"Unsupported producer: {producer}"
    raise ValueError(message)


def resolve_executable(executable: str) -> str:
    return shutil.which(executable) or executable


def build_cmdline(args: list[str]) -> str | None:
    if not args:
        return None

    return subprocess.list2cmdline(args)


def measure_direct(command: list[str]) -> dict[str, object]:
    started_at = time.perf_counter()
    process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False)
    elapsed_seconds = time.perf_counter() - started_at

    output = process.stdout
    total_chars = len(output)
    total_bytes = len(output.encode("utf-8", errors="replace"))

    return {
        "kind": "direct",
        "returncode": process.returncode,
        "elapsed_seconds": round(elapsed_seconds, 3),
        "total_chars": total_chars,
        "total_bytes": total_bytes,
        "chars_per_second": round(total_chars / elapsed_seconds, 1),
        "mb_per_second": round(total_bytes / elapsed_seconds / 1024 / 1024, 2),
    }


def create_pty(cols: int, rows: int) -> winpty.PTY:
    return winpty.PTY(cols, rows, backend=winpty.Backend.ConPTY)


def accumulate_output(
    output: str, read_sizes: list[int], total_chars: list[int], total_bytes: list[int]
) -> None:
    read_sizes.append(len(output))
    total_chars[0] += len(output)
    total_bytes[0] += len(output.encode("utf-8", errors="replace"))


def get_exitstatus(pty: winpty.PTY) -> int | None:
    return pty.get_exitstatus()


def drain_after_exit(
    pty: winpty.PTY,
    read_sizes: list[int],
    total_chars: list[int],
    total_bytes: list[int],
    *,
    grace_seconds: float,
    poll_seconds: float,
) -> tuple[bool, bool]:
    deadline = time.monotonic() + grace_seconds

    while True:
        if pty.iseof():
            return True, False

        try:
            output = pty.read(blocking=False)
        except winpty.WinptyError:
            if pty.iseof():
                return True, False
            if time.monotonic() >= deadline:
                return False, True

            time.sleep(poll_seconds)
            continue

        if output:
            accumulate_output(output, read_sizes, total_chars, total_bytes)
            deadline = time.monotonic() + grace_seconds
            continue

        if time.monotonic() >= deadline:
            return False, True

        time.sleep(poll_seconds)


def measure_pty(command: list[str], *, cols: int, rows: int) -> dict[str, object]:
    pty = create_pty(cols, rows)
    pty.spawn(
        resolve_executable(command[0]),
        cmdline=build_cmdline(command[1:]),
    )

    read_sizes: list[int] = []
    total_chars = [0]
    total_bytes = [0]
    started_at = time.perf_counter()
    reached_eof = False
    drain_timed_out = False
    exitstatus: int | None = None

    while True:
        try:
            output = pty.read(blocking=True)
        except winpty.WinptyError as error:
            exitstatus = get_exitstatus(pty)
            if pty.iseof() or exitstatus is not None:
                if pty.iseof():
                    reached_eof = True
                else:
                    reached_eof, drain_timed_out = drain_after_exit(
                        pty,
                        read_sizes,
                        total_chars,
                        total_bytes,
                        grace_seconds=POST_EXIT_DRAIN_SECONDS,
                        poll_seconds=POST_EXIT_DRAIN_POLL_SECONDS,
                    )
                break

            message = f"PTY read failed unexpectedly: {error}"
            raise RuntimeError(message) from error

        if not output:
            if pty.iseof():
                reached_eof = True
                break
            exitstatus = get_exitstatus(pty)
            if exitstatus is not None:
                reached_eof, drain_timed_out = drain_after_exit(
                    pty,
                    read_sizes,
                    total_chars,
                    total_bytes,
                    grace_seconds=POST_EXIT_DRAIN_SECONDS,
                    poll_seconds=POST_EXIT_DRAIN_POLL_SECONDS,
                )
                break
            continue

        accumulate_output(output, read_sizes, total_chars, total_bytes)

    elapsed_seconds = time.perf_counter() - started_at

    return {
        "kind": "pty",
        "backend": CONPTY_BACKEND_NAME,
        "exitstatus": get_exitstatus(pty),
        "elapsed_seconds": round(elapsed_seconds, 3),
        "total_chars": total_chars[0],
        "total_bytes": total_bytes[0],
        "reads": len(read_sizes),
        "chars_per_second": round(total_chars[0] / elapsed_seconds, 1),
        "mb_per_second": round(total_bytes[0] / elapsed_seconds / 1024 / 1024, 2),
        "mean_chars_per_read": round(statistics.mean(read_sizes), 1) if read_sizes else 0.0,
        "median_chars_per_read": round(statistics.median(read_sizes), 1) if read_sizes else 0.0,
        "max_chars_per_read": max(read_sizes) if read_sizes else 0,
        "eof_reached": reached_eof,
        "post_exit_drain_timed_out": drain_timed_out,
    }


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Measure pywinpty ConPTY behavior for a large plain-text producer on Windows."
    )
    parser.add_argument(
        "--fixture",
        type=Path,
        default=Path(__file__).with_name("large_output_fixture.txt"),
        help="Path to the generated text fixture.",
    )
    parser.add_argument(
        "--lines",
        type=int,
        default=DEFAULT_LINE_COUNT,
        help="Number of generated data lines in the fixture.",
    )
    parser.add_argument(
        "--overwrite-fixture",
        action="store_true",
        help="Regenerate the fixture even if it already exists.",
    )
    parser.add_argument(
        "--producer",
        choices=["type", "bat", "cat", "get_content", "python"],
        default="type",
        help=(
            "Producer command to benchmark. 'type' uses cmd.exe built-in type, "
            "'bat' runs bat -P, 'cat' uses the cat executable on PATH, and "
            "'get_content' uses pwsh Get-Content."
        ),
    )
    parser.add_argument("--cols", type=int, default=120, help="PTY column count.")
    parser.add_argument("--rows", type=int, default=40, help="PTY row count.")
    parser.add_argument(
        "--skip-direct",
        action="store_true",
        help="Skip the direct non-PTY subprocess baseline.",
    )
    return parser.parse_args()


def main() -> None:
    args = parse_args()

    if args.overwrite_fixture or not args.fixture.exists():
        create_fixture(args.fixture, args.lines)

    command = build_command(args.producer, args.fixture)

    results: dict[str, object] = {
        "environment": {
            "python": sys.version.split()[0],
            "platform": platform.platform(),
            "windows_version": platform.version(),
            "pywinpty": winpty.__version__,
        },
        "fixture": {
            "path": str(args.fixture.resolve()),
            "bytes": args.fixture.stat().st_size,
            "lines_argument": args.lines,
        },
        "command": command,
    }

    if not args.skip_direct:
        results["direct"] = measure_direct(command)

    results["pty"] = measure_pty(command, cols=args.cols, rows=args.rows)

    print(json.dumps(results, indent=2))


if __name__ == "__main__":
    main()

Expected behavior

After the child process exits, the PTY should eventually reach EOF and allow any remaining output to be drained.

Actual behavior

For the Python producer:

  • direct pipe capture: about 0.2s
  • PTY: about 5.2s
  • the fixture is about 31.4 MB, but only about 370,981 chars are observed through the PTY before the drain window expires
  • get_exitstatus() returns 0, but eof_reached remains false and the post-exit drain times out

Representative result:

{
  "direct": {
    "elapsed_seconds": 0.2,
    "chars_per_second": 157381447.9,
    "mb_per_second": 150.09
  },
  "pty": {
    "backend": "conpty",
    "exitstatus": 0,
    "elapsed_seconds": 5.187,
    "total_chars": 370981,
    "reads": 32,
    "chars_per_second": 71520.3,
    "mb_per_second": 0.07,
    "mean_chars_per_read": 11593.2,
    "median_chars_per_read": 12365.0,
    "max_chars_per_read": 12366,
    "eof_reached": false,
    "post_exit_drain_timed_out": true
  }
}

Notes

  • I also saw eof_reached = false after process exit with type, pwsh Get-Content, and real cat.exe. For those producers the observed totals look roughly like a full stream plus terminal overhead, so the EOF bug can happen even when data delivery appears complete.
  • The Python producer is the clearest reproducer because it is just a straightforward copy-to-stdout and exit, and in this case only a small fraction of the expected output is observed before the drain window expires.
  • The PTY raw stream can be larger than the file because it includes CRLF line endings and ConPTY startup VT sequences, but that does not explain observing only a small fraction of the expected output here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions