diff --git a/persistqueue/exceptions.py b/persistqueue/exceptions.py index a47806c..5c85446 100644 --- a/persistqueue/exceptions.py +++ b/persistqueue/exceptions.py @@ -7,3 +7,23 @@ class Empty(Exception): class Full(Exception): pass + + +class MaxRetriesReached(Exception): + def __init__(self, msg, orig_ex=None, *args, **kwargs): + self.msg = msg + self.orig_ex = orig_ex + self.args = args + self.kwargs = kwargs + + @property + def message(self): + msg = self.msg + if self.args: + msg = msg % self.args + if self.kwargs: + msg = msg.format(self.kwargs) + + if self.orig_ex: + msg = msg + " Original exception: " + self.orig_ex + return msg diff --git a/persistqueue/lib.py b/persistqueue/lib.py new file mode 100644 index 0000000..89893cf --- /dev/null +++ b/persistqueue/lib.py @@ -0,0 +1,59 @@ +import logging +import re +import time as _time + +from persistqueue.exceptions import MaxRetriesReached + +log = logging.getLogger(__name__) + + +def retrying(error_clz, msg=None, max=3): + """Retry decorator for error. + + :param error_clz: exception for retry. + :param msg: optional exception message for retry. + :param max: max retry count. + """ + # TODO support both @retrying and @retry() as valid syntax + def retry(func): + def wrapper(*args, **kwargs): + i = 0 + while i <= max: + i += 1 # remember execution count + try: + return func(*args, **kwargs) + except Exception as ex: + retry = _retry_on_exception(ex, error_clz, msg) + if retry: + _raise_if_max_reached(i, max, ex) + log.warning("Retrying on error '%s', left retries: " + "%d", ex, max-i) + _time.sleep(i**2 * 0.1) + else: + raise + return wrapper + return retry + + +def _raise_if_max_reached(i, max, ex): + if i >= max: + raise MaxRetriesReached('Max retries reached.', orig_ex=ex) + + +def _retry_on_exception(error, error_clz, msg): + retry = False + if isinstance(error, error_clz): + if msg: + if re.search(msg, str(error)): + retry = True + else: + log.debug("exception '%s' occurred, " + "but message did not match, " + "found: '%s'", error_clz, str(error)) + retry = False + else: + retry = True + else: + log.debug("exception '%s' occurred, but type did not match, " + "found: '%s'", type(error), error_clz) + return retry diff --git a/persistqueue/sqlbase.py b/persistqueue/sqlbase.py index e0a7672..47c6c66 100644 --- a/persistqueue/sqlbase.py +++ b/persistqueue/sqlbase.py @@ -2,6 +2,9 @@ import os import sqlite3 import threading +import uuid + +from persistqueue.lib import retrying sqlite3.enable_callback_tracebacks(True) @@ -10,15 +13,15 @@ def with_conditional_transaction(func): def _execute(obj, *args, **kwargs): - with obj.tran_lock: - if obj.auto_commit: - with obj._putter as tran: + with obj.tran_lock: + if obj.auto_commit: + with obj._putter as tran: + stat, param = func(obj, *args, **kwargs) + tran.execute(stat, param) + else: stat, param = func(obj, *args, **kwargs) - tran.execute(stat, param) - else: - stat, param = func(obj, *args, **kwargs) - obj._putter.execute(stat, param) - # commit_ignore_error(obj._putter) + obj._putter.execute(stat, param) + # commit_ignore_error(obj._putter) return _execute @@ -82,6 +85,7 @@ def _init(self): if self.path == self._MEMORY: self.memory_sql = True + self.memory_id = uuid.uuid1() log.debug("Initializing Sqlite3 Queue in memory.") elif not os.path.exists(self.path): os.makedirs(self.path) @@ -100,6 +104,8 @@ def _init(self): if not self.memory_sql: self._putter = self._new_db_connection( self.path, self.multithreading, self.timeout) + # self._putter.isolation_level = None + # self._putter.isolation_level = "" if self.auto_commit is False: log.warning('auto_commit=False is still experimental,' 'only use it with care.') @@ -110,17 +116,29 @@ def _init(self): self.put_event = threading.Event() def _new_db_connection(self, path, multithreading, timeout): - conn = None if path == self._MEMORY: - conn = sqlite3.connect(path, - check_same_thread=not multithreading) + # try: + # conn = sqlite3.connect( + # database='file:{0}?cache=shared&mode=memory'.format( + # self.memory_id), + # timeout=1000, + # check_same_thread=not multithreading, + # uri=True) + # except TypeError: + conn = sqlite3.connect( + database=':memory:', + timeout=timeout, + check_same_thread=not multithreading) else: conn = sqlite3.connect('{}/data.db'.format(path), timeout=timeout, check_same_thread=not multithreading) - conn.execute('PRAGMA journal_mode=WAL;') + conn.execute('PRAGMA journal_mode=WAL;') + # conn.set_trace_callback(print) return conn + @retrying(error_clz=sqlite3.OperationalError, + msg='database .* is locked:', max=3) @with_conditional_transaction def _insert_into(self, *record): return self._sql_insert, record @@ -136,6 +154,8 @@ def _delete(self, key): self._key_column) return sql, (key,) + @retrying(error_clz=sqlite3.OperationalError, + msg='database .* is locked:', max=3) def _select(self, *args): return self._getter.execute(self._sql_select, args).fetchone() diff --git a/persistqueue/sqlqueue.py b/persistqueue/sqlqueue.py index e73d217..4a13e98 100644 --- a/persistqueue/sqlqueue.py +++ b/persistqueue/sqlqueue.py @@ -49,12 +49,19 @@ def _pop(self): row = self._select() # Perhaps a sqlite3 bug, sometimes (None, None) is returned # by select, below can avoid these invalid records. - if row and row[0] is not None: - self._delete(row[0]) - if not self.auto_commit: - # Need to commit if not automatic done by _delete - sqlbase.commit_ignore_error(self._putter) - return row[1] # pickled data + if row is not None: + print("_pop: ", row) + if row and row[0] is not None and row[0] != 0: + self._delete(row[0]) + if not self.auto_commit: + # Need to commit if not automatic done by _delete + sqlbase.commit_ignore_error(self._putter) + return row[1] # pickled data + else: + # self._delete(row[0]) + print ("warning: row is ", row) + return None + # return b'\x80\x03X\x06\x00\x00\x00var999q\x00.' return None def get(self, block=True, timeout=None): diff --git a/tests/test_sqlqueue.py b/tests/test_sqlqueue.py index fe00f42..ed71f76 100644 --- a/tests/test_sqlqueue.py +++ b/tests/test_sqlqueue.py @@ -153,6 +153,7 @@ def test_multiple_consumers(self): def producer(): for x in range(1000): queue.put('var%d' % x) + print('put: var%d' % x) task_done_if_required(queue) counter = [] @@ -162,7 +163,8 @@ def producer(): def consumer(index): for i in range(200): - data = queue.get(block=True) + data = queue.get(block=True, timeout=2) + print('get: index=%d, i=%d, data=%s' % (index, i, data)) self.assertTrue('var' in data) counter[index * 200 + i] = data @@ -216,18 +218,6 @@ def test_open_close_1000(self): def test_open_close_single(self): self.skipTest('Memory based sqlite is not persistent.') - def test_multiple_consumers(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_multi_threaded_multi_producer(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - - def test_multi_threaded_parallel(self): - self.skipTest('Skipped due to occasional crash during ' - 'multithreading mode.') - class FILOSQLite3QueueTest(unittest.TestCase): def setUp(self):