-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.py
103 lines (78 loc) · 2.92 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import bromine
import sys
from twisted.names import client, dns
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
# https://twistedmatrix.com/documents/13.1.0/core/howto/servers.html
TESTING = False
if TESTING:
SLOW = 1e-2
FAST = SLOW
REQS = 2
else:
SLOW = 1 # dont pump too fast when not busy
FAST = 1e-2
REQS = 5 # when busy, we may need help
class SocketInDns(Protocol):
def __init__(self):
if TESTING:
self.resolver = client.Resolver(servers=[('127.0.0.1', 5553)])
else:
self.resolver = client.Resolver('/etc/resolv.conf')
# data going out
self.score_board = bromine.Scoreboard()
self.last_ack = bromine.INVALID_MID
# data coming in
self.systems = bromine.Systems()
# keep track of callbacks
self.requested = 0 # looping call not withstanding
LoopingCall(self.pump).start(SLOW)
def dataReceived(self, data):
self.score_board.push_data(data)
reactor.callLater(FAST, self.pump)
def clientConnectionLost(self, connector, reason):
print('connection lost:', reason.getErrorMessage())
sys.exit(0)
def empty(self):
return all(l.type_ == bromine.TYPE_ACK for l in self.score_board.backlog.values())
def pump(self):
self.requested -= 1
if self.requested > REQS:
return
last_seen = self.systems.last_seen_remote_mid
if last_seen % bromine.CONFIG['ackperiod'] == 0 and self.last_ack != last_seen:
self.last_ack = last_seen
self.score_board.push_ack(last_seen)
else:
self.score_board.last_seen_remote_mid = last_seen
for d in self.systems.data:
self.transport.write(d)
for ack in self.systems.acks:
remote_last_seen_remote_mid = ack[0]
self.score_board.retire(remote_last_seen_remote_mid)
self.systems.commit()
host = self.score_board.transmit()
query = dns.Query(host, dns.CNAME, dns.IN)
task = self.resolver.queryUDP([query], [20 * SLOW])
task.addCallback(self.ok_)
task.addErrback(self.error_)
def ok_(self, reply):
for a in reply.answers:
cname = a.payload.name.name
self.systems.add(cname)
if not self.empty() or len(reply.answers) > 1:
reactor.callLater(SLOW, self.pump)
self.requested += 1
def error_(self, failure):
reactor.callLater(FAST, self.pump)
self.requested += 1
class ClientFactory(Factory):
def buildProtocol(self, addr):
print("connection from", addr)
return SocketInDns()
if __name__ == '__main__':
endpoint = TCP4ServerEndpoint(reactor, bromine.CONFIG['port'])
endpoint.listen(ClientFactory())
reactor.run()