diff --git a/DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java b/DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java index 4b681658..9da1b557 100644 --- a/DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java +++ b/DataLayerService/src/main/java/org/microfunctions/data_layer/DataLayerServer.java @@ -1728,7 +1728,7 @@ else if(sys.containsKey(key)) InetAddress la = InetAddress.getByName(addr[0]); Socket s = new Socket(la, port); s.close(); - riakNodes.put(la.getHostAddress(), port); + riakNodes.put(la.getCanonicalHostName(), port); System.out.println("Using riak node: "+la.getHostAddress()+":"+port.toString()); break; } catch (UnknownHostException e) { diff --git a/FunctionWorker/MicroFunctionsAPI.thrift b/FunctionWorker/MicroFunctionsAPI.thrift index 0b1080a2..6fd18b67 100644 --- a/FunctionWorker/MicroFunctionsAPI.thrift +++ b/FunctionWorker/MicroFunctionsAPI.thrift @@ -15,6 +15,25 @@ */ namespace java org.microfunctions.mfnapi +struct TriggerAPIResult { + 1: bool success + 2: string message +} + +struct TriggerInfoAMQP { + 1: string amqp_addr + 2: string routing_key + 3: string exchange + 4: bool with_ack + 5: bool durable + 6: bool exclusive + 7: double ignr_msg_prob +} + +struct TriggerInfoTimer { + 1: i64 timerIntervalMilliseconds +} + service MicroFunctionsAPIService { string get_context_object_properties(), @@ -23,10 +42,10 @@ service MicroFunctionsAPIService { void update_metadata(1: string metadata_name, 2: string metadata_value, 3: bool is_privileged_metadata), - void send_to_running_function_in_session(1: string rgid, 2: string message, 3: bool send_now), // message? - void send_to_all_running_functions_in_session_with_function_name(1: string gname, 2: string message, 3: bool send_now), // message - void send_to_all_running_functions_in_session(1: string message, 2: bool send_now), //message - void send_to_running_function_in_session_with_alias(1: string als, 2: string message, 3: bool send_now), // message + void send_to_running_function_in_session(1: string rgid, 2: string message, 3: bool send_now), + void send_to_all_running_functions_in_session_with_function_name(1: string gname, 2: string message, 3: bool send_now), + void send_to_all_running_functions_in_session(1: string message, 2: bool send_now), + void send_to_running_function_in_session_with_alias(1: string als, 2: string message, 3: bool send_now), list get_session_update_messages(1: i32 count, 2: bool blck), @@ -46,11 +65,11 @@ service MicroFunctionsAPIService { bool is_still_running(), - void add_workflow_next(1: string nxt, 2: string value), // value - void add_dynamic_next(1: string nxt, 2: string value), // value - void send_to_function_now(1: string destination, 2: string value), // value - void add_dynamic_workflow(1: list> dynamic_trigger), // dynamic_trigger - list> get_dynamic_workflow(), // return value + void add_workflow_next(1: string nxt, 2: string value), + void add_dynamic_next(1: string nxt, 2: string value), + void send_to_function_now(1: string destination, 2: string value), + void add_dynamic_workflow(1: list> dynamic_trigger), + list> get_dynamic_workflow(), i64 get_remaining_time_in_millis(), void log(1: string text, 2: string level), @@ -89,7 +108,19 @@ service MicroFunctionsAPIService { void deleteCounter(1: string countername, 2: bool is_private, 3: bool is_queued), list getCounterNames(1: i32 start_index, 2: i32 end_index, 3: bool is_private), + bool addTriggerableBucket(1: string bucket_name), + bool addStorageTriggerForWorkflow(1: string workflow_name, 2: string bucket_name), + bool deleteTriggerableBucket(1: string bucket_name), + bool deleteStorageTriggerForWorkflow(1: string workflow_name, 2: string bucket_name), + + TriggerAPIResult addTriggerAMQP(1: string trigger_name, 2: TriggerInfoAMQP trigger_info), + TriggerAPIResult addTriggerTimer(1: string trigger_name, 2: TriggerInfoTimer trigger_info), + TriggerAPIResult addTriggerForWorkflow(1: string trigger_name, 2: string workflow_name, 3: string workflow_state), + TriggerAPIResult deleteTriggerForWorkflow(1: string trigger_name, 2: string workflow_name), + TriggerAPIResult deleteTrigger(1: string trigger_name), + map get_transient_data_output(1: bool is_private), map get_data_to_be_deleted(1: bool is_private) + } diff --git a/FunctionWorker/python/DataLayerClient.py b/FunctionWorker/python/DataLayerClient.py index b6d51200..345694e7 100644 --- a/FunctionWorker/python/DataLayerClient.py +++ b/FunctionWorker/python/DataLayerClient.py @@ -23,7 +23,7 @@ from data_layer.message.ttypes import Metadata from data_layer.service import DataLayerService -MAX_RETRIES=3 +MAX_RETRIES = 3 class DataLayerClient: @@ -50,7 +50,7 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals self.countertriggerstable = "counterTriggersTable" self.countertriggersinfotable = "counterTriggersInfoTable" - else: + elif suid is not None: self.keyspace = "storage_" + suid if tableName is not None: self.tablename = tableName @@ -62,6 +62,9 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals self.triggersinfotablename = "triggersInfoTable" self.countertriggerstable = "counterTriggersTable" self.countertriggersinfotable = "counterTriggersInfoTable" + else: + print("[DataLayerClient]: Error in initializing; no required values given.") + return #print("Creating datalayer client in keyspace=%s, tablename=%s, maptablename=%s, settablename=%s, countertablename=%s" % (self.keyspace,self.tablename, self.maptablename, self.settablename, self.countertablename)) self.locality = locality @@ -447,9 +450,17 @@ def deleteSet(self, setname): def getSetNames(self, start_index=0, end_index=2147483647): sets = [] + set_response = [] for retry in range(MAX_RETRIES): try: sets = self.datalayer.selectSets(self.keyspace, self.settablename, start_index, end_index, self.locality) + if sets is not None or isinstance(sets, list): + for name in sets: + if name.endswith("_outputkeys_set"): + continue + else: + set_response.append(name) + break except TTransport.TTransportException as exc: print("[DataLayerClient] Reconnecting because of failed getSetNames: " + str(exc)) @@ -457,7 +468,7 @@ def getSetNames(self, start_index=0, end_index=2147483647): except Exception as exc: print("[DataLayerClient] failed getSetNames: " + str(exc)) raise - return sets + return set_response # counter operations def createCounter(self, countername, count, tableName=None): diff --git a/FunctionWorker/python/FunctionWorker.py b/FunctionWorker/python/FunctionWorker.py index fab66c52..b9e371cc 100644 --- a/FunctionWorker/python/FunctionWorker.py +++ b/FunctionWorker/python/FunctionWorker.py @@ -338,18 +338,18 @@ def _fork_and_handle_message(self, key, encapsulated_value): if not has_error: try: if "__state_action" not in metadata or (metadata["__state_action"] != "post_map_processing" and metadata["__state_action"] != "post_parallel_processing"): - #self._logger.debug("[FunctionWorker] User code input(Before InputPath processing):" + str(type(raw_state_input)) + ":" + str(raw_state_input)) - function_input = self._state_utils.applyInputPath(raw_state_input) - #self._logger.debug("[FunctionWorker] User code input(Before applyParameter processing):" + str(type(function_input)) + ":" + str(function_input)) - function_input = self._state_utils.applyParameters(function_input) - #self._logger.debug("[FunctionWorker] User code input(Before ItemsPath processing):" + str(type(function_input)) + ":" + str(function_input)) - function_input = self._state_utils.applyItemsPath(function_input) # process map items path + #self._logger.debug("[FunctionWorker] User code input(Before InputPath processing):" + str(type(raw_state_input)) + ":" + str(raw_state_input)) + function_input = self._state_utils.applyInputPath(raw_state_input) + #self._logger.debug("[FunctionWorker] User code input(Before applyParameter processing):" + str(type(function_input)) + ":" + str(function_input)) + function_input = self._state_utils.applyParameters(function_input) + #self._logger.debug("[FunctionWorker] User code input(Before ItemsPath processing):" + str(type(function_input)) + ":" + str(function_input)) + function_input = self._state_utils.applyItemsPath(function_input) # process map items path #elif "Action" not in metadata or metadata["Action"] != "post_parallel_processing": # function_input = self._state_utils.applyInputPath(raw_state_input) else: - function_input = raw_state_input + function_input = raw_state_input except Exception as exc: self._logger.exception("InputPath processing exception: %s\n%s", str(instance_pid), str(exc)) error_type = "InputPath processing exception" @@ -728,4 +728,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/FunctionWorker/python/LocalQueueClient.py b/FunctionWorker/python/LocalQueueClient.py index 86b35e8d..e9ba295d 100644 --- a/FunctionWorker/python/LocalQueueClient.py +++ b/FunctionWorker/python/LocalQueueClient.py @@ -15,13 +15,7 @@ import time import socket -from thrift import Thrift -from thrift.transport import TSocket -from thrift.transport import TTransport -from thrift.protocol import TCompactProtocol - -from local_queue.service import LocalQueueService -from local_queue.service.ttypes import LocalQueueMessage +import redis class LocalQueueClient: ''' @@ -32,93 +26,73 @@ class LocalQueueClient: ''' def __init__(self, connect="127.0.0.1:4999"): - self.qaddress = connect + self._qaddress = connect self.connect() def connect(self): - host, port = self.qaddress.split(':') - retry = 0.5 #s while True: try: - self.socket = TSocket.TSocket(host, int(port)) - self.transport = TTransport.TFramedTransport(self.socket) - self.protocol = TCompactProtocol.TCompactProtocol(self.transport) - self.queue = LocalQueueService.Client(self.protocol) - self.transport.open() + self._queue = redis.Redis.from_url("redis://" + self._qaddress, decode_responses=True) break - except Thrift.TException as exc: + except Exception as exc: if retry < 60: - print("[LocalQueueClient] Could not connect due to "+str(exc)+", retrying in "+str(retry)+"s") + print("[LocalQueueClient] Could not connect due to " + str(exc) + ", retrying in " + str(retry) + "s") time.sleep(retry) retry = retry * 2 else: raise - def addMessage(self, topic, lqcm, ack): + def addMessage(self, topic, message, ack): status = True - message = LocalQueueMessage() - message.payload = lqcm.get_serialized().encode() try: if ack: - status = self.queue.addMessage(topic, message) + status = bool(self._queue.xadd(topic, message.get_message())) else: - self.queue.addMessageNoack(topic, message) - except TTransport.TTransportException as exc: + self._queue.xadd(topic, message.get_message()) + except Exception as exc: print("[LocalQueueClient] Reconnecting because of failed addMessage: " + str(exc)) status = False self.connect() - except Exception as exc: - print("[LocalQueueClient] failed addMessage: " + str(exc)) - raise return status def getMessage(self, topic, timeout): + message = None try: - lqm = self.queue.getAndRemoveMessage(topic, timeout) - if lqm.index != 0: - return lqm - except TTransport.TTransportException as exc: + message_list = self._queue.xread({topic: 0}, block=timeout, count=1) + if message_list: + message = message_list[0][1][0][1] + # remove the message from the topic + msg_id = message_list[0][1][0][0] + self._queue.xdel(topic, msg_id) + except Exception as exc: print("[LocalQueueClient] Reconnecting because of failed getMessage: " + str(exc)) self.connect() - except Exception as exc: - print("[LocalQueueClient] failed getMessage: " + str(exc)) - raise - return None + return message def getMultipleMessages(self, topic, max_count, timeout): + message_list = [] try: - lqm_list = self.queue.getAndRemoveMultiMessages(topic, max_count, timeout) - except TTransport.TTransportException as exc: + message_list = self._queue.xread({topic: "0"}, block=timeout, count=max_count) + except Exception as exc: print("[LocalQueueClient] Reconnecting because of failed getMultipleMessages: " + str(exc)) - lqm_list = [] self.connect() - except Exception as exc: - print("[LocalQueueClient] failed getMultipleMessages: " + str(exc)) - raise - return lqm_list + msg_list = [] + for msg in message_list[0][1]: + msg_list.append(msg[1]) + # remove the message from the topic + self._queue.xdel(topic, msg[0]) + + return msg_list def shutdown(self): - if self.transport.isOpen(): - #self.socket.handle.shutdown(socket.SHUT_RDWR) - self.transport.close() + self._queue.close() def addTopic(self, topic): - try: - self.queue.addTopic(topic) - except Thrift.TException as exc: - print("[LocalQueueClient] failed addTopic: " + str(exc)) - except Exception as exc: - print("[LocalQueueClient] failed addTopic: " + str(exc)) - raise + # no op with regular streams + return def removeTopic(self, topic): - try: - self.queue.removeTopic(topic) - except Thrift.TException as exc: - print("[LocalQueueClient] failed removeTopic: " + str(exc)) - except Exception as exc: - print("[LocalQueueClient] failed removeTopic: " + str(exc)) - raise + self._queue.xtrim(topic, "0", approximate=False) diff --git a/FunctionWorker/python/LocalQueueClientMessage.py b/FunctionWorker/python/LocalQueueClientMessage.py index ce97bbb4..8bd186c3 100644 --- a/FunctionWorker/python/LocalQueueClientMessage.py +++ b/FunctionWorker/python/LocalQueueClientMessage.py @@ -12,35 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from struct import pack, unpack - class LocalQueueClientMessage: - ''' - This class defines the message data structure used by the function worker. - It provides the utilities to convert the local queue message - and make the key and value fields easily accessible. - ''' + def __init__(self, lqm=None, key=None, value=None): if lqm is None and key is None and value is None: return elif lqm is not None: - self._serialized = lqm.payload - self._deserialize() + self._message = lqm + self._key = self._message["key"] + self._value = self._message["value"] elif key is not None and value is not None: self._key = key self._value = value - self._serialize() - - def _serialize(self): - length = 4 + len(self._key) - self._serialized = pack('!I', length) - self._serialized = self._serialized + self._key.encode() + self._value.encode() - self._serialized = self._serialized.decode() - - def _deserialize(self): - length = unpack('!I', self._serialized[0:4])[0] - self._key = self._serialized[4:length].decode() - self._value = self._serialized[length:].decode() + self._message = {"key": self._key, "value": self._value} def get_key(self): return self._key @@ -48,6 +32,5 @@ def get_key(self): def get_value(self): return self._value - def get_serialized(self): - return self._serialized - + def get_message(self): + return self._message diff --git a/FunctionWorker/python/MicroFunctionsAPI.py b/FunctionWorker/python/MicroFunctionsAPI.py index 8de6a5c3..8dd49a00 100644 --- a/FunctionWorker/python/MicroFunctionsAPI.py +++ b/FunctionWorker/python/MicroFunctionsAPI.py @@ -194,9 +194,9 @@ def ping(self, num): output = num return 'pong ' + str(output) - def get_privileged_data_layer_client(self, suid=None, sid=None, is_wf_private=False, init_tables=False, drop_keyspace=False, tableName=None): + def get_privileged_data_layer_client(self, suid=None, sid=None, for_mfn=False, is_wf_private=False, init_tables=False, drop_keyspace=False, tableName=None): ''' - Obtain a privileged data layer client to access a user's storage or a workflow-private storage. + Obtain a privileged data layer client to access a user's storage, a workflow-private storage or mfn internal tables of a workflow. Only can be usable by the management service. Args: @@ -205,6 +205,7 @@ def get_privileged_data_layer_client(self, suid=None, sid=None, is_wf_private=Fa init_tables (boolean): whether relevant data layer tables should be initialized; default: False. drop_keyspace (boolean): whether the relevant keyspace for the user's storage should be dropped; default: False. + for_mfn: whether it is about the mfn internal table of a workflow; default: False. tableName (string): name of the table to be used for subsequent storage operations. By default, the default table will be used. If this method is called with is_wf_private = True, then the tableName parameter will be ignored. @@ -218,9 +219,11 @@ def get_privileged_data_layer_client(self, suid=None, sid=None, is_wf_private=Fa return DataLayerClient(locality=1, suid=suid, connect=self._datalayer, init_tables=init_tables, drop_keyspace=drop_keyspace, tableName=tableName) else: return DataLayerClient(locality=1, suid=suid, connect=self._datalayer, init_tables=init_tables, drop_keyspace=drop_keyspace) + elif for_mfn: + return DataLayerClient(locality=1, sid=sid, for_mfn=True, init_tables=init_tables, connect=self._datalayer, drop_keyspace=drop_keyspace) elif is_wf_private: - # we'll never try to access the metadata stored for mfn - return DataLayerClient(locality=1, sid=sid, wid=sid, for_mfn=False, is_wf_private=is_wf_private, connect=self._datalayer, drop_keyspace=drop_keyspace) + return DataLayerClient(locality=1, sid=sid, wid=sid, is_wf_private=True, init_tables=init_tables, connect=self._datalayer, drop_keyspace=drop_keyspace) + return None def update_metadata(self, metadata_name, metadata_value, is_privileged_metadata=False): @@ -1022,7 +1025,7 @@ def put(self, key, value, is_private=False, is_queued=False, bucketName=None): is_private (boolean): whether the item should be written to the private data layer of the workflow; default: False is_queued (boolean): whether the put operation should be reflected on the data layer after the execution finish; default: False (i.e., the put operation will be reflected on the data layer immediately) - bucketName (string): name of the bucket where to put the key. By default, it will be put in the default bucket. If this method is + bucketName (string): name of the bucket where to put the key. By default, it will be put in the default bucket. If this method is called with is_private = True, then the bucketName parameter will be ignored. Returns: @@ -1058,7 +1061,7 @@ def get(self, key, is_private=False, bucketName=None): Args: key (string): the key of the data item is_private (boolean): whether the item should be read from the private data layer of the workflow; default: False - bucketName (string): name of the bucket where to get the key from. By default, it will be fetched from the default bucket. If this method is + bucketName (string): name of the bucket where to get the key from. By default, it will be fetched from the default bucket. If this method is called with is_private = True, then the bucketName parameter will be ignored. Returns: @@ -1871,7 +1874,7 @@ def addTriggerableBucket(self, bucketName): bucketName (string): the name of the bucket to be added Returns: - Boolean, indicating whether the bucket was created successfully + Boolean, indicating whether the bucket was created successfully Raises: None @@ -1903,7 +1906,7 @@ def addStorageTriggerForWorkflow(self, workflowName, bucketName): bucketName (string): the name of the bucket with which to associate the workflow Returns: - Boolean, indicating whether the storage trigger was created successfully + Boolean, indicating whether the storage trigger was created successfully Raises: None @@ -1936,7 +1939,7 @@ def deleteTriggerableBucket(self, bucketName): bucketName (string): the name of the bucket to delete Returns: - Boolean, indicating whether the bucket was deleted successfully + Boolean, indicating whether the bucket was deleted successfully Raises: None @@ -1969,7 +1972,7 @@ def deleteStorageTriggerForWorkflow(self, workflowName, bucketName): bucketName (string): the name of the bucket currently associated with the workflow Returns: - Boolean, indicating whether the storage trigger was deleted successfully + Boolean, indicating whether the storage trigger was deleted successfully Raises: None @@ -2067,6 +2070,21 @@ def _getWorkflowDetails(self, workflowName): return None return response + def addTriggerTimer(self, trigger_name, trigger_info): + if "timer_interval_ms" not in trigger_info: + return False, "Missing parameter for timer trigger: timer_interval_ms" + + return self.addTrigger(trigger_name, trigger_info) + + def addTriggerAMQP(self, trigger_name, trigger_info): + + if "amqp_addr" not in trigger_info: + return False, "Missing parameter for AMQP trigger: amqp_addr" + + if "routing_key" not in trigger_info: + return False, "Missing parameter for AMQP trigger: routing_key" + + return self.addTrigger(trigger_name, trigger_info) def addTrigger(self, trigger_name, trigger_info): ''' @@ -2075,19 +2093,19 @@ def addTrigger(self, trigger_name, trigger_info): trigger_info (dict): Trigger specific information. { trigger_type (string): Type of trigger to associate with a workflow. Currently supported values: "amqp", "timer", For "amqp", - amqp_addr (string) - routing_key (string), + amqp_addr (string) + routing_key (string), exchange (string), "egress_exchange" (default) with_ack (boolean), False (default) - means automatic acks, durable (boolean), False (default), exclusive (boolean), False (default), ignore_message_probability (float, range = [0.0, 100.0)), 0.0 (default), - For 'timer', + For 'timer', timer_interval_ms: specified in milli-seconds. } Returns: (status, status_message) - status (boolean) - True, if the trigger was created successfully. False, otherwise + status (boolean) - True, if the trigger was created successfully. False, otherwise status_message (string) - status message, depending on 'status' Raises: None @@ -2115,7 +2133,7 @@ def addTrigger(self, trigger_name, trigger_info): return True, response["data"]["message"] - def addTriggerForWorkflow(self, trigger_name, workflow_name, workflow_state = ""): + def addTriggerForWorkflow(self, trigger_name, workflow_name, workflow_state=""): ''' Args: trigger_name (string): Name of an existing trigger to a workflow to. @@ -2123,7 +2141,7 @@ def addTriggerForWorkflow(self, trigger_name, workflow_name, workflow_state = "" workflow_state (string): (Optional) Name of the state within the workflow to invoke from the trigger. If not specified then the entry state will be invoked by default. Returns: (status, status_message) - status (boolean) - True, if the workflow was added to the trigger successfully. False, otherwise + status (boolean) - True, if the workflow was added to the trigger successfully. False, otherwise status_message (string) - status message, depending on 'status' Raises: None @@ -2136,7 +2154,7 @@ def addTriggerForWorkflow(self, trigger_name, workflow_name, workflow_state = "" workflow_state_topic = "" if type(workflow_state) == type("") and workflow_state is not "" and len(workflow_state) > 0: workflow_state_topic = self._sid + "-" + self._wid + "-" + workflow_state - + request = \ { "action": "addTriggerForWorkflow", @@ -2163,7 +2181,7 @@ def deleteTriggerForWorkflow(self, trigger_name, workflow_name): workflow_name (string): Name of the workflow to remove from the trigger. Returns: (status, status_message) - status (boolean) - True, if the workflow was remove from this trigger successfully. False, otherwise + status (boolean) - True, if the workflow was remove from this trigger successfully. False, otherwise status_message (string) - status message or error message, depending on 'status' Raises: None @@ -2195,7 +2213,7 @@ def deleteTrigger(self, trigger_name): trigger_name (string): Name of an existing trigger to delete Returns: (status, status_message) - status (boolean) - True, if the trigger was deleted successfully. False, otherwise + status (boolean) - True, if the trigger was deleted successfully. False, otherwise status_message (string) - status message, depending on 'status' Raises: None diff --git a/FunctionWorker/python/SessionHelperThread.py b/FunctionWorker/python/SessionHelperThread.py index 71b30a7c..1902ab7e 100644 --- a/FunctionWorker/python/SessionHelperThread.py +++ b/FunctionWorker/python/SessionHelperThread.py @@ -168,17 +168,6 @@ def run(self): if lqm is not None: self._process_message(lqm) - if self._heartbeat_enabled: - # send heartbeat - # this is part of the message loop, such that we can have a more precise heartbeat - # if it was only after the message loop, then there is a corner case, where the - # processing of the messages would take more than the heartbeat interval, - # meaning we would miss our deadline - t_cur = time.time() * 1000.0 - if (t_cur - last_heartbeat_time) >= self._heartbeat_interval: - self._send_heartbeat() - last_heartbeat_time = t_cur - if self._heartbeat_enabled: # send heartbeat # even if there are no messages, we might need to send a heartbeat @@ -190,6 +179,14 @@ def run(self): # if we sent a heartbeat recently, last_heartbeat and t_cur will cancel each other out poll_timeout = py3utils.ensure_long(last_heartbeat_time + self._local_poll_timeout - t_cur) #self._logger.debug("updated poll timeout: " + str(poll_timeout)) + if poll_timeout <= 0: + # we just missed a deadline; send a heartbeat right away + t_cur = time.time() * 1000.0 + self._send_heartbeat() + last_heartbeat_time = t_cur + # reset the poll timeout accordingly + poll_timeout = self._local_poll_timeout + #self._logger.debug("updated poll timeout (after missing deadline): " + str(poll_timeout)) self._cleanup() diff --git a/FunctionWorker/python/StateUtils.py b/FunctionWorker/python/StateUtils.py index 187c47e2..977f9632 100644 --- a/FunctionWorker/python/StateUtils.py +++ b/FunctionWorker/python/StateUtils.py @@ -44,7 +44,7 @@ class StateUtils: parallelStateType = 'Parallel' mapStateType = 'Map' - mapFunctionOutput = {} + mapFunctionOutput = {} def __init__(self, functionstatetype=defaultStateType, functionstatename='', functionstateinfo='{}', functionruntime="", logger=None, workflowid=None, sandboxid=None, functiontopic=None, datalayer=None, storage_userid=None, internal_endpoint=None): self.operators = ['And', 'BooleanEquals', 'Not', 'NumericEquals', 'NumericGreaterThan', 'NumericGreaterThanEquals',\ @@ -102,7 +102,7 @@ def helper(*args, **kwargs): helper.calls += 1 return func(*args, **kwargs) helper.calls = 0 - helper.__name__= func.__name__ + helper.__name__ = func.__name__ return helper # find target next for error in catcher list @@ -644,7 +644,7 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): function_input_post_output = self.applyResultPath(function_input_post_result, function_input_post_result) if "Next" in self.parsedfunctionstateinfo: if self.parsedfunctionstateinfo["Next"]: - sapi.add_dynamic_next(self.parsedfunctionstateinfo["Next"], function_input_post_output ) + sapi.add_dynamic_next(self.parsedfunctionstateinfo["Next"], function_input_post_output) if "End" in self.parsedfunctionstateinfo: if self.parsedfunctionstateinfo["End"]: @@ -679,7 +679,7 @@ def evaluateParallelState(self, function_input, key, metadata, sapi): else: klist.append(total_branch_count) - counter_name_topic = self.functionstatename + "-" + self.sandboxid + counter_name_topic = self.functionstatename + "-" + self.sandboxid counter_name_trigger_metadata = {"k-list": klist, "total-branches": total_branch_count} counter_name_key = key @@ -1093,18 +1093,18 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi): function_output, metadata = self.evaluateMapState(tobeProcessednow, key, metadata, sapi) elif metadata["__state_action"] == "post_map_processing": - tobeProcessedlater = ast.literal_eval(sapi.get(name_prefix + "_" + "tobeProcessedlater")) # get all elements that have not yet been processed - self._logger.debug("[StateUtils] Map state post_map processing input:" + str(tobeProcessedlater)) - # we need to decide at this point if there is a need for more batches. if so: + tobeProcessedlater = ast.literal_eval(sapi.get(name_prefix + "_" + "tobeProcessedlater")) # get all elements that have not yet been processed + self._logger.debug("[StateUtils] Map state post_map processing input:" + str(tobeProcessedlater)) + # we need to decide at this point if there is a need for more batches. if so: - if len(tobeProcessedlater) > 0: # we need to start another batch - function_output, metadata2 = self.evaluatePostMap(function_input, key, metadata, sapi) # take care not to overwrite metadata - function_output, metadata = self.evaluateMapState(tobeProcessedlater[:maxConcurrency], key, metadata, sapi) # start a new batch - sapi.put(name_prefix + "_" + "tobeProcessedlater", str(tobeProcessedlater[maxConcurrency:])) # store remaining elements to be processed on DL + if len(tobeProcessedlater) > 0: # we need to start another batch + function_output, metadata2 = self.evaluatePostMap(function_input, key, metadata, sapi) # take care not to overwrite metadata + function_output, metadata = self.evaluateMapState(tobeProcessedlater[:maxConcurrency], key, metadata, sapi) # start a new batch + sapi.put(name_prefix + "_" + "tobeProcessedlater", str(tobeProcessedlater[maxConcurrency:])) # store remaining elements to be processed on DL - else:# no more batches required. we are at the iteration end, publish the final result - self._logger.debug("[StateUtils] Map state input final stage: " + str(function_input)) - function_output, metadata = self.evaluatePostMap(function_input, key, metadata, sapi) + else:# no more batches required. we are at the iteration end, publish the final result + self._logger.debug("[StateUtils] Map state input final stage: " + str(function_input)) + function_output, metadata = self.evaluatePostMap(function_input, key, metadata, sapi) else: raise Exception("Unknow action type in map state") @@ -1421,58 +1421,58 @@ def process_parameters(self, parameters, state_data): if parameters == "$": # return unfiltered input data ret_value = state_data elif parameters is None: #return empty json - ret_value = {} + ret_value = {} else: # contains a parameter filter, get it and return selected kv pairs ret_value = {} ret_index = {} for key in parameters.keys(): # process parameters keys - if key.casefold() == "comment".casefold(): # ignore - ret_value[key] = parameters[key] - elif parameters[key] == "$$.Map.Item.Value": # get Items key - value_key = key.split(".$")[0] - ret_value = value_key - ret_item_value = value_key - elif parameters[key] == "$$.Map.Item.Index": # get Index key - index_key = key.split(".$")[0] - ret_index = index_key - else: # processing more complex Parameters values - if isinstance(parameters[key], dict): # parameters key refers to dict value - ret_value[key] = {} - for k in parameters[key]: # get nested keys - if not k.split(".")[-1] == "$": # parse static value - print (parameters[key][k]) - ret_value[key][k] = parameters[key][k] - else: - new_key = k.split(".$")[0] # use the json paths in paramters to match - ret_value[key][new_key] = [match.value for match in parse(parameters[key][k]).find(state_data)][0] - return ret_value + if key.casefold() == "comment".casefold(): # ignore + ret_value[key] = parameters[key] + elif parameters[key] == "$$.Map.Item.Value": # get Items key + value_key = key.split(".$")[0] + ret_value = value_key + ret_item_value = value_key + elif parameters[key] == "$$.Map.Item.Index": # get Index key + index_key = key.split(".$")[0] + ret_index = index_key + else: # processing more complex Parameters values + if isinstance(parameters[key], dict): # parameters key refers to dict value + ret_value[key] = {} + for k in parameters[key]: # get nested keys + if not k.split(".")[-1] == "$": # parse static value + print(parameters[key][k]) + ret_value[key][k] = parameters[key][k] + else: + new_key = k.split(".$")[0] # use the json paths in paramters to match + ret_value[key][new_key] = [match.value for match in parse(parameters[key][k]).find(state_data)][0] + return ret_value - if isinstance(parameters[key], str): # parameters key refers to string value - ret_value = {} - new_key = key.split(".$")[0] # get the parameters key - query_key = parameters[key].split("$.")[1] # correct the correspondig value - new_value = state_data[query_key] # save the actual value before replacing the key - for kk in state_data.keys(): - if isinstance(state_data[kk], dict): # value encapsulates dict + if isinstance(parameters[key], str): # parameters key refers to string value + ret_value = {} + new_key = key.split(".$")[0] # get the parameters key + query_key = parameters[key].split("$.")[1] # correct the correspondig value + new_value = state_data[query_key] # save the actual value before replacing the key + for kk in state_data.keys(): + if isinstance(state_data[kk], dict): # value encapsulates dict ret_value[new_key] = new_value - if ret_item_value != None: - ret_value[ret_item_value] = state_data[kk] + if ret_item_value is not None: + ret_value[ret_item_value] = state_data[kk] else: - raise Exception("Error: item value is not set!") + raise Exception("Error: item value is not set!") ret_value_dict = {} ret_value_dict[kk] = ret_value return ret_value_dict - if isinstance(state_data[kk], list): # value encapsulates list + if isinstance(state_data[kk], list): # value encapsulates list ret_value_list = [] for data in state_data[kk]: ret_value_list.append({new_key: new_value, ret_item_value: data}) ret_value_dict = {} ret_value_dict[kk] = ret_value_list return ret_value_dict - else: - raise Exception("Error: invaldid Parmeters format: " + str(parameters[key])) + else: + raise Exception("Error: invaldid Parmeters format: " + str(parameters[key])) # calculate transformed state output provided to Iterator ret_total = [] @@ -1484,9 +1484,9 @@ def process_parameters(self, parameters, state_data): if ret_value != {} and ret_index == {}: ret_total.append({ret_value: key}) elif ret_value == {} and ret_index != {}: - ret_total.append({ret_index: state_data[kk].index(key) }) + ret_total.append({ret_index: state_data[kk].index(key)}) elif ret_value != {} and ret_index != {}: - ret_total.append({ret_value: key, ret_index: state_data[kk].index(key) }) + ret_total.append({ret_value: key, ret_index: state_data[kk].index(key)}) else: raise Exception("Map State Parameters parse error on dict input: " + str(state_data)) ret_total_dict[kk] = ret_total @@ -1497,9 +1497,9 @@ def process_parameters(self, parameters, state_data): if ret_value != {} and ret_index == {}: ret_total.append({ret_value: key}) elif ret_value == {} and ret_index != {}: - ret_total.append({ret_index: state_data.index(key) }) + ret_total.append({ret_index: state_data.index(key)}) elif ret_value != {} and ret_index != {}: - ret_total.append({ret_value: key, ret_index: state_data.index(key) }) + ret_total.append({ret_value: key, ret_index: state_data.index(key)}) else: raise Exception("Map State Parameters parse error on list input: " + str(list)) ret_value = ret_total @@ -1573,7 +1573,7 @@ def process_result_path(self, path_fields, state_data, task_output): elif result_path is None: ret_value = {} else: # result_path is not empty so is there a match? - self._logger.debug("inside ResultPath processing: " + str(result_path) + " " + str(task_output) ) + self._logger.debug("inside ResultPath processing: " + str(result_path) + " " + str(task_output)) keys = list(tokenize(result_path)) # get all keys filtered_state_data = self.nested_dict(keys[1:], task_output) if isinstance(state_data, dict): diff --git a/GUI/app/main.css b/GUI/app/main.css index a893a2af..116ec9de 100644 --- a/GUI/app/main.css +++ b/GUI/app/main.css @@ -3014,62 +3014,18 @@ a { /* Firefox 16+, IE 10+, Opera */ } } #preloader { + align-items: center; + background: rgb(23, 22, 22); + display: flex; + height: 100vh; + justify-content: center; + left: 0; position: fixed; top: 0; - left: 0; + transition: opacity 0.3s linear; width: 100%; - height: 100%; - z-index: 1000; - background: #000000; - -webkit-backface-visibility: hidden; - backface-visibility: hidden; } - #preloader > div { - display: block; - position: relative; - left: 50%; - top: 50%; - width: 150px; - height: 150px; - margin: -75px 0 0 -75px; - border-radius: 50%; - border: 3px solid transparent; - border-top-color: #e85656; - -webkit-backface-visibility: hidden; - backface-visibility: hidden; - -webkit-transform: translate3d(0, 0, 0); - transform: translate3d(0, 0, 0); - backface-visibility: hidden; - -webkit-animation: spin 2s linear infinite; - animation: spin 2s linear infinite; - /* Chrome, Firefox 16+, IE 10+, Opera */ } - #preloader > div:before { - content: ""; - position: absolute; - top: 5px; - left: 5px; - right: 5px; - bottom: 5px; - border-radius: 50%; - border: 3px solid transparent; - border-top-color: #209e91; - -webkit-animation: spin 3s linear infinite; - /* Chrome, Opera 15+, Safari 5+ */ - animation: spin 3s linear infinite; - /* Chrome, Firefox 16+, IE 10+, Opera */ } - #preloader > div:after { - content: ""; - position: absolute; - top: 15px; - left: 15px; - right: 15px; - bottom: 15px; - border-radius: 50%; - border: 3px solid transparent; - border-top-color: #dfb81c; - -webkit-animation: spin 1.5s linear infinite; - animation: spin 1.5s linear infinite; - /* Chrome, Firefox 16+, IE 10+, Opera */ } - + z-index: 9999; +} @font-face { font-family: 'socicon'; src: url("../assets/fonts/socicon.eot"); @@ -3831,13 +3787,13 @@ th { height: calc(100vh - 283px); font-size: 11px; } -/* -#chartdiv1 { - background: #3f3f4f; - color:#ffffff; - width : 100%; - height : 500px; - font-size : 11px; +/* +#chartdiv1 { + background: #3f3f4f; + color:#ffffff; + width : 100%; + height : 500px; + font-size : 11px; }*/ #map-lines { width: 100%; @@ -3863,8 +3819,8 @@ th { scrollbar-face-color: rgba(0, 0, 0, 0.6); scrollbar-track-color: rgba(255, 255, 255, 0.7); } .panel.animated { - -webkit-animation-duration: 0.5s; - animation-duration: 0.5s; } + -webkit-animation-duration: 0.2s; + animation-duration: 0.2s; } .panel.small-panel { height: 114px; } .panel.xsmall-panel { @@ -3963,9 +3919,9 @@ th { .light-text { font-weight: 300; } -/* -.panel-group .panel { - border-radius: 0; +/* +.panel-group .panel { + border-radius: 0; }*/ /** Different tabs positions, which were removed from bootstrap */ .tabs-below .nav-tabs, .tabs-right .nav-tabs, .tabs-left .nav-tabs { @@ -4103,33 +4059,33 @@ th { width: 100%; padding: 14px 22px; } -/* -.panel-group { - > .panel { - > .panel-heading { - padding: 0; - - > h4.panel-title { - height: 50px; - width: 100%; - padding: 0; - - > a { - display: block; - padding: 15px 22px; - width: 100%; - } - } - } - } - -} - -.panel-collapse { - transition: height $default-animation-duration $default-animation-style; - .panel-body { - padding: 15px; - } +/* +.panel-group { + > .panel { + > .panel-heading { + padding: 0; + + > h4.panel-title { + height: 50px; + width: 100%; + padding: 0; + + > a { + display: block; + padding: 15px 22px; + width: 100%; + } + } + } + } + +} + +.panel-collapse { + transition: height $default-animation-duration $default-animation-style; + .panel-body { + padding: 15px; + } }*/ .back-top { width: 52px; diff --git a/GUI/app/pages/functions/CodeEditorCtrl.js b/GUI/app/pages/functions/CodeEditorCtrl.js index be205903..8967184e 100644 --- a/GUI/app/pages/functions/CodeEditorCtrl.js +++ b/GUI/app/pages/functions/CodeEditorCtrl.js @@ -30,12 +30,12 @@ var dataPrefix = sharedProperties.getDataPrefix(); - var mfnAPI = [ {"word" : "log(\"\")", "score" : 1008 }, - {"word" : "get(\"\")", "score" : 1007}, {"word" : "put()", "score" : 1006}, {"word" : "delete(\"\")", "score" : 1005} ]; - var mfnAPITooltip = [ {"command" : "log()", "ttip" : "log(text)

Log text. Uses the instance id to indicate which function instance logged the text.

Args:
text (string): text to be logged.

Returns:
None.

Raises:
MicroFunctionsUserLogException: if there are any errors in the logging function."}, - {"command" : "put()", "ttip" : "put(key, value, is_private=False, is_queued=False)

Access to data layer to store a data item in the form of a (key, value) pair.
By default, the put operation is reflected on the data layer immediately.
If the put operation is queued (i.e., is_queued = True), the data item is put into the transient data table.
If the key was previously deleted by the function instance, it is removed from the list of items to be deleted.
When the function instance finishes, the transient data items are committed to the data layer.

Args:
key (string): the key of the data item
value (string): the value of the data item
is_private (boolean): whether the item should be written to the private data layer of the workflow; default: False
is_queued (boolean): whether the put operation should be reflected on the data layer after the execution finish; default: False

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key and/or value are not strings."}, - {"command" : "get(\"\")", "ttip" : "get(key, is_private=False)

Access to data layer to load the value of a given key. The key is first checked in the transient deleted items.
If it is not deleted, the key is then checked in the transient data table. If it is not there,
it is retrieved from the global data layer. As a result, the value returned is consistent with what this function
instance does with the data item. If the data item is not present in either the transient data table
nor in the global data layer, an empty string (i.e., \"\") will be returned.
If the function used put() and delete() operations with is_queued = False (default), then the checks
of the transient table will result in empty values, so that the item will be retrieved
from the global data layer.

Args:
key (string): the key of the data item
is_private (boolean): whether the item should be read from the private data layer of the workflow; default: False

Returns:
value (string): the value of the data item; empty string if the data item is not present.

Raises:
MicroFunctionsDataLayerException: when the key is not a string."}, - {"command" : "delete(\"\")", "ttip" : "delete(key, is_private=False, is_queued=False)

Access to data layer to delete data item associated with a given key.
By default, the delete operation is reflected on the data layer immediately.
If the delete operation is queued (i.e., is_queued = True), the key is removed from the transient data table.
It is also added to the list of items to be deleted from the global data layer when the function instance finishes.

Args:
key (string): the key of the data item
is_private (boolean): whether the item should be deleted from the private data layer of the workflow; default: False
is_queued (boolean): whether the delete operation should be reflected on the data layer after the execution finish; default: False

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key is not a string."} + var mfnAPI = [{ "word": "log(\"\")", "score": 1008 }, + { "word": "get(\"\")", "score": 1007 }, { "word": "put(\"\")", "score": 1006 }, { "word": "remove(\"\")", "score": 1005 }]; + var mfnAPITooltip = [{ "command": "log(\"\")", "ttip": "log(text, level='INFO')

Log text. Uses the instance id to indicate which function instance logged the text.

Args:
text (string): text to be logged.
level (string): log level to be used.

Returns:
None.

Raises:
MicroFunctionsUserLogException: if there are any errors in the logging function." }, + { "command": "put(\"\")", "ttip": "put(key, value, is_private=False, is_queued=False, bucketName=None)

Access to data layer to store a data item in the form of a (key, value) pair.
By default, the put operation is reflected on the data layer immediately.
If the put operation is queued (i.e., is_queued = True), the data item
is put into the transient data bucket.
If the key was previously deleted by the function instance,
it is removed from the list of items to be deleted.
When the function instance finishes, the transient data items are committed to the data layer.

Args
key (string): the key of the data item
value (string): the value of the data item
is_private (boolean): whether the item should be written to the private data layer of the workflow; default: False
is_queued (boolean): whether the put operation should be reflected on the data layer after the execution finish; default: False
(i.e., the put operation will be reflected on the data layer immediately)
bucketName (string): name of the bucket where to put the key. By default, it will be put in the default bucket.
If this method is called with is_private = True, then
the bucketName parameter will be ignored.

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key and/or value are not a strings."}, + { "command": "get(\"\")", "ttip": "get(key, is_private=False, bucketName=None)

Access to data layer to load the value of a given key. The key is first checked in the transient deleted items.
If it is not deleted, the key is then checked in the transient data bucket.
If it is not there, it is retrieved from the global data layer.
As a result, the value returned is consistent with what this function instance does with the data item.
If the data item is not present in either the transient data bucket nor in the global data layer, an empty string (i.e., \"\") will be returned.
If the function used put() and delete() operations with is_queued=False (default), then the checks of the transient bucket
will result in empty values, so that the item will be retrieved from the global data layer.

Args:
key (string): the key of the data item
value (string): the value of the data item
is_private (boolean): whether the item should be written to the private data layer of the workflow; default: False
is_queued (boolean): whether the put operation should be reflected on the data layer after the execution finish; default: False
bucketName (string): name of the bucket where to get the key from. By default, it will be fetched from the default bucket.
If this method is called with is_private = True, then the bucketName parameter will be ignored.

Returns:
value (string): the value of the data item; empty string if the data item is not present.

Raises:
MicroFunctionsDataLayerException: when the key is not a string." }, + { "command": "remove(\"\")", "ttip": "remove(key, is_private=False, is_queued=False, bucketName=None)

Access to data layer to remove data item associated with a given key.
By default, the remove operation is reflected on the data layer immediately.
If the remove operation is queued (i.e., is_queued = True), the key is removed from the transient data table.
It is also added to the list of items to be removed from the global data layer when the function instance finishes.

Args:
key (string): the key of the data item
is_private (boolean): whether the item should be removed from the private data layer of the workflow; default: False
is_queued (boolean): whether the delete operation should be reflected on the data layer after the execution finish; default: False
bucketName (string): name of the bucket where to remove the key from. By default, it will be removed from the default bucket.

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key is not a string." } ]; var zip = new JSZip(); diff --git a/GUI/app/pages/functions/FunctionTableCtrl.js b/GUI/app/pages/functions/FunctionTableCtrl.js index ae824cf9..64e1ff52 100644 --- a/GUI/app/pages/functions/FunctionTableCtrl.js +++ b/GUI/app/pages/functions/FunctionTableCtrl.js @@ -1,5 +1,5 @@ /* - Copyright 2020 The KNIX Authors + Copyright 2021 The KNIX Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -365,12 +365,12 @@ if (response.data.data.workflow.status=='deployed') { $interval.cancel(promise); testWorkflow(functionIndex, workflowId, response.data.data.workflow.endpoints[0]); - //testWorkflow(functionIndex, workflowId, response.data.data.workflow.endpoint); } else if (response.data.data.workflow.status=='failed') { $interval.cancel(promise); - + $scope.functionDeploymentModal.dismiss(); + setTimeout(function() { deleteTemporaryWorkflow(functionIndex, workflowId);}, 2000); console.log("Error in deployment: " + response.data.data.workflow.deployment_error); $scope.errorMessage = response.data.data.workflow.deployment_error; $uibModal.open({ @@ -651,6 +651,7 @@ } else { console.log("Failure status returned by addFunction"); console.log("Message:" + response.data.data.message); + $scope.reloadFunctions(); $scope.errorMessage = response.data.data.message; $uibModal.open({ animation: true, @@ -662,6 +663,7 @@ }, function errorCallback(response) { console.log("Error occurred during addFunction"); console.log("Response:" + response); + $scope.reloadFunctions(); if (response.statusText) { $scope.errorMessage = response.statusText; } else { diff --git a/GUI/app/pages/storage/StorageTableCtrl.js b/GUI/app/pages/storage/StorageTableCtrl.js index 679a65e0..64c85537 100644 --- a/GUI/app/pages/storage/StorageTableCtrl.js +++ b/GUI/app/pages/storage/StorageTableCtrl.js @@ -1,5 +1,5 @@ /* - Copyright 2020 The KNIX Authors + Copyright 2021 The KNIX Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ $scope.storageLocations.selected = { name: "General Storage", type: "Default Bucket", id: '' }; $scope.itemsByPage = 10; + $scope.dataType = "kv"; const monthNames = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" @@ -245,14 +246,14 @@ } function getStorageObjectsList(data_type) { - var storageLoc = sharedProperties.getStorageLocation(); + var storageLoc = sharedProperties.getStorageLocation(); var table = ""; if (storageLoc.type == "Bucket") { table = storageLoc.name; } else { table = defaultTable; } - + var param_storage = {}; param_storage["data_type"] = data_type; param_storage["parameters"] = {}; @@ -268,7 +269,7 @@ param_storage["parameters"]["start"] = 0; param_storage["parameters"]["count"] = 2000; param_storage["workflowid"] = storageLoc.id; - + var req = { method: 'POST', url: urlPath, @@ -286,7 +287,9 @@ if (data_type == "kv") { for (var i=0;i + + + diff --git a/GUI/app/pages/storage/modals/deleteStorageObjectModal.html b/GUI/app/pages/storage/modals/deleteStorageObjectModal.html index d79aa1c7..c6a6e00e 100644 --- a/GUI/app/pages/storage/modals/deleteStorageObjectModal.html +++ b/GUI/app/pages/storage/modals/deleteStorageObjectModal.html @@ -20,9 +20,9 @@ Delete Storage Object diff --git a/GUI/app/pages/storage/modals/uploadStorageObjectModal.html b/GUI/app/pages/storage/modals/uploadStorageObjectModal.html index 503355bd..76ffd190 100644 --- a/GUI/app/pages/storage/modals/uploadStorageObjectModal.html +++ b/GUI/app/pages/storage/modals/uploadStorageObjectModal.html @@ -1,5 +1,5 @@ Actions @@ -46,6 +55,15 @@ + + + + + + {{ storageObject.key || 'empty' }} diff --git a/GUI/app/pages/storage/widgets/editableRowMaps.html b/GUI/app/pages/storage/widgets/editableRowMaps.html index 43a5c1a5..e165ed4e 100644 --- a/GUI/app/pages/storage/widgets/editableRowMaps.html +++ b/GUI/app/pages/storage/widgets/editableRowMaps.html @@ -1,5 +1,5 @@ Actions @@ -46,6 +55,15 @@ + + + + + + {{ storageObject.key || 'empty' }} diff --git a/GUI/app/pages/storage/widgets/editableRowSets.html b/GUI/app/pages/storage/widgets/editableRowSets.html index 472de1fc..03000548 100644 --- a/GUI/app/pages/storage/widgets/editableRowSets.html +++ b/GUI/app/pages/storage/widgets/editableRowSets.html @@ -1,5 +1,5 @@ @@ -46,6 +54,15 @@ + + + + + + {{ storageObject.key || 'empty' }} diff --git a/GUI/app/pages/storage/widgets/editableRowTable.html b/GUI/app/pages/storage/widgets/editableRowTable.html index d9309e72..b951927b 100644 --- a/GUI/app/pages/storage/widgets/editableRowTable.html +++ b/GUI/app/pages/storage/widgets/editableRowTable.html @@ -18,7 +18,7 @@
-      +         
@@ -35,6 +35,15 @@ + + + + + + Key Actions @@ -46,6 +55,15 @@ + + + + + + {{ storageObject.key || 'empty' }} diff --git a/GUI/app/pages/workflows/ExecutionCtrl.js b/GUI/app/pages/workflows/ExecutionCtrl.js index 22d60010..317d72f4 100644 --- a/GUI/app/pages/workflows/ExecutionCtrl.js +++ b/GUI/app/pages/workflows/ExecutionCtrl.js @@ -1,5 +1,5 @@ /* - Copyright 2020 The KNIX Authors + Copyright 2021 The KNIX Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -69,28 +69,18 @@ vm.$apply(function() { fileContents = reader.result.slice(reader.result.indexOf("base64,") + "base64,".length); var decodedFileContents = atob(fileContents); - /*if (decodedFileContents.match(/[^\u0000-\u007f]/)) { - // non-ASCII file - //$scope.aceInputSession.setValue(fileContents); - $scope.aceInputSession.setValue("Uploaded file: " + file.name); - sharedData.setWorkflowExecutionInputEditor(workflowId, "Uploaded file: " + file.name); - sharedData.setWorkflowExecutionInput(workflowId, fileContents); - $scope.aceInput.focus(); - } else {*/ - // ASCII file - $scope.aceInputSession.setValue(decodedFileContents); - if (workflowName.endsWith(" ")) { - // test function - sharedData.setWorkflowExecutionInputEditor("mfn-internal-" + workflowName, decodedFileContents); - sharedData.setWorkflowExecutionInput("mfn-internal-" + workflowName, decodedFileContents); - } else { - // execute workflow - sharedData.setWorkflowExecutionInputEditor(workflowId, decodedFileContents); - sharedData.setWorkflowExecutionInput(workflowId, decodedFileContents); - } - //$scope.aceInput.focus(); - //} - + + $scope.aceInputSession.setValue(decodedFileContents); + if (workflowName.endsWith(" ")) { + // test function + sharedData.setWorkflowExecutionInputEditor("mfn-internal-" + workflowName, decodedFileContents); + sharedData.setWorkflowExecutionInput("mfn-internal-" + workflowName, decodedFileContents); + } else { + // execute workflow + sharedData.setWorkflowExecutionInputEditor(workflowId, decodedFileContents); + sharedData.setWorkflowExecutionInput(workflowId, decodedFileContents); + } + }); }; reader.readAsDataURL(file); @@ -119,32 +109,36 @@ if (response.data.data.workflow.log != prevLogEntry) { var logStr = atob(response.data.data.workflow.log); - var log_lines = logStr.split('\n'); - - log_lines.sort(); - - for (var i = 0; i < log_lines.length; i++) + logStr = logStr.replace(/\[1[0-9]{15}\]/g, "#@!"); + + var logArr = logStr.split('#@!'); + logArr.sort(); + for (var i = 0; i < logArr.length; i++) { - var line = log_lines[i]; - log_lines[i] = line.substring(line.indexOf(" ") + 1); + var line = logArr[i]; + logArr[i] = line.substring(line.indexOf(" ") + 1); } - $scope.aceLogSession.setValue(log_lines.join("\n")); + $scope.aceLogSession.setValue(logArr.join("")); + + var n = $scope.aceLog.getValue().split("\n").length; + $scope.aceLog.gotoLine(n, 0, true); + $scope.aceInput.focus(); - var n = $scope.aceLog.getValue().split("\n").length; - $scope.aceLog.gotoLine(n, 0, true); - $scope.aceInput.focus(); + prevLogEntry = response.data.data.workflow.log; - prevLogEntry = response.data.data.workflow.log; } if (response.data.data.workflow.progress != prevProgressEntry) { - - //var logFile = $scope.aceLogSession.getDocument().getValue(); - var logFile = atob(response.data.data.workflow.log); - - var logArray = logFile.split(/\r?\n/); - + var logStr = atob(response.data.data.workflow.log); + logStr = logStr.replace(/\[1[0-9]{15}\]/g, "#@!"); + var logArray = logStr.split('#@!'); + logArray.sort(); + for (var i = 0; i < logArray.length; i++) + { + var line = logArray[i]; + logArray[i] = line.substring(line.indexOf(" ") + 1); + } prevProgressEntry = response.data.data.workflow.progress; var progress = atob(response.data.data.workflow.progress); @@ -165,6 +159,8 @@ var functionExecId = ""; var wfExecId = ""; + var numberOfLogLines = 0; + // [2019-10-07 19:36:07.272] [INFO] [b4caad66e93911e9b41f02dc5d3985be] [g] [__mfn_progress] b4caad66e93911e9b41f02dc5d3985be_46851328608099ff478b53c1a798a095-46851328608099ff478b53c1a798a095-g {"t_start_fork": 1570476966756.972, "t_start_pubutils": 1570476966766.0327, "t_start_sessutils": 1570476966774.294, "t_start_sapi": 1570476966774.2964, "t_start": 1570476966774.4314, "t_end": 1570476966775.858, "t_pub_start": 1570476966775.9727, "t_pub_end": 1570476967272.2407, "t_end_fork": 1570476967272.2407, "function_instance_id": "b4caad66e93911e9b41f02dc5d3985be_46851328608099ff478b53c1a798a095-46851328608099ff478b53c1a798a095-g"} const regex = /\[(.+?)\] \[(.+?)\] \[(.+?)\] \[(.+?)\] \[(.+?)\] (.+?) ({.+?})/; for (var i=0;i= dStart && logEntryDate <= dEnd) { + if (logEntryDate.getTime() >= (dStart.getTime()-5) && logEntryDate.getTime() <= (dEnd.getTime()+5)) { + if (logArray[t].includes("ERROR")) { execErr = 'Error'; } - var cutB = nthIndex(logArray[t],'[', 1); - var cutE = nthIndex(logArray[t],']', 1); - logArray[t] = logArray[t].replace(logArray[t].substring(cutB, cutE+1), ""); + + var cutB = nthIndex(logArray[t],'[', 3); + var cutE = nthIndex(logArray[t],']', 3); + logArray[t] = logArray[t].replace(logArray[t].substring(cutB, cutE+2), ""); cutB = nthIndex(logArray[t],'[', 3); cutE = nthIndex(logArray[t],']', 3); - logArray[t] = logArray[t].replace(logArray[t].substring(cutB, cutE+1), ""); - cutB = nthIndex(logArray[t],'[', 3); - cutE = nthIndex(logArray[t],']', 3); - var lStr = logArray[t].replace(logArray[t].substring(cutB, cutE+1), ""); + var lStr = logArray[t].replace(logArray[t].substring(cutB, cutE+2), ""); + lStr = lStr.substring(lStr.indexOf(" ") + 1); + if (!lStr.includes("(functionworker)")) { - logOutput += lStr + "
"; + logOutput += lStr; } + } + - var f = t+1; + } - while (logArray[f] && !logArray[f].startsWith('[') && (f-t)<25) { - logOutput += logArray[f]; - logOutput += "
"; - f++; - } + } + + if (logOutput!='') { + if (logOutput.length>=1200) { + logOutput = logOutput.substring(0, 1200); + logOutput+= "..."; + } + var logLines = logOutput.split('\n'); + var addedLinebreaks = 0; + if (logLines) { + for (var f=0;f100) { + addedLinebreaks += Math.floor(logLines[f].length / 100.0); + } } - + numberOfLogLines = logLines.length + addedLinebreaks; + + } else { + numberOfLogLines = 1; } + logOutput = '
' + logOutput + '
'; + } else { + numberOfLogLines = 0; } - - toolTip = '
Function Log OutputFunction Execution Statistics
' + truncate.apply(logOutput, [1200, true]) + 'State Name: ' + stateName + '
Function Name: ' + state2FunctionMapping[stateName] + '
Status: ' + execErr + '

Duration: ' + duration + '
Execution Start: ' + dateFormat(new Date(parseFloat(functionExecStart))) + '
Execution End: ' + dateFormat(new Date(parseFloat(functionExecEnd))) + '
Exec Id: ' + wfExecId + '
'; + + + toolTip = '
Function Log OutputFunction Execution Statistics
' + logOutput + 'State Name: ' + stateName + '
Function Name: ' + state2FunctionMapping[stateName] + '
Status: ' + execErr + '

Duration: ' + duration + '
Execution Start: ' + dateFormat(new Date(parseFloat(functionExecStart))) + '
Execution End: ' + dateFormat(new Date(parseFloat(functionExecEnd))) + '
Exec Id: ' + wfExecId + '
'; if (prevExecutedFunction!=stateName) { @@ -289,12 +302,10 @@ lastFunctionExecutionDate = parseFloat(functionExecStart); } - //console.log('dStart:' + dStart.getTime()); - //console.log('dEnd:' + dEnd.getTime()); if (dEnd.getTime() - dStart.getTime() > 0) { - ev[counter] = [{id: execCounter + counter, start: new Date(dStart.getTime()), end: new Date(dEnd.getTime()), content: stateName + " " + duration + "", ttip: toolTip}]; + ev[counter] = [{id: execCounter + counter, start: new Date(dStart.getTime()), end: new Date(dEnd.getTime()), content: stateName + " " + duration + "", ttip: toolTip, state: stateName, execId: wfExecId, log: logOutput}]; } else { - ev[counter] = [{id: execCounter + counter, start: new Date(dStart.getTime()), end: new Date(dStart.getTime()+1), content: stateName + " <1ms", ttip: toolTip}]; + ev[counter] = [{id: execCounter + counter, start: new Date(dStart.getTime()), end: new Date(dStart.getTime()+1), content: stateName + " <1ms", ttip: toolTip, execId: wfExecId, state: stateName,log: logOutput}]; } if (execErr=='Error') { ev[counter][0].style = "background-color: #f7a8a8; border-color: black;"; @@ -302,89 +313,66 @@ ev[counter][0].style = "background-color: #92d193; border-color: black;"; } ev[counter][0].status = execErr; + ev[counter][0].logLines = numberOfLogLines; counter++; } } } - - for (var t=0;t=0) { - errorLine = logOutput.substr(eLine + 5, logOutput.length-1); - if (errorLine.indexOf(',')>0) { - errorLine = errorLine.substr(0, errorLine.indexOf(',')); - } else { - errorLine = errorLine.substr(0, errorLine.indexOf('
')); - } - //console.log('lg:' + lastFunctionExecutionDate + ' ed:' + logEntryDate.getTime()); - if (lastFunctionExecutionDate < logEntryDate.getTime()) { - - codeError = state2FunctionMapping[stateName] + ':' + errorLine; - failedFunctions[0] = stateName; - - $scope.workflowButtonLabel = "Go to Error"; - } - } - - } - } - - //ev.sort(compareExecutionEvents); - for (var i=0;i=0 && execEvents[lastFailedExecution][0].execId==execEvents[execEvents.length-1][0].execId) { + var logOutput = execEvents[lastFailedExecution][0].log; + var eLine = logOutput.lastIndexOf("line "); + if (eLine>=0) { + errorLine = logOutput.substr(eLine + 5, logOutput.length-1); + if (errorLine.indexOf(',')>0) { + errorLine = errorLine.substr(0, errorLine.indexOf(',')); + } else { + errorLine = errorLine.substr(0, errorLine.indexOf('
')); + } + codeError = state2FunctionMapping[execEvents[lastFailedExecution][0].state] + ':' + errorLine; + failedFunctions[0] = execEvents[lastFailedExecution][0].state; + $scope.workflowButtonLabel = "Go to Error"; + + } + } if (initialized) { timeline.on('itemover', function (properties) { if (!popoverVisible) { - var logLines = (execEvents[properties.item][0].ttip.match(/
/g) || []).length; - //console.log('count:' + logLines); - logLines -= 12; - logLines = Math.max(0, logLines); - var marginTop = 11 - logLines; - if (marginTop>5 && logLines>0) { + + var numberOfLogLines = execEvents[properties.item][0].logLines; + + numberOfLogLines -= 12; + numberOfLogLines = Math.max(0, numberOfLogLines); + var marginTop = 11 - numberOfLogLines; + if (marginTop>5 && numberOfLogLines>0) { marginTop-=4; } + if (marginTop<3) { + marginTop = 3; + } document.getElementById('popover').style.top = marginTop.toString() + '%'; - if (logLines > 12) { - } else if (logLines > 20) { - document.getElementById('popover').style.top = '15%'; + if (numberOfLogLines > 12) { + } else if (numberOfLogLines > 20) { + document.getElementById('popover').style.top = '0%'; } document.getElementById('popover').innerHTML = execEvents[properties.item][0].ttip; document.getElementById('popover').style.display = 'block'; + document.getElementById('popover').style.width = '98%'; popoverVisible = true; } }); @@ -395,10 +383,6 @@ } }); } - /*if (execErr && lastFunctionExecutionDate < logEntryDate.getTime()) { - - setTimeout(function() { timeline.setWindow(new Date(logEntryDate.getTime()-300), new Date(logEntryDate.getTime()+450)) }, 500); -*/ if (counter>0) { setTimeout(function() { timeline.setWindow(new Date(dStart.getTime()-((dEnd-dStart)*2)), new Date(dEnd.getTime()+((dEnd-dStart)*2))) }, 500); @@ -462,14 +446,6 @@ return 0; } - function truncate( n, useWordBoundary ){ - if (this.length <= n) { return this; } - var subString = this.substr(0, n-1); - return (useWordBoundary - ? subString.substr(0, subString.lastIndexOf(' ')) - : subString) + "…"; - }; - function colorExecutedFunctions(workflowFunctions, workflowStateNames, workflowJson, states, curStateName, prevStateName, prevChoice, executed, parallelNext, prefix, wForNum) { var parallelNextState = parallelNext; var pfix = prefix; @@ -1163,23 +1139,16 @@ intervalCounter = 0; - //setTimeout(function() { prepareLogFile(); }, 3000); - setTimeout(function() { inter = $interval(function(){prepareLogFile();}, 3000); }, 250); $http(req).then(function successCallback(response) { - //console.log(response.data); - //console.log(atob(response.data.data.workflow.log)); - - //setTimeout(function() { promise = $interval(function(){prepareLogFile();}, 3000); }, 500); - + $interval.cancel(inter); setTimeout(function() { prepareLogFile(); }, 500); setTimeout(function() { workflowExecuted = true; prepareLogFile(); }, 4500); - //showTimeline(); if (response.data.status=="success") { console.log("executeWorkflow called succesfully."); result = response.data.data.result; @@ -1210,7 +1179,7 @@ console.log("Error occurred during workflow execution"); console.log("Response:" + response); $interval.cancel(inter); - //$interval.cancel(promise); + if (response.statusText) { $scope.errorMessage = response.statusText; console.log(response.statusText); @@ -1222,16 +1191,9 @@ } toastr.error('Workflow Execution Error: ' + $scope.errorMessage); $interval.cancel(inter); - /*if (executionModalVisible) { - $uibModal.open({ - animation: true, - scope: $scope, - templateUrl: 'app/pages/workflows/modals/errorModal.html', - size: 'md', - }); - }*/ + }); - //return $timeout(function() {}, 8000); + }); }; @@ -1514,3 +1476,4 @@ }()); + diff --git a/GUI/app/pages/workflows/WorkflowEditorCtrl.js b/GUI/app/pages/workflows/WorkflowEditorCtrl.js index d89a8430..0999af8d 100644 --- a/GUI/app/pages/workflows/WorkflowEditorCtrl.js +++ b/GUI/app/pages/workflows/WorkflowEditorCtrl.js @@ -1,5 +1,5 @@ /* - Copyright 2020 The KNIX Authors + Copyright 2021 The KNIX Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -92,14 +92,13 @@ var token = $cookies.get('token'); var mfnAPI = [{ "word": "log(\"\")", "score": 1008 }, - { "word": "get(\"\")", "score": 1007 }, { "word": "put()", "score": 1006 }, { "word": "delete(\"\")", "score": 1005 }]; - var mfnAPITooltip = [{ "command": "log()", "ttip": "log(text)

Log text. Uses the instance id to indicate which function instance logged the text.

Args:
text (string): text to be logged.

Returns:
None.

Raises:
MicroFunctionsUserLogException: if there are any errors in the logging function." }, - { "command": "put()", "ttip": "put(key, value, is_private=False, is_queued=False)

Access to data layer to store a data item in the form of a (key, value) pair.
By default, the put operation is reflected on the data layer immediately.
If the put operation is queued (i.e., is_queued = True), the data item is put into the transient data table.
If the key was previously deleted by the function instance, it is removed from the list of items to be deleted.
When the function instance finishes, the transient data items are committed to the data layer.

Args:
key (string): the key of the data item
value (string): the value of the data item
is_private (boolean): whether the item should be written to the private data layer of the workflow; default: False
is_queued (boolean): whether the put operation should be reflected on the data layer after the execution finish; default: False

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key and/or value are not a strings." }, - { "command": "get(\"\")", "ttip": "get(key, is_private=False)

Access to data layer to load the value of a given key. The key is first checked in the transient deleted items.
If it is not deleted, the key is then checked in the transient data table. If it is not there,
it is retrieved from the global data layer. As a result, the value returned is consistent with what this function
instance does with the data item. If the data item is not present in either the transient data table
nor in the global data layer, an empty string (i.e., \"\") will be returned.
If the function used put() and delete() operations with is_queued = False (default), then the checks
of the transient table will result in empty values, so that the item will be retrieved
from the global data layer.

Args:
key (string): the key of the data item
is_private (boolean): whether the item should be read from the private data layer of the workflow; default: False

Returns:
value (string): the value of the data item; empty string if the data item is not present.

Raises:
MicroFunctionsDataLayerException: when the key is not a string." }, - { "command": "delete(\"\")", "ttip": "delete(key, is_private=False, is_queued=False)

Access to data layer to delete data item associated with a given key.
By default, the delete operation is reflected on the data layer immediately.
If the delete operation is queued (i.e., is_queued = True), the key is removed from the transient data table.
It is also added to the list of items to be deleted from the global data layer when the function instance finishes.

Args:
key (string): the key of the data item
is_private (boolean): whether the item should be deleted from the private data layer of the workflow; default: False
is_queued (boolean): whether the delete operation should be reflected on the data layer after the execution finish; default: False

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key is not a string." } + { "word": "get(\"\")", "score": 1007 }, { "word": "put(\"\")", "score": 1006 }, { "word": "remove(\"\")", "score": 1005 }]; + var mfnAPITooltip = [{ "command": "log(\"\")", "ttip": "log(text, level='INFO')

Log text. Uses the instance id to indicate which function instance logged the text.

Args:
text (string): text to be logged.
level (string): log level to be used.

Returns:
None.

Raises:
MicroFunctionsUserLogException: if there are any errors in the logging function." }, + { "command": "put(\"\")", "ttip": "put(key, value, is_private=False, is_queued=False, bucketName=None)

Access to data layer to store a data item in the form of a (key, value) pair.
By default, the put operation is reflected on the data layer immediately.
If the put operation is queued (i.e., is_queued = True), the data item
is put into the transient data bucket.
If the key was previously deleted by the function instance,
it is removed from the list of items to be deleted.
When the function instance finishes, the transient data items are committed to the data layer.

Args
key (string): the key of the data item
value (string): the value of the data item
is_private (boolean): whether the item should be written to the private data layer of the workflow; default: False
is_queued (boolean): whether the put operation should be reflected on the data layer after the execution finish; default: False
(i.e., the put operation will be reflected on the data layer immediately)
bucketName (string): name of the bucket where to put the key. By default, it will be put in the default bucket.
If this method is called with is_private = True, then
the bucketName parameter will be ignored.

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key and/or value are not a strings."}, + { "command": "get(\"\")", "ttip": "get(key, is_private=False, bucketName=None)

Access to data layer to load the value of a given key. The key is first checked in the transient deleted items.
If it is not deleted, the key is then checked in the transient data bucket.
If it is not there, it is retrieved from the global data layer.
As a result, the value returned is consistent with what this function instance does with the data item.
If the data item is not present in either the transient data bucket nor in the global data layer, an empty string (i.e., \"\") will be returned.
If the function used put() and delete() operations with is_queued=False (default), then the checks of the transient bucket
will result in empty values, so that the item will be retrieved from the global data layer.

Args:
key (string): the key of the data item
value (string): the value of the data item
is_private (boolean): whether the item should be written to the private data layer of the workflow; default: False
is_queued (boolean): whether the put operation should be reflected on the data layer after the execution finish; default: False
bucketName (string): name of the bucket where to get the key from. By default, it will be fetched from the default bucket.
If this method is called with is_private = True, then the bucketName parameter will be ignored.

Returns:
value (string): the value of the data item; empty string if the data item is not present.

Raises:
MicroFunctionsDataLayerException: when the key is not a string." }, + { "command": "remove(\"\")", "ttip": "remove(key, is_private=False, is_queued=False, bucketName=None)

Access to data layer to remove data item associated with a given key.
By default, the remove operation is reflected on the data layer immediately.
If the remove operation is queued (i.e., is_queued = True), the key is removed from the transient data table.
It is also added to the list of items to be removed from the global data layer when the function instance finishes.

Args:
key (string): the key of the data item
is_private (boolean): whether the item should be removed from the private data layer of the workflow; default: False
is_queued (boolean): whether the delete operation should be reflected on the data layer after the execution finish; default: False
bucketName (string): name of the bucket where to remove the key from. By default, it will be removed from the default bucket.

Returns:
None

Raises:
MicroFunctionsDataLayerException: when the key is not a string." } ]; - - //var workflowBlueprint = '{\r\n\t"name": "' + sharedProperties.getWorkflowName() + '",\r\n\t"entry": "",\r\n\t"functions": [\r\n\t\t{\r\n\t\t\t"name": "",\r\n\t\t\t"next": ["end"]\r\n\t\t}\r\n\t]\r\n}'; + var workflowBlueprint = '{\r\n\t"Comment": "' + sharedProperties.getWorkflowName() + ' Workflow",\r\n\t"StartAt": "",\r\n\t"States": {\r\n\t\t"": {\r\n\t\t\t"Type": "Task",\r\n\t\t\t"Resource": "",\r\n\t\t\t"End": true\r\n\t\t}\r\n\t}\r\n}'; $scope.showFunctionCodeTab = new Array(10); @@ -489,23 +488,30 @@ var urlPath = sharedProperties.getUrlPath(); var token = $cookies.get('token'); + var param_storage = {}; + param_storage["data_type"] = "kv"; + param_storage["parameters"] = {}; + param_storage["parameters"]["action"] = "listkeys"; + param_storage["parameters"]["tableName"] = "defaultTable"; + param_storage["parameters"]["start"] = 0; + param_storage["parameters"]["count"] = 2000; + var req = { method: 'POST', url: urlPath, headers: { 'Content-Type': 'application/json' }, - data: JSON.stringify({ "action": "performStorageAction", "data": { "user": { "token": token }, "storage": { "table": "defaultTable", "action": "listkeys", "start": 0, "count": 2000 } } }) + data: JSON.stringify({ "action": "performStorageAction", "data": { "user": { "token": token }, "storage": param_storage } }) } $http(req).then(function successCallback(response) { $scope.sObjects = []; var score = 2000; - - for (var i = 0; i < response.data.length; i++) { - if (!response.data[i].startsWith("grain_requirements_") && !response.data[i].startsWith("grain_source_") && !response.data[i].startsWith("workflow_json_")) { - $scope.sObjects.push({ "word": response.data[i], "score": score }); + for (var i=0;i { if (workflowJson.States[e].Branches[f].States[g].Resource) { - console.log(workflowJson.States[e].Branches[f].States[g].Resource); - + for (var key in $scope.functions) { var n = workflowJson.States[e].Branches[f].States[g].Resource.indexOf(':'); var resource = workflowJson.States[e].Branches[f].States[g].Resource.substring(0, n != -1 ? n : workflowJson.States[e].Branches[f].States[g].Resource.length); diff --git a/GUI/app/pages/workflows/WorkflowTableCtrl.js b/GUI/app/pages/workflows/WorkflowTableCtrl.js index 54c9517a..41130b76 100644 --- a/GUI/app/pages/workflows/WorkflowTableCtrl.js +++ b/GUI/app/pages/workflows/WorkflowTableCtrl.js @@ -1,5 +1,5 @@ /* - Copyright 2020 The KNIX Authors + Copyright 2021 The KNIX Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -313,6 +313,9 @@ { $interval.cancel(promise); $scope.workflows[index].status="failed"; + if ($scope.workflowDeploymentModal) { + $scope.workflowDeploymentModal.dismiss(); + } console.log("Error in deployment: " + response.data.data.workflow.deployment_error); $scope.errorMessage = response.data.data.workflow.deployment_error; $uibModal.open({ @@ -526,6 +529,7 @@ } else { console.log("Failure status returned by addWorkflow"); console.log("Message:" + response.data.data.message); + $scope.reloadWorkflows(); $scope.errorMessage = response.data.data.message; $uibModal.open({ animation: true, @@ -537,6 +541,7 @@ }, function errorCallback(response) { console.log("Error occurred during addWorkflow"); console.log("Response:" + response); + $scope.reloadWorkflows(); if (response.statusText) { $scope.errorMessage = response.statusText; } else { @@ -570,6 +575,7 @@ } else { console.log("Failure status returned by modifyWorkflow"); console.log("Message:" + response.data.data.message); + $scope.reloadWorkflows(); $scope.errorMessage = response.data.data.message; $uibModal.open({ animation: true, @@ -581,6 +587,7 @@ }, function errorCallback(response) { console.log("Error occurred during modifyWorkflow"); console.log("Response:" + response); + $scope.reloadWorkflows(); if (response.statusText) { $scope.errorMessage = response.statusText; } else { @@ -607,7 +614,7 @@ if ($scope.workflows[key].id==workflowId) { var index = key; if ($scope.workflows[index].status=='deployed') { - $scope.open('app/pages/workflows/modals/workflowExecutionModal.html', 'lg', $scope.workflows[index].id, $scope.workflows[index].name, $scope.workflows[index].status, $scope.workflows[index].endpoint); + $scope.open('app/pages/workflows/modals/workflowExecutionModal.html', 'lg', $scope.workflows[index].id, $scope.workflows[index].name, $scope.workflows[index].status, $scope.workflows[index].endpoints[0]); } } } diff --git a/GUI/app/pages/workflows/modals/workflowEditorModal.html b/GUI/app/pages/workflows/modals/workflowEditorModal.html index 62d2b2a5..baafd2d8 100644 --- a/GUI/app/pages/workflows/modals/workflowEditorModal.html +++ b/GUI/app/pages/workflows/modals/workflowEditorModal.html @@ -1,5 +1,5 @@