Skip to content

Commit

Permalink
Fix in-memory queue using shared-cache
Browse files Browse the repository at this point in the history
When share in-memory queue in multi-thread, the queue is not
properly synced between threads, this commit uses shared-cache
to achive the synchronization.
fixes #31
  • Loading branch information
peter-wangxu committed Nov 1, 2017
1 parent 71ded7c commit 6cd79ad
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 18 deletions.
19 changes: 13 additions & 6 deletions persistqueue/sqlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ def _init(self):
self._conn.commit()
# Setup another session only for disk-based queue.
if self.multithreading:
if not self.memory_sql:
self._putter = self._new_db_connection(
self.path, self.multithreading, self.timeout)
self._putter = self._new_db_connection(
self.path, self.multithreading, self.timeout)
if self.auto_commit is False:
log.warning('auto_commit=False is still experimental,'
'only use it with care.')
Expand All @@ -110,10 +109,18 @@ def _init(self):
self.put_event = threading.Event()

def _new_db_connection(self, path, multithreading, timeout):
conn = None
if path == self._MEMORY:
conn = sqlite3.connect(path,
check_same_thread=not multithreading)
try:
conn = sqlite3.connect(
database='file::memory:?cache=shared&model=memory',
timeout=timeout,
check_same_thread=not multithreading,
uri=True)
except TypeError:
conn = sqlite3.connect(
database='file::memory:?cache=shared&model=memory',
timeout=timeout,
check_same_thread=not multithreading)
else:
conn = sqlite3.connect('{}/data.db'.format(path),
timeout=timeout,
Expand Down
12 changes: 0 additions & 12 deletions tests/test_sqlqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,6 @@ def test_open_close_1000(self):
def test_open_close_single(self):
self.skipTest('Memory based sqlite is not persistent.')

def test_multiple_consumers(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')

def test_multi_threaded_multi_producer(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')

def test_multi_threaded_parallel(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')


class FILOSQLite3QueueTest(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit 6cd79ad

Please sign in to comment.