@@ -25,6 +25,7 @@ let api = function Binance() {
2525 const HttpsProxyAgent = require ( 'https-proxy-agent' ) ;
2626 const SocksProxyAgent = require ( 'socks-proxy-agent' ) ;
2727 const stringHash = require ( 'string-hash' ) ;
28+ const async = require ( 'async' ) ;
2829 const base = 'https://api.binance.com/api/' ;
2930 const wapi = 'https://api.binance.com/wapi/' ;
3031 const stream = 'wss://stream.binance.com:9443/ws/' ;
@@ -1666,26 +1667,38 @@ let api = function Binance() {
16661667 }
16671668 } ;
16681669
1669- let getSymbolDepthSnapshot = function ( symbol ) {
1670- publicRequest ( base + 'v1/depth' , { symbol :symbol , limit :limit } , function ( error , json ) {
1671- // Initialize depth cache from snapshot
1672- depthCache [ symbol ] = depthData ( json ) ;
1673- // Prepare depth cache context
1674- let context = depthCacheContext [ symbol ] ;
1675- context . snapshotUpdateId = json . lastUpdateId ;
1676- context . messageQueue = context . messageQueue . filter ( depth => depth . u > context . snapshotUpdateId ) ;
1677- // Process any pending depth messages
1678- for ( let depth of context . messageQueue ) {
1679-
1680- /* Although sync errors shouldn't ever happen here, we catch and swallow them anyway
1681- just in case. The stream handler function above will deal with broken caches. */
1682- try { depthHandler ( depth ) ; } catch ( err ) {
1683- // do nothing
1684- }
1670+ let getSymbolDepthSnapshot = function ( symbol , cb ) {
1671+
1672+ publicRequest ( base + 'v1/depth' , { symbol :symbol , limit :limit } , function ( error , json ) {
1673+ if ( error ) {
1674+ return cb ( error , null ) ;
1675+ }
1676+ // Store symbol next use
1677+ json . symb = symbol ;
1678+ cb ( null , json )
1679+ } ) ;
1680+ } ;
1681+
1682+ let updateSymbolDepthCache = function ( json ) {
1683+ // Get previous store symbol
1684+ let symbol = json . symb ;
1685+ // Initialize depth cache from snapshot
1686+ depthCache [ symbol ] = depthData ( json ) ;
1687+ // Prepare depth cache context
1688+ let context = depthCacheContext [ symbol ] ;
1689+ context . snapshotUpdateId = json . lastUpdateId ;
1690+ context . messageQueue = context . messageQueue . filter ( depth => depth . u > context . snapshotUpdateId ) ;
1691+ // Process any pending depth messages
1692+ for ( let depth of context . messageQueue ) {
1693+
1694+ /* Although sync errors shouldn't ever happen here, we catch and swallow them anyway
1695+ just in case. The stream handler function above will deal with broken caches. */
1696+ try { depthHandler ( depth ) ; } catch ( err ) {
1697+ // do nothing
16851698 }
1686- delete context . messageQueue ;
1687- if ( callback ) callback ( symbol , depthCache [ symbol ] ) ;
1688- } ) ;
1699+ }
1700+ delete context . messageQueue ;
1701+ if ( callback ) callback ( symbol , depthCache [ symbol ] ) ;
16891702 } ;
16901703
16911704 /* If an array of symbols are sent we use a combined stream connection rather.
@@ -1700,14 +1713,20 @@ let api = function Binance() {
17001713 return symbol . toLowerCase ( ) + '@depth' ;
17011714 } ) ;
17021715 subscription = subscribeCombined ( streams , handleDepthStreamData , reconnect , function ( ) {
1703- symbols . forEach ( getSymbolDepthSnapshot ) ;
1716+ async . mapLimit ( symbols , symbols . length , getSymbolDepthSnapshot , ( err , results ) => {
1717+ if ( err ) throw err
1718+ results . forEach ( updateSymbolDepthCache ) ;
1719+ } ) ;
17041720 } ) ;
17051721 symbols . forEach ( s => assignEndpointIdToContext ( s , subscription . endpoint ) ) ;
17061722 } else {
17071723 let symbol = symbols ;
17081724 symbolDepthInit ( symbol ) ;
17091725 subscription = subscribe ( symbol . toLowerCase ( ) + '@depth' , handleDepthStreamData , reconnect , function ( ) {
1710- getSymbolDepthSnapshot ( symbol ) ;
1726+ async . mapLimit ( [ symbol ] , 1 , getSymbolDepthSnapshot , ( err , results ) => {
1727+ if ( err ) throw err
1728+ results . forEach ( updateSymbolDepthCache ) ;
1729+ } ) ;
17111730 } ) ;
17121731 assignEndpointIdToContext ( symbol , subscription . endpoint ) ;
17131732 }
0 commit comments