Skip to content

Commit

Permalink
Plumb thrown errors through all streams
Browse files Browse the repository at this point in the history
  • Loading branch information
saagarjha committed Oct 7, 2024
1 parent 25665aa commit 0348f52
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions unxip.swift
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,18 @@ actor BackpressureStream<Element: Sendable, Backpressure: BackpressureProvider>:
results = .error(error)
nextCondition?.signal()
}

@discardableResult
nonisolated func task<Success>(body: sending @escaping () async throws -> Success) -> Task<Success, Error> {
Task {
do {
return try await body()
} catch {
finish(throwing: error)
throw error
}
}
}
}

actor ConcurrentStream<Element: Sendable> {
Expand Down Expand Up @@ -945,14 +957,10 @@ public enum XIP<S: AsyncSequence>: StreamAperture where S.Element: RandomAccessC
let decompressionStream = ConcurrentStream<Void>(consumeResults: true)
let chunkStream = BackpressureStream(backpressure: CountedBackpressure(max: 16), of: Chunk.self)

Task {
chunkStream.task {
var content = data

do {
try await locateContent(in: &content, options: options)
} catch {
chunkStream.finish(throwing: error)
}
try await locateContent(in: &content, options: options)

let magic = "pbzx".utf8
try await UnxipError.throw(.invalid, if: await !content.read(magic.count).elementsEqual(magic))
Expand Down Expand Up @@ -1005,7 +1013,7 @@ public enum Chunks: StreamAperture {

public static func transform(_ chunks: sending Input, options: Options?) -> Next.Input {
let fileStream = BackpressureStream(backpressure: FileBackpressure(maxSize: 1_000_000_000), of: File.self)
Task {
fileStream.task {
var iterator = chunks.makeAsyncIterator()
var chunk = try await UnxipError.throw(.truncated, ifNil: await iterator.next())
var position = chunk.buffer.startIndex
Expand Down Expand Up @@ -1125,7 +1133,7 @@ public enum Files: StreamAperture {
}
let completion = Completion()

Task {
completion.completionStream.task {
let compressionStream = ConcurrentStream<[UInt8]?>(consumeResults: true)

var hardlinks = [File.Identifier: (String, Task<Void, Error>)]()
Expand Down

0 comments on commit 0348f52

Please sign in to comment.