-
Notifications
You must be signed in to change notification settings - Fork 1
Deferred Chunk Generation
Almost everything rendered to a user in Contentment is built up from one or more Page Block hierarchies. In some instances it may be worthwhile to allow certain blocks to be deferred and pushed to the page upon final generation.
There are several components to this idea.
A broker is, well, just that, a message broker. It contributes to the context a callable that can be used to deliver JSON-encoded or HTML fragment messages over a named channel. Brokers offer the flexibility of very simple, routable message delivery to one or more back-ends.
An example Broker implementation for Nginx's HTTP Push Stream Module:
from requests import Request, Session
class NHPSMBroker:
def __init__(self, global="__global", publisher="/_publish/{namespace}", subscriber="/_subscribe/{namespace}"):
self._global = global
self._publisher = publisher
self._subscriber = subscriber
self.session = None
def start(self, context):
"""Perform any startup work such as opening opening connections."""
self.session = requests.Session()
def stop(self, context):
"""Perform any shutdown work such as closing connections."""
self.session.close()
self.session = None
def publisher(self, context, namespace=None):
"""Return the URI to the publisher URL."""
return self._publisher.format(namespace=self._global if namespace is None else namespace)
def subscriber(self, context, namespace=None):
"""Return the URI to the subscriber URL."""
return self._subscriber.format(namespace=self._global if namespace is None else namespace)
def handle_future(self, context, namespace=None):
def callback(self, future):
self(context, future.result(), namespace)
return callback
def __call__(self, context, message, namespace=None):
if isinstance(message, Future):
message.add_done_callback(self.handle_future(context, namespace))
return self
url = self.publisher(context, namespace)
if isinstance(message, str):
result = self.session.post(
url,
data = None,
headers = {
"Content-Type": "text/html; charset=utf-8"
}
)
else:
result = self.session.post(url, json=message)
result.raise_for_status()
return self
A minor extension to the cinje chunked response protocol, allowing yielding of Future
instances, which will buffer chunks while waiting for the accumulated futures to return. Once the timeout is hit, dump what we have, filling the empty blocks with placeholders referencing an endpoint to listen to and placeholder ID.
from collections import deque
from concurrent.futures import as_completed, Future, TimeoutError
def deferred_stream(context, timeout, input, encoding=None, errors='strict'):
futures = dict()
buffer = deque()
input = (i for i in input if i) # Omits `None` (empty wrappers) and empty chunks.
for chunk in input:
if isinstance(chunk, Future):
futures[id(future)] = chunk
buffer.append(chunk)
continue
chunk = chunk if encoding is None else chunk.encode(encoding, errors=errors)
if buffer: buffer.append(chunk)
else: yield chunk
if futures: # Wait for our timeout period for the various chunks to complete.
try:
for future in as_completed(futures.values(), timeout=timeout):
if buffer[0] != future: # Not our future.
continue
del futures[id(buffer[0])]
chunk = buffer.popleft().result()
yield chunk if encoding is None else chunk.encode(encoding, errors=errors)
while buffer:
if isinstance(buffer[0], Future):
if buffer[0].done():
del futures[id(buffer[0])]
chunk = buffer.popleft().result()
yield chunk if encoding is None else chunk.encode(encoding, errors=errors)
continue
break
chunk = buffer.popleft() # Handle interstitial static elements.
yield chunk if encoding is None else chunk.encode(encoding, errors=errors)
except TimeoutError:
pass
namespace = 'cinje-{context}'.format(id(context))
endpoint = context.broker.subscriber(context, namespace)
def deferred(future):
result = '<div data-placeholder="{placeholder}">{result}</div>'.format(
placeholder = id(chunk),
result = f.result()
)
return result if encoding is None else result.encode(encoding, errors=errors)
for chunk in buffer: # Now process any remaining chunks, replacing with placeholders where needed.
if isinstance(chunk, Future): # Attempt to replace with actual value.
if chunk.done():
yield chunk.result() if encoding is None else chunk.result().encode(encoding, errors=errors)
else:
context.broker(context, deferred, namespace)
yield '<div data-endpoint="{endpoint}" data-placeholder="{placeholder}"></div>'.format(
endpoint = endpoint,
placeholder = id(chunk),
)