Skip to content

Commit

Permalink
Use retry when database is locked
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-wangxu committed Nov 6, 2017
1 parent 8a83bab commit 540e0e6
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 24 deletions.
20 changes: 20 additions & 0 deletions persistqueue/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,23 @@ class Empty(Exception):

class Full(Exception):
pass


class MaxRetriesReached(Exception):
def __init__(self, msg, orig_ex=None, *args, **kwargs):
self.msg = msg
self.orig_ex = orig_ex
self.args = args
self.kwargs = kwargs

@property
def message(self):
msg = self.msg
if self.args:
msg = msg % self.args
if self.kwargs:
msg = msg.format(self.kwargs)

if self.orig_ex:
msg = msg + " Original exception: " + self.orig_ex
return msg
59 changes: 59 additions & 0 deletions persistqueue/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
import re
import time as _time

from persistqueue.exceptions import MaxRetriesReached

log = logging.getLogger(__name__)


def retrying(error_clz, msg=None, max=3):
"""Retry decorator for error.
:param error_clz: exception for retry.
:param msg: optional exception message for retry.
:param max: max retry count.
"""
# TODO support both @retrying and @retry() as valid syntax
def retry(func):
def wrapper(*args, **kwargs):
i = 0
while i <= max:
i += 1 # remember execution count
try:
return func(*args, **kwargs)
except Exception as ex:
retry = _retry_on_exception(ex, error_clz, msg)
if retry:
_raise_if_max_reached(i, max, ex)
log.warning("Retrying on error '%s', left retries: "
"%d", ex, max-i)
_time.sleep(i**2 * 0.1)
else:
raise
return wrapper
return retry


def _raise_if_max_reached(i, max, ex):
if i >= max:
raise MaxRetriesReached('Max retries reached.', orig_ex=ex)


def _retry_on_exception(error, error_clz, msg):
retry = False
if isinstance(error, error_clz):
if msg:
if re.search(msg, str(error)):
retry = True
else:
log.debug("exception '%s' occurred, "
"but message did not match, "
"found: '%s'", error_clz, str(error))
retry = False
else:
retry = True
else:
log.debug("exception '%s' occurred, but type did not match, "
"found: '%s'", type(error), error_clz)
return retry
45 changes: 28 additions & 17 deletions persistqueue/sqlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@
import sqlite3
import threading
import uuid

from persistqueue.lib import retrying

sqlite3.enable_callback_tracebacks(True)

log = logging.getLogger(__name__)


def with_conditional_transaction(func):
def _execute(obj, *args, **kwargs):
with obj.tran_lock:
if obj.auto_commit:
with obj._putter as tran:
with obj.tran_lock:
if obj.auto_commit:
with obj._putter as tran:
stat, param = func(obj, *args, **kwargs)
tran.execute(stat, param)
else:
stat, param = func(obj, *args, **kwargs)
tran.execute(stat, param)
else:
stat, param = func(obj, *args, **kwargs)
obj._putter.execute(stat, param)
# commit_ignore_error(obj._putter)
obj._putter.execute(stat, param)
# commit_ignore_error(obj._putter)

return _execute

Expand Down Expand Up @@ -82,6 +85,7 @@ def _init(self):

if self.path == self._MEMORY:
self.memory_sql = True
self.memory_id = uuid.uuid1()
log.debug("Initializing Sqlite3 Queue in memory.")
elif not os.path.exists(self.path):
os.makedirs(self.path)
Expand All @@ -100,6 +104,8 @@ def _init(self):
if not self.memory_sql:
self._putter = self._new_db_connection(
self.path, self.multithreading, self.timeout)
# self._putter.isolation_level = None
# self._putter.isolation_level = ""
if self.auto_commit is False:
log.warning('auto_commit=False is still experimental,'
'only use it with care.')
Expand All @@ -111,13 +117,14 @@ def _init(self):

def _new_db_connection(self, path, multithreading, timeout):
if path == self._MEMORY:
try:
conn = sqlite3.connect(
database='file:{0}?cache=shared&mode=memory'.format(uuid.uuid1()),
timeout=timeout,
check_same_thread=not multithreading,
uri=True)
except TypeError:
# try:
# conn = sqlite3.connect(
# database='file:{0}?cache=shared&mode=memory'.format(
# self.memory_id),
# timeout=1000,
# check_same_thread=not multithreading,
# uri=True)
# except TypeError:
conn = sqlite3.connect(
database=':memory:',
timeout=timeout,
Expand All @@ -126,10 +133,12 @@ def _new_db_connection(self, path, multithreading, timeout):
conn = sqlite3.connect('{}/data.db'.format(path),
timeout=timeout,
check_same_thread=not multithreading)
conn.execute('PRAGMA journal_mode=WAL;')
conn.set_trace_callback(print)
conn.execute('PRAGMA journal_mode=WAL;')
# conn.set_trace_callback(print)
return conn

@retrying(error_clz=sqlite3.OperationalError,
msg='database .* is locked:', max=3)
@with_conditional_transaction
def _insert_into(self, *record):
return self._sql_insert, record
Expand All @@ -145,6 +154,8 @@ def _delete(self, key):
self._key_column)
return sql, (key,)

@retrying(error_clz=sqlite3.OperationalError,
msg='database .* is locked:', max=3)
def _select(self, *args):
return self._getter.execute(self._sql_select, args).fetchone()

Expand Down
19 changes: 13 additions & 6 deletions persistqueue/sqlqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ def _pop(self):
row = self._select()
# Perhaps a sqlite3 bug, sometimes (None, None) is returned
# by select, below can avoid these invalid records.
if row and row[0] is not None:
self._delete(row[0])
if not self.auto_commit:
# Need to commit if not automatic done by _delete
sqlbase.commit_ignore_error(self._putter)
return row[1] # pickled data
if row is not None:
print("_pop: ", row)
if row and row[0] is not None and row[0] != 0:
self._delete(row[0])
if not self.auto_commit:
# Need to commit if not automatic done by _delete
sqlbase.commit_ignore_error(self._putter)
return row[1] # pickled data
else:
# self._delete(row[0])
print ("warning: row is ", row)
return None
# return b'\x80\x03X\x06\x00\x00\x00var999q\x00.'
return None

def get(self, block=True, timeout=None):
Expand Down
4 changes: 3 additions & 1 deletion tests/test_sqlqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def test_multiple_consumers(self):
def producer():
for x in range(1000):
queue.put('var%d' % x)
print('put: var%d' % x)
task_done_if_required(queue)

counter = []
Expand All @@ -162,7 +163,8 @@ def producer():

def consumer(index):
for i in range(200):
data = queue.get(block=True)
data = queue.get(block=True, timeout=2)
print('get: index=%d, i=%d, data=%s' % (index, i, data))
self.assertTrue('var' in data)
counter[index * 200 + i] = data

Expand Down

0 comments on commit 540e0e6

Please sign in to comment.