diff --git a/threading/channels.nim b/threading/channels.nim index 1c9222e..7904ad7 100644 --- a/threading/channels.nim +++ b/threading/channels.nim @@ -6,7 +6,7 @@ # See the file "copying.txt", included in this # distribution, for details about the copyright. # -# This Channel implementation is a shared memory concurrent queue using +# This Channel implementation is a shared memory, fixed-size, concurrent queue using # a circular buffer for data. Based on channels implementation[1]_ by # Mamy André-Ratsimbazafy (@mratsim), which is a C to Nim translation of the # original[2]_ by Andreas Prell (@aprell) @@ -20,14 +20,18 @@ ## ## This module implements multi-producer multi-consumer channels - a concurrency ## primitive with a high-level interface intended for communication and -## synchronization between threads. It allows sending and receiving typed data, -## enabling safe and efficient concurrency. +## synchronization between threads. It allows sending and receiving typed, isolated +## data, enabling safe and efficient concurrency. ## -## The `Chan` type represents a generic channel object that internally manages +## The `Chan` type represents a generic fixed-size channel object that internally manages ## the underlying resources and synchronization. It has to be initialized using ## the `newChan` proc. Sending and receiving operations are provided by the ## blocking `send` and `recv` procs, and non-blocking `trySend` and `tryRecv` -## procs. +## procs. Send operations add messages to the channel, receiving operations +## remove them. +## +## See also: +## * [std/isolation](https://nim-lang.org/docs/isolation.html) ## ## The following is a simple example of two different ways to use channels: ## blocking and non-blocking. @@ -266,22 +270,37 @@ proc `=copy`*[T](dest: var Chan[T], src: Chan[T]) = dest.d = src.d proc trySend*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} = - ## Sends item to the channel (non-blocking). + ## Tries to send a message to a channel. + ## + ## The memory `src` is moved, not copied. Doesn't block. + ## + ## Returns `false` if the message was not sent because the number of pending + ## items in the channel exceeded its capacity. var data = src.extract result = channelSend(c.d, data.unsafeAddr, sizeof(T), false) if result: wasMoved(data) template trySend*[T](c: Chan[T], src: T): bool = - ## Helper template for `trySend`. + ## Helper template for `trySend <#trySend,Chan[T],sinkIsolated[T]>`_. trySend(c, isolate(src)) proc tryRecv*[T](c: Chan[T], dst: var T): bool {.inline.} = - ## Receives item from the channel (non-blocking). + ## Tries to receive a message from the channel `c` and fill `dst` with its value. + ## This returns immediately even if no message is found. Doesn't block. + ## + ## This can fail for all sort of reasons, including a lack of messages in the channel + ## to receive or contention. + ## + ## If it fails it returns `false`. Otherwise it returns `true`. channelReceive(c.d, dst.addr, sizeof(T), false) proc send*[T](c: Chan[T], src: sink Isolated[T]) {.inline.} = - ## Sends item to the channel (blocking). + ## Sends item to the channel. + ## This blocks the sending thread until the item was successfully sent. + ## + ## If the channel is already full with items this will block the thread until + ## items from the channel are removed. var data = src.extract when defined(gcOrc) and defined(nimSafeOrcSend): GC_runOrc() @@ -293,24 +312,35 @@ template send*[T](c: Chan[T]; src: T) = send(c, isolate(src)) proc recv*[T](c: Chan[T], dst: var T) {.inline.} = - ## Receives item from the channel (blocking). + ## Receives an item from the channel. + ## Fills `dist` with the item. + ## This blocks the receiving thread until an item was successfully received. + ## + ## If the channel does not contain any items this will block the thread until + ## items get sent to the channel. discard channelReceive(c.d, dst.addr, sizeof(T), true) proc recv*[T](c: Chan[T]): T {.inline.} = - ## Receives item from the channel (blocking). + ## Receives an item from the channel. + ## A version of `recv`_ that returns the item. discard channelReceive(c.d, result.addr, sizeof(result), true) proc recvIso*[T](c: Chan[T]): Isolated[T] {.inline.} = + ## Receives an item from the channel. + ## A version of `recv`_ that returns the item and isolates it. var dst: T discard channelReceive(c.d, dst.addr, sizeof(T), true) result = isolate(dst) func peek*[T](c: Chan[T]): int {.inline.} = - ## Returns an estimation of current number of items held by the channel. + ## Returns an estimation of the current number of items held by the channel. numItems(c.d) proc newChan*[T](elements: Positive = 30): Chan[T] = ## An initialization procedure, necessary for acquiring resources and - ## initializing internal state of the channel. + ## initializing internal state of the channel. + ## + ## `elements` is the capacity of the channel and thus how many items it can hold + ## before it refuses to accept any further items. assert elements >= 1, "Elements must be positive!" result = Chan[T](d: allocChannel(sizeof(T), elements))