@@ -58,12 +58,16 @@ import {
5858 WorkspaceFile ,
5959 WorkspaceFileOperation ,
6060} from '@/lib/copilot/generated/tool-catalog-v1'
61- import { parsePersistedStreamEventEnvelopeJson } from '@/lib/copilot/request/session/contract'
61+ import {
62+ type ParseStreamEventEnvelopeFailure ,
63+ parsePersistedStreamEventEnvelope ,
64+ parsePersistedStreamEventEnvelopeJson ,
65+ } from '@/lib/copilot/request/session/contract'
6266import {
6367 type FilePreviewSession ,
6468 isFilePreviewSession ,
6569} from '@/lib/copilot/request/session/file-preview-session-contract'
66- import { isStreamBatchEvent , type StreamBatchEvent } from '@/lib/copilot/request/session/types'
70+ import type { StreamBatchEvent } from '@/lib/copilot/request/session/types'
6771import {
6872 extractResourcesFromToolResult ,
6973 isResourceToolName ,
@@ -509,27 +513,75 @@ function isRecord(value: unknown): value is Record<string, unknown> {
509513 return Boolean ( value ) && typeof value === 'object' && ! Array . isArray ( value )
510514}
511515
516+ const STREAM_SCHEMA_ENFORCEMENT_PREFIX = 'Client stream schema enforcement failed.'
517+
518+ class StreamSchemaValidationError extends Error {
519+ constructor ( message : string ) {
520+ super ( message )
521+ this . name = 'StreamSchemaValidationError'
522+ }
523+ }
524+
525+ function createStreamSchemaValidationError (
526+ failure : ParseStreamEventEnvelopeFailure ,
527+ context ?: string
528+ ) : StreamSchemaValidationError {
529+ const details = failure . errors ?. filter ( Boolean ) . join ( '; ' )
530+ return new StreamSchemaValidationError (
531+ [ STREAM_SCHEMA_ENFORCEMENT_PREFIX , context , failure . message , details ] . filter ( Boolean ) . join ( ' ' )
532+ )
533+ }
534+
535+ function createBatchSchemaValidationError ( message : string ) : StreamSchemaValidationError {
536+ return new StreamSchemaValidationError ( [ STREAM_SCHEMA_ENFORCEMENT_PREFIX , message ] . join ( ' ' ) )
537+ }
538+
539+ function isStreamSchemaValidationError ( error : unknown ) : error is StreamSchemaValidationError {
540+ return error instanceof StreamSchemaValidationError
541+ }
542+
512543function parseStreamBatchResponse ( value : unknown ) : StreamBatchResponse {
513544 if ( ! isRecord ( value ) ) {
514545 throw new Error ( 'Invalid stream batch response' )
515546 }
516547
517548 const rawEvents = Array . isArray ( value . events ) ? value . events : [ ]
518549 const events : StreamBatchEvent [ ] = [ ]
519- for ( const entry of rawEvents ) {
520- if ( ! isStreamBatchEvent ( entry ) ) {
521- throw new Error ( 'Invalid stream batch event' )
550+ for ( const [ index , entry ] of rawEvents . entries ( ) ) {
551+ if ( ! isRecord ( entry ) ) {
552+ throw createBatchSchemaValidationError ( `Reconnect batch event ${ index + 1 } is not an object.` )
553+ }
554+ if (
555+ typeof entry . eventId !== 'number' ||
556+ ! Number . isFinite ( entry . eventId ) ||
557+ typeof entry . streamId !== 'string'
558+ ) {
559+ throw createBatchSchemaValidationError (
560+ `Reconnect batch event ${ index + 1 } is missing required metadata.`
561+ )
522562 }
523- events . push ( entry )
563+
564+ const parsedEvent = parsePersistedStreamEventEnvelope ( entry . event )
565+ if ( ! parsedEvent . ok ) {
566+ throw createStreamSchemaValidationError ( parsedEvent , `Reconnect batch event ${ index + 1 } .` )
567+ }
568+
569+ events . push ( {
570+ eventId : entry . eventId ,
571+ streamId : entry . streamId ,
572+ event : parsedEvent . event ,
573+ } )
524574 }
525575
526576 const rawPreviewSessions = Array . isArray ( value . previewSessions )
527577 ? value . previewSessions
528578 : undefined
529579 const previewSessions =
530- rawPreviewSessions ?. map ( ( session ) => {
580+ rawPreviewSessions ?. map ( ( session , index ) => {
531581 if ( ! isFilePreviewSession ( session ) ) {
532- throw new Error ( 'Invalid stream preview session' )
582+ throw createBatchSchemaValidationError (
583+ `Reconnect preview session ${ index + 1 } failed validation.`
584+ )
533585 }
534586 return session
535587 } ) ?? undefined
@@ -1579,12 +1631,14 @@ export function useChat(
15791631
15801632 const parsedResult = parsePersistedStreamEventEnvelopeJson ( raw )
15811633 if ( ! parsedResult . ok ) {
1582- logger . warn ( 'Failed to parse chat SSE event' , {
1634+ const error = createStreamSchemaValidationError ( parsedResult , 'Live SSE event.' )
1635+ logger . error ( 'Rejected chat SSE event due to client-side schema enforcement' , {
15831636 reason : parsedResult . reason ,
15841637 message : parsedResult . message ,
15851638 errors : parsedResult . errors ,
1639+ error : error . message ,
15861640 } )
1587- continue
1641+ throw error
15881642 }
15891643 const parsed = parsedResult . event
15901644
@@ -2533,6 +2587,17 @@ export function useChat(
25332587 }
25342588 return true
25352589 }
2590+ if ( isStreamSchemaValidationError ( err ) ) {
2591+ logger . error ( 'Reconnect halted by client-side stream schema enforcement' , {
2592+ streamId,
2593+ attempt : attempt + 1 ,
2594+ error : err . message ,
2595+ } )
2596+ if ( streamGenRef . current === gen ) {
2597+ setError ( err . message )
2598+ }
2599+ return false
2600+ }
25362601 logger . warn ( 'Reconnect attempt failed' , {
25372602 streamId,
25382603 attempt : attempt + 1 ,
@@ -2892,6 +2957,13 @@ export function useChat(
28922957 }
28932958 } catch ( err ) {
28942959 if ( err instanceof Error && err . name === 'AbortError' ) return consumedByTranscript
2960+ if ( isStreamSchemaValidationError ( err ) ) {
2961+ setError ( err . message )
2962+ if ( streamGenRef . current === gen ) {
2963+ finalize ( { error : true } )
2964+ }
2965+ return consumedByTranscript
2966+ }
28952967
28962968 const activeStreamId = streamIdRef . current
28972969 if ( activeStreamId && streamGenRef . current === gen ) {
0 commit comments