Skip to content

Commit

Permalink
Update code for Swift 6
Browse files Browse the repository at this point in the history
  • Loading branch information
saagarjha committed Sep 30, 2024
1 parent 65f0833 commit 7b6d791
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
test:
strategy:
matrix:
os: [macos-14, ubuntu-22.04]
os: [macos-15, ubuntu-22.04]
name: Test
runs-on: ${{ matrix.os }}
environment: ci
Expand Down
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.5
// swift-tools-version:6.0
import PackageDescription

#if os(macOS)
Expand Down
154 changes: 85 additions & 69 deletions unxip.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#if !canImport(Glibc)
@preconcurrency import unistd // optind
#endif
import Foundation

#if canImport(Compression)
Expand Down Expand Up @@ -112,13 +115,52 @@ struct Queue<Element> {
}
}

extension AsyncThrowingStream where Failure == Error {
actor PermissiveActionLink<S: AsyncSequence> where S.Element == Element {
protocol ErasedIterator<Element>: AsyncIteratorProtocol, Sendable {
}

public struct ErasedSequence<Element>: AsyncSequence {
struct ErasedButBarelyLikeWithThosePinkPearlThingsSequence<S: AsyncSequence>: AsyncSequence where S.AsyncIterator: Sendable {
struct Iterator: ErasedIterator {
var iterator: S.AsyncIterator

mutating func next() async throws -> S.Element? {
try await iterator.next()
}
}

let sequence: S

func makeAsyncIterator() -> Iterator {
.init(iterator: sequence.makeAsyncIterator())
}
}

public struct Iterator<T>: AsyncIteratorProtocol, Sendable {
var iterator: any ErasedIterator<Element>

public mutating func next() async throws -> Element? {
try await iterator.next()
}
}

let iterator: any ErasedIterator<Element>

init<S: AsyncSequence>(sequence: S) where S.Element == Element, S.AsyncIterator: Sendable {
iterator = ErasedButBarelyLikeWithThosePinkPearlThingsSequence(sequence: sequence).makeAsyncIterator()
}

public func makeAsyncIterator() -> Iterator<Element> {
.init(iterator: iterator)
}
}

extension AsyncThrowingStream where Element: Sendable, Failure == Error {
actor PermissiveActionLink<S: AsyncSequence> where S.Element == Element, S.AsyncIterator: Sendable {
var iterator: S.AsyncIterator
let count: Int
var queued = [CheckedContinuation<Element?, Error>]()

init(iterator: S.AsyncIterator, count: Int) {
init(iterator: sending S.AsyncIterator, count: Int) {
self.iterator = iterator
self.count = count
}
Expand Down Expand Up @@ -150,13 +192,6 @@ extension AsyncThrowingStream where Failure == Error {
queued.removeAll()
}
}

init<S: AsyncSequence>(erasing sequence: S) where S.Element == Element {
var iterator = sequence.makeAsyncIterator()
self.init {
try await iterator.next()
}
}
}

protocol BackpressureProvider {
Expand Down Expand Up @@ -210,7 +245,7 @@ final class FileBackpressure: BackpressureProvider {
}
}

actor BackpressureStream<Element, Backpressure: BackpressureProvider>: AsyncSequence where Backpressure.Element == Element {
actor BackpressureStream<Element: Sendable, Backpressure: BackpressureProvider>: AsyncSequence where Backpressure.Element == Element {
struct Iterator: AsyncIteratorProtocol {
let stream: BackpressureStream

Expand Down Expand Up @@ -309,30 +344,11 @@ actor BackpressureStream<Element, Backpressure: BackpressureProvider>: AsyncSequ
}
}

actor ConcurrentStream<Element> {
class Wrapper {
var stream: AsyncThrowingStream<Element, Error>!
var continuation: AsyncThrowingStream<Element, Error>.Continuation!
}
actor ConcurrentStream<Element: Sendable> {
let results: AsyncThrowingStream<Element, Error>
let continuation: AsyncThrowingStream<Element, Error>.Continuation

let wrapper = Wrapper()
let batchSize: Int
nonisolated var results: AsyncThrowingStream<Element, Error> {
get {
wrapper.stream
}
set {
wrapper.stream = newValue
}
}
nonisolated var continuation: AsyncThrowingStream<Element, Error>.Continuation {
get {
wrapper.continuation
}
set {
wrapper.continuation = newValue
}
}
var index = -1
var finishedIndex = Int?.none
var completedIndex = -1
Expand All @@ -341,9 +357,7 @@ actor ConcurrentStream<Element> {

init(batchSize: Int = 2 * ProcessInfo.processInfo.activeProcessorCount, consumeResults: Bool = false) {
self.batchSize = batchSize
results = AsyncThrowingStream<Element, Error> {
continuation = $0
}
(results, continuation) = AsyncThrowingStream.makeStream(of: Element.self, throwing: Error.self)
if consumeResults {
Task {
for try await _ in results {
Expand Down Expand Up @@ -531,7 +545,7 @@ public struct DataReader<S: AsyncSequence> where S.Element: RandomAccessCollecti
}
}

extension DataReader where S == AsyncThrowingStream<[UInt8], Error> {
extension DataReader where S == ErasedSequence<[UInt8]> {
public init(descriptor: CInt) {
self.init(data: Self.data(readingFrom: descriptor))
}
Expand Down Expand Up @@ -580,13 +594,12 @@ extension DataReader where S == AsyncThrowingStream<[UInt8], Error> {
#if PROFILING
os_signpost(.end, log: readLog, name: "Read", signpostID: id, "Ended read")
#endif
let chunk = chunk
let chunk = [UInt8](unsafeUninitializedCapacity: chunk.count) { buffer, count in
_ = chunk.copyBytes(to: buffer, from: nil)
count = chunk.count
}
Task {
await stream.yield(
[UInt8](unsafeUninitializedCapacity: chunk.count) { buffer, count in
_ = chunk.copyBytes(to: buffer, from: nil)
count = chunk.count
})
await stream.yield(chunk)
continuation.resume(returning: true)
}
}
Expand All @@ -596,7 +609,7 @@ extension DataReader where S == AsyncThrowingStream<[UInt8], Error> {
}
}

return .init(erasing: stream)
return .init(sequence: stream)
}
}

Expand All @@ -615,7 +628,7 @@ public struct Chunk: Sendable {
}
}

public struct File {
public struct File: Sendable {
public let dev: Int
public let ino: Int
public let mode: Int
Expand Down Expand Up @@ -819,9 +832,9 @@ public struct File {
public protocol StreamAperture {
associatedtype Input
associatedtype Next: StreamAperture
associatedtype Options
associatedtype Options: Sendable

static func transform(_: Input, options: Options?) -> Next.Input
static func transform(_: sending Input, options: Options?) -> Next.Input
}

protocol Decompressor {
Expand Down Expand Up @@ -871,9 +884,9 @@ public enum XIP<S: AsyncSequence>: StreamAperture where S.Element: RandomAccessC
public typealias Input = DataReader<S>
public typealias Next = Chunks

public struct Options {
let zlibDecompressor: ([UInt8], Int) throws -> [UInt8]
let lzmaDecompressor: ([UInt8], Int) throws -> [UInt8]
public struct Options: Sendable {
let zlibDecompressor: @Sendable ([UInt8], Int) throws -> [UInt8]
let lzmaDecompressor: @Sendable ([UInt8], Int) throws -> [UInt8]

init<Zlib: Decompressor, LZMA: Decompressor>(zlibDecompressor: Zlib.Type, lzmaDecompressor: LZMA.Type) {
self.zlibDecompressor = Zlib.decompress
Expand Down Expand Up @@ -953,7 +966,7 @@ public enum XIP<S: AsyncSequence>: StreamAperture where S.Element: RandomAccessC
file.cap = file.position + contentSize
}

public static func transform(_ data: Input, options: Options?) -> Next.Input {
public static func transform(_ data: sending Input, options: Options?) -> Next.Input {
let options = options ?? Self.defaultOptions

let decompressionStream = ConcurrentStream<Void>(consumeResults: true)
Expand Down Expand Up @@ -1007,17 +1020,17 @@ public enum XIP<S: AsyncSequence>: StreamAperture where S.Element: RandomAccessC
await decompressionStream.finish()
}

return .init(erasing: chunkStream)
return .init(sequence: chunkStream)
}
}

public enum Chunks: StreamAperture {
public typealias Input = AsyncThrowingStream<Chunk, Error>
public typealias Input = ErasedSequence<Chunk>
public typealias Next = Files

public typealias Options = Never

public static func transform(_ chunks: Input, options: Options?) -> Next.Input {
public static func transform(_ chunks: sending Input, options: Options?) -> Next.Input {
let fileStream = BackpressureStream(backpressure: FileBackpressure(maxSize: 1_000_000_000), of: File.self)
Task {
var iterator = chunks.makeAsyncIterator()
Expand All @@ -1038,9 +1051,11 @@ public enum Chunks: StreamAperture {
}

func readOctal(from bytes: [UInt8]) throws -> Int {
try UnxipError.throw(.invalid, ifNil: String(data: Data(bytes), encoding: .utf8).map {
Int($0, radix: 8)
} ?? nil)
try UnxipError.throw(
.invalid,
ifNil: String(data: Data(bytes), encoding: .utf8).map {
Int($0, radix: 8)
} ?? nil)
}

while true {
Expand All @@ -1059,7 +1074,7 @@ public enum Chunks: StreamAperture {
var filesize = try readOctal(from: await read(size: 11))
let _name = try await read(size: namesize)
try UnxipError.throw(.invalid, if: _name.last != 0)
let name = String(cString: _name)
let name = String(decoding: _name.dropLast(), as: UTF8.self)
var file = File(dev: dev, ino: ino, mode: mode, name: name)

while filesize > 0 {
Expand Down Expand Up @@ -1095,15 +1110,15 @@ public enum Chunks: StreamAperture {
await fileStream.yield(file)
}
}
return .init(erasing: fileStream)
return .init(sequence: fileStream)
}
}

public enum Files: StreamAperture {
public typealias Input = AsyncThrowingStream<File, Error>
public typealias Input = ErasedSequence<File>
public typealias Next = Disk

public struct Options {
public struct Options: Sendable {
public let compress: Bool
public let dryRun: Bool

Expand All @@ -1117,7 +1132,7 @@ public enum Files: StreamAperture {
.init(compress: true, dryRun: false)
}

public static func transform(_ files: Input, options: Options?) -> Next.Input {
public static func transform(_ files: sending Input, options: Options?) -> Next.Input {
let options = options ?? Self.defaultOptions
let taskStream = ConcurrentStream<Void>()

Expand Down Expand Up @@ -1315,12 +1330,12 @@ public enum Files: StreamAperture {
completion.completionStream.finish()
}

return .init(erasing: completion.completionStream)
return .init(sequence: completion.completionStream)
}
}

public enum Disk: StreamAperture {
public typealias Input = AsyncThrowingStream<File, Error>
public typealias Input = ErasedSequence<File>
public typealias Next = Disk // Irrelevant because this is never used

public typealias Options = Never
Expand Down Expand Up @@ -1353,15 +1368,16 @@ public struct UnxipStream<T: StreamAperture> {
}

public struct Unxip {
public static func makeStream<Start: StreamAperture, End: StreamAperture>(from start: UnxipStream<Start>, to end: UnxipStream<End>, input: Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil, _ option3: Start.Next.Next.Options? = nil) -> End.Input where Start.Next.Next.Next == End {
public static func makeStream<Start: StreamAperture, End: StreamAperture>(from start: UnxipStream<Start>, to end: UnxipStream<End>, input: sending Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil, _ option3: Start.Next.Next.Options? = nil) -> End.Input where Start.Next.Next.Next == End {
Start.Next.Next.transform(Start.Next.transform(Start.transform(input, options: option1), options: option2), options: option3)
}

public static func makeStream<Start: StreamAperture, End: StreamAperture>(from start: UnxipStream<Start>, to end: UnxipStream<End>, input: Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil) -> End.Input where Start.Next.Next == End {
Start.Next.transform(Start.transform(input, options: option1), options: option2)
public static func makeStream<Start: StreamAperture, End: StreamAperture>(from start: UnxipStream<Start>, to end: UnxipStream<End>, input: sending Start.Input, _ option1: Start.Options? = nil, _ option2: Start.Next.Options? = nil) -> End.Input where Start.Next.Next == End {
let input = Start.transform(input, options: option1)
return Start.Next.transform(input, options: option2)
}

public static func makeStream<Start: StreamAperture, End: StreamAperture>(from start: UnxipStream<Start>, to end: UnxipStream<End>, input: Start.Input, _ option1: Start.Options? = nil) -> End.Input where Start.Next == End {
public static func makeStream<Start: StreamAperture, End: StreamAperture>(from start: UnxipStream<Start>, to end: UnxipStream<End>, input: sending Start.Input, _ option1: Start.Options? = nil) -> End.Input where Start.Next == End {
Start.transform(input, options: option1)
}

Expand All @@ -1371,7 +1387,7 @@ public struct Unxip {
}
}

extension AsyncSequence {
extension AsyncSequence where Element: Sendable, AsyncIterator: Sendable {
public func lockstepSplit() -> (AsyncThrowingStream<Element, Error>, AsyncThrowingStream<Element, Error>) {
let pal = AsyncThrowingStream.PermissiveActionLink<Self>(iterator: makeAsyncIterator(), count: 2)

Expand Down

0 comments on commit 7b6d791

Please sign in to comment.