-
Notifications
You must be signed in to change notification settings - Fork 0
/
cv-stats.py
138 lines (116 loc) · 4.55 KB
/
cv-stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from pymongo import MongoClient
from os import path, environ
from datetime import datetime, date, timedelta
import json
import pandas as pd
class CoronavirusStats():
# constructor
def __init__(self):
# read config, if config.json file is not available then try OS environment vars
if path.exists('config.json'):
with open('config.json') as config_file:
self.config = json.load(config_file)
else:
self.config = {
"mongodb": {
"url": environ.get("DATABASE_URL"),
"database": environ.get("DATABASE_NAME")
},
"other": {
"dashboard_url": environ.get("DASHBOARD_URL")
},
"smtp": {
"user": environ.get("SMTP_USER"),
"password": environ.get("SMTP_PASSWORD"),
"email_from": environ.get("EMAIL_FROM"),
"email_to": environ.get("EMAIL_TO"),
}
}
# connect to MongoDB/Atlas
self.client = MongoClient(self.config["mongodb"]["url"])
self.db = self.client.get_database(self.config["mongodb"]["database"])
today = datetime.today() - timedelta(days=1)
self.data = self.read_mongo("florida", {"date_added": {"$lt": today}})
# Convert MongoDB cursor to Pandas dataframe
def read_mongo(self, collection, query={}, no_id=True):
""" Read from Mongo and Store into DataFrame """
# Make a query to the specific DB and Collection
cursor = self.db[collection].find(query)
# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))
# Delete the _id
if no_id:
del df['_id']
return df
# get case count cumulative sum by date
def cum_sum(self):
count_by_date = self.data.groupby("date_added")["case_number"].count()
return count_by_date.cumsum()
# calculate daily growth of case count
def cum_growth(self, tail = None):
pct_growth_change = self.cum_sum().pct_change()
growth_rate = pct_growth_change.apply(lambda x: x + 1)
if tail:
return growth_rate.tail(tail)
else:
return growth_rate
# simulate case growth
def growth_sim(self, count, growth_factor):
cum_sum = self.cum_sum()
prediction_dict = cum_sum.to_dict()
last_date = list(prediction_dict.keys())[-1]
last_count = prediction_dict[last_date]
predict_range = range(count)
for i in predict_range:
new_date = last_date + timedelta(days=1)
predicted_count = last_count * growth_factor
prediction_dict[new_date] = predicted_count
last_date = new_date
last_count = predicted_count
return prediction_dict
# push stats
def push_stats(self, recalculate_sim = False):
# rebuild simulation
self.db.florida_growth.delete_many({"series": "actual"})
self.db.florida_growth_rates.delete_many({})
if recalculate_sim:
self.db.florida_growth.delete_many({"series": "predicted"})
# get cumulated sums
current_growth = self.cum_sum().to_dict()
data = []
for date, count in current_growth.items():
data.append({
"date": date,
"count": count,
"series": "actual"
})
# simulate cumulated sums
if recalculate_sim:
# get average growth rate for the last 5 days
average_growth_rate = self.cum_growth(5).mean()
# simulate 14 days of growth at current growth rate
simulated_growth = stats.growth_sim(14, average_growth_rate)
for date, count in simulated_growth.items():
data.append({
"date": date,
"count": count,
"series": "predicted"
})
try:
self.db.florida_growth.insert_many(data)
except Exception as e:
print(str(e))
# store growth rates
data = []
growth_rates = self.cum_growth().to_dict()
for date, rate in growth_rates.items():
data.append({
"date": date,
"rate": rate
})
try:
self.db.florida_growth_rates.insert_many(data)
except Exception as e:
print(str(e))
stats = CoronavirusStats()
stats.push_stats()