diff --git a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift index 830dfaf6c4c..388ce8e59c8 100644 --- a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift +++ b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift @@ -127,6 +127,7 @@ fileprivate extension URLSession._MultiHandle { if let opaque = socketSourcePtr { Unmanaged<_SocketSources>.fromOpaque(opaque).release() } + socketSources?.tearDown() socketSources = nil } if let ss = socketSources { @@ -416,7 +417,7 @@ fileprivate extension URLSession._MultiHandle._Timeout { } } - +#if !os(Windows) /// Read and write libdispatch sources for a specific socket. /// /// A simple helper that combines two sources -- both being optional. @@ -474,6 +475,179 @@ extension _SocketSources { } } } + +#else + +private let threadpoolWaitCallback: PTP_WAIT_CALLBACK = { (inst, context, pwa, res) in + guard let sources = _SocketSources.from(socketSourcePtr: context) else { + fatalError("Context is not set in socket callback") + } + + sources.socketCallback() +} + +private class _SocketSources { + struct SocketEvents: OptionSet { + let rawValue: CLong + + static let read = SocketEvents(rawValue: FD_READ) + static let write = SocketEvents(rawValue: FD_WRITE) + } + + private var socket: SOCKET = INVALID_SOCKET + private var queue: DispatchQueue? + private var handler: DispatchWorkItem? + + // Only the handlerCallout and callback properties are + // accessed concurrently (from queue thread and ThreadpoolWait thread). + // While callback property should not be raced due to specific + // disarm logic, it is still guarded with lock for safety. + private var handlerCallout: DispatchWorkItem? + private var callback: (event: HANDLE, threadpoolWait: PTP_WAIT)? + private let lock = NSLock() + + private var networkEvents: CLong = 0 + private var events: SocketEvents = [] { + didSet { + guard oldValue != events else { + return + } + triggerIO() + } + } + + func triggerIO() { + // Decide which network events we're interested in, + // initialize callback lazily. + let (networkEvents, event) = { () -> (CLong, HANDLE?) in + guard !events.isEmpty else { + return (0, nil) + } + let event = { + if let callback = callback { + return callback.event + } + guard let event = CreateEventW(nil, /* bManualReset */ false, /* bInitialState */ false, nil) else { + fatalError("CreateEventW \(GetLastError())") + } + guard let threadpoolWait = CreateThreadpoolWait(threadpoolWaitCallback, Unmanaged.passUnretained(self).toOpaque(), /* PTP_CALLBACK_ENVIRON */ nil) else { + fatalError("CreateThreadpoolWait \(GetLastError())") + } + SetThreadpoolWait(threadpoolWait, event, /* pftTimeout */ nil) + callback = (event, threadpoolWait) + return event + }() + return (FD_CLOSE | events.rawValue, event) + }() + + if self.networkEvents != networkEvents { + guard WSAEventSelect(socket, event, networkEvents) == 0 else { + fatalError("WSAEventSelect \(WSAGetLastError())") + } + self.networkEvents = networkEvents + } + + if events.contains(.write) { + // FD_WRITE will only be signaled if the socket becomes writable after + // a send() fails with WSAEWOULDBLOCK. If shis zero-byte send() doesn't fail, + // we could immediately schedule the handler callout. + if send(socket, "", 0, 0) == 0 { + queue!.async(execute: handler!) + } + } else if events.isEmpty, let callback = callback { + SetThreadpoolWait(callback.threadpoolWait, nil, nil) + WaitForThreadpoolWaitCallbacks(callback.threadpoolWait, /* fCancelPendingCallbacks */ true) + CloseThreadpoolWait(callback.threadpoolWait) + CloseHandle(callback.event) + + lock.lock() + self.callback = nil + handlerCallout?.cancel() + handlerCallout = nil + lock.unlock() + + handler = nil + } + } + + func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { + precondition(self.socket == INVALID_SOCKET || self.socket == socket, "Socket value changed") + precondition(self.queue == nil || self.queue === queue, "Queue changed") + + self.socket = socket + self.queue = queue + self.handler = handler + + events = action.socketEvents + } + + func tearDown() { + events = [] + } + + func socketCallback() { + // Note: this called on ThreadpoolWait thread. + lock.lock() + if let callback = callback { + ResetEvent(callback.event) + SetThreadpoolWait(callback.threadpoolWait, callback.event, /* pftTimeout */ nil) + } + lock.unlock() + + performHandler() + } + + private func performHandler() { + guard let queue = queue else { + fatalError("Attempting callout without queue set") + } + + let handlerCallout = DispatchWorkItem { + self.lock.lock() + self.handlerCallout = nil + self.lock.unlock() + + if let handler = self.handler, !handler.isCancelled { + handler.perform() + } + + // Check if new callout was scheduled while we were performing the handler. + self.lock.lock() + let hasCallout = self.handlerCallout != nil + self.lock.unlock() + guard !hasCallout, !self.events.isEmpty else { + return + } + + self.triggerIO() + } + + // Simple callout merge implementation. + // Just do not schedule additional work if there is pending item. + lock.lock() + if self.handlerCallout == nil { + self.handlerCallout = handlerCallout + queue.async(execute: handlerCallout) + } + lock.unlock() + } + +} + +private extension URLSession._MultiHandle._SocketRegisterAction { + var socketEvents: _SocketSources.SocketEvents { + switch self { + case .none: return [] + case .registerRead: return [.read] + case .registerWrite: return [.write] + case .registerReadAndWrite: return [.read, .write] + case .unregister: return [] + } + } +} + +#endif + extension _SocketSources { /// Unwraps the `SocketSources` ///