Skip to content

Commit

Permalink
Merge pull request #112 from knix-microfunctions/release/0.8.9
Browse files Browse the repository at this point in the history
Release/0.8.9
  • Loading branch information
iakkus authored Mar 29, 2021
2 parents 8287a3b + 9549933 commit ff6184f
Show file tree
Hide file tree
Showing 101 changed files with 2,180 additions and 1,429 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,7 @@ else if(sys.containsKey(key))
InetAddress la = InetAddress.getByName(addr[0]);
Socket s = new Socket(la, port);
s.close();
riakNodes.put(la.getHostAddress(), port);
riakNodes.put(la.getCanonicalHostName(), port);
System.out.println("Using riak node: "+la.getHostAddress()+":"+port.toString());
break;
} catch (UnknownHostException e) {
Expand Down
49 changes: 40 additions & 9 deletions FunctionWorker/MicroFunctionsAPI.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@
*/
namespace java org.microfunctions.mfnapi

struct TriggerAPIResult {
1: bool success
2: string message
}

struct TriggerInfoAMQP {
1: string amqp_addr
2: string routing_key
3: string exchange
4: bool with_ack
5: bool durable
6: bool exclusive
7: double ignr_msg_prob
}

struct TriggerInfoTimer {
1: i64 timerIntervalMilliseconds
}

service MicroFunctionsAPIService {

string get_context_object_properties(),
Expand All @@ -23,10 +42,10 @@ service MicroFunctionsAPIService {

void update_metadata(1: string metadata_name, 2: string metadata_value, 3: bool is_privileged_metadata),

void send_to_running_function_in_session(1: string rgid, 2: string message, 3: bool send_now), // message?
void send_to_all_running_functions_in_session_with_function_name(1: string gname, 2: string message, 3: bool send_now), // message
void send_to_all_running_functions_in_session(1: string message, 2: bool send_now), //message
void send_to_running_function_in_session_with_alias(1: string als, 2: string message, 3: bool send_now), // message
void send_to_running_function_in_session(1: string rgid, 2: string message, 3: bool send_now),
void send_to_all_running_functions_in_session_with_function_name(1: string gname, 2: string message, 3: bool send_now),
void send_to_all_running_functions_in_session(1: string message, 2: bool send_now),
void send_to_running_function_in_session_with_alias(1: string als, 2: string message, 3: bool send_now),

list<string> get_session_update_messages(1: i32 count, 2: bool blck),

Expand All @@ -46,11 +65,11 @@ service MicroFunctionsAPIService {

bool is_still_running(),

void add_workflow_next(1: string nxt, 2: string value), // value
void add_dynamic_next(1: string nxt, 2: string value), // value
void send_to_function_now(1: string destination, 2: string value), // value
void add_dynamic_workflow(1: list<map<string, string>> dynamic_trigger), // dynamic_trigger
list<map<string, string>> get_dynamic_workflow(), // return value
void add_workflow_next(1: string nxt, 2: string value),
void add_dynamic_next(1: string nxt, 2: string value),
void send_to_function_now(1: string destination, 2: string value),
void add_dynamic_workflow(1: list<map<string, string>> dynamic_trigger),
list<map<string, string>> get_dynamic_workflow(),

i64 get_remaining_time_in_millis(),
void log(1: string text, 2: string level),
Expand Down Expand Up @@ -89,7 +108,19 @@ service MicroFunctionsAPIService {
void deleteCounter(1: string countername, 2: bool is_private, 3: bool is_queued),
list<string> getCounterNames(1: i32 start_index, 2: i32 end_index, 3: bool is_private),

bool addTriggerableBucket(1: string bucket_name),
bool addStorageTriggerForWorkflow(1: string workflow_name, 2: string bucket_name),
bool deleteTriggerableBucket(1: string bucket_name),
bool deleteStorageTriggerForWorkflow(1: string workflow_name, 2: string bucket_name),

TriggerAPIResult addTriggerAMQP(1: string trigger_name, 2: TriggerInfoAMQP trigger_info),
TriggerAPIResult addTriggerTimer(1: string trigger_name, 2: TriggerInfoTimer trigger_info),
TriggerAPIResult addTriggerForWorkflow(1: string trigger_name, 2: string workflow_name, 3: string workflow_state),
TriggerAPIResult deleteTriggerForWorkflow(1: string trigger_name, 2: string workflow_name),
TriggerAPIResult deleteTrigger(1: string trigger_name),

map<string, string> get_transient_data_output(1: bool is_private),
map<string, bool> get_data_to_be_deleted(1: bool is_private)

}

17 changes: 14 additions & 3 deletions FunctionWorker/python/DataLayerClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from data_layer.message.ttypes import Metadata
from data_layer.service import DataLayerService

MAX_RETRIES=3
MAX_RETRIES = 3

class DataLayerClient:

Expand All @@ -50,7 +50,7 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
self.countertriggerstable = "counterTriggersTable"
self.countertriggersinfotable = "counterTriggersInfoTable"

else:
elif suid is not None:
self.keyspace = "storage_" + suid
if tableName is not None:
self.tablename = tableName
Expand All @@ -62,6 +62,9 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
self.triggersinfotablename = "triggersInfoTable"
self.countertriggerstable = "counterTriggersTable"
self.countertriggersinfotable = "counterTriggersInfoTable"
else:
print("[DataLayerClient]: Error in initializing; no required values given.")
return

#print("Creating datalayer client in keyspace=%s, tablename=%s, maptablename=%s, settablename=%s, countertablename=%s" % (self.keyspace,self.tablename, self.maptablename, self.settablename, self.countertablename))
self.locality = locality
Expand Down Expand Up @@ -447,17 +450,25 @@ def deleteSet(self, setname):

def getSetNames(self, start_index=0, end_index=2147483647):
sets = []
set_response = []
for retry in range(MAX_RETRIES):
try:
sets = self.datalayer.selectSets(self.keyspace, self.settablename, start_index, end_index, self.locality)
if sets is not None or isinstance(sets, list):
for name in sets:
if name.endswith("_outputkeys_set"):
continue
else:
set_response.append(name)

break
except TTransport.TTransportException as exc:
print("[DataLayerClient] Reconnecting because of failed getSetNames: " + str(exc))
self.connect()
except Exception as exc:
print("[DataLayerClient] failed getSetNames: " + str(exc))
raise
return sets
return set_response

# counter operations
def createCounter(self, countername, count, tableName=None):
Expand Down
15 changes: 7 additions & 8 deletions FunctionWorker/python/FunctionWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,18 @@ def _fork_and_handle_message(self, key, encapsulated_value):
if not has_error:
try:
if "__state_action" not in metadata or (metadata["__state_action"] != "post_map_processing" and metadata["__state_action"] != "post_parallel_processing"):
#self._logger.debug("[FunctionWorker] User code input(Before InputPath processing):" + str(type(raw_state_input)) + ":" + str(raw_state_input))
function_input = self._state_utils.applyInputPath(raw_state_input)
#self._logger.debug("[FunctionWorker] User code input(Before applyParameter processing):" + str(type(function_input)) + ":" + str(function_input))
function_input = self._state_utils.applyParameters(function_input)
#self._logger.debug("[FunctionWorker] User code input(Before ItemsPath processing):" + str(type(function_input)) + ":" + str(function_input))
function_input = self._state_utils.applyItemsPath(function_input) # process map items path
#self._logger.debug("[FunctionWorker] User code input(Before InputPath processing):" + str(type(raw_state_input)) + ":" + str(raw_state_input))
function_input = self._state_utils.applyInputPath(raw_state_input)
#self._logger.debug("[FunctionWorker] User code input(Before applyParameter processing):" + str(type(function_input)) + ":" + str(function_input))
function_input = self._state_utils.applyParameters(function_input)
#self._logger.debug("[FunctionWorker] User code input(Before ItemsPath processing):" + str(type(function_input)) + ":" + str(function_input))
function_input = self._state_utils.applyItemsPath(function_input) # process map items path

#elif "Action" not in metadata or metadata["Action"] != "post_parallel_processing":
# function_input = self._state_utils.applyInputPath(raw_state_input)

else:
function_input = raw_state_input
function_input = raw_state_input
except Exception as exc:
self._logger.exception("InputPath processing exception: %s\n%s", str(instance_pid), str(exc))
error_type = "InputPath processing exception"
Expand Down Expand Up @@ -728,4 +728,3 @@ def main():

if __name__ == '__main__':
main()

90 changes: 32 additions & 58 deletions FunctionWorker/python/LocalQueueClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@
import time
import socket

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TCompactProtocol

from local_queue.service import LocalQueueService
from local_queue.service.ttypes import LocalQueueMessage
import redis

class LocalQueueClient:
'''
Expand All @@ -32,93 +26,73 @@ class LocalQueueClient:
'''
def __init__(self, connect="127.0.0.1:4999"):
self.qaddress = connect
self._qaddress = connect
self.connect()

def connect(self):
host, port = self.qaddress.split(':')
retry = 0.5 #s
while True:
try:
self.socket = TSocket.TSocket(host, int(port))
self.transport = TTransport.TFramedTransport(self.socket)
self.protocol = TCompactProtocol.TCompactProtocol(self.transport)
self.queue = LocalQueueService.Client(self.protocol)
self.transport.open()
self._queue = redis.Redis.from_url("redis://" + self._qaddress, decode_responses=True)
break
except Thrift.TException as exc:
except Exception as exc:
if retry < 60:
print("[LocalQueueClient] Could not connect due to "+str(exc)+", retrying in "+str(retry)+"s")
print("[LocalQueueClient] Could not connect due to " + str(exc) + ", retrying in " + str(retry) + "s")
time.sleep(retry)
retry = retry * 2
else:
raise

def addMessage(self, topic, lqcm, ack):
def addMessage(self, topic, message, ack):
status = True
message = LocalQueueMessage()
message.payload = lqcm.get_serialized().encode()
try:
if ack:
status = self.queue.addMessage(topic, message)
status = bool(self._queue.xadd(topic, message.get_message()))
else:
self.queue.addMessageNoack(topic, message)
except TTransport.TTransportException as exc:
self._queue.xadd(topic, message.get_message())
except Exception as exc:
print("[LocalQueueClient] Reconnecting because of failed addMessage: " + str(exc))
status = False
self.connect()
except Exception as exc:
print("[LocalQueueClient] failed addMessage: " + str(exc))
raise

return status

def getMessage(self, topic, timeout):
message = None
try:
lqm = self.queue.getAndRemoveMessage(topic, timeout)
if lqm.index != 0:
return lqm
except TTransport.TTransportException as exc:
message_list = self._queue.xread({topic: 0}, block=timeout, count=1)
if message_list:
message = message_list[0][1][0][1]
# remove the message from the topic
msg_id = message_list[0][1][0][0]
self._queue.xdel(topic, msg_id)
except Exception as exc:
print("[LocalQueueClient] Reconnecting because of failed getMessage: " + str(exc))
self.connect()
except Exception as exc:
print("[LocalQueueClient] failed getMessage: " + str(exc))
raise

return None
return message

def getMultipleMessages(self, topic, max_count, timeout):
message_list = []
try:
lqm_list = self.queue.getAndRemoveMultiMessages(topic, max_count, timeout)
except TTransport.TTransportException as exc:
message_list = self._queue.xread({topic: "0"}, block=timeout, count=max_count)
except Exception as exc:
print("[LocalQueueClient] Reconnecting because of failed getMultipleMessages: " + str(exc))
lqm_list = []
self.connect()
except Exception as exc:
print("[LocalQueueClient] failed getMultipleMessages: " + str(exc))
raise

return lqm_list
msg_list = []
for msg in message_list[0][1]:
msg_list.append(msg[1])
# remove the message from the topic
self._queue.xdel(topic, msg[0])

return msg_list

def shutdown(self):
if self.transport.isOpen():
#self.socket.handle.shutdown(socket.SHUT_RDWR)
self.transport.close()
self._queue.close()

def addTopic(self, topic):
try:
self.queue.addTopic(topic)
except Thrift.TException as exc:
print("[LocalQueueClient] failed addTopic: " + str(exc))
except Exception as exc:
print("[LocalQueueClient] failed addTopic: " + str(exc))
raise
# no op with regular streams
return

def removeTopic(self, topic):
try:
self.queue.removeTopic(topic)
except Thrift.TException as exc:
print("[LocalQueueClient] failed removeTopic: " + str(exc))
except Exception as exc:
print("[LocalQueueClient] failed removeTopic: " + str(exc))
raise
self._queue.xtrim(topic, "0", approximate=False)
31 changes: 7 additions & 24 deletions FunctionWorker/python/LocalQueueClientMessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from struct import pack, unpack

class LocalQueueClientMessage:
'''
This class defines the message data structure used by the function worker.
It provides the utilities to convert the local queue message
and make the key and value fields easily accessible.
'''

def __init__(self, lqm=None, key=None, value=None):
if lqm is None and key is None and value is None:
return
elif lqm is not None:
self._serialized = lqm.payload
self._deserialize()
self._message = lqm
self._key = self._message["key"]
self._value = self._message["value"]
elif key is not None and value is not None:
self._key = key
self._value = value
self._serialize()

def _serialize(self):
length = 4 + len(self._key)
self._serialized = pack('!I', length)
self._serialized = self._serialized + self._key.encode() + self._value.encode()
self._serialized = self._serialized.decode()

def _deserialize(self):
length = unpack('!I', self._serialized[0:4])[0]
self._key = self._serialized[4:length].decode()
self._value = self._serialized[length:].decode()
self._message = {"key": self._key, "value": self._value}

def get_key(self):
return self._key

def get_value(self):
return self._value

def get_serialized(self):
return self._serialized

def get_message(self):
return self._message
Loading

0 comments on commit ff6184f

Please sign in to comment.