-
Notifications
You must be signed in to change notification settings - Fork 0
/
inner_eye
executable file
·108 lines (84 loc) · 3.66 KB
/
inner_eye
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import print_function
import time
import sys
import logging
from pprint import pformat
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from gnuradio import gr, blocks, zeromq
from phychic import Snapshotlizer, clever_complex2ishort
STATUS_BAR_FMT = 'Input rate: %0.2f MSps' + \
' | Output rate: %0.2f kSps' + \
' | Network load: %0.2f kB/s'
class InnerEye(gr.top_block):
'''
ZMQ SRC --> Snapshotlizer --> ZMQ SNK
'''
def __init__(self, input_address, output_address, block_size, one_in):
gr.top_block.__init__(self, self.__class__.__name__)
self._set_logger()
self.block_size = block_size
self.one_in = one_in
self.zmq_sub_src = zeromq.sub_source(gr.sizeof_gr_complex, 1,
input_address,
100, True, -1)
snapshotlizer = Snapshotlizer(self.block_size, self.one_in)
cc2is = clever_complex2ishort()
# Two shorts for each complex
self.s2v = blocks.stream_to_vector(gr.sizeof_short, self.block_size * 2)
self.zmq_pub_snk = zeromq.pub_sink(gr.sizeof_short,
self.block_size * 2,
output_address,
100, True, -1)
self.connect(self.zmq_sub_src, snapshotlizer, cc2is, self.s2v, self.zmq_pub_snk)
def _set_logger(self):
self.logger = logging.getLogger(self.__class__.__name__)
sh = logging.StreamHandler()
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(sh)
logging.getLogger('phychic.snapshotlizer').setLevel(logging.DEBUG)
logging.getLogger('phychic.snapshotlizer').addHandler(sh)
@classmethod
def from_sys_args(cls):
ap = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
ap.add_argument('--block_size', type=int, default=40,
help='How many samples per block.')
ap.add_argument('--one_in', type=int, default=48,
help='One block for every "one_in" will be sent.')
ap.add_argument('input_address', type=str,
default='ipc:///tmp/some_full_iq',
help='ZMQ SUB input address')
ap.add_argument('output_address', type=str,
default='tcp://*:35000',
help='ZMQ PUB output address')
args = ap.parse_args()
obj = cls(args.input_address, args.output_address,
args.block_size, args.one_in)
obj.logger.info('Starting with:\n%s' % pformat(args.__dict__))
return obj
def __call__(self):
output_item_size = gr.sizeof_short
self.start()
self.logger.info('Rate measurements are not always precise. They are slow to react.')
while True:
input_rate = self.zmq_sub_src.pc_throughput_avg()
output_rate = 2 * input_rate * 1.0 / self.one_in
network_load = output_item_size * output_rate
status_bar = STATUS_BAR_FMT % (input_rate / 1e6,
output_rate / 1e3,
network_load / 1024.0)
print('\r' + status_bar, end='')
sys.stdout.flush()
time.sleep(0.5)
if __name__ == '__main__':
ie = InnerEye.from_sys_args()
try:
ie()
except KeyboardInterrupt:
pass
finally:
print('\nStopping %s' % ie.__class__.__name__)
ie.stop()
ie.wait()
print('The Inner Eye closes.')