-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathfhir.py
95 lines (77 loc) · 2.94 KB
/
fhir.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from fhirclient import client
from datetime import datetime
from datetime import date
from fhirclient.models import fhirsearch
def perform_in(srch_str,server,apiBase):
""" Execute the search URL against the given server.
:param server: The server against which to perform the search
:returns: A Bundle resource
"""
from fhirclient.models import bundle
if server is None:
raise Exception("Need a server url to perform search")
resources = []
bundleBase = server.request_json(srch_str)
bundleCur = bundleBase
from fhirclient.models import bundle
bundle = bundle.Bundle(bundleBase)
bundle.origin_server = server
if bundle is not None and bundle.entry is not None:
for entry in bundle.entry:
resources.append(entry.resource)
while True:
if len(bundleCur['link'])>1 and bundleCur['link'][1]['relation'] != 'previous' :
from fhirclient.models import bundle
url = bundleCur['link'][1]['url']
urlString = url.replace(apiBase, '')
bundleNext = server.request_json(urlString)
bundleN = bundle.Bundle(bundleNext)
bundleN.origin_server = server
if bundleN is not None and bundleN.entry is not None:
for entry in bundleN.entry:
resources.append(entry.resource)
bundleCur = bundleNext
else:
break
return resources
def runCohortCounter(endpointUrl, endpointToken):
#connect to FHIR server
smart = client.FHIRClient(settings={
'app_id': endpointToken,
'api_base': endpointUrl
})
urlBase = endpointUrl
#Patients diagnosed with diabetes
print('Patient cohort for Diabetes')
search_str = 'Condition?_include=Condition:patient&code:below=http://snomed.info/sct|73211009'
# print('Patient cohort for asthma')
#search_str = 'Condition?_include=Condition:patient&code=195917001'
cohort = perform_in(search_str,smart.server,urlBase)
results = len(cohort)
cohortSize = int(results/2)
print("Retrieved cohort size %s" %cohortSize)
ageSum = 0
def calculate_age(born):
# convert str to datetime format
dob = datetime.strptime(born, "%Y-%m-%d")
today = date.today()
return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
# Loop over all patients in bundle
for patients in cohort:
entry = patients.as_json()
if 'birthDate' in entry.keys():
dob = entry['birthDate']
age_i = calculate_age(dob)
ageSum = ageSum+age_i
else:
results = results - 1
print("Cohort size after eliminating patients with no age data: %s" % results)
# Calculate mean age
meanAge = None
if cohortSize > 0:
meanAge = ageSum / results
print("Mean age in cohort: %s" % meanAge)
return {
'cohortCount': results,
'meanAge': meanAge
}