Skip to content

Commit

Permalink
Replace DispatchSource in URLSession on Windows with custom event lis…
Browse files Browse the repository at this point in the history
…tener (swiftlang#4791)
  • Loading branch information
lxbndr committed Apr 23, 2024
1 parent 6254ea0 commit a14fcfe
Showing 1 changed file with 175 additions and 1 deletion.
176 changes: 175 additions & 1 deletion Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ fileprivate extension URLSession._MultiHandle {
if let opaque = socketSourcePtr {
Unmanaged<_SocketSources>.fromOpaque(opaque).release()
}
socketSources?.tearDown()
socketSources = nil
}
if let ss = socketSources {
Expand Down Expand Up @@ -416,7 +417,7 @@ fileprivate extension URLSession._MultiHandle._Timeout {
}
}


#if !os(Windows)
/// Read and write libdispatch sources for a specific socket.
///
/// A simple helper that combines two sources -- both being optional.
Expand Down Expand Up @@ -474,6 +475,179 @@ extension _SocketSources {
}
}
}

#else

private let threadpoolWaitCallback: PTP_WAIT_CALLBACK = { (inst, context, pwa, res) in
guard let sources = _SocketSources.from(socketSourcePtr: context) else {
fatalError("Context is not set in socket callback")
}

sources.socketCallback()
}

private class _SocketSources {
struct SocketEvents: OptionSet {
let rawValue: CLong

static let read = SocketEvents(rawValue: FD_READ)
static let write = SocketEvents(rawValue: FD_WRITE)
}

private var socket: SOCKET = INVALID_SOCKET
private var queue: DispatchQueue?
private var handler: DispatchWorkItem?

// Only the handlerCallout and callback properties are
// accessed concurrently (from queue thread and ThreadpoolWait thread).
// While callback property should not be raced due to specific
// disarm logic, it is still guarded with lock for safety.
private var handlerCallout: DispatchWorkItem?
private var callback: (event: HANDLE, threadpoolWait: PTP_WAIT)?
private let lock = NSLock()

private var networkEvents: CLong = 0
private var events: SocketEvents = [] {
didSet {
guard oldValue != events else {
return
}
triggerIO()
}
}

func triggerIO() {
// Decide which network events we're interested in,
// initialize callback lazily.
let (networkEvents, event) = { () -> (CLong, HANDLE?) in
guard !events.isEmpty else {
return (0, nil)
}
let event = {
if let callback = callback {
return callback.event
}
guard let event = CreateEventW(nil, /* bManualReset */ false, /* bInitialState */ false, nil) else {
fatalError("CreateEventW \(GetLastError())")
}
guard let threadpoolWait = CreateThreadpoolWait(threadpoolWaitCallback, Unmanaged.passUnretained(self).toOpaque(), /* PTP_CALLBACK_ENVIRON */ nil) else {
fatalError("CreateThreadpoolWait \(GetLastError())")
}
SetThreadpoolWait(threadpoolWait, event, /* pftTimeout */ nil)
callback = (event, threadpoolWait)
return event
}()
return (FD_CLOSE | events.rawValue, event)
}()

if self.networkEvents != networkEvents {
guard WSAEventSelect(socket, event, networkEvents) == 0 else {
fatalError("WSAEventSelect \(WSAGetLastError())")
}
self.networkEvents = networkEvents
}

if events.contains(.write) {
// FD_WRITE will only be signaled if the socket becomes writable after
// a send() fails with WSAEWOULDBLOCK. If shis zero-byte send() doesn't fail,
// we could immediately schedule the handler callout.
if send(socket, "", 0, 0) == 0 {
performHandler()
}
} else if events.isEmpty, let callback = callback {
SetThreadpoolWait(callback.threadpoolWait, nil, nil)
WaitForThreadpoolWaitCallbacks(callback.threadpoolWait, /* fCancelPendingCallbacks */ true)
CloseThreadpoolWait(callback.threadpoolWait)
CloseHandle(callback.event)

lock.lock()
self.callback = nil
handlerCallout?.cancel()
handlerCallout = nil
lock.unlock()

handler = nil
}
}

func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
precondition(self.socket == INVALID_SOCKET || self.socket == socket, "Socket value changed")
precondition(self.queue == nil || self.queue === queue, "Queue changed")

self.socket = socket
self.queue = queue
self.handler = handler

events = action.socketEvents
}

func tearDown() {
events = []
}

func socketCallback() {
// Note: this called on ThreadpoolWait thread.
lock.lock()
if let callback = callback {
ResetEvent(callback.event)
SetThreadpoolWait(callback.threadpoolWait, callback.event, /* pftTimeout */ nil)
}
lock.unlock()

performHandler()
}

private func performHandler() {
guard let queue = queue else {
fatalError("Attempting callout without queue set")
}

let handlerCallout = DispatchWorkItem {
self.lock.lock()
self.handlerCallout = nil
self.lock.unlock()

if let handler = self.handler, !handler.isCancelled {
handler.perform()
}

// Check if new callout was scheduled while we were performing the handler.
self.lock.lock()
let hasCallout = self.handlerCallout != nil
self.lock.unlock()
guard !hasCallout, !self.events.isEmpty else {
return
}

self.triggerIO()
}

// Simple callout merge implementation.
// Just do not schedule additional work if there is pending item.
lock.lock()
if self.handlerCallout == nil {
self.handlerCallout = handlerCallout
queue.async(execute: handlerCallout)
}
lock.unlock()
}

}

private extension URLSession._MultiHandle._SocketRegisterAction {
var socketEvents: _SocketSources.SocketEvents {
switch self {
case .none: return []
case .registerRead: return [.read]
case .registerWrite: return [.write]
case .registerReadAndWrite: return [.read, .write]
case .unregister: return []
}
}
}

#endif

extension _SocketSources {
/// Unwraps the `SocketSources`
///
Expand Down

0 comments on commit a14fcfe

Please sign in to comment.