From 52cea673736edf1d4a5c53391a3b9e70452efa55 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Tue, 16 Jun 2026 14:18:58 -0700 Subject: [PATCH 1/3] fix(ocap-kernel): use length-prefixed framing for remote messages Replace byteStream with lpStream so write/read are 1:1 regardless of underlying webRTC chunking, which splits payloads >16 KB. Co-Authored-By: Claude Opus 4.7 --- .../platform/connection-factory.test.ts | 7 ++- .../remotes/platform/connection-factory.ts | 11 +++-- .../src/remotes/platform/handshake.test.ts | 5 +- .../src/remotes/platform/handshake.ts | 3 -- .../src/remotes/platform/transport.test.ts | 7 ++- .../src/remotes/platform/transport.ts | 49 +++++++++++++++---- packages/ocap-kernel/src/remotes/types.ts | 4 +- 7 files changed, 63 insertions(+), 23 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts b/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts index a1fe73f358..ad3c5746d9 100644 --- a/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/connection-factory.test.ts @@ -220,7 +220,7 @@ function makeTestMultiaddr(protoNames: string[], host: string) { }; } -// Simple ByteStream mock +// Simple LengthPrefixedStream mock type MockByteStream = { write: (chunk: Uint8Array) => Promise; read: () => Promise; @@ -229,7 +229,7 @@ type MockByteStream = { const streamMap = new WeakMap(); vi.mock('@libp2p/utils', () => ({ - byteStream: (stream: object) => { + lpStream: (stream: object) => { const bs: MockByteStream = { writes: [], async write(chunk: Uint8Array) { @@ -243,6 +243,9 @@ vi.mock('@libp2p/utils', () => ({ return bs; }, getByteStreamFor: (stream: object) => streamMap.get(stream), + InvalidDataLengthError: class InvalidDataLengthError extends Error {}, + InvalidDataLengthLengthError: class InvalidDataLengthLengthError extends Error {}, + UnexpectedEOFError: class UnexpectedEOFError extends Error {}, })); const createLibp2p = vi.fn(); diff --git a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts index 696db93ff1..3f3b56112a 100644 --- a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts +++ b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts @@ -10,7 +10,7 @@ import { } from '@libp2p/interface'; import type { PrivateKey, Libp2p } from '@libp2p/interface'; import { ping } from '@libp2p/ping'; -import { byteStream } from '@libp2p/utils'; +import { lpStream } from '@libp2p/utils'; import { webRTC } from '@libp2p/webrtc'; import { webSockets } from '@libp2p/websockets'; import { webTransport } from '@libp2p/webtransport'; @@ -26,6 +26,7 @@ import type { Multiaddr } from '@multiformats/multiaddr'; import { createLibp2p } from 'libp2p'; import { + DEFAULT_MAX_MESSAGE_SIZE_BYTES, RELAY_RECONNECT_BASE_DELAY_MS, RELAY_RECONNECT_MAX_DELAY_MS, RELAY_RECONNECT_MAX_ATTEMPTS, @@ -217,7 +218,9 @@ export class ConnectionFactory { // Set up inbound handler await this.#libp2p.handle('whatever', async (stream, connection) => { - const msgStream = byteStream(stream); + const msgStream = lpStream(stream, { + maxDataLength: DEFAULT_MAX_MESSAGE_SIZE_BYTES, + }); const remotePeerId = connection.remotePeer.toString(); const connType = connection.direct ? 'direct' : 'relayed'; this.#logger.log( @@ -401,7 +404,9 @@ export class ConnectionFactory { this.#logger.log( `successfully connected to ${peerId} via ${addressString}`, ); - const msgStream = byteStream(stream); + const msgStream = lpStream(stream, { + maxDataLength: DEFAULT_MAX_MESSAGE_SIZE_BYTES, + }); const channel: Channel = { msgStream, stream, peerId }; this.#logger.log(`opened channel to ${peerId}`); return channel; diff --git a/packages/ocap-kernel/src/remotes/platform/handshake.test.ts b/packages/ocap-kernel/src/remotes/platform/handshake.test.ts index a54f85e9ce..8e04823015 100644 --- a/packages/ocap-kernel/src/remotes/platform/handshake.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/handshake.test.ts @@ -1,3 +1,4 @@ +import { UnexpectedEOFError } from '@libp2p/utils'; import { Logger } from '@metamask/logger'; import { describe, it, expect, beforeEach, vi } from 'vitest'; @@ -298,11 +299,11 @@ describe('handshake', () => { it('throws when channel closes during read', async () => { vi.spyOn(mockChannel.msgStream, 'read') .mockImplementation() - .mockResolvedValueOnce(undefined); + .mockRejectedValueOnce(new UnexpectedEOFError('stream closed')); await expect( performInboundHandshake(mockChannel, mockDeps), - ).rejects.toThrow('Channel closed during handshake'); + ).rejects.toThrow(UnexpectedEOFError); }); }); }); diff --git a/packages/ocap-kernel/src/remotes/platform/handshake.ts b/packages/ocap-kernel/src/remotes/platform/handshake.ts index 9e8ccd2627..b6dc3219f1 100644 --- a/packages/ocap-kernel/src/remotes/platform/handshake.ts +++ b/packages/ocap-kernel/src/remotes/platform/handshake.ts @@ -89,9 +89,6 @@ async function readWithTimeout( const readPromise = (async () => { const readBuf = await channel.msgStream.read(); - if (!readBuf) { - throw new Error('Channel closed during handshake'); - } return bufToString(readBuf.subarray()); })(); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 32b6ad7fa2..b78a2f02e7 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -1,3 +1,4 @@ +import { UnexpectedEOFError } from '@libp2p/utils'; import { AbortError } from '@metamask/kernel-errors'; import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; import { @@ -748,8 +749,10 @@ describe('transport.initTransport', () => { await initTransport('0x1234', {}, remoteHandler); const mockChannel = createMockChannel('peer-1'); - // First read returns undefined, which means stream ended - loop should break - mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + // First read throws UnexpectedEOFError, which means stream ended - loop should break + mockChannel.msgStream.read.mockRejectedValueOnce( + new UnexpectedEOFError('stream closed'), + ); inboundHandler?.(mockChannel); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index ba39f0cf92..a963754c5f 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -1,5 +1,10 @@ import { StreamResetError } from '@libp2p/interface'; import type { StreamCloseEvent } from '@libp2p/interface'; +import { + InvalidDataLengthError, + InvalidDataLengthLengthError, + UnexpectedEOFError, +} from '@libp2p/utils'; import { AbortError, IntentionalCloseError, @@ -463,6 +468,38 @@ export async function initTransport( try { readBuf = await channel.msgStream.read(); } catch (problem) { + if ( + problem instanceof InvalidDataLengthError || + problem instanceof InvalidDataLengthLengthError + ) { + // Peer announced a payload larger than maxMessageSizeBytes. The + // length-prefixed framing is now poisoned (subsequent bytes are + // not on a message boundary), so we cannot continue on this + // stream. Surface a uniform "message too long" error to match the + // sender-side validator. + const sizeError = new ResourceLimitError( + `Inbound message exceeds size limit: ${problem.message}`, + { + cause: problem, + data: { limitType: 'messageSize' }, + }, + ); + outputError( + channel.peerId, + `reading message from ${channel.peerId}`, + sizeError, + ); + handleConnectionLoss(channel.peerId); + logger.log(`closed channel to ${channel.peerId}`); + throw sizeError; + } + if (problem instanceof UnexpectedEOFError) { + // Peer closed while we were waiting for the next message (or + // partway through one). Treat as graceful stream end — there is + // no in-flight kernel-level operation to recover. + logger.log(`${channel.peerId}:: stream ended`); + break; + } if (problem instanceof StreamResetError) { // Remote-initiated stream reset: treat as connection loss and // reconnect. Do NOT mark as intentionally closed — a malicious @@ -487,15 +524,9 @@ export async function initTransport( logger.log(`closed channel to ${channel.peerId}`); throw problem; } - if (readBuf) { - reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic - peerStateManager.updateConnectionTime(channel.peerId); - await receiveMessage(channel.peerId, bufToString(readBuf.subarray())); - } else { - // Stream ended (returned undefined), exit the read loop - logger.log(`${channel.peerId}:: stream ended`); - break; - } + reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic + peerStateManager.updateConnectionTime(channel.peerId); + await receiveMessage(channel.peerId, bufToString(readBuf.subarray())); } } finally { // Always remove the channel when readChannel exits to prevent stale channels diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 943be5d39f..8dd3bffde2 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -1,5 +1,5 @@ import type { Stream } from '@libp2p/interface'; -import type { ByteStream } from '@libp2p/utils'; +import type { LengthPrefixedStream } from '@libp2p/utils'; import type { Logger } from '@metamask/logger'; import type { KRef } from '../types.ts'; @@ -11,7 +11,7 @@ export type InboundConnectionHandler = ( export type PeerDisconnectHandler = (peerId: string) => void; export type Channel = { - msgStream: ByteStream; + msgStream: LengthPrefixedStream; stream: Stream; peerId: string; }; From 32c1a18e6a502701ba1aa388035d5dd756c91b9a Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Tue, 16 Jun 2026 16:13:57 -0700 Subject: [PATCH 2/3] docs(ocap-kernel): changelog entry for #957 Co-Authored-By: Claude Opus 4.7 --- packages/ocap-kernel/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/ocap-kernel/CHANGELOG.md b/packages/ocap-kernel/CHANGELOG.md index 9197180954..e2903d804a 100644 --- a/packages/ocap-kernel/CHANGELOG.md +++ b/packages/ocap-kernel/CHANGELOG.md @@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Regenerate `incarnationId` when `resetStorage=true` clears the rest of kernel state, completing the #948 peer-restart detection on browser/extension kernel reloads ([#950](https://github.com/MetaMask/ocap-kernel/pull/950)) - The previous except-list preserved `incarnationId` across `resetStorage` wipes, so a restarted sender signalled the same incarnation it had before the wipe and the matching receiver's handshake decided "no restart" — leaving stale `highestReceivedSeq` in place and silently dropping the sender's fresh `seq=1` messages - Register a new vat with its subcluster before awaiting `runVat`, so a garbage-collection pass during bundle load cannot delete the still-empty subcluster out from under the in-progress vat creation ([#952](https://github.com/MetaMask/ocap-kernel/pull/952)) +- Use length-prefixed framing for remote messages so payloads larger than the underlying transport's per-frame cutoff (e.g. `@libp2p/webrtc`'s 16 KB datachannel limit) are reassembled correctly on the receiver ([#957](https://github.com/MetaMask/ocap-kernel/pull/957)) + - Replace `byteStream` with `lpStream` on every remote channel; the byte-oriented stream did not preserve `write()` boundaries, so any message the transport split into multiple frames was parsed from the first frame only, silently dropped without acknowledgement, and the sender retried until giving up after `MAX_RETRIES` + - Surface receiver-side framing-cap violations (`InvalidDataLengthError`, `InvalidDataLengthLengthError`) as `ResourceLimitError` with `limitType: 'messageSize'` so size errors look the same whether they tripped on the sender's `validateMessageSize` or the receiver's framing decoder ## [0.7.0] From 383fb93798ade4387c2a2c6c9d8ed1ff5f54cee6 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Wed, 17 Jun 2026 10:38:29 -0700 Subject: [PATCH 3/3] fix(ocap-kernel): address PR #957 review (size threading, EOF, test) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread maxMessageSizeBytes through to ConnectionFactory and use it as maxDataLength on every lpStream so the sender's size limit and the receiver's framing cap stay in sync across deployment-time overrides. Stop treating UnexpectedEOFError as a graceful stream end — lpStream throws the same error for clean inter-message close and for partial in-flight message loss, and silently breaking left the sender retransmitting into a dead channel. Fall through to the existing non-intentional-disconnect path so reconnection is triggered. Add a regression test that drives a chunked transport through lpStream end-to-end: write a single 20 KB message over a streamPair with a 1 KB per-frame cap, verify it arrives as one read(); also verify back-to-back boundaries and the receiver-side size-cap rejection. Co-Authored-By: Claude Opus 4.7 --- .../remotes/platform/connection-factory.ts | 10 ++- .../src/remotes/platform/lp-framing.test.ts | 87 +++++++++++++++++++ .../src/remotes/platform/transport.test.ts | 18 ++-- .../src/remotes/platform/transport.ts | 9 +- packages/ocap-kernel/src/remotes/types.ts | 8 ++ 5 files changed, 117 insertions(+), 15 deletions(-) create mode 100644 packages/ocap-kernel/src/remotes/platform/lp-framing.test.ts diff --git a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts index 3f3b56112a..7dce604391 100644 --- a/packages/ocap-kernel/src/remotes/platform/connection-factory.ts +++ b/packages/ocap-kernel/src/remotes/platform/connection-factory.ts @@ -60,6 +60,8 @@ export class ConnectionFactory { readonly #maxRetryAttempts: number; + readonly #maxDataLength: number; + readonly #directTransports: DirectTransport[]; readonly #allowedWsHosts: string[]; @@ -88,6 +90,7 @@ export class ConnectionFactory { * @param options.logger - The logger to use for the libp2p node. * @param options.signal - The signal to use for the libp2p node. * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). + * @param options.maxMessageSizeBytes - Maximum inbound message size in bytes, used as `maxDataLength` on every `lpStream`. Defaults to 1 MB. * @param options.directTransports - Optional direct transports (e.g. QUIC, TCP) with listen addresses. * @param options.allowedWsHosts - Hostnames/IPs allowed for plain ws:// connections beyond private ranges. */ @@ -98,6 +101,8 @@ export class ConnectionFactory { this.#logger = options.logger; this.#signal = options.signal; this.#maxRetryAttempts = options.maxRetryAttempts ?? 0; + this.#maxDataLength = + options.maxMessageSizeBytes ?? DEFAULT_MAX_MESSAGE_SIZE_BYTES; this.#directTransports = options.directTransports ?? []; const explicitHosts = options.allowedWsHosts ?? []; const relayHosts: string[] = []; @@ -138,6 +143,7 @@ export class ConnectionFactory { * @param options.logger - The logger to use for the libp2p node. * @param options.signal - The signal to use for the libp2p node. * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). + * @param options.maxMessageSizeBytes - Maximum inbound message size in bytes, used as `maxDataLength` on every `lpStream`. Defaults to 1 MB. * @param options.directTransports - Optional direct transports (e.g. QUIC, TCP) with listen addresses. * @param options.allowedWsHosts - Hostnames/IPs allowed for plain ws:// connections beyond private ranges. * @returns A promise for the new ConnectionFactory instance. @@ -219,7 +225,7 @@ export class ConnectionFactory { // Set up inbound handler await this.#libp2p.handle('whatever', async (stream, connection) => { const msgStream = lpStream(stream, { - maxDataLength: DEFAULT_MAX_MESSAGE_SIZE_BYTES, + maxDataLength: this.#maxDataLength, }); const remotePeerId = connection.remotePeer.toString(); const connType = connection.direct ? 'direct' : 'relayed'; @@ -405,7 +411,7 @@ export class ConnectionFactory { `successfully connected to ${peerId} via ${addressString}`, ); const msgStream = lpStream(stream, { - maxDataLength: DEFAULT_MAX_MESSAGE_SIZE_BYTES, + maxDataLength: this.#maxDataLength, }); const channel: Channel = { msgStream, stream, peerId }; this.#logger.log(`opened channel to ${peerId}`); diff --git a/packages/ocap-kernel/src/remotes/platform/lp-framing.test.ts b/packages/ocap-kernel/src/remotes/platform/lp-framing.test.ts new file mode 100644 index 0000000000..fc16d8dd3c --- /dev/null +++ b/packages/ocap-kernel/src/remotes/platform/lp-framing.test.ts @@ -0,0 +1,87 @@ +import type { Stream } from '@libp2p/interface'; +import { lpStream, streamPair } from '@libp2p/utils'; +import { fromString, toString as bufToString } from 'uint8arrays'; +import { describe, expect, it } from 'vitest'; + +/** + * Regression test for the framing bug that motivated switching from + * `byteStream` to `lpStream`: when the underlying transport (e.g. + * `@libp2p/webrtc`) splits a single write across multiple frames, + * `byteStream`'s reader would wake on the first chunk and return a + * truncated payload. `lpStream` adds a length-prefix so the reader + * waits until the full message is present and returns it intact. + * + * Each test forces chunking by capping the underlying stream's + * `maxMessageSize` well below the payload size; the abstract stream + * splits the write into several `message` events on the receiver side. + * `lpStream.read()` should still return exactly one complete payload + * per `lpStream.write()`. + */ +describe('lpStream framing over a chunked transport', () => { + /** + * Build a connected pair of message streams whose underlying + * AbstractStream splits any write larger than `chunkSize` bytes into + * multiple `message` events on the receiving end. + * + * @param chunkSize - Per-frame cap to apply to both ends. + * @returns The outbound/inbound paired streams. + */ + async function chunkedStreamPair( + chunkSize: number, + ): Promise<[Stream, Stream]> { + return streamPair({ + outbound: { maxMessageSize: chunkSize }, + inbound: { maxMessageSize: chunkSize }, + }); + } + + it('reassembles a single payload that the transport splits into many frames', async () => { + const [outbound, inbound] = await chunkedStreamPair(1024); + const sender = lpStream(outbound, { maxDataLength: 1024 * 1024 }); + const receiver = lpStream(inbound, { maxDataLength: 1024 * 1024 }); + + // 20 KB payload — well above the 1 KB per-frame cap, so the transport + // will split it into ~20 frames on the receiver side. + const payload = 'A'.repeat(20_000); + await sender.write(fromString(payload)); + + const received = await receiver.read(); + expect(received.byteLength).toBe(20_000); + expect(bufToString(received.subarray())).toBe(payload); + }); + + it('preserves message boundaries when several large payloads are sent back-to-back', async () => { + const [outbound, inbound] = await chunkedStreamPair(2048); + const sender = lpStream(outbound, { maxDataLength: 1024 * 1024 }); + const receiver = lpStream(inbound, { maxDataLength: 1024 * 1024 }); + + const payloads = ['alpha', 'bravo', 'charlie'].map( + (label) => `${label}:${'x'.repeat(8_000)}`, + ); + for (const payload of payloads) { + await sender.write(fromString(payload)); + } + + for (const expected of payloads) { + const received = await receiver.read(); + expect(bufToString(received.subarray())).toBe(expected); + } + }); + + it('rejects an inbound message that announces a payload larger than maxDataLength', async () => { + const [outbound, inbound] = await chunkedStreamPair(1024); + const sender = lpStream(outbound, { maxDataLength: 1024 * 1024 }); + // Receiver caps inbound at 8 KB to exercise the cross-machine + // mismatch sirtimid called out: a sender that allows larger messages + // than the receiver does should produce a clean InvalidDataLengthError + // on the receiver, not a silent reassembly stall. + const receiver = lpStream(inbound, { maxDataLength: 8 * 1024 }); + + const oversized = fromString('B'.repeat(16_000)); + await sender.write(oversized); + + await expect(receiver.read()).rejects.toThrow( + /Message length too long|InvalidDataLength/u, + ); + }); +}); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index b78a2f02e7..fdb544ce7b 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -289,6 +289,7 @@ describe('transport.initTransport', () => { logger: expect.any(Object), signal: expect.any(AbortSignal), maxRetryAttempts: undefined, + maxMessageSizeBytes: 1024 * 1024, directTransports: undefined, }); }); @@ -306,6 +307,7 @@ describe('transport.initTransport', () => { logger: expect.any(Object), signal: expect.any(AbortSignal), maxRetryAttempts, + maxMessageSizeBytes: 1024 * 1024, directTransports: undefined, }); }); @@ -334,6 +336,7 @@ describe('transport.initTransport', () => { logger: expect.any(Object), signal: expect.any(AbortSignal), maxRetryAttempts: undefined, + maxMessageSizeBytes: 1024 * 1024, directTransports, }); }); @@ -737,7 +740,7 @@ describe('transport.initTransport', () => { }); }); - it('exits read loop when readBuf is undefined (stream ended)', async () => { + it('treats UnexpectedEOFError as connection loss and triggers reconnection', async () => { let inboundHandler: ((channel: MockChannel) => void) | undefined; mockConnectionFactory.onInboundConnection.mockImplementation( (handler) => { @@ -749,7 +752,12 @@ describe('transport.initTransport', () => { await initTransport('0x1234', {}, remoteHandler); const mockChannel = createMockChannel('peer-1'); - // First read throws UnexpectedEOFError, which means stream ended - loop should break + // lpStream throws UnexpectedEOFError on EOF — both for clean + // end-of-stream between messages and for a partial-message drop. + // The read loop can't distinguish the two, so it must treat any + // EOF as a potential mid-message drop and trigger reconnection + // (rather than silently exiting and leaving the sender to + // retransmit into a dead channel). mockChannel.msgStream.read.mockRejectedValueOnce( new UnexpectedEOFError('stream closed'), ); @@ -757,10 +765,10 @@ describe('transport.initTransport', () => { inboundHandler?.(mockChannel); await vi.waitFor(() => { - // Stream ended, so no messages should be processed expect(remoteHandler).not.toHaveBeenCalled(); - // Should log that stream ended - expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-1', + ); }); }); diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index a963754c5f..908ea3c4cc 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -3,7 +3,6 @@ import type { StreamCloseEvent } from '@libp2p/interface'; import { InvalidDataLengthError, InvalidDataLengthLengthError, - UnexpectedEOFError, } from '@libp2p/utils'; import { AbortError, @@ -176,6 +175,7 @@ export async function initTransport( logger, signal, maxRetryAttempts, + maxMessageSizeBytes, directTransports, allowedWsHosts, }); @@ -493,13 +493,6 @@ export async function initTransport( logger.log(`closed channel to ${channel.peerId}`); throw sizeError; } - if (problem instanceof UnexpectedEOFError) { - // Peer closed while we were waiting for the next message (or - // partway through one). Treat as graceful stream end — there is - // no in-flight kernel-level operation to recover. - logger.log(`${channel.peerId}:: stream ended`); - break; - } if (problem instanceof StreamResetError) { // Remote-initiated stream reset: treat as connection loss and // reconnect. Do NOT mark as intentionally closed — a malicious diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 8dd3bffde2..a84e8fe080 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -211,6 +211,14 @@ export type ConnectionFactoryOptions = { logger: Logger; signal: AbortSignal; maxRetryAttempts?: number | undefined; + /** + * Maximum inbound message payload size in bytes. Used as `maxDataLength` + * on every `lpStream` constructed for a channel — must match the + * sender-side validator's limit (`maxMessageSizeBytes` on + * `RemoteCommsOptions`) so that a deployment which raises one also raises + * the other. Defaults to `DEFAULT_MAX_MESSAGE_SIZE_BYTES` (1 MB). + */ + maxMessageSizeBytes?: number | undefined; directTransports?: DirectTransport[] | undefined; allowedWsHosts?: string[] | undefined; };