Skip to content

Commit

Permalink
Merge pull request #196 from kmarwah/raf_fix
Browse files Browse the repository at this point in the history
Rearchitect RAFPlugin to fix bug
  • Loading branch information
nttoole authored Aug 23, 2022
2 parents 378a12d + d5c086a commit 56dbb13
Showing 1 changed file with 17 additions and 25 deletions.
42 changes: 17 additions & 25 deletions ait/dsn/plugins/raf_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,15 @@
from ait.dsn.sle import RAF
from ait.core.server.plugins import Plugin

class RAFModified(RAF):
"""
A modified version of the RAF class which publishes received frames to a plugin"s output topic
(instead of sending them to a UDP socket)
"""
def __init__(self, publish_function, *args, **kwargs):
super().__init__(self, *args, **kwargs)
self.publish = publish_function

def _transfer_data_invoc_handler(self, pdu):
""""""
frame = pdu.getComponent()
if "data" in frame and frame["data"].isValue:
tm_data = frame["data"].asOctets()
else:
err = (
"RafTransferBuffer received but data cannot be located. "
"Skipping further processing of this PDU ..."
)
ait.core.log.info(err)
return

self.publish(tm_data)

class RAFPlugin(Plugin):
"""
A plugin which creates a RAF instance using the SLE parameters specified in config.yaml.
All received frames are published to the output topic.
"""
def __init__(self, inputs=None, outputs=None, zmq_args=None):
super().__init__(inputs, outputs, zmq_args)
self.raf_object = RAFModified(publish_function= self.publish)
self.raf_object = RAF()
self.raf_object._handlers['AnnotatedFrame']=[self._transfer_data_invoc_handler]
self.raf_object.connect()
time.sleep(2)
self.raf_object.bind()
Expand All @@ -53,3 +30,18 @@ def __del__(self):
self.raf_object.stop()
self.raf_object.unbind()
self.raf_object.disconnect()

def _transfer_data_invoc_handler(self, pdu):
""""""
frame = pdu.getComponent()
if "data" in frame and frame["data"].isValue:
tm_data = frame["data"].asOctets()
else:
err = (
"RafTransferBuffer received but data cannot be located. "
"Skipping further processing of this PDU ..."
)
ait.core.log.info(err)
return

self.publish(tm_data)

0 comments on commit 56dbb13

Please sign in to comment.