Skip to content

Commit

Permalink
Propagate request ID for channel pool (#1328)
Browse files Browse the repository at this point in the history
Motivation:

The channel pool currently ignores the request ID call options, meaning
that request IDs are not correctly propagated.

Modifications:

- Apply request ID to call options.

Result:

- Request ID from call options is propagated to the server in request
  headers when using a connection pool.
  • Loading branch information
glbrntt authored Dec 15, 2021
1 parent c43be58 commit df6a583
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 0 deletions.
1 change: 1 addition & 0 deletions Sources/GRPC/CallOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ extension CallOptions {
self.source = source
}

@usableFromInline
internal func requestID() -> String? {
switch self.source {
case .none:
Expand Down
21 changes: 21 additions & 0 deletions Sources/GRPC/ConnectionPool/PooledChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ internal final class PooledChannel: GRPCChannel {
callOptions: CallOptions,
interceptors: [ClientInterceptor<Request, Response>]
) -> Call<Request, Response> where Request: Message, Response: Message {
var callOptions = callOptions
if let requestID = callOptions.requestIDProvider.requestID() {
callOptions.applyRequestID(requestID)
}

let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)

return Call(
Expand All @@ -157,6 +162,11 @@ internal final class PooledChannel: GRPCChannel {
callOptions: CallOptions,
interceptors: [ClientInterceptor<Request, Response>]
) -> Call<Request, Response> where Request: GRPCPayload, Response: GRPCPayload {
var callOptions = callOptions
if let requestID = callOptions.requestIDProvider.requestID() {
callOptions.applyRequestID(requestID)
}

let (stream, eventLoop) = self._makeStreamChannel(callOptions: callOptions)

return Call(
Expand Down Expand Up @@ -192,3 +202,14 @@ internal final class PooledChannel: GRPCChannel {
self._pool.shutdown(mode: .graceful(deadline), promise: promise)
}
}

extension CallOptions {
@usableFromInline
mutating func applyRequestID(_ requestID: String) {
self.logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
// Add the request ID header too.
if let requestIDHeader = self.requestIDHeader {
self.customMetadata.add(name: requestIDHeader, value: requestID)
}
}
}
54 changes: 54 additions & 0 deletions Tests/GRPCTests/EchoHelpers/Providers/MetadataEchoProvider.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import EchoModel
import GRPC
import NIOCore

internal final class MetadataEchoProvider: Echo_EchoProvider {
let interceptors: Echo_EchoServerInterceptorFactoryProtocol? = nil

func get(
request: Echo_EchoRequest,
context: StatusOnlyCallContext
) -> EventLoopFuture<Echo_EchoResponse> {
let response = Echo_EchoResponse.with {
$0.text = context.headers.sorted(by: { $0.name < $1.name }).map {
$0.name + ": " + $0.value
}.joined(separator: "\n")
}

return context.eventLoop.makeSucceededFuture(response)
}

func expand(
request: Echo_EchoRequest,
context: StreamingResponseCallContext<Echo_EchoResponse>
) -> EventLoopFuture<GRPCStatus> {
return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unimplemented))
}

func collect(
context: UnaryResponseCallContext<Echo_EchoResponse>
) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unimplemented))
}

func update(
context: StreamingResponseCallContext<Echo_EchoResponse>
) -> EventLoopFuture<(StreamEvent<Echo_EchoRequest>) -> Void> {
return context.eventLoop.makeFailedFuture(GRPCStatus(code: .unimplemented))
}
}
85 changes: 85 additions & 0 deletions Tests/GRPCTests/RequestIDTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import EchoModel
import GRPC
import NIOCore
import NIOPosix
import XCTest

internal final class RequestIDTests: GRPCTestCase {
private var server: Server!
private var group: EventLoopGroup!

override func setUp() {
super.setUp()

self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.server = try! Server.insecure(group: self.group)
.withServiceProviders([MetadataEchoProvider()])
.withLogger(self.serverLogger)
.bind(host: "127.0.0.1", port: 0)
.wait()
}

override func tearDown() {
XCTAssertNoThrow(try self.server.close().wait())
XCTAssertNoThrow(try self.group.syncShutdownGracefully())
super.tearDown()
}

func testRequestIDIsPopulatedClientConnection() throws {
let channel = ClientConnection.insecure(group: self.group)
.connect(host: "127.0.0.1", port: self.server.channel.localAddress!.port!)

defer {
let loop = group.next()
let promise = loop.makePromise(of: Void.self)
channel.closeGracefully(deadline: .now() + .seconds(30), promise: promise)
XCTAssertNoThrow(try promise.futureResult.wait())
}

try self._testRequestIDIsPopulated(channel: channel)
}

func testRequestIDIsPopulatedChannelPool() throws {
let channel = try! GRPCChannelPool.with(
target: .host("127.0.0.1", port: self.server.channel.localAddress!.port!),
transportSecurity: .plaintext,
eventLoopGroup: self.group
)

defer {
let loop = group.next()
let promise = loop.makePromise(of: Void.self)
channel.closeGracefully(deadline: .now() + .seconds(30), promise: promise)
XCTAssertNoThrow(try promise.futureResult.wait())
}

try self._testRequestIDIsPopulated(channel: channel)
}

func _testRequestIDIsPopulated(channel: GRPCChannel) throws {
let echo = Echo_EchoClient(channel: channel)
let options = CallOptions(
requestIDProvider: .userDefined("foo"),
requestIDHeader: "request-id-header"
)

let get = echo.get(.with { $0.text = "ignored" }, callOptions: options)
let response = try get.response.wait()
XCTAssert(response.text.contains("request-id-header: foo"))
}
}

0 comments on commit df6a583

Please sign in to comment.