Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix in-memory queue using shared-cache #32

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions persistqueue/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,23 @@ class Empty(Exception):

class Full(Exception):
pass


class MaxRetriesReached(Exception):
def __init__(self, msg, orig_ex=None, *args, **kwargs):
self.msg = msg
self.orig_ex = orig_ex
self.args = args
self.kwargs = kwargs

@property
def message(self):
msg = self.msg
if self.args:
msg = msg % self.args
if self.kwargs:
msg = msg.format(self.kwargs)

if self.orig_ex:
msg = msg + " Original exception: " + self.orig_ex
return msg
59 changes: 59 additions & 0 deletions persistqueue/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
import re
import time as _time

from persistqueue.exceptions import MaxRetriesReached

log = logging.getLogger(__name__)


def retrying(error_clz, msg=None, max=3):
"""Retry decorator for error.

:param error_clz: exception for retry.
:param msg: optional exception message for retry.
:param max: max retry count.
"""
# TODO support both @retrying and @retry() as valid syntax
def retry(func):
def wrapper(*args, **kwargs):
i = 0
while i <= max:
i += 1 # remember execution count
try:
return func(*args, **kwargs)
except Exception as ex:
retry = _retry_on_exception(ex, error_clz, msg)
if retry:
_raise_if_max_reached(i, max, ex)
log.warning("Retrying on error '%s', left retries: "
"%d", ex, max-i)
_time.sleep(i**2 * 0.1)
else:
raise
return wrapper
return retry


def _raise_if_max_reached(i, max, ex):
if i >= max:
raise MaxRetriesReached('Max retries reached.', orig_ex=ex)


def _retry_on_exception(error, error_clz, msg):
retry = False
if isinstance(error, error_clz):
if msg:
if re.search(msg, str(error)):
retry = True
else:
log.debug("exception '%s' occurred, "
"but message did not match, "
"found: '%s'", error_clz, str(error))
retry = False
else:
retry = True
else:
log.debug("exception '%s' occurred, but type did not match, "
"found: '%s'", type(error), error_clz)
return retry
44 changes: 32 additions & 12 deletions persistqueue/sqlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import os
import sqlite3
import threading
import uuid

from persistqueue.lib import retrying

sqlite3.enable_callback_tracebacks(True)

Expand All @@ -10,15 +13,15 @@

def with_conditional_transaction(func):
def _execute(obj, *args, **kwargs):
with obj.tran_lock:
if obj.auto_commit:
with obj._putter as tran:
with obj.tran_lock:
if obj.auto_commit:
with obj._putter as tran:
stat, param = func(obj, *args, **kwargs)
tran.execute(stat, param)
else:
stat, param = func(obj, *args, **kwargs)
tran.execute(stat, param)
else:
stat, param = func(obj, *args, **kwargs)
obj._putter.execute(stat, param)
# commit_ignore_error(obj._putter)
obj._putter.execute(stat, param)
# commit_ignore_error(obj._putter)

return _execute

Expand Down Expand Up @@ -82,6 +85,7 @@ def _init(self):

if self.path == self._MEMORY:
self.memory_sql = True
self.memory_id = uuid.uuid1()
log.debug("Initializing Sqlite3 Queue in memory.")
elif not os.path.exists(self.path):
os.makedirs(self.path)
Expand All @@ -100,6 +104,8 @@ def _init(self):
if not self.memory_sql:
self._putter = self._new_db_connection(
self.path, self.multithreading, self.timeout)
# self._putter.isolation_level = None
# self._putter.isolation_level = ""
if self.auto_commit is False:
log.warning('auto_commit=False is still experimental,'
'only use it with care.')
Expand All @@ -110,17 +116,29 @@ def _init(self):
self.put_event = threading.Event()

def _new_db_connection(self, path, multithreading, timeout):
conn = None
if path == self._MEMORY:
conn = sqlite3.connect(path,
check_same_thread=not multithreading)
# try:
# conn = sqlite3.connect(
# database='file:{0}?cache=shared&mode=memory'.format(
# self.memory_id),
# timeout=1000,
# check_same_thread=not multithreading,
# uri=True)
# except TypeError:
conn = sqlite3.connect(
database=':memory:',
timeout=timeout,
check_same_thread=not multithreading)
else:
conn = sqlite3.connect('{}/data.db'.format(path),
timeout=timeout,
check_same_thread=not multithreading)
conn.execute('PRAGMA journal_mode=WAL;')
conn.execute('PRAGMA journal_mode=WAL;')
# conn.set_trace_callback(print)
return conn

@retrying(error_clz=sqlite3.OperationalError,
msg='database .* is locked:', max=3)
@with_conditional_transaction
def _insert_into(self, *record):
return self._sql_insert, record
Expand All @@ -136,6 +154,8 @@ def _delete(self, key):
self._key_column)
return sql, (key,)

@retrying(error_clz=sqlite3.OperationalError,
msg='database .* is locked:', max=3)
def _select(self, *args):
return self._getter.execute(self._sql_select, args).fetchone()

Expand Down
19 changes: 13 additions & 6 deletions persistqueue/sqlqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ def _pop(self):
row = self._select()
# Perhaps a sqlite3 bug, sometimes (None, None) is returned
# by select, below can avoid these invalid records.
if row and row[0] is not None:
self._delete(row[0])
if not self.auto_commit:
# Need to commit if not automatic done by _delete
sqlbase.commit_ignore_error(self._putter)
return row[1] # pickled data
if row is not None:
print("_pop: ", row)
if row and row[0] is not None and row[0] != 0:
self._delete(row[0])
if not self.auto_commit:
# Need to commit if not automatic done by _delete
sqlbase.commit_ignore_error(self._putter)
return row[1] # pickled data
else:
# self._delete(row[0])
print ("warning: row is ", row)
return None
# return b'\x80\x03X\x06\x00\x00\x00var999q\x00.'
return None

def get(self, block=True, timeout=None):
Expand Down
16 changes: 3 additions & 13 deletions tests/test_sqlqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def test_multiple_consumers(self):
def producer():
for x in range(1000):
queue.put('var%d' % x)
print('put: var%d' % x)
task_done_if_required(queue)

counter = []
Expand All @@ -162,7 +163,8 @@ def producer():

def consumer(index):
for i in range(200):
data = queue.get(block=True)
data = queue.get(block=True, timeout=2)
print('get: index=%d, i=%d, data=%s' % (index, i, data))
self.assertTrue('var' in data)
counter[index * 200 + i] = data

Expand Down Expand Up @@ -216,18 +218,6 @@ def test_open_close_1000(self):
def test_open_close_single(self):
self.skipTest('Memory based sqlite is not persistent.')

def test_multiple_consumers(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')

def test_multi_threaded_multi_producer(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')

def test_multi_threaded_parallel(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')


class FILOSQLite3QueueTest(unittest.TestCase):
def setUp(self):
Expand Down