diff --git a/CHANGELOG.md b/CHANGELOG.md
index 79302c813..fc23335e0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,9 +3,11 @@
## Bugfixes
* Fixed [#784](https://github.com/appserver-io/appserver/issues/784) - Application Deployment after switching to safe user
+* Fixed [#790](https://github.com/appserver-io/appserver/issues/790) - Long running messages in Message Queue blocks other messages
## Features
+* Remove [#777](https://github.com/appserver-io/appserver/issues/777) - Remove remote http://www.w3.org/2001/03/xml.xsd from schemas and configurations
* Closed [#758](https://github.com/appserver-io/appserver/issues/758) - Update to latest PHP 5.5.24
# Version 1.0.4
diff --git a/UPGRADE-1.0.5.md b/UPGRADE-1.0.5.md
new file mode 100644
index 000000000..0545fdcec
--- /dev/null
+++ b/UPGRADE-1.0.5.md
@@ -0,0 +1,3 @@
+# Upgrade from 1.0.4 to 1.0.5
+
+Updating from 1.0.4 to 1.0.5 doesn't have any impacts. Please read the apropriate UPGRADE-1.x.x files for updates from older versions to 1.0.4.
diff --git a/resources/schema/appserver.xsd b/resources/schema/appserver.xsd
index 0f884dd31..69cc7d5f8 100644
--- a/resources/schema/appserver.xsd
+++ b/resources/schema/appserver.xsd
@@ -4,7 +4,7 @@
targetNamespace="http://www.appserver.io/appserver"
elementFormDefault="unqualified"
attributeFormDefault="unqualified">
-
+
diff --git a/resources/schema/xml.xsd b/resources/schema/xml.xsd
new file mode 100644
index 000000000..d662b4236
--- /dev/null
+++ b/resources/schema/xml.xsd
@@ -0,0 +1,117 @@
+
+
+
+
+
+
+ See http://www.w3.org/XML/1998/namespace.html and
+ http://www.w3.org/TR/REC-xml for information about this namespace.
+
+ This schema document describes the XML namespace, in a form
+ suitable for import by other schema documents.
+
+ Note that local names in this namespace are intended to be defined
+ only by the World Wide Web Consortium or its subgroups. The
+ following names are currently defined in this namespace and should
+ not be used with conflicting semantics by any Working Group,
+ specification, or document instance:
+
+ base (as an attribute name): denotes an attribute whose value
+ provides a URI to be used as the base for interpreting any
+ relative URIs in the scope of the element on which it
+ appears; its value is inherited. This name is reserved
+ by virtue of its definition in the XML Base specification.
+
+ lang (as an attribute name): denotes an attribute whose value
+ is a language code for the natural language of the content of
+ any element; its value is inherited. This name is reserved
+ by virtue of its definition in the XML specification.
+
+ space (as an attribute name): denotes an attribute whose
+ value is a keyword indicating what whitespace processing
+ discipline is intended for the content of the element; its
+ value is inherited. This name is reserved by virtue of its
+ definition in the XML specification.
+
+ Father (in any context at all): denotes Jon Bosak, the chair of
+ the original XML Working Group. This name is reserved by
+ the following decision of the W3C XML Plenary and
+ XML Coordination groups:
+
+ In appreciation for his vision, leadership and dedication
+ the W3C XML Plenary on this 10th day of February, 2000
+ reserves for Jon Bosak in perpetuity the XML name
+ xml:Father
+
+
+
+
+ This schema defines attributes and an attribute group
+ suitable for use by
+ schemas wishing to allow xml:base, xml:lang or xml:space attributes
+ on elements they define.
+
+ To enable this, such a schema must import this schema
+ for the XML namespace, e.g. as follows:
+ <schema . . .>
+ . . .
+ <import namespace="http://www.w3.org/XML/1998/namespace"
+ schemaLocation="http://www.w3.org/2001/03/xml.xsd"/>
+
+ Subsequently, qualified reference to any of the attributes
+ or the group defined below will have the desired effect, e.g.
+
+ <type . . .>
+ . . .
+ <attributeGroup ref="xml:specialAttrs"/>
+
+ will define a type which will schema-validate an instance
+ element with any of those attributes
+
+
+
+ In keeping with the XML Schema WG's standard versioning
+ policy, this schema document will persist at
+ http://www.w3.org/2001/03/xml.xsd.
+ At the date of issue it can also be found at
+ http://www.w3.org/2001/xml.xsd.
+ The schema document at that URI may however change in the future,
+ in order to remain compatible with the latest version of XML Schema
+ itself. In other words, if the XML Schema namespace changes, the version
+ of this document at
+ http://www.w3.org/2001/xml.xsd will change
+ accordingly; the version at
+ http://www.w3.org/2001/03/xml.xsd will not change.
+
+
+
+
+
+ In due course, we should install the relevant ISO 2- and 3-letter
+ codes as the enumerated possible values . . .
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ See http://www.w3.org/TR/xmlbase/ for
+ information about this attribute.
+
+
+
+
+
+
+
+
+
+
diff --git a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php
index 3371670d3..a368ce75d 100755
--- a/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php
+++ b/src/AppserverIo/Appserver/MessageQueue/MessageQueue.php
@@ -230,6 +230,29 @@ protected function uniqueWorkerName(PriorityKeyInterface $priorityKey)
return sprintf('%s-%s', $this->getName(), $priorityKey);
}
+ /**
+ * Does shutdown logic for request handler if something went wrong and
+ * produces a fatal error for example.
+ *
+ * @return void
+ */
+ public function shutdown()
+ {
+
+ // check if there was a fatal error caused shutdown
+ if ($lastError = error_get_last()) {
+ // initialize type + message
+ $type = 0;
+ $message = '';
+ // extract the last error values
+ extract($lastError);
+ // query whether we've a fatal/user error
+ if ($type === E_ERROR || $type === E_USER_ERROR) {
+ $this->getApplication()->getInitialContext()->getSystemLogger()->error($message);
+ }
+ }
+ }
+
/**
* We process the messages/jobs here.
*
@@ -238,6 +261,9 @@ protected function uniqueWorkerName(PriorityKeyInterface $priorityKey)
public function run()
{
+ // register shutdown handler
+ register_shutdown_function(array(&$this, "shutdown"));
+
// create a local instance of application and storage
$application = $this->application;
@@ -254,22 +280,15 @@ public function run()
$messages = $this->messages;
// prepare the storages
- $jobsExecuting = array();
$jobsToExceute = array();
- $messageStates = array();
// initialize the counter for the storages
$counter = 0;
// create a separate queue for each priority
foreach (PriorityKeys::getAll() as $priorityKey) {
- // ATTENTION: We use an array for the jobs (threads) that are executing acutually.
- // Using stackables leads to random segfaults on Windows!
- $jobsExecuting[$counter] = array();
-
// create the containers for the worker
$jobsToExceute[$counter] = new GenericStackable();
- $messageStates[$counter] = new GenericStackable();
// initialize and start the queue worker
$queueWorker = new QueueWorker();
@@ -278,9 +297,7 @@ public function run()
$queueWorker->injectApplication($application);
// attach the storages
- $queueWorker->injectJobsExecuting($jobsExecuting[$counter]);
$queueWorker->injectJobsToExecute($jobsToExceute[$counter]);
- $queueWorker->injectMessageStates($messageStates[$counter]);
// start the worker instance
$queueWorker->start();
diff --git a/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php b/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php
index 25dca2012..c54c3f255 100644
--- a/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php
+++ b/src/AppserverIo/Appserver/MessageQueue/MessageQueueValve.php
@@ -59,9 +59,6 @@ public function invoke(HttpServletRequestInterface $servletRequest, HttpServletR
// unpack the message
$message = MessageQueueProtocol::unpack($servletRequest->getBodyContent());
- // @todo Not sure if we need that!!!!!!
- $message->setState(StateActive::get());
-
// load message queue name and priority key
$queueName = $message->getDestination()->getName();
$priorityKey = $message->getPriority();
diff --git a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php
index b3a035b8a..5cffaedbc 100644
--- a/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php
+++ b/src/AppserverIo/Appserver/MessageQueue/QueueWorker.php
@@ -43,25 +43,21 @@
* @link https://github.com/appserver-io/appserver
* @link http://www.appserver.io
*
+ * @property boolean $run Flag to start/stop the worker
* @property \AppserverIo\Psr\Application\ApplicationInterface $application The application instance with the queue manager/locator
- * @property \AppserverIo\Storage\GenericStackable $jobsExecuting The storage for the jobs currently executing
* @property \AppserverIo\Storage\GenericStackable $jobsToExecute The storage for the jobs to be executed
* @property \AppserverIo\Storage\GenericStackable $messages The storage for the messages
- * @property \AppserverIo\Storage\GenericStackable $messageStates The storage for the messages' states
* @property \AppserverIo\Psr\Pms\PriorityKeyInterface $priorityKey The priority of this queue worker
*/
class QueueWorker extends \Thread
{
/**
- * Initializes the message queue with the necessary data.
+ * Sets the workers start flag.
*/
public function __construct()
{
-
- // initialize the flags for start/stop handling
$this->run = true;
- $this->running = false;
}
/**
@@ -88,18 +84,6 @@ public function injectMessages(GenericStackable $messages)
$this->messages = $messages;
}
- /**
- * Inject the application instance the worker is bound to.
- *
- * @param \AppserverIo\Psr\Application\ApplicationInterface $application The application instance
- *
- * @return void
- */
- public function injectApplication(ApplicationInterface $application)
- {
- $this->application = $application;
- }
-
/**
* Inject the storage for the jobs to be executed.
*
@@ -113,27 +97,15 @@ public function injectJobsToExecute(GenericStackable $jobsToExecute)
}
/**
- * Inject the storage for the message states.
- *
- * @param \AppserverIo\Storage\GenericStackable $messageStates The storage for the message states
- *
- * @return void
- */
- public function injectMessageStates(GenericStackable $messageStates)
- {
- $this->messageStates = $messageStates;
- }
-
- /**
- * Inject the storage for the executing jobs.
+ * Inject the application instance the worker is bound to.
*
- * @param array $jobsExecuting The storage for the executing jobs
+ * @param \AppserverIo\Psr\Application\ApplicationInterface $application The application instance
*
* @return void
*/
- public function injectJobsExecuting(array $jobsExecuting)
+ public function injectApplication(ApplicationInterface $application)
{
- $this->jobsExecuting = $jobsExecuting;
+ $this->application = $application;
}
/**
@@ -156,158 +128,22 @@ public function getApplication()
public function attach(\stdClass $jobWrapper)
{
- // force handling the timer tasks now
+ // attach the job wrapper
$this->synchronized(function (QueueWorker $self, \stdClass $jw) {
-
- // attach the job wrapper
$self->jobsToExecute[$jw->jobId] = $jw;
- $self->messageStates[$jw->jobId] = StateActive::KEY;
-
}, $this, $jobWrapper);
}
/**
- * Removes the message from the queue.
- *
- * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be removed from the queue
- *
- * @return void
- */
- public function remove(MessageInterface $message)
- {
- unset($this->messages[$message->getMessageId()]);
- unset($this->messageStates[$message->getMessageId()]);
- }
-
- /**
- * Process a message with the state 'StateActive'.
- *
- * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed
- *
- * @return void
- */
- public function processActive(MessageInterface $message)
- {
- $this->messageStates[$message->getMessageId()] = StateToProcess::KEY;
- }
-
- /**
- * Process a message with the state 'StateInProgress'.
- *
- * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed
- *
- * @return void
- */
- public function processInProgress(MessageInterface $message)
- {
-
- // make sure the job has been finished
- if (isset($this->jobsExecuting[$message->getMessageId()]) &&
- $this->jobsExecuting[$message->getMessageId()] instanceof JobInterface &&
- $this->jobsExecuting[$message->getMessageId()]->isFinished()
- ) {
- // log a message that the job is still in progress
- $this->getApplication()->getInitialContext()->getSystemLogger()->info(
- sprintf('Job %s has been finished, remove it from job queue now', $message->getMessageId())
- );
-
- // we also remove the job
- unset($this->jobsExecuting[$message->getMessageId()]);
-
- // set new state
- $this->messageStates[$message->getMessageId()] = StateProcessed::KEY;
-
- } else {
- // log a message that the job is still in progress
- $this->getApplication()->getInitialContext()->getSystemLogger()->debug(
- sprintf('Job %s is still in progress', $message->getMessageId())
- );
- }
- }
-
- /**
- * Process a message with the state 'StateProcessed'.
- *
- * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed
+ * Stops the worker instance.
*
* @return void
*/
- public function processProcessed(MessageInterface $message)
+ public function stop()
{
- // remove the job from the queue with jobs that has to be executed
- unset($this->jobsToExecute[$message->getMessageId()]);
- // remove the message from the queue
- $this->remove($message);
- }
-
- /**
- * Process a message with the state 'StateToProcess'.
- *
- * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed
- *
- * @return void
- */
- public function processToProcess(MessageInterface $message)
- {
-
- // count messages in queue
- $inQueue = sizeof($this->jobsExecuting);
-
- // we only process 50 jobs in parallel
- if ($inQueue < 50) {
- // load application
- $application = $this->getApplication();
-
- // start the job and add it to the internal array
- $this->jobsExecuting[$message->getMessageId()] = new Job($message, $application);
-
- // set new state
- $this->messageStates[$message->getMessageId()] = StateInProgress::KEY;
-
- } else {
- // log a message that queue is actually full
- $this->getApplication()->getInitialContext()->getSystemLogger()->debug(
- sprintf('Job queue full - (%d jobs/%d msg wait)', $inQueue, sizeof($this->messages))
- );
- }
- }
-
- /**
- * Process a message with the state 'StateUnknown'.
- *
- * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed
- *
- * @return void
- */
- public function processUnknown(MessageInterface $message)
- {
-
- // set new state
- $this->messageStates[$message->getMessageId()] = StateFailed::KEY;
-
- // log a message that we've a message with a unknown state
- $this->getApplication()->getInitialContext()->getSystemLogger()->critical(
- sprintf('Message %s has state %s', $message->getMessageId(), StateFailed::KEY)
- );
- }
-
- /**
- * Process a message with an invalid state.
- *
- * @param \AppserverIo\Psr\Pms\MessageInterface $message The message to be processed
- *
- * @return void
- */
- public function processInvalid(MessageInterface $message)
- {
-
- // set new state
- $this->messageStates[$message->getMessageId()] = StateFailed::KEY;
-
- // log a message that we've a message with an invalid state
- $this->getApplication()->getInitialContext()->getSystemLogger()->critical(
- sprintf('Message %s has an invalid state', $message->getMessageId())
- );
+ $this->synchronized(function ($self) {
+ $self->run = false;
+ }, $this);
}
/**
@@ -328,7 +164,7 @@ public function shutdown()
extract($lastError);
// query whether we've a fatal/user error
if ($type === E_ERROR || $type === E_USER_ERROR) {
- $this->getApplication()->getInitialContex()->getSystemLogger()->error($message);
+ $this->getApplication()->getInitialContext()->getSystemLogger()->error($message);
}
}
}
@@ -347,12 +183,21 @@ public function run()
// create a local instance of application and storage
$application = $this->application;
+ // create local instances of the storages
+ $messages = $this->messages;
+ $priorityKey = $this->priorityKey;
+ $jobsToExecute = $this->jobsToExecute;
+
+ // initialize the arrays for the message states and the jobs executing
+ $messageStates = array();
+ $jobsExecuting = array();
+
// register the class loader again, because each thread has its own context
$application->registerClassLoaders();
// try to load the profile logger
if ($profileLogger = $application->getInitialContext()->getLogger(LoggerUtils::PROFILE)) {
- $profileLogger->appendThreadContext(sprintf('queue-worker-%s', $this->priorityKey));
+ $profileLogger->appendThreadContext(sprintf('queue-worker-%s', $priorityKey));
}
/*
@@ -363,57 +208,131 @@ public function run()
* PriorityMedium: 10.000 === 0.01 s
* PriorityLow: 1.000.000 === 1 s
*/
- $sleepFor = pow(10, $this->priorityKey->getPriority() * 2);
+ $sleepFor = pow(10, $priorityKey->getPriority() * 2);
- // run forever
- while (true) {
+ // run until we've to stop
+ while ($this->run) {
// iterate over all job wrappers
- foreach ($this->jobsToExecute as $jobWrapper) {
+ foreach ($jobsToExecute as $jobWrapper) {
try {
// load the message
- $message = $this->messages[$jobWrapper->jobId];
+ $message = $messages[$jobWrapper->jobId];
// check if we've a message found
if ($message instanceof MessageInterface) {
+ // set the inital message state if not done
+ if (isset($messageStates[$jobWrapper->jobId]) === false) {
+ // initialize the default message state
+ if ($state = $message->getState()) {
+ $messageStates[$jobWrapper->jobId] = $state->getState();
+ } else {
+ $messageStates[$jobWrapper->jobId] = StateUnknown::KEY;
+ }
+ }
+
// check the message state
- switch ($this->messageStates[$jobWrapper->jobId]) {
+ switch ($messageStates[$jobWrapper->jobId]) {
// message is active and ready to be processed
case StateActive::KEY:
- $this->processActive($message);
+ // set the new state now
+ $messageStates[$message->getMessageId()] = StateToProcess::KEY;
+
break;
// message is paused or in progress
case StatePaused::KEY:
case StateInProgress::KEY:
- $this->processInProgress($message);
+ // make sure the job has been finished
+ if (isset($jobsExecuting[$message->getMessageId()]) &&
+ $jobsExecuting[$message->getMessageId()] instanceof JobInterface &&
+ $jobsExecuting[$message->getMessageId()]->isFinished()
+ ) {
+ // log a message that the job is still in progress
+ $this->getApplication()->getInitialContext()->getSystemLogger()->info(
+ sprintf('Job %s has been finished, remove it from job queue now', $message->getMessageId())
+ );
+
+ // set the new state now
+ $messageStates[$message->getMessageId()] = StateProcessed::KEY;
+
+ } else {
+ // log a message that the job is still in progress
+ $this->getApplication()->getInitialContext()->getSystemLogger()->debug(
+ sprintf('Job %s is still in progress', $message->getMessageId())
+ );
+ }
+
break;
// message processing failed or has been successfully processed
case StateFailed::KEY:
case StateProcessed::KEY:
- $this->processProcessed($message);
+ // load the unique message-ID
+ $messageId = $message->getMessageId();
+
+ // remove the job from the queue with jobs that has to be executed
+ unset($jobsToExecute[$messageId]);
+
+ // also remove the job
+ unset($jobsExecuting[$messageId]);
+
+ // finally, remove the message states and the message from the queue
+ unset($messageStates[$messageId]);
+ unset($messages[$messageId]);
+
break;
// message has to be processed now
case StateToProcess::KEY:
- $this->processToProcess($message);
+ // count messages in queue
+ $inQueue = sizeof($jobsExecuting);
+
+ // we only process 50 jobs in parallel
+ if ($inQueue < 50) {
+ // start the job and add it to the internal array
+ $jobsExecuting[$message->getMessageId()] = new Job(clone $message, $application);
+
+ // set the new state now
+ $messageStates[$message->getMessageId()] = StateInProgress::KEY;
+
+ } else {
+ // log a message that queue is actually full
+ $application->getInitialContext()->getSystemLogger()->debug(
+ sprintf('Job queue full - (%d jobs/%d msg wait)', $inQueue, sizeof($messages))
+ );
+ }
+
break;
// message is in an unknown state -> this is weired and should never happen!
case StateUnknown::KEY:
- $this->processUnknown($message);
+ // set new state now
+ $messageStates[$message->getMessageId()] = StateFailed::KEY;
+
+ // log a message that we've a message with a unknown state
+ $this->getApplication()->getInitialContext()->getSystemLogger()->critical(
+ sprintf('Message %s has state %s', $message->getMessageId(), StateFailed::KEY)
+ );
+
break;
// we don't know the message state -> this is weired and should never happen!
default:
- $this->processInvalid($message);
+ // set the failed message state
+ $messageStates[$message->getMessageId()] = StateFailed::KEY;
+
+ // log a message that we've a message with an invalid state
+ $this->getApplication()->getInitialContext()->getSystemLogger()->critical(
+ sprintf('Message %s has an invalid state', $message->getMessageId())
+ );
+
break;
}
}
@@ -430,7 +349,7 @@ public function run()
// profile the size of the session pool
if ($profileLogger) {
$profileLogger->debug(
- sprintf('Processed queue worker with priority %s, size of queue size is: %d', $this->priorityKey, sizeof($this->storage))
+ sprintf('Processed queue worker with priority %s, size of queue size is: %d', $priorityKey, sizeof($jobsToExecute))
);
}