-
Notifications
You must be signed in to change notification settings - Fork 0
/
ibm_cf_connector.py
120 lines (98 loc) · 4.14 KB
/
ibm_cf_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import requests
import os
import base64
class CloudFunctions:
def __init__(self, config):
"""
Constructor
"""
self.api_key = str.encode(config['api_key'])
self.endpoint = config['endpoint'].replace('http:', 'https:')
self.namespace = config['namespace']
auth = base64.encodebytes(self.api_key).replace(b'\n', b'')
#auth = base64.b64encode(self.api_key).replace(b'\n', b'')
self.session = requests.session()
default_user_agent = self.session.headers['User-Agent']
self.headers = {
'content-type': 'application/json',
'Authorization': 'Basic %s' % auth.decode('UTF-8'),
'User-Agent': default_user_agent + ' pywren-ibm-cloud'
}
self.session.headers.update(self.headers)
adapter = requests.adapters.HTTPAdapter()
self.session.mount('https://', adapter)
msg = 'IBM Cloud Functions init for'
print("{} Namespace: {}".format(msg, self.namespace))
print("{} Host: {}".format(msg, self.endpoint))
def create_action(self, action_name, code=None, kind='blackbox',
image='ibmfunctions/action-python-v3.6', is_binary=True, overwrite=True):
"""
Create an IBM Cloud Function
"""
print('I am about to create a new cloud function action')
url = str(self.endpoint)+'/api'+'/v1'+'/namespaces/'+str(self.namespace)+'/actions/'+str(action_name)+"?overwrite=" + str(overwrite)
data = {}
limits = {}
cfexec = {}
limits['timeout'] = 60000
limits['memory'] = 1024
if limits['timeout'] and limits['memory']:
data['limits'] = limits
cfexec['kind'] = kind
if kind == 'blackbox':
cfexec['image'] = image
cfexec['binary'] = is_binary
cfexec['code'] = base64.b64encode(code).decode("utf-8") if is_binary else code
data['exec'] = cfexec
res = self.session.put(url, json=data)
if res.status_code != 200:
print('An error occurred updating action {}'.format(action_name))
else:
print("OK --> Updated action {}".format(action_name))
def get_action(self, action_name):
"""
Get an IBM Cloud Function
"""
print("I am about to get a cloud function action: {}".format(action_name))
url = os.path.join(self.endpoint, 'api', 'v1', 'namespaces',
self.namespace, 'actions', action_name)
res = self.session.get(url)
return res.json()
def delete_action(self, action_name):
"""
Delete an IBM Cloud Function
"""
print("Delete cloud function action: {}".format(action_name))
url = os.path.join(self.endpoint, 'api', 'v1', 'namespaces',
self.namespace, 'actions', action_name)
res = self.session.delete(url)
if res.status_code != 200:
print('An error occurred deleting action {}'.format(action_name))
def invoke(self, action_name, payload):
"""
Invoke an IBM Cloud Function
"""
url = str(self.endpoint)+'/api'+'/v1'+'/namespaces/'+str(self.namespace)+'/actions/'+str(action_name)
try:
resp = self.session.post(url, json=payload)
data = resp.json()
resp_time = format(round(resp.elapsed.total_seconds(), 3), '.3f')
except:
return self.invoke(action_name, payload)
if 'activationId' in data:
log_msg = ('Activation ID: {} - Time: {} seconds'.format(data["activationId"], resp_time))
print(log_msg)
return data["activationId"]
else:
print(data)
return None
def invoke_with_result(self, action_name, payload={}):
"""
Invoke an IBM Cloud Function waiting for the result.
"""
url = os.path.join(self.endpoint, 'api', 'v1',
'namespaces', self.namespace, 'actions',
action_name + "?blocking=true&result=true")
resp = self.session.post(url, json=payload)
result = resp.json()
return result