From 1f96558e8902f4a4977c9c37dba43d2cf41ba479 Mon Sep 17 00:00:00 2001 From: Tim Wagner Date: Thu, 7 May 2015 13:41:52 +0200 Subject: [PATCH 1/4] Fixed #790 - Long running messages in Message Queue blocks other messages Closed #777 - Remove remote http://www.w3.org/2001/03/xml.xsd from schemas and configurations --- CHANGELOG.md | 2 + UPGRADE-1.0.5.md | 3 + resources/schema/appserver.xsd | 2 +- resources/schema/xml.xsd | 117 +++++++++ .../Appserver/MessageQueue/QueueWorker.php | 243 +++++++----------- 5 files changed, 212 insertions(+), 155 deletions(-) create mode 100644 UPGRADE-1.0.5.md create mode 100644 resources/schema/xml.xsd diff --git a/CHANGELOG.md b/CHANGELOG.md index 79302c813..fc23335e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,11 @@ ## Bugfixes * Fixed [#784](https://github.com/appserver-io/appserver/issues/784) - Application Deployment after switching to safe user +* Fixed [#790](https://github.com/appserver-io/appserver/issues/790) - Long running messages in Message Queue blocks other messages ## Features +* Remove [#777](https://github.com/appserver-io/appserver/issues/777) - Remove remote http://www.w3.org/2001/03/xml.xsd from schemas and configurations * Closed [#758](https://github.com/appserver-io/appserver/issues/758) - Update to latest PHP 5.5.24 # Version 1.0.4 diff --git a/UPGRADE-1.0.5.md b/UPGRADE-1.0.5.md new file mode 100644 index 000000000..0545fdcec --- /dev/null +++ b/UPGRADE-1.0.5.md @@ -0,0 +1,3 @@ +# Upgrade from 1.0.4 to 1.0.5 + +Updating from 1.0.4 to 1.0.5 doesn't have any impacts. Please read the apropriate UPGRADE-1.x.x files for updates from older versions to 1.0.4. diff --git a/resources/schema/appserver.xsd b/resources/schema/appserver.xsd index 0f884dd31..69cc7d5f8 100644 --- a/resources/schema/appserver.xsd +++ b/resources/schema/appserver.xsd @@ -4,7 +4,7 @@ targetNamespace="http://www.appserver.io/appserver" elementFormDefault="unqualified" attributeFormDefault="unqualified"> - + diff --git a/resources/schema/xml.xsd b/resources/schema/xml.xsd new file mode 100644 index 000000000..d662b4236 --- /dev/null +++ b/resources/schema/xml.xsd @@ -0,0 +1,117 @@ + + + + + + + See http://www.w3.org/XML/1998/namespace.html and + http://www.w3.org/TR/REC-xml for information about this namespace. + + This schema document describes the XML namespace, in a form + suitable for import by other schema documents. + + Note that local names in this namespace are intended to be defined + only by the World Wide Web Consortium or its subgroups. The + following names are currently defined in this namespace and should + not be used with conflicting semantics by any Working Group, + specification, or document instance: + + base (as an attribute name): denotes an attribute whose value + provides a URI to be used as the base for interpreting any + relative URIs in the scope of the element on which it + appears; its value is inherited. This name is reserved + by virtue of its definition in the XML Base specification. + + lang (as an attribute name): denotes an attribute whose value + is a language code for the natural language of the content of + any element; its value is inherited. This name is reserved + by virtue of its definition in the XML specification. + + space (as an attribute name): denotes an attribute whose + value is a keyword indicating what whitespace processing + discipline is intended for the content of the element; its + value is inherited. This name is reserved by virtue of its + definition in the XML specification. + + Father (in any context at all): denotes Jon Bosak, the chair of + the original XML Working Group. This name is reserved by + the following decision of the W3C XML Plenary and + XML Coordination groups: + + In appreciation for his vision, leadership and dedication + the W3C XML Plenary on this 10th day of February, 2000 + reserves for Jon Bosak in perpetuity the XML name + xml:Father + + + + + This schema defines attributes and an attribute group + suitable for use by + schemas wishing to allow xml:base, xml:lang or xml:space attributes + on elements they define. + + To enable this, such a schema must import this schema + for the XML namespace, e.g. as follows: + <schema . . .> + . . . + <import namespace="http://www.w3.org/XML/1998/namespace" + schemaLocation="http://www.w3.org/2001/03/xml.xsd"/> + + Subsequently, qualified reference to any of the attributes + or the group defined below will have the desired effect, e.g. + + <type . . .> + . . . + <attributeGroup ref="xml:specialAttrs"/> + + will define a type which will schema-validate an instance + element with any of those attributes + + + + In keeping with the XML Schema WG's standard versioning + policy, this schema document will persist at + http://www.w3.org/2001/03/xml.xsd. + At the date of issue it can also be found at + http://www.w3.org/2001/xml.xsd. + The schema document at that URI may however change in the future, + in order to remain compatible with the latest version of XML Schema + itself. In other words, if the XML Schema namespace changes, the version + of this document at + http://www.w3.org/2001/xml.xsd will change + accordingly; the version at + http://www.w3.org/2001/03/xml.xsd will not change. + + + + + + In due course, we should install the relevant ISO 2- and 3-letter + codes as the enumerated possible values . . . + + + + + + + + + + + + + + + See http://www.w3.org/TR/xmlbase/ for + information about this attribute. + + + + + + + + + + diff --git a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php index b3a035b8a..9f4b9f51e 100644 --- a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php +++ b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php @@ -43,6 +43,7 @@ * @link https://github.com/appserver-io/appserver * @link http://www.appserver.io * + * @property boolean $run Flag to start/stop the worker * @property \AppserverIo\Psr\Application\ApplicationInterface $application The application instance with the queue manager/locator * @property \AppserverIo\Storage\GenericStackable $jobsExecuting The storage for the jobs currently executing * @property \AppserverIo\Storage\GenericStackable $jobsToExecute The storage for the jobs to be executed @@ -54,14 +55,11 @@ class QueueWorker extends \Thread { /** - * Initializes the message queue with the necessary data. + * Sets the workers start flag. */ public function __construct() { - - // initialize the flags for start/stop handling $this->run = true; - $this->running = false; } /** @@ -167,147 +165,15 @@ public function attach(\stdClass $jobWrapper) } /** - * Removes the message from the queue. - * - * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be removed from the queue - * - * @return void - */ - public function remove(MessageInterface $message) - { - unset($this->messages[$message->getMessageId()]); - unset($this->messageStates[$message->getMessageId()]); - } - - /** - * Process a message with the state 'StateActive'. - * - * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed - * - * @return void - */ - public function processActive(MessageInterface $message) - { - $this->messageStates[$message->getMessageId()] = StateToProcess::KEY; - } - - /** - * Process a message with the state 'StateInProgress'. - * - * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed - * - * @return void - */ - public function processInProgress(MessageInterface $message) - { - - // make sure the job has been finished - if (isset($this->jobsExecuting[$message->getMessageId()]) && - $this->jobsExecuting[$message->getMessageId()] instanceof JobInterface && - $this->jobsExecuting[$message->getMessageId()]->isFinished() - ) { - // log a message that the job is still in progress - $this->getApplication()->getInitialContext()->getSystemLogger()->info( - sprintf('Job %s has been finished, remove it from job queue now', $message->getMessageId()) - ); - - // we also remove the job - unset($this->jobsExecuting[$message->getMessageId()]); - - // set new state - $this->messageStates[$message->getMessageId()] = StateProcessed::KEY; - - } else { - // log a message that the job is still in progress - $this->getApplication()->getInitialContext()->getSystemLogger()->debug( - sprintf('Job %s is still in progress', $message->getMessageId()) - ); - } - } - - /** - * Process a message with the state 'StateProcessed'. - * - * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed - * - * @return void - */ - public function processProcessed(MessageInterface $message) - { - // remove the job from the queue with jobs that has to be executed - unset($this->jobsToExecute[$message->getMessageId()]); - // remove the message from the queue - $this->remove($message); - } - - /** - * Process a message with the state 'StateToProcess'. - * - * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed - * - * @return void - */ - public function processToProcess(MessageInterface $message) - { - - // count messages in queue - $inQueue = sizeof($this->jobsExecuting); - - // we only process 50 jobs in parallel - if ($inQueue < 50) { - // load application - $application = $this->getApplication(); - - // start the job and add it to the internal array - $this->jobsExecuting[$message->getMessageId()] = new Job($message, $application); - - // set new state - $this->messageStates[$message->getMessageId()] = StateInProgress::KEY; - - } else { - // log a message that queue is actually full - $this->getApplication()->getInitialContext()->getSystemLogger()->debug( - sprintf('Job queue full - (%d jobs/%d msg wait)', $inQueue, sizeof($this->messages)) - ); - } - } - - /** - * Process a message with the state 'StateUnknown'. - * - * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed - * - * @return void - */ - public function processUnknown(MessageInterface $message) - { - - // set new state - $this->messageStates[$message->getMessageId()] = StateFailed::KEY; - - // log a message that we've a message with a unknown state - $this->getApplication()->getInitialContext()->getSystemLogger()->critical( - sprintf('Message %s has state %s', $message->getMessageId(), StateFailed::KEY) - ); - } - - /** - * Process a message with an invalid state. - * - * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed + * Stops the worker instance. * * @return void */ - public function processInvalid(MessageInterface $message) + public function stop() { - - // set new state - $this->messageStates[$message->getMessageId()] = StateFailed::KEY; - - // log a message that we've a message with an invalid state - $this->getApplication()->getInitialContext()->getSystemLogger()->critical( - sprintf('Message %s has an invalid state', $message->getMessageId()) - ); + $this->synchronized(function ($self) { + $self->run = false; + }, $this); } /** @@ -328,7 +194,7 @@ public function shutdown() extract($lastError); // query whether we've a fatal/user error if ($type === E_ERROR || $type === E_USER_ERROR) { - $this->getApplication()->getInitialContex()->getSystemLogger()->error($message); + $this->getApplication()->getInitialContext()->getSystemLogger()->error($message); } } } @@ -347,12 +213,19 @@ public function run() // create a local instance of application and storage $application = $this->application; + // create local instances of the storages + $messages = $this->messages; + $priorityKey = $this->priorityKey; + $messageStates = $this->messageStates; + $jobsToExecute = $this->jobsToExecute; + $jobsExecuting = $this->jobsExecuting; + // register the class loader again, because each thread has its own context $application->registerClassLoaders(); // try to load the profile logger if ($profileLogger = $application->getInitialContext()->getLogger(LoggerUtils::PROFILE)) { - $profileLogger->appendThreadContext(sprintf('queue-worker-%s', $this->priorityKey)); + $profileLogger->appendThreadContext(sprintf('queue-worker-%s', $priorityKey)); } /* @@ -363,57 +236,119 @@ public function run() * PriorityMedium: 10.000 === 0.01 s * PriorityLow: 1.000.000 === 1 s */ - $sleepFor = pow(10, $this->priorityKey->getPriority() * 2); + $sleepFor = pow(10, $priorityKey->getPriority() * 2); // run forever while (true) { // iterate over all job wrappers - foreach ($this->jobsToExecute as $jobWrapper) { + foreach ($jobsToExecute as $jobWrapper) { try { // load the message - $message = $this->messages[$jobWrapper->jobId]; + $message = $messages[$jobWrapper->jobId]; // check if we've a message found if ($message instanceof MessageInterface) { // check the message state - switch ($this->messageStates[$jobWrapper->jobId]) { + switch ($messageStates[$jobWrapper->jobId]) { // message is active and ready to be processed case StateActive::KEY: - $this->processActive($message); + // set the new state now + $messageStates[$message->getMessageId()] = StateToProcess::KEY; + break; // message is paused or in progress case StatePaused::KEY: case StateInProgress::KEY: - $this->processInProgress($message); + // make sure the job has been finished + if (isset($jobsExecuting[$message->getMessageId()]) && + $jobsExecuting[$message->getMessageId()] instanceof JobInterface && + $jobsExecuting[$message->getMessageId()]->isFinished() + ) { + // log a message that the job is still in progress + $this->getApplication()->getInitialContext()->getSystemLogger()->info( + sprintf('Job %s has been finished, remove it from job queue now', $message->getMessageId()) + ); + + // we also remove the job + unset($jobsExecuting[$message->getMessageId()]); + + // set the new state now + $messageStates[$message->getMessageId()] = StateProcessed::KEY; + + } else { + // log a message that the job is still in progress + $this->getApplication()->getInitialContext()->getSystemLogger()->debug( + sprintf('Job %s is still in progress', $message->getMessageId()) + ); + } + break; // message processing failed or has been successfully processed case StateFailed::KEY: case StateProcessed::KEY: - $this->processProcessed($message); + // remove the job from the queue with jobs that has to be executed + unset($jobsToExecute[$message->getMessageId()]); + + // remove the message from the queue + unset($messages[$message->getMessageId()]); + unset($messageStates[$message->getMessageId()]); + break; // message has to be processed now case StateToProcess::KEY: - $this->processToProcess($message); + // count messages in queue + $inQueue = sizeof($jobsExecuting); + + // we only process 50 jobs in parallel + if ($inQueue < 50) { + + // start the job and add it to the internal array + $jobsExecuting[$message->getMessageId()] = new Job($message, $application); + + // set the new state now + $messageStates[$message->getMessageId()] = StateInProgress::KEY; + + } else { + // log a message that queue is actually full + $application->getInitialContext()->getSystemLogger()->debug( + sprintf('Job queue full - (%d jobs/%d msg wait)', $inQueue, sizeof($messages)) + ); + } + break; // message is in an unknown state -> this is weired and should never happen! case StateUnknown::KEY: - $this->processUnknown($message); + // set new state now + $messageStates[$message->getMessageId()] = StateFailed::KEY; + + // log a message that we've a message with a unknown state + $this->getApplication()->getInitialContext()->getSystemLogger()->critical( + sprintf('Message %s has state %s', $message->getMessageId(), StateFailed::KEY) + ); + break; // we don't know the message state -> this is weired and should never happen! default: - $this->processInvalid($message); + // set new state + $messageStates[$message->getMessageId()] = StateFailed::KEY; + + // log a message that we've a message with an invalid state + $this->getApplication()->getInitialContext()->getSystemLogger()->critical( + sprintf('Message %s has an invalid state', $message->getMessageId()) + ); + break; } } @@ -430,7 +365,7 @@ public function run() // profile the size of the session pool if ($profileLogger) { $profileLogger->debug( - sprintf('Processed queue worker with priority %s, size of queue size is: %d', $this->priorityKey, sizeof($this->storage)) + sprintf('Processed queue worker with priority %s, size of queue size is: %d', $priorityKey, sizeof($jobsToExecute)) ); } From 905a8df852ede64cb28b39f63ebb7eedcf2f348a Mon Sep 17 00:00:00 2001 From: Tim Wagner Date: Thu, 7 May 2015 14:37:51 +0200 Subject: [PATCH 2/4] Query run flag in queue workers while loop --- .../Appserver/MessageQueue/MessageQueue.php | 26 +++++++++++++++++++ .../Appserver/MessageQueue/QueueWorker.php | 6 ++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php index 3371670d3..34d619e4c 100755 --- a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php +++ b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php @@ -230,6 +230,29 @@ protected function uniqueWorkerName(PriorityKeyInterface $priorityKey) return sprintf('%s-%s', $this->getName(), $priorityKey); } + /** + * Does shutdown logic for request handler if something went wrong and + * produces a fatal error for example. + * + * @return void + */ + public function shutdown() + { + + // check if there was a fatal error caused shutdown + if ($lastError = error_get_last()) { + // initialize type + message + $type = 0; + $message = ''; + // extract the last error values + extract($lastError); + // query whether we've a fatal/user error + if ($type === E_ERROR || $type === E_USER_ERROR) { + $this->getApplication()->getInitialContex()->getSystemLogger()->error($message); + } + } + } + /** * We process the messages/jobs here. * @@ -238,6 +261,9 @@ protected function uniqueWorkerName(PriorityKeyInterface $priorityKey) public function run() { + // register shutdown handler + register_shutdown_function(array(&$this, "shutdown")); + // create a local instance of application and storage $application = $this->application; diff --git a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php index 9f4b9f51e..d8233f567 100644 --- a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php +++ b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php @@ -238,8 +238,8 @@ public function run() */ $sleepFor = pow(10, $priorityKey->getPriority() * 2); - // run forever - while (true) { + // run until we've to stop + while ($this->run) { // iterate over all job wrappers foreach ($jobsToExecute as $jobWrapper) { try { @@ -296,8 +296,8 @@ public function run() unset($jobsToExecute[$message->getMessageId()]); // remove the message from the queue - unset($messages[$message->getMessageId()]); unset($messageStates[$message->getMessageId()]); + unset($messages[$message->getMessageId()]); break; From be758d09ba7c088943a9cbd8e4dbdf2e5a9a5494 Mon Sep 17 00:00:00 2001 From: Tim Wagner Date: Thu, 7 May 2015 14:37:51 +0200 Subject: [PATCH 3/4] Query run flag in queue workers while loop Fixed some possible segfault risks --- .../Appserver/MessageQueue/MessageQueue.php | 34 ++++++-- .../MessageQueue/MessageQueueValve.php | 3 - .../Appserver/MessageQueue/QueueWorker.php | 82 ++++++++----------- 3 files changed, 60 insertions(+), 59 deletions(-) diff --git a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php index 3371670d3..3b175225c 100755 --- a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php +++ b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php @@ -230,6 +230,29 @@ protected function uniqueWorkerName(PriorityKeyInterface $priorityKey) return sprintf('%s-%s', $this->getName(), $priorityKey); } + /** + * Does shutdown logic for request handler if something went wrong and + * produces a fatal error for example. + * + * @return void + */ + public function shutdown() + { + + // check if there was a fatal error caused shutdown + if ($lastError = error_get_last()) { + // initialize type + message + $type = 0; + $message = ''; + // extract the last error values + extract($lastError); + // query whether we've a fatal/user error + if ($type === E_ERROR || $type === E_USER_ERROR) { + $this->getApplication()->getInitialContext()->getSystemLogger()->error($message); + } + } + } + /** * We process the messages/jobs here. * @@ -238,6 +261,9 @@ protected function uniqueWorkerName(PriorityKeyInterface $priorityKey) public function run() { + // register shutdown handler + register_shutdown_function(array(&$this, "shutdown")); + // create a local instance of application and storage $application = $this->application; @@ -254,22 +280,16 @@ public function run() $messages = $this->messages; // prepare the storages - $jobsExecuting = array(); $jobsToExceute = array(); - $messageStates = array(); // initialize the counter for the storages $counter = 0; // create a separate queue for each priority foreach (PriorityKeys::getAll() as $priorityKey) { - // ATTENTION: We use an array for the jobs (threads) that are executing acutually. - // Using stackables leads to random segfaults on Windows! - $jobsExecuting[$counter] = array(); // create the containers for the worker $jobsToExceute[$counter] = new GenericStackable(); - $messageStates[$counter] = new GenericStackable(); // initialize and start the queue worker $queueWorker = new QueueWorker(); @@ -278,9 +298,7 @@ public function run() $queueWorker->injectApplication($application); // attach the storages - $queueWorker->injectJobsExecuting($jobsExecuting[$counter]); $queueWorker->injectJobsToExecute($jobsToExceute[$counter]); - $queueWorker->injectMessageStates($messageStates[$counter]); // start the worker instance $queueWorker->start(); diff --git a/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php b/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php index 25dca2012..c54c3f255 100644 --- a/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php +++ b/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php @@ -59,9 +59,6 @@ public function invoke(HttpServletRequestInterface $servletRequest, HttpServletR // unpack the message $message = MessageQueueProtocol::unpack($servletRequest->getBodyContent()); - // @todo Not sure if we need that!!!!!! - $message->setState(StateActive::get()); - // load message queue name and priority key $queueName = $message->getDestination()->getName(); $priorityKey = $message->getPriority(); diff --git a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php index 9f4b9f51e..600e16ecf 100644 --- a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php +++ b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php @@ -45,10 +45,8 @@ * * @property boolean $run Flag to start/stop the worker * @property \AppserverIo\Psr\Application\ApplicationInterface $application The application instance with the queue manager/locator - * @property \AppserverIo\Storage\GenericStackable $jobsExecuting The storage for the jobs currently executing * @property \AppserverIo\Storage\GenericStackable $jobsToExecute The storage for the jobs to be executed * @property \AppserverIo\Storage\GenericStackable $messages The storage for the messages - * @property \AppserverIo\Storage\GenericStackable $messageStates The storage for the messages' states * @property \AppserverIo\Psr\Pms\PriorityKeyInterface $priorityKey The priority of this queue worker */ class QueueWorker extends \Thread @@ -86,18 +84,6 @@ public function injectMessages(GenericStackable $messages) $this->messages = $messages; } - /** - * Inject the application instance the worker is bound to. - * - * @param \AppserverIo\Psr\Application\ApplicationInterface $application The application instance - * - * @return void - */ - public function injectApplication(ApplicationInterface $application) - { - $this->application = $application; - } - /** * Inject the storage for the jobs to be executed. * @@ -111,27 +97,15 @@ public function injectJobsToExecute(GenericStackable $jobsToExecute) } /** - * Inject the storage for the message states. - * - * @param \AppserverIo\Storage\GenericStackable $messageStates The storage for the message states - * - * @return void - */ - public function injectMessageStates(GenericStackable $messageStates) - { - $this->messageStates = $messageStates; - } - - /** - * Inject the storage for the executing jobs. + * Inject the application instance the worker is bound to. * - * @param array $jobsExecuting The storage for the executing jobs + * @param \AppserverIo\Psr\Application\ApplicationInterface $application The application instance * * @return void */ - public function injectJobsExecuting(array $jobsExecuting) + public function injectApplication(ApplicationInterface $application) { - $this->jobsExecuting = $jobsExecuting; + $this->application = $application; } /** @@ -154,13 +128,9 @@ public function getApplication() public function attach(\stdClass $jobWrapper) { - // force handling the timer tasks now + // attach the job wrapper $this->synchronized(function (QueueWorker $self, \stdClass $jw) { - - // attach the job wrapper $self->jobsToExecute[$jw->jobId] = $jw; - $self->messageStates[$jw->jobId] = StateActive::KEY; - }, $this, $jobWrapper); } @@ -216,9 +186,11 @@ public function run() // create local instances of the storages $messages = $this->messages; $priorityKey = $this->priorityKey; - $messageStates = $this->messageStates; $jobsToExecute = $this->jobsToExecute; - $jobsExecuting = $this->jobsExecuting; + + // initialize the arrays for the message states and the jobs executing + $messageStates = array(); + $jobsExecuting = array(); // register the class loader again, because each thread has its own context $application->registerClassLoaders(); @@ -238,8 +210,8 @@ public function run() */ $sleepFor = pow(10, $priorityKey->getPriority() * 2); - // run forever - while (true) { + // run until we've to stop + while ($this->run) { // iterate over all job wrappers foreach ($jobsToExecute as $jobWrapper) { try { @@ -248,6 +220,17 @@ public function run() // check if we've a message found if ($message instanceof MessageInterface) { + // set the inital message state if not done + if (isset($messageStates[$jobWrapper->jobId]) === false) { + + // initialize the default message state + if ($state = $message->getState()) { + $messageStates[$jobWrapper->jobId] = $state->getState(); + } else { + $messageStates[$jobWrapper->jobId] = StateUnknown::KEY; + } + } + // check the message state switch ($messageStates[$jobWrapper->jobId]) { @@ -273,9 +256,6 @@ public function run() sprintf('Job %s has been finished, remove it from job queue now', $message->getMessageId()) ); - // we also remove the job - unset($jobsExecuting[$message->getMessageId()]); - // set the new state now $messageStates[$message->getMessageId()] = StateProcessed::KEY; @@ -292,12 +272,18 @@ public function run() case StateFailed::KEY: case StateProcessed::KEY: + // load the unique message-ID + $messageId = $message->getMessageId(); + // remove the job from the queue with jobs that has to be executed - unset($jobsToExecute[$message->getMessageId()]); + unset($jobsToExecute[$messageId]); + + // also remove the job + unset($jobsExecuting[$messageId]); - // remove the message from the queue - unset($messages[$message->getMessageId()]); - unset($messageStates[$message->getMessageId()]); + // finally, remove the message states and the message from the queue + unset($messageStates[$messageId]); + unset($messages[$messageId]); break; @@ -311,7 +297,7 @@ public function run() if ($inQueue < 50) { // start the job and add it to the internal array - $jobsExecuting[$message->getMessageId()] = new Job($message, $application); + $jobsExecuting[$message->getMessageId()] = new Job(clone $message, $application); // set the new state now $messageStates[$message->getMessageId()] = StateInProgress::KEY; @@ -341,7 +327,7 @@ public function run() // we don't know the message state -> this is weired and should never happen! default: - // set new state + // set the failed message state $messageStates[$message->getMessageId()] = StateFailed::KEY; // log a message that we've a message with an invalid state From 1f404c2a5442c7405fe6612ff4ddb235500904cc Mon Sep 17 00:00:00 2001 From: Tim Wagner Date: Thu, 7 May 2015 16:51:58 +0200 Subject: [PATCH 4/4] Make PSR-2 compatible --- src/AppserverIo/Appserver/MessageQueue/MessageQueue.php | 1 - src/AppserverIo/Appserver/MessageQueue/QueueWorker.php | 2 -- 2 files changed, 3 deletions(-) diff --git a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php index 3b175225c..a368ce75d 100755 --- a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php +++ b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php @@ -287,7 +287,6 @@ public function run() // create a separate queue for each priority foreach (PriorityKeys::getAll() as $priorityKey) { - // create the containers for the worker $jobsToExceute[$counter] = new GenericStackable(); diff --git a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php index 600e16ecf..5cffaedbc 100644 --- a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php +++ b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php @@ -222,7 +222,6 @@ public function run() if ($message instanceof MessageInterface) { // set the inital message state if not done if (isset($messageStates[$jobWrapper->jobId]) === false) { - // initialize the default message state if ($state = $message->getState()) { $messageStates[$jobWrapper->jobId] = $state->getState(); @@ -295,7 +294,6 @@ public function run() // we only process 50 jobs in parallel if ($inQueue < 50) { - // start the job and add it to the internal array $jobsExecuting[$message->getMessageId()] = new Job(clone $message, $application);