@@ -53,11 +53,13 @@ class FakeMessageBatch {
5353 created : number ;
5454 messages : p . PubsubMessage [ ] ;
5555 options : b . BatchPublishOptions ;
56+ bytes : number ;
5657 constructor ( options = { } as b . BatchPublishOptions ) {
5758 this . callbacks = [ ] ;
5859 this . created = Date . now ( ) ;
5960 this . messages = [ ] ;
6061 this . options = options ;
62+ this . bytes = 0 ;
6163 }
6264 // eslint-disable-next-line @typescript-eslint/no-unused-vars
6365 add ( message : p . PubsubMessage , callback : p . PublishCallback ) : void { }
@@ -332,6 +334,75 @@ describe('Message Queues', () => {
332334 assert . strictEqual ( messages , batch . messages ) ;
333335 assert . strictEqual ( callbacks , batch . callbacks ) ;
334336 } ) ;
337+
338+ describe ( 'publish chaining' , ( ) => {
339+ let fakeMessages : p . PubsubMessage [ ] ;
340+ let spies : p . PublishCallback [ ] ;
341+ beforeEach ( ( ) => {
342+ fakeMessages = [ { } , { } ] as p . PubsubMessage [ ] ;
343+ spies = [ sandbox . spy ( ) , sandbox . spy ( ) ] as p . PublishCallback [ ] ;
344+ } ) ;
345+
346+ it ( 'should begin another publish(drain) if there are pending batches' , ( ) => {
347+ const stub = sandbox . stub ( queue , '_publish' ) ;
348+ let once = false ;
349+ stub . callsFake ( ( m , c , done ) => {
350+ if ( ! once ) {
351+ // Drop in a second batch before calling the callback.
352+ const secondBatch = new FakeMessageBatch ( ) ;
353+ secondBatch . messages = fakeMessages ;
354+ secondBatch . callbacks = spies ;
355+ queue . batch = secondBatch ;
356+ }
357+ once = true ;
358+
359+ done ! ( null ) ;
360+ } ) ;
361+
362+ queue . batch = new FakeMessageBatch ( ) ;
363+ queue . batch . messages = fakeMessages ;
364+ queue . batch . callbacks = spies ;
365+ queue . publishDrain ( ) ;
366+
367+ assert . strictEqual ( stub . callCount , 2 ) ;
368+ } ) ;
369+
370+ it ( 'should not begin another publish(non-drain) if there are pending batches' , ( ) => {
371+ const stub = sandbox . stub ( queue , '_publish' ) ;
372+ let once = false ;
373+ stub . callsFake ( ( m , c , done ) => {
374+ if ( ! once ) {
375+ // Drop in a second batch before calling the callback.
376+ const secondBatch = new FakeMessageBatch ( ) ;
377+ secondBatch . messages = fakeMessages ;
378+ secondBatch . callbacks = spies ;
379+ queue . batch = secondBatch ;
380+ }
381+ once = true ;
382+
383+ done ! ( null ) ;
384+ } ) ;
385+
386+ queue . batch = new FakeMessageBatch ( ) ;
387+ queue . batch . messages = fakeMessages ;
388+ queue . batch . callbacks = spies ;
389+ queue . publish ( ) ;
390+
391+ assert . strictEqual ( stub . callCount , 1 ) ;
392+ } ) ;
393+
394+ it ( 'should emit "drain" if there is nothing left to publish' , ( ) => {
395+ const spy = sandbox . spy ( ) ;
396+ sandbox
397+ . stub ( queue , '_publish' )
398+ . callsFake ( ( m , c , done ) => done ! ( null ) ) ;
399+
400+ queue . on ( 'drain' , spy ) ;
401+ queue . publish ( ) ;
402+
403+ assert . strictEqual ( spy . callCount , 1 ) ;
404+ } ) ;
405+ } ) ;
335406 } ) ;
336407 } ) ;
337408
0 commit comments