@@ -234,46 +234,23 @@ function done(stream, er, data) {
234234}
235235
236236const from = require ( 'internal/streams/from' ) ;
237- const createReadableStreamAsyncIterator = require ( 'internal/streams/async_iterator' ) ;
238237
239238Transform . by = function by ( asyncGeneratorFn , opts ) {
240- let resume = null ;
241- function next ( ) {
242- if ( resume ) {
243- const doResume = resume ;
244- resume = null ;
245- doResume ( ) ;
239+ let _resolve ;
240+ let _promise = new Promise ( ( resolve ) => _resolve = resolve ) ;
241+ return from ( Duplex , asyncGeneratorFn ( async function * ( ) {
242+ while ( true ) {
243+ const { chunk, done, cb } = await _promise ;
244+ if ( done ) return cb ( ) ;
245+ yield chunk ;
246+ _promise = new Promise ( ( resolve ) => _resolve = resolve ) ;
247+ cb ( ) ;
246248 }
247- }
248- const input = new Readable ( {
249- objectMode : true ,
250- autoDestroy : true ,
251- read : next ,
252- highWaterMark : 1 , // TODO: Buffer here?
253- destroy ( err , callback ) {
254- next ( ) ;
255- callback ( err ) ;
256- }
257- } ) ;
258-
259- const iterator = createReadableStreamAsyncIterator ( input ) ;
260- return from ( Duplex , asyncGeneratorFn ( iterator ) , {
249+ } ( ) ) , {
261250 objectMode : true ,
262251 autoDestroy : true ,
263252 ...opts ,
264- write ( chunk , encoding , callback ) {
265- if ( ! input . push ( chunk ) ) {
266- resume = callback ;
267- } else {
268- callback ( ) ;
269- }
270- } ,
271- final ( callback ) {
272- input . push ( null ) ;
273- resume = callback ;
274- } ,
275- destroy ( err , callback ) {
276- input . destroy ( err , callback ) ;
277- }
253+ write : ( chunk , encoding , cb ) => _resolve ( { chunk, done : false , cb } ) ,
254+ final : ( cb ) => _resolve ( { done : true , cb } )
278255 } ) ;
279256} ;
0 commit comments