Skip to content

Commit

Permalink
Fix in-memory queue using shared-cache
Browse files Browse the repository at this point in the history
When share in-memory queue in multi-thread, the queue is not
properly synced between threads, this commit uses shared-cache
to achieve the synchronization.
fixes #31
  • Loading branch information
peter-wangxu committed Nov 1, 2017
1 parent 71ded7c commit 8a83bab
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
17 changes: 13 additions & 4 deletions persistqueue/sqlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import sqlite3
import threading

import uuid
sqlite3.enable_callback_tracebacks(True)

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,15 +110,24 @@ def _init(self):
self.put_event = threading.Event()

def _new_db_connection(self, path, multithreading, timeout):
conn = None
if path == self._MEMORY:
conn = sqlite3.connect(path,
check_same_thread=not multithreading)
try:
conn = sqlite3.connect(
database='file:{0}?cache=shared&mode=memory'.format(uuid.uuid1()),
timeout=timeout,
check_same_thread=not multithreading,
uri=True)
except TypeError:
conn = sqlite3.connect(
database=':memory:',
timeout=timeout,
check_same_thread=not multithreading)
else:
conn = sqlite3.connect('{}/data.db'.format(path),
timeout=timeout,
check_same_thread=not multithreading)
conn.execute('PRAGMA journal_mode=WAL;')
conn.set_trace_callback(print)
return conn

@with_conditional_transaction
Expand Down
12 changes: 0 additions & 12 deletions tests/test_sqlqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,6 @@ def test_open_close_1000(self):
def test_open_close_single(self):
self.skipTest('Memory based sqlite is not persistent.')

def test_multiple_consumers(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')

def test_multi_threaded_multi_producer(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')

def test_multi_threaded_parallel(self):
self.skipTest('Skipped due to occasional crash during '
'multithreading mode.')


class FILOSQLite3QueueTest(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit 8a83bab

Please sign in to comment.