StreamCoordinator

Correlates outbound server->client requests with the client's responses, which arrive on a *separate* POST. One instance is shared across all handlers of a single server mount.

Members

Functions

alloc
long alloc()

Allocate a fresh outbound request id.

allocStream
long allocStream()

Allocate a fresh stream ordinal. Combined with a per-stream monotonic event sequence, this yields globally-unique SSE event ids across every stream served by this mount, as the Resumability/Redelivery requirement demands ("If present, the ID MUST be globally unique ... within ... the session").

await
Json await(long id, Duration timeout, string sessionToken)

Block the current task until the client responds to (sessionToken, id) (or timeout elapses). Returns the result, or throws McpException on error/timeout.

awaitLive
Json awaitLive(long id, bool delegate() @(safe) alive, McpException disconnectError, Duration timeout, Duration slice, string sessionToken)

Block the current task until the client responds to (sessionToken, id), while polling alive in short slices so a dropped connection releases the awaiting fiber promptly rather than parking for the full timeout. When alive returns false before a response arrives, the pending request is failed with disconnectError (mirroring the GET path's failPendingForIds on listener disconnect) and that error is thrown. A null alive probe degrades to the plain await(id, timeout, sessionToken) behaviour.

cancel
void cancel(long id, string sessionToken)

Drop a registered-but-unawaited request (sessionToken, id) (e.g. when delivery failed so the request will never get a response). Idempotent; unknown keys are ignored. Keeps the waiter table from leaking when register is not followed by await.

failPending
void failPending(long id, McpException error, string sessionToken)

Fail a single still-pending outbound request (sessionToken, id) with error and wake its awaiting task immediately (mirrors DuplexCoordinator.failPending, but targeted at one waiter). Used when the GET SSE listener a server->client request was delivered on disconnects before the client could respond: rather than letting the awaiter block for the full timeout, it is released promptly with an McpException. Unknown keys are ignored.

failPendingForIds
void failPendingForIds(long[] ids, McpException error, string sessionToken)

Fail several pending outbound requests at once for one session (see failPending).

register
void register(long id, string sessionToken)

Begin tracking a pending outbound request, keyed by (sessionToken, id). Only a response that arrives on the SAME session token may resolve it, so one session cannot satisfy another session's pending server->client request even when both happen to use the same numeric id. The empty token registers an unscoped (stateless / shared mode) waiter, resolvable only by a reply that likewise carries the empty token.

resolve
bool resolve(Json idJson, Json result, Json error, string responderToken)

Deliver a client response/errorResponse from the peer identified by responderToken. Returns true if it matched a pending outbound request issued to the SAME session token. The waiter is looked up by (responderToken, id), so a reply arriving on one session can never resolve (or even reach) another session's waiter — closing the cross-session hijack without a separate owner check. A waiter registered under the empty token (stateless / shared mode) is matched only by a reply that likewise carries the empty token.