Skip to content

Commit

Permalink
Merge pull request #56 from PhilippMDoerner/bugfix/#55-refactor-to-at…
Browse files Browse the repository at this point in the history
…omics

Bugfix/#55 refactor to atomics
  • Loading branch information
Araq authored Jan 13, 2024
2 parents 836ff67 + 1bf7eb3 commit dd68ccd
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 26 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
nimcache/
nimblecache/
htmldocs/
tests/tsmartptrsleak
tests/tchannels_simple
11 changes: 11 additions & 0 deletions tests/nim.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
--path:"../"
--gc:arc
--threads:on
--define:useMalloc
--cc:clang
@if not windows:
--debugger:native
--passc:"-fsanitize=thread -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer"
--passl:"-fsanitize=thread -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer"
@end
@if windows and i386:
--passc:"-m32"
--passl:"-m32"
@end
66 changes: 42 additions & 24 deletions threading/channels.nim
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ runnableExamples("--threads:on --gc:orc"):
when not defined(gcArc) and not defined(gcOrc) and not defined(nimdoc):
{.error: "This channel implementation requires --gc:arc or --gc:orc".}

import std/[locks, atomics, isolation]
import std/[locks, isolation]
import ./atomics
import system/ansi_c

# Channel
Expand All @@ -111,25 +112,43 @@ type
lock: Lock
spaceAvailableCV, dataAvailableCV: Cond
slots: int ## Number of item slots in the buffer
head: int ## Write/enqueue/send index
tail: int ## Read/dequeue/receive index
head: Atomic[int] ## Write/enqueue/send index
tail: Atomic[int] ## Read/dequeue/receive index
buffer: ptr UncheckedArray[byte]
atomicCounter: Atomic[int]

# ------------------------------------------------------------------------------

func getTail(chan: ChannelRaw, order: Ordering = Relaxed): int {.inline.} =
chan.tail.load(order)

func getHead(chan: ChannelRaw, order: Ordering = Relaxed): int {.inline.} =
chan.head.load(order)

proc setTail(chan: ChannelRaw, value: int, order: Ordering = Relaxed) {.inline.} =
chan.tail.store(value, order)

proc setHead(chan: ChannelRaw, value: int, order: Ordering = Relaxed) {.inline.} =
chan.head.store(value, order)

func getAtomicCounter(chan: ChannelRaw, order: Ordering = Relaxed): int {.inline.} =
chan.atomicCounter.load(order)

proc setAtomicCounter(chan: ChannelRaw, value: int, order: Ordering = Relaxed) {.inline.} =
chan.atomicCounter.store(value, order)

func numItems(chan: ChannelRaw): int {.inline.} =
result = chan.head - chan.tail
result = chan.getHead() - chan.getTail()
if result < 0:
inc(result, 2 * chan.slots)

assert result <= chan.slots

template isFull(chan: ChannelRaw): bool =
abs(chan.head - chan.tail) == chan.slots
abs(chan.getHead() - chan.getTail()) == chan.slots

template isEmpty(chan: ChannelRaw): bool =
chan.head == chan.tail
chan.getHead() == chan.getTail()

# Channels memory ops
# ------------------------------------------------------------------------------
Expand All @@ -145,9 +164,9 @@ proc allocChannel(size, n: int): ChannelRaw =
initCond(result.dataAvailableCV)

result.slots = n
result.head = 0
result.tail = 0
result.atomicCounter.store(0, moRelaxed)
result.setHead(0)
result.setTail(0)
result.setAtomicCounter(0)


proc freeChannel(chan: ChannelRaw) =
Expand Down Expand Up @@ -186,16 +205,15 @@ proc channelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bo

assert not chan.isFull()

let writeIdx = if chan.head < chan.slots:
chan.head
let writeIdx = if chan.getHead() < chan.slots:
chan.getHead()
else:
chan.head - chan.slots
chan.getHead() - chan.slots

copyMem(chan.buffer[writeIdx * size].addr, data, size)

inc(chan.head)
if chan.head == 2 * chan.slots:
chan.head = 0
atomicInc(chan.head)
if chan.getHead() == 2 * chan.slots:
chan.setHead(0)

signal(chan.dataAvailableCV)
release(chan.lock)
Expand All @@ -221,16 +239,16 @@ proc channelReceive(chan: ChannelRaw, data: pointer, size: int, blocking: static

assert not chan.isEmpty()

let readIdx = if chan.tail < chan.slots:
chan.tail
let readIdx = if chan.getTail() < chan.slots:
chan.getTail()
else:
chan.tail - chan.slots
chan.getTail() - chan.slots

copyMem(data, chan.buffer[readIdx * size].addr, size)

inc(chan.tail)
if chan.tail == 2 * chan.slots:
chan.tail = 0
atomicInc(chan.tail)
if chan.getTail() == 2 * chan.slots:
chan.setTail(0)

signal(chan.spaceAvailableCV)
release(chan.lock)
Expand All @@ -246,15 +264,15 @@ type
when defined(nimAllowNonVarDestructor):
proc `=destroy`*[T](c: Chan[T]) =
if c.d != nil:
if load(c.d.atomicCounter, moAcquire) == 0:
if c.d.getAtomicCounter(Acquire) == 0:
if c.d.buffer != nil:
freeChannel(c.d)
else:
atomicDec(c.d.atomicCounter)
else:
proc `=destroy`*[T](c: var Chan[T]) =
if c.d != nil:
if load(c.d.atomicCounter, moAcquire) == 0:
if c.d.getAtomicCounter(Acquire) == 0:
if c.d.buffer != nil:
freeChannel(c.d)
else:
Expand Down
4 changes: 2 additions & 2 deletions threading/smartptrs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ proc decr[T](p: SharedPtr[T]) {.inline.} =
if p.val != nil:
# this `fetchSub` returns current val then subs
# so count == 0 means we're the last
if p.val.counter.fetchSub(1, Release) == 0:
if p.val.counter.fetchSub(1, AcqRel) == 0:
`=destroy`(p.val.value)
deallocShared(p.val)

Expand All @@ -119,7 +119,7 @@ proc newSharedPtr*[T](val: sink Isolated[T]): SharedPtr[T] {.nodestroy.} =
## Returns a shared pointer which shares
## ownership of the object by reference counting.
result.val = cast[typeof(result.val)](allocShared(sizeof(result.val[])))
int(result.val.counter) = 0
result.val.counter.store(0)
result.val.value = extract val

template newSharedPtr*[T](val: T): SharedPtr[T] =
Expand Down

0 comments on commit dd68ccd

Please sign in to comment.