1515import io .netty .channel .ChannelHandlerContext ;
1616import io .netty .channel .ChannelInitializer ;
1717import io .netty .channel .EventLoopGroup ;
18+ import io .netty .channel .MultiThreadIoEventLoopGroup ;
1819import io .netty .channel .SimpleChannelInboundHandler ;
19- import io .netty .channel .nio .NioEventLoopGroup ;
20+ import io .netty .channel .nio .NioIoHandler ;
2021import io .netty .channel .socket .DatagramChannel ;
2122import io .netty .channel .socket .nio .NioDatagramChannel ;
2223
@@ -28,7 +29,7 @@ public class QueryClient {
2829 private Map <InetSocketAddress , Map .Entry <Query , CompletableFuture <?>>> requests = new HashMap <InetSocketAddress , Map .Entry <Query , CompletableFuture <?>>>();
2930
3031 public QueryClient () {
31- worker = new NioEventLoopGroup ( );
32+ worker = new MultiThreadIoEventLoopGroup ( NioIoHandler . newFactory () );
3233 Bootstrap bootstrap = new Bootstrap ()
3334 .group (worker )
3435 .channel (NioDatagramChannel .class )
@@ -40,12 +41,11 @@ protected void initChannel(NioDatagramChannel ch) throws Exception {
4041 new MessageCodec (),
4142 new SimpleChannelInboundHandler <Reply >() {
4243 protected void channelRead0 (ChannelHandlerContext ctx , Reply msg ) throws Exception {
43- Map .Entry <Query , CompletableFuture <?>> request = requests .get (msg .remoteAddress ());
44+ Map .Entry <Query , CompletableFuture <?>> request = requests .remove (msg .remoteAddress ());
4445 if (request != null ) {
4546 @ SuppressWarnings ("unchecked" )
4647 CompletableFuture <Object > future = (CompletableFuture <Object >) request .getValue ();
4748 future .complete (msg .payload ());
48- clearRequest (msg .remoteAddress ());
4949 }
5050 }
5151 },
@@ -70,9 +70,12 @@ private void writeRequest(Query request, CompletableFuture<?> future) {
7070 }
7171
7272 private void clearRequest (InetSocketAddress remoteAddress ) {
73- if (requests .get (remoteAddress ) != null ) {
74- requests .get (remoteAddress ).getValue ().cancel (true );
75- requests .put (remoteAddress , null );
73+ Map .Entry <Query , CompletableFuture <?>> entry = requests .remove (remoteAddress );
74+ if (entry != null ) {
75+ CompletableFuture <?> future = entry .getValue ();
76+ if (!future .isDone ()) {
77+ future .cancel (true );
78+ }
7679 }
7780 }
7881
0 commit comments