Skip to content

Commit

Permalink
INTG-TESTS: Add Tests for service by name and by port lookups
Browse files Browse the repository at this point in the history
Signed-off-by: Samuel Cabrero <[email protected]>
  • Loading branch information
scabrero committed Jan 15, 2025
1 parent 3f32b62 commit 48c2de6
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 0 deletions.
23 changes: 23 additions & 0 deletions src/tests/intg/ldap_ent.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,24 @@ def ip_net(base_dn, name, address, aliases=()):
return ("cn=" + name + ",ou=Networks," + base_dn, attr_list)


def ip_service(base_dn, name, proto, port, aliases=()):
"""
Generate an RFC2307 ipService add-modlist for passing to ldap.add*.
"""
attr_list = [
('objectClass', [b'top', b'ipService']),
('ipServicePort', [str(port).encode('utf-8')]),
('ipServiceProtocol', [proto.encode('utf-8')]),
]
if (len(aliases)) > 0:
alias_list = [alias.encode('utf-8') for alias in aliases]
alias_list.insert(0, name.encode('utf-8'))
attr_list.append(('cn', alias_list))
else:
attr_list.append(('cn', [name.encode('utf-8')]))
return ("cn=" + name + ",ou=Services," + base_dn, attr_list)


class List(list):
"""LDAP add-modlist list"""

Expand Down Expand Up @@ -233,3 +251,8 @@ def add_ipnet(self, name, address, aliases=[], base_dn=None):
"""Add an RFC2307 ipNetwork add-modlist."""
self.append(ip_net(base_dn or self.base_dn,
name, address, aliases))

def add_service(self, name, proto, port, aliases=[], base_dn=None):
"""Add an RFC2307 ipService add-modlist."""
self.append(ip_service(base_dn or self.base_dn,
name, proto, port, aliases))
163 changes: 163 additions & 0 deletions src/tests/intg/sssd_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#
# Module for simulation of utility "getent services -s sss" from coreutils
#
# Authors:
# Samuel Cabrero <[email protected]>
#
# Copyright (C) 2025 SUSE LINUX GmbH, Nuernberg, Germany.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from ctypes import (
c_int,
c_char_p,
c_ulong,
POINTER,
Structure,
create_string_buffer,
)
from sssd_nss import NssReturnCode, SssdNssError, nss_sss_ctypes_loader
import socket

SERVICE_BUFLEN = 1024


# struct servent from netdb.h
class Servent(Structure):
_fields_ = [
("s_name", c_char_p),
("s_aliases", POINTER(c_char_p)),
("s_port", c_int),
("s_proto", c_char_p),
]


def getservbyname_r(name, proto, result_p, buffer_p, buflen):
"""
ctypes wrapper for:
enum nss_status _nss_sss_getservbyname_r(const char *name,
const char *protocol,
struct servent *result,
char *buffer, size_t buflen,
int *errnop)
"""
func = nss_sss_ctypes_loader("_nss_sss_getservbyname_r")
func.restype = c_int
func.argtypes = [
c_char_p,
c_char_p,
POINTER(Servent),
c_char_p,
c_ulong,
POINTER(c_int),
]

errno = POINTER(c_int)(c_int(0))

name = name.encode("utf-8")
proto = proto.encode("utf-8")
res = func(c_char_p(name), c_char_p(proto), result_p, buffer_p, buflen, errno)

return (int(res), int(errno[0]), result_p)


def getservbyport_r(port, proto, result_p, buffer_p, buflen):
"""
ctypes wrapper for:
enum nss_status _nss_sss_getservbyport_r(int port, const char *protocol,
struct servent *result,
char *buffer, size_t buflen,
int *errnop)
"""
func = nss_sss_ctypes_loader("_nss_sss_getservbyport_r")
func.restype = c_int
func.argtypes = [
c_int,
c_char_p,
POINTER(Servent),
c_char_p,
c_ulong,
POINTER(c_int),
]

errno = POINTER(c_int)(c_int(0))

port = socket.htons(port)
proto = proto.encode("utf-8")
res = func(port, c_char_p(proto), result_p, buffer_p, buflen, errno)

return (int(res), int(errno[0]), result_p)


def set_servent_dict(res, result_p):
if res != NssReturnCode.SUCCESS:
return dict()

servent_dict = dict()
servent_dict["name"] = result_p[0].s_name.decode("utf-8")
servent_dict["aliases"] = list()
servent_dict["port"] = result_p[0].s_port
servent_dict["proto"] = result_p[0].s_proto

i = 0
while result_p[0].s_aliases[i] is not None:
alias = result_p[0].s_aliases[i].decode("utf-8")
servent_dict["aliases"].append(alias)
i = i + 1

return servent_dict


def call_sssd_getservbyname(name, proto):
"""
A Python wrapper to retrieve a service by name and protocol. Returns:
(res, servent_dict)
if res is NssReturnCode.SUCCESS, then servent_dict contains the keys
corresponding to the C servent structure fields. Otherwise, the dictionary
is empty and errno indicates the error code
"""
result = Servent()
result_p = POINTER(Servent)(result)
buff = create_string_buffer(SERVICE_BUFLEN)

(res, errno, result_p) = getservbyname_r(
name, proto, result_p, buff, SERVICE_BUFLEN
)
if errno != 0:
raise SssdNssError(errno, "getservbyname_r")

servent_dict = set_servent_dict(res, result_p)
return (res, servent_dict)


def call_sssd_getservbyport(port, proto):
"""
A Python wrapper to retrieve a service by port and protocol. Returns:
(res, servent_dict)
if res is NssReturnCode.SUCCESS, then servent_dict contains the keys
corresponding to the C servent structure fields. Otherwise, the dictionary
is empty and errno indicates the error code
"""
result = Servent()
result_p = POINTER(Servent)(result)
buff = create_string_buffer(SERVICE_BUFLEN)

(res, errno, result_p) = getservbyport_r(
port, proto, result_p, buff, SERVICE_BUFLEN
)
if errno != 0:
raise SssdNssError(errno, "getservbyport_r")

servent_dict = set_servent_dict(res, result_p)
return (res, servent_dict)
63 changes: 63 additions & 0 deletions src/tests/intg/test_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from sssd_nss import NssReturnCode, HostError
from sssd_hosts import call_sssd_gethostbyname
from sssd_nets import call_sssd_getnetbyname, call_sssd_getnetbyaddr
from sssd_services import call_sssd_getservbyname, call_sssd_getservbyport

LDAP_BASE_DN = "dc=example,dc=com"

Expand Down Expand Up @@ -240,6 +241,25 @@ def add_nets(request, ldap_conn):
return None


@pytest.fixture
def add_services(request, ldap_conn):
ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn)

ent_list.add_service("svc1", "tcp", 11111, aliases=["svc1_alias1", "svc1_alias2"])
ent_list.add_service(
"svc2_1", "tcp", 22222, aliases=["svc2_1_alias1", "svc2_1_alias2"]
)
ent_list.add_service(
"svc2_2", "tcp", 22222, aliases=["svc2_2_alias1", "svc2_2_alias2"]
)

create_ldap_fixture(request, ldap_conn, ent_list)
conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307)
create_conf_fixture(request, conf)
create_sssd_fixture(request)
return None


def test_hostbyname(add_hosts):
(res, hres, _) = call_sssd_gethostbyname("invalid")

Expand Down Expand Up @@ -325,3 +345,46 @@ def test_netbyaddr(add_nets):
(res, hres, _) = call_sssd_getnetbyaddr("10.2.2.2", socket.AF_UNSPEC)
assert res == NssReturnCode.SUCCESS
assert hres == 0


def test_servbyname(add_services):
(res, _) = call_sssd_getservbyname("svcX", "tcp")
assert res == NssReturnCode.NOTFOUND

(res, _) = call_sssd_getservbyname("svcX", "udp")
assert res == NssReturnCode.NOTFOUND

(res, _) = call_sssd_getservbyname("svc1", "tcp")
assert res == NssReturnCode.SUCCESS

(res, _) = call_sssd_getservbyname("svc1", "udp")
assert res == NssReturnCode.NOTFOUND

(res, _) = call_sssd_getservbyname("svc1_alias2", "tcp")
assert res == NssReturnCode.SUCCESS

(res, _) = call_sssd_getservbyname("svc2_1", "tcp")
assert res == NssReturnCode.SUCCESS

(res, _) = call_sssd_getservbyname("svc2_2", "tcp")
assert res == NssReturnCode.SUCCESS


def test_servbyport(add_services):
(res, _) = call_sssd_getservbyport(1234, "tcp")
assert res == NssReturnCode.NOTFOUND

(res, _) = call_sssd_getservbyport(1234, "udp")
assert res == NssReturnCode.NOTFOUND

(res, _) = call_sssd_getservbyport(11111, "tcp")
assert res == NssReturnCode.SUCCESS

(res, _) = call_sssd_getservbyport(11111, "udp")
assert res == NssReturnCode.NOTFOUND

(res, _) = call_sssd_getservbyport(22222, "tcp")
assert res == NssReturnCode.SUCCESS

(res, _) = call_sssd_getservbyport(22222, "udp")
assert res == NssReturnCode.NOTFOUND

0 comments on commit 48c2de6

Please sign in to comment.