Skip to content

Commit

Permalink
Simplification and refactoring of state machines
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Aug 27, 2024
1 parent d5d0347 commit 83d22ae
Show file tree
Hide file tree
Showing 16 changed files with 318 additions and 311 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

# [unreleased]

# [v0.3.0]

## Changed

- Simplified state machine usage: Packets are now inserted using an optional `packet` argument
of the `state_machine` call.

## Removed

- `insert_packet` API of the source and destination handler. Packet insertion is now performed
using the `state_machine` call.

# [v0.2.0] 2024-08-27

## Fixed
Expand Down
25 changes: 16 additions & 9 deletions docs/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ CFDP source entity

The :py:class:`cfdppy.handler.source.SourceHandler` converts a
:py:class:`cfdppy.request.PutRequest` into all packet data units (PDUs) which need to be
sent to a remote CFDP entity to perform a File Copy operation to a remote entity. The source entity
allows freedom of communcation by only generating the packets required to be sent, and leaving the
actual transmission of those packets to the user. The packet is returned to the user using
the :py:class:`spacepackets.cfdp.pdu.helper.PduHolder` field of the
:py:class:`cfdppy.handler.source.FsmResult` structure returned in the
:py:meth:`cfdppy.handler.source.SourceHandler.state_machine` call.
sent to a remote CFDP entity to perform a File Copy operation to a remote entity.

The source entity allows freedom of communcation by only generating the packets required to be
sent, and leaving the actual transmission of those packets to the user. The packets are returned
to the user using the :py:meth:`cfdppy.handler.source.SourceHandler.get_next_packet` call.
It should be noted that for regular file transfers, each state machine call will map to one
generated file data PDU. This allows flow control for the user of the state machine.

The state machine of the source entity will generally perform the following steps, after
a valid :py:class:`cfdppy.request.PutRequest` to perform a File Copy operation was received:
Expand Down Expand Up @@ -58,10 +59,16 @@ CFDP destination entity
The :py:class:`cfdppy.handler.dest.DestHandler` can convert the PDUs sent from a remote
source entity ID back to a file. A file copy operation on the receiver side is started with
the reception of a Metadata PDU, for example one generated by the
:py:class:`spacepackets.cfdp.pdu.metadata.MetadataPdu` class . After that, file packet PDUs, for
:py:class:`spacepackets.cfdp.pdu.metadata.MetadataPdu` class. After that, file packet PDUs, for
example generated by the :py:class:`spacepackets.cfdp.pdu.file_data.FileDataPdu`, can be inserted
into the destination handler and will be assembled into a file. The transaction will be finished
for the following conditions:
into the destination handler and will be assembled into a file.

A destination entity might still generate packets which need to be sent back to the source entity
of the file transfer. However, it allows freedom of communcation like the source entity by leaving
the actual transmission of generated packets to the user. The packets are returned to the user
using the :py:meth:`cfdppy.handler.source.DestHandler.get_next_packet` call.

The transaction will be finished for the following conditions:

1. A valid EOF PDU, for example generated by the :py:class:`spacepackets.cfdp.pdu.eof.EofPdu`
class, has been inserted into the class.
Expand Down
83 changes: 50 additions & 33 deletions examples/cfdp-cli-udp/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import select
import socket
import time
import threading
from datetime import timedelta
from multiprocessing import Queue
from pathlib import Path
Expand Down Expand Up @@ -31,7 +32,6 @@
from spacepackets.cfdp import (
ChecksumType,
ConditionCode,
GenericPduPacket,
TransactionId,
TransmissionMode,
)
Expand Down Expand Up @@ -299,6 +299,7 @@ def __init__(
tx_queue: Queue,
source_entity_rx_queue: Queue,
dest_entity_rx_queue: Queue,
stop_signal: threading.Event,
):
super().__init__()
self.sleep_time = sleep_time
Expand All @@ -308,12 +309,15 @@ def __init__(
self.udp_socket.bind(addr)
self.tm_queue = tx_queue
self.last_sender = None
self.stop_signal = stop_signal
self.source_entity_queue = source_entity_rx_queue
self.dest_entity_queue = dest_entity_rx_queue

def run(self):
_LOGGER.info(f"Starting UDP server on {self.addr}")
while True:
if self.stop_signal.is_set():
break
self.periodic_operation()
time.sleep(self.sleep_time)

Expand All @@ -324,6 +328,7 @@ def periodic_operation(self):
break
# Perform PDU routing.
packet_dest = get_packet_destination(next_packet.pdu)
_LOGGER.debug(f"UDP server: Routing {next_packet} to {packet_dest}")
if packet_dest == PacketDestination.DEST_HANDLER:
self.dest_entity_queue.put(next_packet.pdu)
elif packet_dest == PacketDestination.SOURCE_HANDLER:
Expand Down Expand Up @@ -369,6 +374,7 @@ def __init__(
put_req_queue: Queue,
source_entity_queue: Queue,
tm_queue: Queue,
stop_signal: threading.Event,
):
super().__init__()
self.base_str = base_str
Expand All @@ -377,6 +383,7 @@ def __init__(
self.put_req_queue = put_req_queue
self.source_entity_queue = source_entity_queue
self.tm_queue = tm_queue
self.stop_signal = stop_signal

def _idle_handling(self) -> bool:
try:
Expand All @@ -401,33 +408,40 @@ def _idle_handling(self) -> bool:
def _busy_handling(self):
# We are getting the packets from a Queue here, they could for example also be polled
# from a network.
no_packet_received = True
packet_received = False
packet = None
try:
# We are getting the packets from a Queue here, they could for example also be polled
# from a network.
packet: AbstractFileDirectiveBase = self.source_entity_queue.get(False)
try:
self.source_handler.insert_packet(packet)
except InvalidDestinationId as e:
_LOGGER.warning(
f"invalid destination ID {e.found_dest_id} on packet {packet}, expected "
f"{e.expected_dest_id}"
)
no_packet_received = False
packet = self.source_entity_queue.get(False)
packet_received = True
except Empty:
no_packet_received = True
pass
try:
no_packet_sent = self._call_source_state_machine()
packet_sent = self._call_source_state_machine(packet)
# If there is no work to do, put the thread to sleep.
if no_packet_received and no_packet_sent:
if not packet_received and not packet_sent:
return False
except SourceFileDoesNotExist:
_LOGGER.warning("Source file does not exist")
self.source_handler.reset()

def _call_source_state_machine(self) -> bool:
def _call_source_state_machine(
self, packet: Optional[AbstractFileDirectiveBase]
) -> bool:
"""Returns whether a packet was sent."""
fsm_result = self.source_handler.state_machine()

if packet is not None:
_LOGGER.debug(f"{self.base_str}: Inserting {packet}")
try:
fsm_result = self.source_handler.state_machine(packet)
except InvalidDestinationId as e:
_LOGGER.warning(
f"invalid destination ID {e.found_dest_id} on packet {packet}, expected "
f"{e.expected_dest_id}"
)
fsm_result = self.source_handler.state_machine(None)
packet_sent = False
if fsm_result.states.num_packets_ready > 0:
while fsm_result.states.num_packets_ready > 0:
next_pdu_wrapper = self.source_handler.get_next_packet()
Expand All @@ -438,13 +452,14 @@ def _call_source_state_machine(self) -> bool:
)
# Send all packets which need to be sent.
self.tm_queue.put(next_pdu_wrapper.pack())
return False
else:
return True
packet_sent = True
return packet_sent

def run(self):
_LOGGER.info(f"Starting {self.base_str}")
while True:
if self.stop_signal.is_set():
break
if self.source_handler.state == CfdpState.IDLE:
if not self._idle_handling():
time.sleep(0.2)
Expand All @@ -462,44 +477,46 @@ def __init__(
dest_handler: DestHandler,
dest_entity_queue: Queue,
tm_queue: Queue,
stop_signal: threading.Event,
):
super().__init__()
self.base_str = base_str
self.verbose_level = verbose_level
self.dest_handler = dest_handler
self.dest_entity_queue = dest_entity_queue
self.tm_queue = tm_queue
self.stop_signal = stop_signal

def run(self):
_LOGGER.info(
f"Starting {self.base_str}. Local ID {self.dest_handler.cfg.local_entity_id}"
)
first_packet = True
no_packet_received = False
while True:
packet_received = False
packet = None
if self.stop_signal.is_set():
break
try:
packet: GenericPduPacket = self.dest_entity_queue.get(False)
self.dest_handler.insert_packet(packet)
no_packet_received = False
if first_packet:
first_packet = False
packet = self.dest_entity_queue.get(False)
packet_received = True
except Empty:
no_packet_received = True
fsm_result = self.dest_handler.state_machine()
pass
if packet is not None:
_LOGGER.debug(f"{self.base_str}: Inserting {packet}")
fsm_result = self.dest_handler.state_machine(packet)
packet_sent = False
if fsm_result.states.num_packets_ready > 0:
no_packet_sent = False
while fsm_result.states.num_packets_ready > 0:
next_pdu_wrapper = self.dest_handler.get_next_packet()
assert next_pdu_wrapper is not None
if self.verbose_level >= 1:
_LOGGER.debug(
f"REMOTE DEST ENTITY: Sending packet {next_pdu_wrapper.pdu}"
f"{self.base_str}: Sending packet {next_pdu_wrapper.pdu}"
)
self.tm_queue.put(next_pdu_wrapper.pack())
else:
no_packet_sent = True
packet_sent = True
# If there is no work to do, put the thread to sleep.
if no_packet_received and no_packet_sent:
if not packet_received and not packet_sent:
time.sleep(0.5)


Expand Down
14 changes: 13 additions & 1 deletion examples/cfdp-cli-udp/local.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#!/usr/bin/env python3
"""This component simulates the local component."""
import argparse
import time
import logging
import ipaddress
import threading
from logging import basicConfig
from multiprocessing import Queue
from pathlib import Path
Expand Down Expand Up @@ -62,6 +64,8 @@ def main():
parser.add_argument("-v", "--verbose", action="count", default=0)
add_cfdp_procedure_arguments(parser)
args = parser.parse_args()
stop_signal = threading.Event()

logging_level = logging.INFO
if args.verbose >= 1:
logging_level = logging.DEBUG
Expand Down Expand Up @@ -98,6 +102,7 @@ def main():
PUT_REQ_QUEUE,
SOURCE_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)

# Enable all indications.
Expand All @@ -115,6 +120,7 @@ def main():
dest_handler,
DEST_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)

# Address Any to accept CFDP packets from other address than localhost.
Expand All @@ -136,13 +142,19 @@ def main():
tx_queue=TM_QUEUE,
source_entity_rx_queue=SOURCE_ENTITY_QUEUE,
dest_entity_rx_queue=DEST_ENTITY_QUEUE,
stop_signal=stop_signal,
)

# TODO: Graceful shutdown handling.
source_entity_task.start()
dest_entity_task.start()
udp_server.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_signal.set()

source_entity_task.join()
dest_entity_task.join()
udp_server.join()
Expand Down
13 changes: 12 additions & 1 deletion examples/cfdp-cli-udp/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""This component simulates the remote component."""
import argparse
import logging
import time
import threading
from logging import basicConfig
from multiprocessing import Queue

Expand Down Expand Up @@ -47,6 +49,7 @@
def main():
parser = argparse.ArgumentParser(prog="CFDP Remote Entity Application")
parser.add_argument("-v", "--verbose", action="count", default=0)
stop_signal = threading.Event()
args = parser.parse_args()
logging_level = logging.INFO
if args.verbose >= 1:
Expand Down Expand Up @@ -74,6 +77,7 @@ def main():
PUT_REQ_QUEUE,
SOURCE_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)

# Enable all indications.
Expand All @@ -91,6 +95,7 @@ def main():
dest_handler,
DEST_ENTITY_QUEUE,
TM_QUEUE,
stop_signal,
)

# Address Any to accept CFDP packets from other address than localhost.
Expand All @@ -103,13 +108,19 @@ def main():
tx_queue=TM_QUEUE,
source_entity_rx_queue=SOURCE_ENTITY_QUEUE,
dest_entity_rx_queue=DEST_ENTITY_QUEUE,
stop_signal=stop_signal,
)

# TODO: Graceful shutdown.
source_entity_task.start()
dest_entity_task.start()
udp_server.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_signal.set()

source_entity_task.join()
dest_entity_task.join()
udp_server.join()
Expand Down
Loading

0 comments on commit 83d22ae

Please sign in to comment.