forked from MetOffice/localTables-GRIB-BUFR
-
Notifications
You must be signed in to change notification settings - Fork 0
/
uploadChanges.py
82 lines (67 loc) · 2.74 KB
/
uploadChanges.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import argparse
import json
import requests
"""
This script uploads content to the defined register
./prodRegister
This reqires an authentication token and userID and structured
content.
The structured content is taken from the command line argument which
shall consist of a dictionary with the keys:
'PUT', 'POST'
and each key shall provide a list of .ttl files to upload to prodRegister
based on the relative path of the .ttl file.
"""
def authenticate(session, base, userid, pss):
auth = session.post('{}/system/security/apilogin'.format(base),
data={'userid':userid,
'password':pss})
if not auth.status_code == 200:
raise ValueError('auth failed')
return session
def parse_uploads(uploads):
result = json.loads(uploads)
if set(result.keys()) != set(('PUT', 'POST')):
raise ValueError("Uploads inputs should have keys"
" set(('PUT', 'POST')) only, not:\n"
"{}".format(result.keys()))
return result
def post(session, url, payload):
headers={'Content-type':'text/turtle'}
response = session.get(url, headers=headers)
if response.status_code != 200:
raise ValueError('Cannot POST to {}, it exists.'.format(url))
params = {'status':'Experimental'}
res = session.post(url, headers=headers, data=payload, params=params)
if res.status_code != 201:
print('POST failed with {}\n{}'.format(res.status_code, res.reason))
def put(session, url, payload):
headers={'Content-type':'text/turtle'}
response = session.get(url, headers=headers)
if response.status_code != 200:
raise ValueError('Cannot PUT to {}, it does not exist.'.format(url))
res = session.post(url, headers=headers, data=payload)
def post_uploads(session, rootURL, uploads):
for postfile in uploads:
with open('.{}'.format(postfile), 'r') as pf:
pdata = pf.read()
# post, so remove last part of identity, this is in the payload
relID = postfile.rstrip('.ttl')
relID = '/'.join(postfile.split('/')[:-1])
url = '{}{}'.format(rootURL, relID)
print(url)
post(session, url, pdata)
if __name__ == '__main__':
with open('prodRegister', 'r') as fh:
rooturl = fh.read().split('\n')[0]
print('Running test with respect to {}'.format(rooturl))
parser = argparse.ArgumentParser()
parser.add_argument('user_id')
parser.add_argument("passcode")
parser.add_argument('uploads')
args = parser.parse_args()
session = requests.Session()
uploads = parse_uploads(args.uploads)
session = authenticate(session, rooturl, args.user_id, args.passcode)
print(uploads)
post_uploads(session, rooturl, uploads['POST'])