forked from eve-val/nrds-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathChatKosLookup.py
executable file
·259 lines (220 loc) · 8.29 KB
/
ChatKosLookup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
#!/usr/bin/env python
"""Checks pilots mentioned in the EVE chatlogs against a KOS list."""
from eveapi import eveapi
import sys, string, os, tempfile, time, json, urllib2, zlib, cPickle, urllib
KOS_CHECKER_URL = 'http://kos.cva-eve.org/api/?c=json&type=unit&%s'
NPC = 'npc'
LASTCORP = 'lastcorp'
class SimpleCache(object):
"""Implements a memory and disk-based cache of previous API calls."""
def __init__(self, debug=False):
self.debug = debug
self.count = 0
self.cache = {}
self.tempdir = os.path.join(tempfile.gettempdir(), 'eveapi')
if not os.path.exists(self.tempdir):
os.makedirs(self.tempdir)
def log(self, what):
"""Outputs debug information if the debug flag is set."""
if self.debug:
print '[%d] %s' % (self.count, what)
def retrieve(self, host, path, params):
"""Retrieves a cached value, or returns None if not cached."""
# eveapi asks if we have this request cached
key = hash((host, path, frozenset(params.items())))
self.count += 1 # for logging
# see if we have the requested page cached...
cached = self.cache.get(key, None)
if cached:
cache_file = None
#print "'%s': retrieving from memory" % path
else:
# it wasn't cached in memory, but it might be on disk.
cache_file = os.path.join(self.tempdir, str(key) + '.cache')
if os.path.exists(cache_file):
self.log('%s: retrieving from disk at %s' % (path, cache_file))
handle = open(cache_file, 'rb')
cached = self.cache[key] = cPickle.loads(
zlib.decompress(handle.read()))
handle.close()
if cached:
# check if the cached doc is fresh enough
if time.time() < cached[0]:
self.log('%s: returning cached document' % path)
return cached[1] # return the cached XML doc
# it's stale. purge it.
self.log('%s: cache expired, purging!' % path)
del self.cache[key]
if cache_file:
os.remove(cache_file)
self.log('%s: not cached, fetching from server...' % path)
# we didn't get a cache hit so return None to indicate that the data
# should be requested from the server.
return None
def store(self, host, path, params, doc, obj):
"""Saves a cached value to the backing stores."""
# eveapi is asking us to cache an item
key = hash((host, path, frozenset(params.items())))
cached_for = obj.cachedUntil - obj.currentTime
if cached_for:
self.log('%s: cached (%d seconds)' % (path, cached_for))
cached_until = time.time() + cached_for
# store in memory
cached = self.cache[key] = (cached_until, doc)
# store in cache folder
cache_file = os.path.join(self.tempdir, str(key) + '.cache')
handle = open(cache_file, 'wb')
handle.write(zlib.compress(cPickle.dumps(cached, -1)))
handle.close()
class CacheObject:
"""Allows caching objects that do not come from the EVE api."""
def __init__(self, valid_duration_seconds):
self.cachedUntil = valid_duration_seconds
self.currentTime = 0
class FileTailer:
def __init__(self, filename):
self.handle = open(filename, 'rb')
self.where = 0
def poll(self):
self.where = self.handle.tell()
line = self.handle.readline()
if not line:
self.handle.seek(self.where)
return (None, None)
else:
sanitized = ''.join([x for x in line if x in string.printable and
x not in ['\n', '\r']])
if not '> xxx ' in sanitized:
return (None, None)
person, command = sanitized.split('> xxx ', 1)
person = person.split(']')[1].strip()
mashup = command.split('#', 1)
names = mashup[0]
comment = '%s >' % person
if len(mashup) > 1:
comment = '%s > %s' % (person, mashup[1].strip())
return (names.split(' '), comment)
class KosChecker:
"""Maintains API state and performs KOS checks."""
def __init__(self):
self.cache = SimpleCache()
self.eveapi = eveapi.EVEAPIConnection(cacheHandler=self.cache)
def koscheck(self, player):
"""Checks a given player against the KOS list, including esoteric rules."""
kos = self.koscheck_internal(player)
if kos == None or kos == NPC:
# We were unable to find the player. Use employment history to
# get their current corp and look that up. If it's an NPC corp,
# we'll get bounced again.
history = self.employment_history(player)
kos = self.koscheck_internal(history[0])
in_npc_corp = (kos == NPC)
idx = 0
while kos == NPC and (idx + 1) < len(history):
idx = idx + 1
kos = self.koscheck_internal(history[idx])
if in_npc_corp and kos != None and kos != NPC and kos != False:
kos = '%s: %s' % (LASTCORP, history[idx])
if kos == None or kos == NPC:
kos = False
return kos
def koscheck_internal(self, entity):
"""Looks up KOS entries by directly calling the CVA KOS API."""
result = self.cache.retrieve(KOS_CHECKER_URL, KOS_CHECKER_URL,
{'entity': entity})
if not result:
result = json.load(urllib2.urlopen(
KOS_CHECKER_URL % urllib.urlencode({'q' : entity})))
obj = CacheObject(60*60)
self.cache.store(KOS_CHECKER_URL, KOS_CHECKER_URL,
{'entity': entity}, result, obj)
kos = None
for value in result['results']:
# Require exact match (case-insensitively).
if value['label'].lower() != entity.lower():
continue
if value['type'] == 'alliance' and value['ticker'] == None:
# Bogus alliance created instead of NPC corp.
continue
kos = False
while True:
if value['kos']:
kos = '%s: %s' % (value['type'], value['label'])
if 'npc' in value and value['npc'] and not kos:
# Signal that further lookup is needed of player's last corp
return NPC
if 'corp' in value:
value = value['corp']
elif 'alliance' in value:
value = value['alliance']
else:
return kos
break
return kos
def employment_history(self, character):
"""Retrieves a player's most recent corporations via EVE api."""
cid = self.eveapi.eve.CharacterID(
names=character).characters[0].characterID
cdata = self.eveapi.eve.CharacterInfo(characterID=cid)
corps = [row.corporationID for row in cdata.employmentHistory]
unique_corps = []
for value in corps:
if value not in unique_corps:
unique_corps.append(value)
return [row.name for row in
self.eveapi.eve.CharacterName(
ids=','.join(str(x) for x in unique_corps)).characters]
def loop(self, filename, handler):
"""Performs KOS processing on each line read from the log file.
handler is a function of 3 args: (kos, notkos, error) that is called
every time there is a new KOS result.
"""
tailer = FileTailer(filename)
while True:
entry, comment = tailer.poll()
if not entry:
time.sleep(1.0)
continue
kos, not_kos, error = self.koscheck_logentry(entry)
handler(comment, kos, not_kos, error)
def koscheck_logentry(self, entry):
kos = []
notkos = []
error = []
for person in entry:
if person.isspace() or len(person) == 0:
continue
person = person.strip(' .')
try:
result = self.koscheck(person)
if result != False:
kos.append((person, result))
else:
notkos.append(person)
except:
error.append(person)
return (kos, notkos, error)
def stdout_handler(comment, kos, notkos, error):
fmt = '%s%6s (%3d) %s\033[0m'
if comment:
print comment
print fmt % ('\033[31m', 'KOS', len(kos), len(kos) * '*')
print fmt % ('\033[34m', 'NotKOS', len(notkos), len(notkos) * '*')
if len(error) > 0:
print fmt % ('\033[33m', 'Error', len(error), len(error) * '*')
print
for (person, reason) in kos:
print u'\033[31m[\u2212] %s\033[0m (%s)' % (person, reason)
print
for person in notkos:
print '\033[34m[+] %s\033[0m' % person
print
for person in error:
print '\033[33m[?] %s\033[0m' % person
print '-----'
if __name__ == '__main__':
if len(sys.argv) > 1:
KosChecker().loop(sys.argv[1], stdout_handler)
else:
print ('Usage: %s ~/EVE/logs/ChatLogs/Fleet_YYYYMMDD_HHMMSS.txt' %
sys.argv[0])