-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdriver.py
115 lines (96 loc) · 3.72 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import json
import logging
from datetime import datetime
from datetime import timedelta
from string import Template
import requests
from data import *
import pandas as pd
logger = logging.getLogger("ocps-covid-csv")
def readFile(file, mode="r"):
with open(file, mode) as f:
return f.read()
def writeFile(st, file, mode="w"):
with open(file, mode) as f:
return f.write(st)
person_types = ['Student', 'Employee', 'Vendor/Visitor']
class Driver:
def __init__(self, dataset, data):
self.data = data
self.dataset = dataset
self.url = dataset['request_url']
self.request = readFile("%s/request.json" % (dataset['templates']))
self.headers = {}
headers_str = readFile("%s/request-headers" % (dataset['templates']))
for line in headers_str.split("\n"):
s = line.split(":", 1)
if len(s) == 2:
self.headers[s[0].strip()] = s[1].strip()
def go(self, all=False):
if all:
self.data.clearAll()
for dt in self.iterAllDateTypes():
if self.data.haveDataFor(dt):
logger.debug("Already have the data for %s %s .. skipping" % (
dt['date'], dt['type']))
continue
logger.info("Getting data for %s %s" % (dt['date'], dt['type']))
data = self.getDataFor(dt)
if len(data) == 0:
logger.warn("Did not get any data for %s %s" %
(dt['date'], dt['type']))
else:
self.data.append(data)
self.data.toCsv(self.dataset['file'])
def iterAllDateTypes(self):
if "start_date" in self.dataset:
start_date = self.dataset['start_date']
else:
start_date = datetime.today()
for pt in person_types:
day = datetime(start_date.year, start_date.month, start_date.day)
while day >= self.dataset['cutoff']:
date = datetime(day.year, day.month, day.day)
yield {'date': date, 'type': pt}
day = day-timedelta(days=1)
def datetimeToQuery(self, d):
return datetime.strftime(d, "%Y-%m-%d")
def getDataFor(self, dt):
src = Template(self.request)
date = self.datetimeToQuery(dt['date'])
m = {'date': date, 'type': dt['type']}
js = json.loads(src.substitute(m))
resp = requests.post(self.url, json=js, headers=self.headers)
results = self.parseResult(json.loads(resp.text))
ret = []
for result in results:
r = m.copy()
r.update(result)
ret.append(r)
return ret
def parseResult(self, result_json):
"""
Read the result json that power bi gives us and translate it to a
dictionary of school -> count
"""
ret = []
for result in result_json['results']:
data = result['result']['data']
dsr = data['dsr']
last_c = 0
for ds in dsr['DS']:
for ph in ds['PH']:
if 'DM1' in ph.keys():
for dm1 in ph['DM1']:
if 'C' in dm1.keys():
c = dm1['C']
name = c[0]
# They only include a count if it's different than the one before
if len(c) > 1:
count = c[1]
last_c = c[1]
else:
count = last_c
res = {'location': name, 'count': count}
ret.append(res)
return ret