Skip to content

Commit

Permalink
Add syncSend and syncReceive
Browse files Browse the repository at this point in the history
  • Loading branch information
gh123man committed Oct 18, 2024
1 parent 50bec3f commit e2655cf
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 2 deletions.
38 changes: 36 additions & 2 deletions Sources/AsyncChannels/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public final class Channel<T: Sendable>: @unchecked Sendable {
return false
}

/// Sends data on the channel. This function will suspend until a reciever is ready or buffer space is avalible.
/// - Parameter value: The data to send.
@inline(__always)
public func send(_ value: T) async {
mutex.lock()
Expand All @@ -117,6 +119,19 @@ public final class Channel<T: Sendable>: @unchecked Sendable {
}
}

/// Sends data synchonosly. Returns true if the data was sent.
/// A fatal error will be triggered if you attpend to send on a closed channel.
/// - Parameter value: The input data.
@inline(__always)
public func syncSend(_ value: T) -> Bool {
mutex.lock()
if nonBlockingSend(value) {
return true
}
mutex.unlock()
return false
}

@inline(__always)
private func nonBlockingReceive() -> T? {
if buffer.isEmpty {
Expand All @@ -142,7 +157,10 @@ public final class Channel<T: Sendable>: @unchecked Sendable {
}
return val
}


/// Receive data from the channel. This function will suspend until a sender is ready or there is data in the buffer.
/// This functionw will return `nil` when the channel is closed after all buffered data is read.
/// - Returns: data or nil.
@inline(__always)
public func receive() async -> T? {
mutex.lock()
Expand All @@ -164,13 +182,29 @@ public final class Channel<T: Sendable>: @unchecked Sendable {
}
}


/// Receive data synchronosly. Returns nil if there is no data or the channel is closed.
/// This function will never block or suspend.
/// - Returns: The data or nil
@inline(__always)
public func syncReceive() -> T? {
mutex.lock()
if let val = nonBlockingReceive() {
return val
}
mutex.unlock()
return nil
}


/// Closes the channel. A channel cannot be reopened.
/// Once a channel is closed, no more data can be writeen. The remaining data can be read until the buffer is empty.
public func close() {
mutex.lock()
defer { mutex.unlock() }
closed = true
selectWaiter?.signal()


while let recvW = recvQueue.pop() {
recvW.resume(returning: nil)
}
Expand Down
21 changes: 21 additions & 0 deletions Tests/AsyncChannelsTests/AsyncChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -806,4 +806,25 @@ final class AsyncTest: XCTestCase {
await data <- "bar"
await signal <- .done
}

func testSyncSendRecieve() {
let data = Channel<String>(capacity: 3)

XCTAssertTrue(data.syncSend("1"))
XCTAssertTrue(data.syncSend("2"))
XCTAssertTrue(data.syncSend("3"))
XCTAssertFalse(data.syncSend("4"))

XCTAssertEqual(data.syncReceive(), "1")
XCTAssertEqual(data.syncReceive(), "2")
XCTAssertEqual(data.syncReceive(), "3")
XCTAssertNil(data.syncReceive())
XCTAssertNil(data.syncReceive())

XCTAssertTrue(data.syncSend("4"))
XCTAssertEqual(data.syncReceive(), "4")

data.close()
XCTAssertNil(data.syncReceive())
}
}

0 comments on commit e2655cf

Please sign in to comment.