-
Notifications
You must be signed in to change notification settings - Fork 12
/
wm3con.py
executable file
·277 lines (243 loc) · 8.93 KB
/
wm3con.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/usr/bin/env python
'''
F-Secure Virus World Map console edition
See README.md for more details
Copyright 2012-2013 Jyrki Muukkonen
Released under the MIT license.
See LICENSE.txt or http://www.opensource.org/licenses/mit-license.php
ASCII map in map-world-01.txt is copyright:
"Map 1998 Matthew Thomas. Freely usable as long as this line is included"
'''
import curses
import json
import locale
import os
import random
import sys
import time
import urllib2
STREAMS = {
'filetest': 'wm3stream.json',
'wm3': 'http://worldmap3.f-secure.com/api/stream/',
}
MAPS = {
'world': {
# offset (as (y, x) for curses...)
'corners': (1, 4, 23, 73),
# lat top, lon left, lat bottom, lon right
'coords': [90.0, -180.0, -90.0, 180.0],
'file': 'map-world-01.txt',
}
}
class AsciiMap(object):
"""
Helper class for handling map drawing and coordinate calculations
"""
def __init__(self, map_name='world', map_conf=None, window=None, encoding=None):
if map_conf is None:
map_conf = MAPS[map_name]
with open(map_conf['file'], 'rb') as mapf:
self.map = mapf.read()
self.coords = map_conf['coords']
self.corners = map_conf['corners']
if window is None:
window = curses.newwin(0, 0)
self.window = window
self.data = []
self.data_timestamp = None
# JSON contents _should_ be UTF8 (so, python internal unicode here...)
if encoding is None:
encoding = locale.getpreferredencoding()
self.encoding = encoding
# check if we can use transparent background or not
if curses.can_change_color():
curses.use_default_colors()
background = -1
else:
background = curses.COLOR_BLACK
tmp_colors = [
('red', curses.COLOR_RED, background),
('blue', curses.COLOR_BLUE, background),
('pink', curses.COLOR_MAGENTA, background)
]
self.colors = {}
if curses.has_colors():
for i, (name, fgcolor, bgcolor) in enumerate(tmp_colors, 1):
curses.init_pair(i, fgcolor, bgcolor)
self.colors[name] = i
def latlon_to_coords(self, lat, lon):
"""
Convert lat/lon coordinates to character positions.
Very naive version, assumes that we are drawing the whole world
TODO: filter out stuff that doesn't fit
TODO: make it possible to use "zoomed" maps
"""
width = (self.corners[3]-self.corners[1])
height = (self.corners[2]-self.corners[0])
# change to 0-180, 0-360
abs_lat = -lat+90
abs_lon = lon+180
x = (abs_lon/360.0)*width + self.corners[1]
y = (abs_lat/180.0)*height + self.corners[0]
return int(x), int(y)
def set_data(self, data):
"""
Set / convert internal data.
For now it just selects a random set to show (good enough for demo purposes)
TODO: could use deque to show all entries
"""
entries = []
formats = [
u"{name} / {country} {city}",
u"{name} / {country}",
u"{name}",
u"{type}",
]
dets = data.get('detections', [])
for det in random.sample(dets, min(len(dets), 5)):
#"city": "Montoire-sur-le-loir",
#"country": "FR",
#"lat": "47.7500",
#"long": "0.8667",
#"name": "Trojan.Generic.7555308",
#"type": "Trojan"
desc = "Detection"
# keeping it unicode here, encode() for curses later on
for fmt in formats:
try:
desc = fmt.format(**det)
break
except StandardError:
pass
entry = (
float(det['lat']),
float(det['long']),
'*',
desc,
curses.A_BOLD,
'red',
)
entries.append(entry)
self.data = entries
# for debugging... maybe it could be shown again now that we have the live stream support
#self.data_timestamp = data.get('response_generated')
def draw(self, target):
""" Draw internal data to curses window """
self.window.clear()
self.window.addstr(0, 0, self.map)
debugdata = [
(60.16, 24.94, '*', self.data_timestamp, curses.A_BOLD, 'blue'), # Helsinki
#(90, -180, '1', 'top left', curses.A_BOLD, 'blue'),
#(-90, -180, '2', 'bottom left', curses.A_BOLD, 'blue'),
#(90, 180, '3', 'top right', curses.A_BOLD, 'blue'),
#(-90, 180, '4', 'bottom right', curses.A_BOLD, 'blue'),
]
# FIXME: position to be defined in map config?
row = self.corners[2]-6
items_to_show = 5
for lat, lon, char, desc, attrs, color in debugdata + self.data:
# to make this work almost everywhere. see http://docs.python.org/2/library/curses.html
if desc:
desc = desc.encode(self.encoding, 'ignore')
if items_to_show <= 0:
break
char_x, char_y = self.latlon_to_coords(lat, lon)
if self.colors and color:
attrs |= curses.color_pair(self.colors[color])
self.window.addstr(char_y, char_x, char, attrs)
if desc:
det_show = "%s %s" % (char, desc)
else:
det_show = None
if det_show is not None:
try:
self.window.addstr(row, 1, det_show, attrs)
row += 1
items_to_show -= 1
except StandardError:
# FIXME: check window size before addstr()
break
self.window.overwrite(target)
self.window.leaveok(1)
class MapApp(object):
""" Virus World Map ncurses application """
def __init__(self, conf=None):
conf = dict(conf or [])
# stream url can be a known name, filename or url
stream_url = conf.get('stream_url', 'wm3')
stream_url = STREAMS.get(stream_url, stream_url)
if '://' not in stream_url and os.path.isfile(stream_url):
stream_url = 'file://' + os.path.abspath(stream_url)
self.stream_url = stream_url
#self.replay = True
self.data = None
self.last_fetch = 0
self.sleep = 10 # tenths of seconds, for curses.halfdelay()
def fetch_data(self, epoch_now, force_refresh=False):
""" (Re)fetch data from JSON stream """
refresh = False
if force_refresh or self.data is None:
refresh = True
else:
# json data usually has: "polling_interval": 120
try:
poll_interval = int(self.data['polling_interval'])
except (ValueError, KeyError):
poll_interval = 60
if self.last_fetch + poll_interval <= epoch_now:
refresh = True
if refresh:
try:
self.data = json.load(urllib2.urlopen(self.stream_url))
self.last_fetch = epoch_now
except StandardError:
pass
return refresh
def run_curses_app(self, scr):
""" Initialize and run the application """
m = AsciiMap()
curses.halfdelay(self.sleep)
while True:
now = int(time.time())
refresh = self.fetch_data(now)
m.set_data(self.data)
m.draw(scr)
scr.addstr(0, 1, "F-Secure Virus World Map '99", curses.A_BOLD)
scr.addstr(0, 40, time.strftime("%c UTC", time.gmtime(now)).rjust(37), curses.A_BOLD)
event = scr.getch()
if event == ord("q"):
break
# if in replay mode?
#elif event == ord('-'):
# self.sleep = min(self.sleep+10, 100)
# curses.halfdelay(self.sleep)
#elif event == ord('+'):
# self.sleep = max(self.sleep-10, 10)
# curses.halfdelay(self.sleep)
elif event == ord('r'):
# force refresh
refresh = True
elif event == ord('c'):
# enter config mode
pass
elif event == ord('h'):
# show help screen
pass
elif event == ord('m'):
# cycle maps
pass
# redraw window (to fix encoding/rendering bugs and to hide other messages to same tty)
# user pressed 'r' or new data was fetched
if refresh:
m.window.redrawwin()
def main(argv=None):
""" Main function / entry point """
if argv is None:
argv = sys.argv[1:]
conf = {}
if len(argv):
conf['stream_url'] = argv[0]
app = MapApp(conf)
return curses.wrapper(app.run_curses_app)
if __name__ == '__main__':
sys.exit(main())