1- package com .xxdb .streaming .cep ;
1+ package com .xxdb .streaming .client . cep ;
22
33import com .xxdb .DBConnection ;
44import com .xxdb .comm .ErrorCodeInfo ;
99import com .xxdb .streaming .client .AbstractClient ;
1010import com .xxdb .streaming .client .IMessage ;
1111import com .xxdb .streaming .client .MessageHandler ;
12+ import com .xxdb .streaming .client .Site ;
1213import org .slf4j .Logger ;
1314import org .slf4j .LoggerFactory ;
1415import java .io .IOException ;
1516import java .net .SocketException ;
17+ import java .text .DateFormat ;
18+ import java .text .SimpleDateFormat ;
1619import java .util .ArrayList ;
1720import java .util .Arrays ;
21+ import java .util .Date ;
1822import java .util .List ;
1923import java .util .concurrent .BlockingQueue ;
2024
@@ -118,7 +122,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
118122 if (sites == null || sites .length == 0 )
119123 ;
120124 for (int i = 0 ; i < sites .length ; i ++)
121- sites [i ].closed = true ;
125+ sites [i ].setClosed ( true ) ;
122126 }
123127 synchronized (queueManager ) {
124128 queueManager .removeQueue (topic );
@@ -135,6 +139,19 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
135139
136140 @ Override
137141 protected boolean doReconnect (Site site ) {
138- return false ;
142+ try {
143+ site .getHost ();
144+ subscribe (site .getHost (), site .getPort (), site .getTableName (), site .getActionName (), site .getHandler (), site .getMsgId () + 1 , true , site .getUserName (), site .getPassWord ());
145+ Date d = new Date ();
146+ DateFormat df = new SimpleDateFormat ("yyyy/MM/dd HH:mm:ss" );
147+ log .info (df .format (d ) + " Successfully reconnected and subscribed " + site .getHost () + ":" + site .getPort () + "/" + site .getTableName () + "/" + site .getActionName ());
148+ return true ;
149+ } catch (Exception ex ) {
150+ Date d = new Date ();
151+ DateFormat df = new SimpleDateFormat ("yyyy/MM/dd HH:mm:ss" );
152+ log .error (df .format (d ) + " Unable to subscribe table. Will try again after 1 seconds." + site .getHost () + ":" + site .getPort () + "/" + site .getTableName () + "/" + site .getActionName ());
153+ ex .printStackTrace ();
154+ return false ;
155+ }
139156 }
140157}
0 commit comments