Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(discovery): support endpointslices in kubernetes discovery #10916

Merged
merged 8 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apisix/discovery/kubernetes/informer_factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ function _M.new(group, version, kind, plural, namespace)
end

if namespace and namespace ~= "" then
path = path .. "/namespace/" .. namespace
path = path .. "/namespaces/" .. namespace
dongjiang1989 marked this conversation as resolved.
Show resolved Hide resolved
end
path = path .. "/" .. plural

Expand Down
96 changes: 89 additions & 7 deletions apisix/discovery/kubernetes/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,69 @@ local function sort_nodes_cmp(left, right)
return left.port < right.port
end

local function on_endpoint_slices_modified(handle, endpoint)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
return
end

core.log.debug(core.json.delay_encode(endpoint))
core.table.clear(endpoint_buffer)

local endpointslices = endpoint.endpoints
for _, endpointslice in ipairs(endpointslices or {}) do
if endpointslice.addresses then
local addresses = endpointslices.addresses
for _, port in ipairs(endpointslice.ports or {}) do
local port_name
if port.name then
port_name = port.name
elseif port.targetPort then
port_name = tostring(port.targetPort)
else
port_name = tostring(port.port)
end

if endpointslice.conditions and endpointslice.condition.ready then
local nodes = endpoint_buffer[port_name]
if nodes == nil then
nodes = core.table.new(0, #endpointslices * #addresses)
endpoint_buffer[port_name] = nodes
end

for _, address in ipairs(endpointslices.addresses) do
core.table.insert(nodes, {
host = address.ip,
port = port.port,
weight = handle.default_weight
})
end
end
end
end
end

for _, ports in pairs(endpoint_buffer) do
for _, nodes in pairs(ports) do
core.table.sort(nodes, sort_nodes_cmp)
end
end
local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
local endpoint_content = core.json.encode(endpoint_buffer, true)
local endpoint_version = ngx.crc32_long(endpoint_content)

local _, err
_, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version)
if err then
core.log.error("set endpoint version into discovery DICT failed, ", err)
return
end
_, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
if err then
core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
end
end

local function on_endpoint_modified(handle, endpoint)
if handle.namespace_selector and
Expand Down Expand Up @@ -367,8 +430,12 @@ local function single_mode_init(conf)
end

local default_weight = conf.default_weight

local endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
local endpoints_informer, err
if conf.watch_endpoint_slices_schema then
dongjiang1989 marked this conversation as resolved.
Show resolved Hide resolved
endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1", "EndpointSlice", "endpointslices", "")
else
endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
end
if err then
error(err)
return
Expand All @@ -377,8 +444,13 @@ local function single_mode_init(conf)
setup_namespace_selector(conf, endpoints_informer)
setup_label_selector(conf, endpoints_informer)

endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
if conf.watch_endpoint_slices_schema then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
end
endpoints_informer.on_deleted = on_endpoint_deleted
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
Expand Down Expand Up @@ -463,7 +535,12 @@ local function multiple_mode_init(confs)

local default_weight = conf.default_weight

local endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
local endpoints_informer, err
if conf.watch_endpoint_slices_schema then
endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1", "EndpointSlice", "endpointslices", "")
else
endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
end
if err then
error(err)
return
Expand All @@ -472,8 +549,13 @@ local function multiple_mode_init(confs)
setup_namespace_selector(conf, endpoints_informer)
setup_label_selector(conf, endpoints_informer)

endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
if conf.watch_endpoint_slices_schema then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
end
endpoints_informer.on_deleted = on_endpoint_deleted
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
Expand Down
7 changes: 7 additions & 0 deletions apisix/discovery/kubernetes/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ local shared_size_schema = {
default = "1m",
}

local watch_endpoint_slices_schema = {
type = "boolean",
default = false,
}

return {
anyOf = {
{
Expand Down Expand Up @@ -160,6 +165,7 @@ return {
label_selector = label_selector_schema,
default_weight = default_weight_schema,
shared_size = shared_size_schema,
watch_endpoint_slices = watch_endpoint_slices_schema,
},
},
{
Expand Down Expand Up @@ -202,6 +208,7 @@ return {
label_selector = label_selector_schema,
default_weight = default_weight_schema,
shared_size = shared_size_schema,
watch_endpoint_slices = watch_endpoint_slices_schema,
},
required = { "id", "service", "client" }
},
Expand Down
Loading