From ddd2a378a6c1273594ec21dff8c37cda0ab2e1ac Mon Sep 17 00:00:00 2001 From: David Hayes Date: Sat, 21 Feb 2026 13:02:12 +0000 Subject: [PATCH 1/5] fix: change _handle_reconnection to return bool and fix infinite recursion _handle_reconnection previously returned None, making it impossible for callers to distinguish between a successful response delivery and exhausted retries. This changes the return type to bool (True on success, False when max attempts exceeded) and fixes two issues: - The attempt counter at line 426 was reset to 0 on each reconnection, causing infinite recursion when streams kept ending without delivering a response. Now increments attempt on each recursion. - All recursive calls now use `return await` so the result propagates back to the original caller. MAX_RECONNECTION_ATTEMPTS increased from 2 to 5 to accommodate legitimate multi-reconnection patterns where the server intentionally closes streams between checkpoints. Github-Issue: #1401 --- src/mcp/client/streamable_http.py | 23 +++++++++------- tests/shared/test_streamable_http.py | 39 +++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9f3dd5e0b..0a50f53b5 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -47,7 +47,7 @@ # Reconnection defaults DEFAULT_RECONNECTION_DELAY_MS = 1000 # 1 second fallback when server doesn't provide retry -MAX_RECONNECTION_ATTEMPTS = 2 # Max retry attempts before giving up +MAX_RECONNECTION_ATTEMPTS = 5 # Max retry attempts before giving up class StreamableHTTPError(Exception): @@ -377,12 +377,17 @@ async def _handle_reconnection( last_event_id: str, retry_interval_ms: int | None = None, attempt: int = 0, - ) -> None: - """Reconnect with Last-Event-ID to resume stream after server disconnect.""" + ) -> bool: + """Reconnect with Last-Event-ID to resume stream after server disconnect. + + Returns: + True if the response was successfully delivered, False if max + reconnection attempts were exceeded without delivering a response. + """ # Bail if max retries exceeded - if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover + if attempt >= MAX_RECONNECTION_ATTEMPTS: logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") - return + return False # Always wait - use server value or default delay_ms = retry_interval_ms if retry_interval_ms is not None else DEFAULT_RECONNECTION_DELAY_MS @@ -419,15 +424,15 @@ async def _handle_reconnection( ) if is_complete: await event_source.response.aclose() - return + return True - # Stream ended again without response - reconnect again (reset attempt counter) + # Stream ended again without response - reconnect again logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) + return await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, attempt + 1) except Exception as e: # pragma: no cover logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID - await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1) + return await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1) async def post_writer( self, diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 42b1a3698..55c08f0f9 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -29,7 +29,14 @@ from mcp import MCPError, types from mcp.client.session import ClientSession -from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client +from mcp.client.streamable_http import ( + MAX_RECONNECTION_ATTEMPTS, + StreamableHTTPTransport, + streamable_http_client, +) +from mcp.client.streamable_http import ( + RequestContext as TransportRequestContext, +) from mcp.server import Server, ServerRequestContext from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, @@ -2247,3 +2254,33 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_handle_reconnection_returns_false_on_max_attempts(): + """_handle_reconnection returns False when max attempts exceeded.""" + transport = StreamableHTTPTransport(url="http://localhost:9999/mcp") + + read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](1) + + message = JSONRPCRequest(jsonrpc="2.0", id=42, method="tools/call", params={"name": "test"}) + session_message = SessionMessage(message) + + ctx = TransportRequestContext( + client=httpx.AsyncClient(), + session_id="test-session", + session_message=session_message, + metadata=None, + read_stream_writer=read_stream_writer, + ) + + try: + with anyio.fail_after(5): + result = await transport._handle_reconnection( + ctx, last_event_id="evt-1", retry_interval_ms=None, attempt=MAX_RECONNECTION_ATTEMPTS + ) + assert result is False + finally: + await read_stream_writer.aclose() + await read_stream.aclose() + await ctx.client.aclose() From 75cd668aa8dd8a67a673b1dc5315dd9f88c719de Mon Sep 17 00:00:00 2001 From: David Hayes Date: Sat, 21 Feb 2026 13:09:49 +0000 Subject: [PATCH 2/5] fix: propagate error when SSE stream ends without a response When an SSE stream ends prematurely (e.g. due to a read timeout), the client would hang forever waiting for a response that will never arrive. Now _handle_sse_response checks the return value of _handle_reconnection and, if reconnection did not deliver a response, sends a JSONRPCError with INTERNAL_ERROR to the read stream. This unblocks the waiting request and surfaces the failure as an MCPError to the caller. Github-Issue: #1401 --- src/mcp/client/streamable_http.py | 14 ++++++++++---- tests/shared/test_streamable_http.py | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 0a50f53b5..7c2fea5d4 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -364,12 +364,18 @@ async def _handle_sse_response( await response.aclose() return # Normal completion, no reconnect needed except Exception: - logger.debug("SSE stream ended", exc_info=True) # pragma: no cover + logger.debug("SSE stream error", exc_info=True) - # Stream ended without response - reconnect if we received an event with ID - if last_event_id is not None: # pragma: no branch + # Stream ended without a complete response — attempt reconnection if possible + if last_event_id is not None: logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) + if await self._handle_reconnection(ctx, last_event_id, retry_interval_ms): + return # Reconnection delivered the response + + # No response delivered — unblock the waiting request with an error + error_data = ErrorData(code=INTERNAL_ERROR, message="SSE stream ended without a response") + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) + await ctx.read_stream_writer.send(error_msg) async def _handle_reconnection( self, diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 55c08f0f9..f91c869ba 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2256,6 +2256,25 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert headers_data["content-type"] == "application/json" +@pytest.mark.anyio +async def test_sse_read_timeout_propagates_error(basic_server: None, basic_server_url: str): + """SSE read timeout should propagate MCPError instead of hanging.""" + # Create client with very short SSE read timeout + short_timeout = httpx.Timeout(30.0, read=0.5) + async with httpx.AsyncClient(timeout=short_timeout, follow_redirects=True) as http_client: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=http_client) as ( + read_stream, + write_stream, + ): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + # Read a "slow" resource that takes 2s — longer than our 0.5s read timeout + with pytest.raises(MCPError): + with anyio.fail_after(10): + await session.read_resource("slow://test") + + @pytest.mark.anyio async def test_handle_reconnection_returns_false_on_max_attempts(): """_handle_reconnection returns False when max attempts exceeded.""" From 0efb39253a245d0edb69c43311adb0e558e3ac30 Mon Sep 17 00:00:00 2001 From: David Hayes Date: Sat, 21 Feb 2026 13:12:51 +0000 Subject: [PATCH 3/5] fix: stop infinite GET stream reconnection on repeated empty connections Only reset the attempt counter when events were actually received during the connection. Connections that close immediately without delivering events now count toward MAX_RECONNECTION_ATTEMPTS. Github-Issue:#1401 --- src/mcp/client/streamable_http.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 7c2fea5d4..df5986656 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -197,7 +197,9 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer: event_source.response.raise_for_status() logger.debug("GET SSE connection established") + received_events = False async for sse in event_source.aiter_sse(): + received_events = True # Track last event ID for reconnection if sse.id: last_event_id = sse.id @@ -207,8 +209,12 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer: await self._handle_sse_event(sse, read_stream_writer) - # Stream ended normally (server closed) - reset attempt counter - attempt = 0 + # Only reset attempts if we actually received events; + # empty connections count toward MAX_RECONNECTION_ATTEMPTS + if received_events: + attempt = 0 + else: + attempt += 1 except Exception: # pragma: lax no cover logger.debug("GET stream error", exc_info=True) From ae0bc81a22ba997a2de1eac6c6fd83b58594f37d Mon Sep 17 00:00:00 2001 From: David Hayes Date: Sat, 21 Feb 2026 13:12:09 +0000 Subject: [PATCH 4/5] Log warning for unhandled exceptions in default message handler Transport errors that are not tied to a specific pending request (e.g., GET stream failures) were silently swallowed by the default message handler. Add a warning log so these exceptions are at least visible in logs as an observability safety net. Github-Issue: #1401 --- src/mcp/client/session.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mcp/client/session.py b/src/mcp/client/session.py index a0ca751bd..e165001f2 100644 --- a/src/mcp/client/session.py +++ b/src/mcp/client/session.py @@ -57,6 +57,8 @@ async def __call__( async def _default_message_handler( message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception, ) -> None: + if isinstance(message, Exception): + logger.warning("Unhandled exception in message handler: %s", message) await anyio.lowlevel.checkpoint() From f0af07e10e3b7fb6333806468532089ff5edf03a Mon Sep 17 00:00:00 2001 From: David Hayes Date: Sat, 21 Feb 2026 13:34:33 +0000 Subject: [PATCH 5/5] test: cover branch where reconnection exhausts max attempts Add test_sse_error_when_reconnection_exhausted to exercise the _handle_sse_response path where SSE events are received (setting last_event_id) but reconnection fails, ensuring the JSONRPCError is sent to unblock the waiting request. --- src/mcp/client/streamable_http.py | 5 +---- tests/shared/test_streamable_http.py | 33 +++++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index df5986656..963e4e33c 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -211,10 +211,7 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer: # Only reset attempts if we actually received events; # empty connections count toward MAX_RECONNECTION_ATTEMPTS - if received_events: - attempt = 0 - else: - attempt += 1 + attempt = 0 if received_events else attempt + 1 except Exception: # pragma: lax no cover logger.debug("GET stream error", exc_info=True) diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index f91c869ba..83fa25d3f 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2266,15 +2266,42 @@ async def test_sse_read_timeout_propagates_error(basic_server: None, basic_serve read_stream, write_stream, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() # Read a "slow" resource that takes 2s — longer than our 0.5s read timeout - with pytest.raises(MCPError): - with anyio.fail_after(10): + with pytest.raises(MCPError): # pragma: no branch + with anyio.fail_after(10): # pragma: no branch await session.read_resource("slow://test") +@pytest.mark.anyio +async def test_sse_error_when_reconnection_exhausted( + event_server: tuple[SimpleEventStore, str], + monkeypatch: pytest.MonkeyPatch, +): + """When SSE stream closes after events and reconnection fails, MCPError is raised.""" + _, server_url = event_server + + async def _always_fail_reconnection( + self: Any, ctx: Any, last_event_id: Any, retry_interval_ms: Any = None, attempt: int = 0 + ) -> bool: + return False + + monkeypatch.setattr(StreamableHTTPTransport, "_handle_reconnection", _always_fail_reconnection) + + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch + await session.initialize() + + # tool_with_stream_close sends a priming event (setting last_event_id), + # then closes the SSE stream. With reconnection patched to fail, + # _handle_sse_response falls through to send the error. + with pytest.raises(MCPError): # pragma: no branch + with anyio.fail_after(10): # pragma: no branch + await session.call_tool("tool_with_stream_close", {}) + + @pytest.mark.anyio async def test_handle_reconnection_returns_false_on_max_attempts(): """_handle_reconnection returns False when max attempts exceeded."""