@@ -322,22 +322,22 @@ async function handleResumeRequestBody({
322322 const flushEvents = async ( ) => {
323323 const events = await readEvents ( streamId , cursor )
324324 if ( events . length > 0 ) {
325- totalEventsFlushed += events . length
326325 logger . debug ( '[Resume] Flushing events' , {
327326 streamId,
328327 afterCursor : cursor ,
329328 eventCount : events . length ,
330329 } )
331330 }
332331 for ( const envelope of events ) {
332+ if ( ! enqueueEvent ( envelope ) ) {
333+ break
334+ }
335+ totalEventsFlushed += 1
333336 cursor = envelope . stream . cursor ?? String ( envelope . seq )
334337 currentRequestId = extractEnvelopeRequestId ( envelope ) || currentRequestId
335338 if ( envelope . type === MothershipStreamV1EventType . complete ) {
336339 sawTerminalEvent = true
337340 }
338- if ( ! enqueueEvent ( envelope ) ) {
339- break
340- }
341341 }
342342 }
343343
@@ -357,13 +357,13 @@ async function handleResumeRequestBody({
357357 reason : options ?. reason ,
358358 requestId : currentRequestId ,
359359 } ) ) {
360+ if ( ! enqueueEvent ( envelope ) ) {
361+ break
362+ }
360363 cursor = envelope . stream . cursor ?? String ( envelope . seq )
361364 if ( envelope . type === MothershipStreamV1EventType . complete ) {
362365 sawTerminalEvent = true
363366 }
364- if ( ! enqueueEvent ( envelope ) ) {
365- break
366- }
367367 }
368368 }
369369
@@ -373,14 +373,14 @@ async function handleResumeRequestBody({
373373 const gap = await checkForReplayGap ( streamId , afterCursor , currentRequestId )
374374 if ( gap ) {
375375 for ( const envelope of gap . envelopes ) {
376+ if ( ! enqueueEvent ( envelope ) ) {
377+ break
378+ }
376379 cursor = envelope . stream . cursor ?? String ( envelope . seq )
377380 currentRequestId = extractEnvelopeRequestId ( envelope ) || currentRequestId
378381 if ( envelope . type === MothershipStreamV1EventType . complete ) {
379382 sawTerminalEvent = true
380383 }
381- if ( ! enqueueEvent ( envelope ) ) {
382- break
383- }
384384 }
385385 return
386386 }
0 commit comments