Skip to content

Commit

Permalink
Used dependency injection in network threads
Browse files Browse the repository at this point in the history
  • Loading branch information
anand-skss committed Jun 20, 2024
1 parent f2bb4d1 commit 7f87894
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 41 deletions.
2 changes: 1 addition & 1 deletion src/bitmessageqt/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def accept(self):
'bitmessagesettings', 'udp'):
self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
if udp_enabled:
announceThread = AnnounceThread()
announceThread = AnnounceThread(self.config)
announceThread.daemon = True
announceThread.start()
else:
Expand Down
9 changes: 5 additions & 4 deletions src/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ def start(config, state):
readKnownNodes()
connectionpool.pool.connectToStream(1)
for thread in (
BMNetworkThread(), InvThread(), AddrThread(),
DownloadThread(), UploadThread()
BMNetworkThread(queues), InvThread(protocol, state, queues, addresses),
AddrThread(protocol, queues), DownloadThread(state, protocol, addresses),
UploadThread(protocol, state)
):
thread.daemon = True
thread.start()

# Optional components
for i in range(config.getint('threads', 'receive')):
thread = ReceiveQueueThread(i)
thread = ReceiveQueueThread(queues, i)
thread.daemon = True
thread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread()
state.announceThread = AnnounceThread(config)
state.announceThread.daemon = True
state.announceThread.start()
15 changes: 9 additions & 6 deletions src/network/addrthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
# magic imports!
import connectionpool
from helper_random import randomshuffle
from protocol import assembleAddrMessage
from network import queues # FIXME: init with queue

from threads import StoppableThread

Expand All @@ -16,12 +14,17 @@ class AddrThread(StoppableThread):
"""(Node) address broadcasting thread"""
name = "AddrBroadcaster"

def __init__(self, protocol, queues):
self.protocol = protocol
self.queues = queues
StoppableThread.__init__(self)

def run(self):
while not self._stopped:
chunk = []
while True:
try:
data = queues.addrQueue.get(False)
data = self.queues.addrQueue.get(False)
chunk.append(data)
except queue.Empty:
break
Expand All @@ -41,9 +44,9 @@ def run(self):
continue
filtered.append((stream, peer, seen))
if filtered:
i.append_write_buf(assembleAddrMessage(filtered))
i.append_write_buf(self.protocol.assembleAddrMessage(filtered))

queues.addrQueue.iterate()
self.queues.addrQueue.iterate()
for i in range(len(chunk)):
queues.addrQueue.task_done()
self.queues.addrQueue.task_done()
self.stop.wait(1)
9 changes: 6 additions & 3 deletions src/network/announcethread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

# magic imports!
import connectionpool
from network import config
from protocol import assembleAddrMessage

from node import Peer
Expand All @@ -17,18 +16,22 @@ class AnnounceThread(StoppableThread):
name = "Announcer"
announceInterval = 60

def __init__(self, config):
self.config = config
StoppableThread.__init__()

def run(self):
lastSelfAnnounced = 0
while not self._stopped:
processed = 0
if lastSelfAnnounced < time.time() - self.announceInterval:
self.announceSelf()
self.announceSelf(self.config)
lastSelfAnnounced = time.time()
if processed == 0:
self.stop.wait(10)

@staticmethod
def announceSelf():
def announceSelf(config):
"""Announce our presence"""
for connection in connectionpool.pool.udpSockets.values():
if not connection.announcing:
Expand Down
13 changes: 8 additions & 5 deletions src/network/downloadthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
`DownloadThread` class definition
"""
import time
from network import state, protocol, addresses, dandelion_ins
from network import dandelion_ins
import helper_random
import connectionpool
from objectracker import missingObjects
Expand All @@ -17,8 +17,11 @@ class DownloadThread(StoppableThread):
cleanInterval = 60
requestExpires = 3600

def __init__(self):
def __init__(self, state, protocol, addresses):
super(DownloadThread, self).__init__(name="Downloader")
self.state = state
self.protocol = protocol
self.addresses = addresses
self.lastCleaned = time.time()

def cleanPending(self):
Expand Down Expand Up @@ -57,7 +60,7 @@ def run(self):
payload = bytearray()
chunkCount = 0
for chunk in request:
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
if chunk in self.state.Inventory and not dandelion_ins.hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:
Expand All @@ -68,8 +71,8 @@ def run(self):
missingObjects[chunk] = now
if not chunkCount:
continue
payload[0:0] = addresses.encodeVarint(chunkCount)
i.append_write_buf(protocol.CreatePacket('getdata', payload))
payload[0:0] = self.addresses.encodeVarint(chunkCount)
i.append_write_buf(self.protocol.CreatePacket('getdata', payload))
self.logger.debug(
'%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount)
Expand Down
28 changes: 17 additions & 11 deletions src/network/invthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import random
from time import time

from network import protocol, state, queues, addresses
import connectionpool
from network import dandelion_ins
from threads import StoppableThread
Expand Down Expand Up @@ -34,6 +33,13 @@ class InvThread(StoppableThread):

name = "InvBroadcaster"

def __init__(self, protocol, state, queues, addresses):
self.protocol = protocol
self.state = state
self.queues = queues
self.addresses = addresses
StoppableThread.__init__(self)

@staticmethod
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling"""
Expand All @@ -45,13 +51,13 @@ def handleLocallyGenerated(stream, hashId):
connection.objectsNewToThem[hashId] = time()

def run(self): # pylint: disable=too-many-branches
while not state.shutdown: # pylint: disable=too-many-nested-blocks
while not self.state.shutdown: # pylint: disable=too-many-nested-blocks
chunk = []
while True:
# Dandelion fluff trigger by expiration
handleExpiredDandelion(dandelion_ins.expire(queues.invQueue))
handleExpiredDandelion(dandelion_ins.expire(self.queues.invQueue))
try:
data = queues.invQueue.get(False)
data = self.queues.invQueue.get(False)
chunk.append((data[0], data[1]))
# locally generated
if len(data) == 2 or data[2] is None:
Expand All @@ -78,7 +84,7 @@ def run(self): # pylint: disable=too-many-branches
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0:
elif connection.services & self.protocol.NODE_DANDELION > 0:
stems.append(inv[1])
else:
fluffs.append(inv[1])
Expand All @@ -87,20 +93,20 @@ def run(self): # pylint: disable=too-many-branches

if fluffs:
random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'inv',
addresses.encodeVarint(
self.addresses.encodeVarint(
len(fluffs)) + ''.join(fluffs)))
if stems:
random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket(
connection.append_write_buf(self.protocol.CreatePacket(
'dinv',
addresses.encodeVarint(
self.addresses.encodeVarint(
len(stems)) + ''.join(stems)))

queues.invQueue.iterate()
self.queues.invQueue.iterate()
for _ in range(len(chunk)):
queues.invQueue.task_done()
self.queues.invQueue.task_done()

dandelion_ins.reRandomiseStems()

Expand Down
7 changes: 5 additions & 2 deletions src/network/networkthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@
"""
import network.asyncore_pollchoose as asyncore
import connectionpool
from network import queues
from threads import StoppableThread


class BMNetworkThread(StoppableThread):
"""Main network thread"""
name = "Asyncore"

def __init__(self, queues):
self.queues = queues
StoppableThread.__init__(self)

def run(self):
try:
while not self._stopped:
connectionpool.pool.loop()
except Exception as e:
queues.excQueue.put((self.name, e))
self.queues.excQueue.put((self.name, e))
raise

def stopThread(self):
Expand Down
10 changes: 5 additions & 5 deletions src/network/receivequeuethread.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@

import connectionpool
from network.advanceddispatcher import UnknownStateError
from network import queues
from threads import StoppableThread


class ReceiveQueueThread(StoppableThread):
"""This thread processes data received from the network
(which is done by the asyncore thread)"""
def __init__(self, num=0):
def __init__(self, queues, num=0):
self.queues = queues
super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num)

def run(self):
while not self._stopped:
try:
dest = queues.receiveDataQueue.get(block=True, timeout=1)
dest = self.queues.receiveDataQueue.get(block=True, timeout=1)
except Queue.Empty:
continue

Expand All @@ -38,7 +38,7 @@ def run(self):
connection = connectionpool.pool.getConnectionByAddr(dest)
# connection object not found
except KeyError:
queues.receiveDataQueue.task_done()
self.queues.receiveDataQueue.task_done()
continue
try:
connection.process()
Expand All @@ -52,4 +52,4 @@ def run(self):
self.logger.error('Socket error: %s', err)
except: # noqa:E722
self.logger.error('Error processing', exc_info=True)
queues.receiveDataQueue.task_done()
self.queues.receiveDataQueue.task_done()
10 changes: 7 additions & 3 deletions src/network/uploadthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time

import helper_random
from network import protocol, state
import connectionpool
from randomtrackingdict import RandomTrackingDict
from network import dandelion_ins
Expand All @@ -18,6 +17,11 @@ class UploadThread(StoppableThread):
maxBufSize = 2097152 # 2MB
name = "Uploader"

def __init__(self, protocol, state):
self.protocol = protocol
self.state = state
StoppableThread.__init__(self)

def run(self):
while not self._stopped:
uploaded = 0
Expand Down Expand Up @@ -48,8 +52,8 @@ def run(self):
i.destination)
break
try:
payload.extend(protocol.CreatePacket(
'object', state.Inventory[chunk].payload))
payload.extend(self.protocol.CreatePacket(
'object', self.state.Inventory[chunk].payload))
chunk_count += 1
except KeyError:
i.antiIntersectionDelay()
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_udp(self):

for _ in range(10):
try:
self.state.announceThread.announceSelf()
self.state.announceThread.announceSelf(self.config)
except AttributeError:
self.fail('state.announceThread is not set properly')
time.sleep(1)
Expand Down

0 comments on commit 7f87894

Please sign in to comment.