Skip to content
This repository was archived by the owner on May 16, 2019. It is now read-only.

Commit 743f090

Browse files
committed
Refactor timeout handling
1 parent 4c99f69 commit 743f090

3 files changed

Lines changed: 31 additions & 61 deletions

File tree

dht/tests/test_protocol.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import nacl.hash
1111
from txrudp import connection, rudp, packet, constants
1212
from twisted.trial import unittest
13-
from twisted.internet import task, reactor, address, udp, defer
13+
from twisted.internet import task, address, udp, defer
1414
from dht.protocol import KademliaProtocol
1515
from dht.utils import digest
1616
from dht.storage import ForgetfulStorage
@@ -451,13 +451,11 @@ def handle_response(resp):
451451
self.assertTrue(resp[0])
452452
self.assertEqual(resp[1][0], "test")
453453
self.assertTrue(message_id not in self.protocol._outstanding)
454-
self.assertFalse(timeout.active())
455454

456455
message_id = digest("msgid")
457456
n = Node(digest("S"), self.addr1[0], self.addr1[1])
458457
d = defer.Deferred()
459-
timeout = reactor.callLater(5, self.protocol._timeout, message_id)
460-
self.protocol._outstanding[message_id] = (d, timeout)
458+
self.protocol._outstanding[message_id] = (d, self.addr1)
461459
self.protocol._acceptResponse(message_id, ["test"], n)
462460

463461
return d.addCallback(handle_response)
@@ -466,22 +464,17 @@ def test_unknownRPC(self):
466464
self.assertFalse(self.handler.receive_message(str(random.getrandbits(1400))))
467465

468466
def test_timeout(self):
469-
self._connecting_to_connected()
470-
self.wire_protocol[self.addr1] = self.con
471467

472-
def test_remove_outstanding():
473-
self.assertTrue(len(self.protocol._outstanding) == 0)
474-
475-
def test_deffered(d):
476-
self.assertFalse(d[0])
477-
test_remove_outstanding()
468+
def handle_response(resp, n):
469+
self.assertFalse(resp[0])
470+
self.assertIsNone(resp[1])
471+
self.assertTrue(self.protocol.router.isNewNode(n))
478472

479473
n = Node(digest("S"), self.addr1[0], self.addr1[1])
480-
d = self.protocol.callPing(n)
481-
self.clock.advance(6)
482-
connection.REACTOR.runUntilCurrent()
483-
self.clock.advance(6)
484-
return d.addCallback(test_deffered)
474+
d = defer.Deferred().addCallback(handle_response, n)
475+
self.protocol._outstanding["msgID"] = [d, self.addr1]
476+
self.protocol.router.addContact(n)
477+
self.protocol.timeout(self.addr1, n)
485478

486479
def test_transferKeyValues(self):
487480
self._connecting_to_connected()

rpcudp.py

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@
1111
from binascii import hexlify
1212
from hashlib import sha1
1313
from base64 import b64encode
14-
from twisted.internet import reactor
1514
from twisted.internet import defer
1615
from log import Logger
1716
from protos.message import Message, Command
1817
from dht import node
1918
from constants import PROTOCOL_VERSION
20-
from protos.message import NOT_FOUND, GET_IMAGE, GET_CONTRACT
19+
from protos.message import NOT_FOUND
2120

2221

2322
class RPCProtocol:
@@ -103,8 +102,7 @@ def _acceptResponse(self, msgID, data, sender):
103102
self.log.debug("received response for message id %s from %s" % msgargs)
104103
else:
105104
self.log.warning("received 404 error response from %s" % sender)
106-
d, timeout = self._outstanding[msgID]
107-
timeout.cancel()
105+
d = self._outstanding[msgID][0]
108106
d.callback((True, data))
109107
del self._outstanding[msgID]
110108

@@ -137,31 +135,17 @@ def _sendResponse(self, response, funcname, msgID, sender, connection):
137135
data = m.SerializeToString()
138136
connection.send_message(data)
139137

140-
def _timeout(self, msgID, address):
138+
def timeout(self, address, node_to_remove):
141139
"""
142-
If a message times out we are first going to try hole punching because
143-
the node may be behind a restricted NAT. If it is successful, the original
144-
should get through. This timeout will only fire if the hole punching
145-
fails.
140+
This timeout is called by the txrudp connection handler. We will run through the
141+
outstanding messages and callback false on any waiting on this IP address.
146142
"""
147-
# pylint: disable=pointless-string-statement
148-
"""
149-
Hole punching disabled for now
150-
151-
seed = SEED_NODE_TESTNET if self.multiplexer.testnet else SEED_NODE
152-
if not hp and self.multiplexer.ip_address[0] != seed[0]:
153-
args = (address[0], address[1], b64encode(msgID))
154-
self.log.debug("did not receive reply from %s:%s for msgID %s, trying hole punching..." % args)
155-
self.hole_punch(seed, address[0], address[1], "True")
156-
timeout = reactor.callLater(self._waitTimeout, self._timeout, msgID, address, True)
157-
self._outstanding[msgID][1] = timeout
158-
else:
159-
"""
160-
args = (b64encode(msgID), self._waitTimeout)
161-
self.log.warning("did not receive reply for msg id %s within %i seconds" % args)
162-
self._outstanding[msgID][0].callback((False, None))
163-
del self._outstanding[msgID]
164-
self.multiplexer[address].shutdown()
143+
if node_to_remove is not None:
144+
self.router.removeContact(node_to_remove)
145+
for msgID, val in self._outstanding.items():
146+
if address == val[1]:
147+
val[0].callback((False, None))
148+
del self._outstanding[msgID]
165149

166150
def rpc_hole_punch(self, sender, ip, port, relay="False"):
167151
"""
@@ -175,12 +159,6 @@ def rpc_hole_punch(self, sender, ip, port, relay="False"):
175159
self.log.debug("punching through NAT for %s:%s" % (ip, port))
176160
self.multiplexer.send_datagram(" ", (ip, int(port)))
177161

178-
def _get_waitTimeout(self, command):
179-
if command == GET_IMAGE or command == GET_CONTRACT:
180-
return 100
181-
else:
182-
return self._waitTimeout
183-
184162
def __getattr__(self, name):
185163
if name.startswith("_") or name.startswith("rpc_"):
186164
return object.__getattr__(self, name)
@@ -202,8 +180,7 @@ def func(address, *args):
202180
m.testnet = self.multiplexer.testnet
203181
data = m.SerializeToString()
204182
d = defer.Deferred()
205-
timeout = reactor.callLater(self._get_waitTimeout(m.command), self._timeout, msgID, address)
206-
self._outstanding[msgID] = [d, timeout]
183+
self._outstanding[msgID] = [d, address]
207184
self.multiplexer.send_message(data, address)
208185
self.log.debug("calling remote function %s on %s (msgid %s)" % (name, address, b64encode(msgID)))
209186
return d

wireprotocol.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from twisted.internet.task import LoopingCall
77
from twisted.internet import task, reactor
88
from interfaces import MessageProcessor
9-
from protos.message import Message, FIND_VALUE
9+
from protos.message import Message
1010
from log import Logger
1111
from dht.node import Node
1212
from protos.message import PING, NOT_FOUND
@@ -74,15 +74,15 @@ def receive_message(self, datagram):
7474
return False
7575

7676
def handle_shutdown(self):
77-
if self.connection is not None:
78-
for processor in self.processors:
79-
if FIND_VALUE in processor and self.node is not None:
80-
processor.router.removeContact(self.node)
81-
reactor.callLater(90, self.connection.unregister)
82-
if self.addr:
83-
self.log.info("connection with %s terminated" % self.addr)
77+
for processor in self.processors:
78+
processor.timeout((self.connection.dest_addr[0], self.connection.dest_addr[1]), self.node)
79+
reactor.callLater(90, self.connection.unregister)
80+
if self.addr:
81+
self.log.info("connection with %s terminated" % self.addr)
82+
try:
8483
self.keep_alive_loop.stop()
85-
self.connection = None
84+
except Exception:
85+
pass
8686

8787
def keep_alive(self):
8888
for processor in self.processors:

0 commit comments

Comments
 (0)