From 0348f523cf633a92dc9e5b23c6a1fe701f6f8098 Mon Sep 17 00:00:00 2001 From: Saagar Jha Date: Mon, 7 Oct 2024 01:22:56 -0700 Subject: [PATCH] Plumb thrown errors through all streams --- unxip.swift | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/unxip.swift b/unxip.swift index 7de647a..605cbbb 100644 --- a/unxip.swift +++ b/unxip.swift @@ -344,6 +344,18 @@ actor BackpressureStream: results = .error(error) nextCondition?.signal() } + + @discardableResult + nonisolated func task(body: sending @escaping () async throws -> Success) -> Task { + Task { + do { + return try await body() + } catch { + finish(throwing: error) + throw error + } + } + } } actor ConcurrentStream { @@ -945,14 +957,10 @@ public enum XIP: StreamAperture where S.Element: RandomAccessC let decompressionStream = ConcurrentStream(consumeResults: true) let chunkStream = BackpressureStream(backpressure: CountedBackpressure(max: 16), of: Chunk.self) - Task { + chunkStream.task { var content = data - do { - try await locateContent(in: &content, options: options) - } catch { - chunkStream.finish(throwing: error) - } + try await locateContent(in: &content, options: options) let magic = "pbzx".utf8 try await UnxipError.throw(.invalid, if: await !content.read(magic.count).elementsEqual(magic)) @@ -1005,7 +1013,7 @@ public enum Chunks: StreamAperture { public static func transform(_ chunks: sending Input, options: Options?) -> Next.Input { let fileStream = BackpressureStream(backpressure: FileBackpressure(maxSize: 1_000_000_000), of: File.self) - Task { + fileStream.task { var iterator = chunks.makeAsyncIterator() var chunk = try await UnxipError.throw(.truncated, ifNil: await iterator.next()) var position = chunk.buffer.startIndex @@ -1125,7 +1133,7 @@ public enum Files: StreamAperture { } let completion = Completion() - Task { + completion.completionStream.task { let compressionStream = ConcurrentStream<[UInt8]?>(consumeResults: true) var hardlinks = [File.Identifier: (String, Task)]()