-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pam: adding support to manage pam modules; pam_access and pam_faillock
- Loading branch information
Dan Lavu
committed
Nov 1, 2023
1 parent
c09aee2
commit 2053820
Showing
7 changed files
with
367 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
Pluggable Authentication Modules / PAM | ||
###################################### | ||
|
||
Class :class:`sssd_test_framework.utils.pam.PAMUtils` provides | ||
an API to manage PAM module configuration; Currently pam_access and pam_faillock is supported. | ||
|
||
pam_access | ||
========== | ||
A module for logdaemon style login access control. This is managed by /etc/security/access.conf. | ||
|
||
.. code-block:: python | ||
:caption: Example PAM Access usage | ||
@pytest.mark.topology(KnownTopologyGroup.AnyProvider) | ||
def test_example(client: Client, provider: GenericProvider): | ||
# Add users | ||
provider.user("user-1").add() | ||
provider.user("user-2").add() | ||
# Add access rules | ||
access = client.pam.access() | ||
access.config_set([{"access": "+", "user": "user-1", "origin": "ALL"}, | ||
{"access": "-", "user": "user-2", "origin": "ALL"}]) | ||
access.config_write() | ||
client.sssd.authselect.enable_feature(["with-pamaccess"]) | ||
client.sssd.domain["use_fully_qualified_names"] = "False" | ||
client.sssd.start() | ||
assert client.auth.ssh.password("user-1", "Secret123") | ||
assert not client.auth.ssh.password("user-2", "Secret123") | ||
pam_faillock | ||
============ | ||
A module that counts authentication failures. This is configured in /etc/security/faillock.conf. | ||
|
||
.. code-block:: python | ||
:caption: Example PAM Faillock usage | ||
@pytest.mark.topology(KnownTopologyGroup.AnyProvider) | ||
def test_example(client: Client, provider: GenericProvider): | ||
# Add user | ||
provider.user("user-1").add() | ||
faillock = client.pam.faillock() | ||
faillock.config_set({"deny": "3", "unlock_time": "300"}) | ||
faillock.config_write() | ||
client.sssd.common.pam(["with-faillock"]) | ||
client.sssd.start() | ||
assert client.auth.ssh.password("user-1", "Secret123") | ||
for i in range(3): | ||
client.auth.ssh.password("user-1", "BadSecret123") | ||
assert not client.auth.ssh.password("user-1", "Secret123") | ||
# Reset user lockout | ||
client.tools.faillock(["--user", "user-1", "--reset"]) | ||
assert client.auth.ssh.password("user-1", "Secret123") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
""""PAM Tools.""" | ||
|
||
from __future__ import annotations | ||
|
||
from pytest_mh import MultihostHost, MultihostUtility | ||
from pytest_mh.ssh import SSHLog | ||
from pytest_mh.utils.fs import LinuxFileSystem | ||
|
||
__all__ = [ | ||
"PAMUtils", | ||
"PAMAccess", | ||
"PAMFaillock", | ||
] | ||
|
||
|
||
class PAMUtils(MultihostUtility[MultihostHost]): | ||
""" | ||
Configuring various PAM modules | ||
""" | ||
|
||
def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None: | ||
""" | ||
:param host: Remote host instance | ||
:type host: MultihostHost | ||
:param fs: Linux file system instance | ||
:type fs: LinuxFileSystem | ||
""" | ||
super().__init__(host) | ||
|
||
self.fs: LinuxFileSystem = fs | ||
|
||
def access(self, file: str = "/etc/security/access.conf") -> PAMAccess: | ||
""" | ||
:param file: PAM Access file name. | ||
:type file: str | ||
:return: PAM Access object | ||
:rtype: PAMAccess | ||
""" | ||
return PAMAccess(self, file) | ||
|
||
def faillock(self, file: str = "/etc/security/faillock.conf") -> PAMFaillock: | ||
""" | ||
:param file: PAM Faillock file name. | ||
:type file: str | ||
:return: PAM Faillock object | ||
:rtype: PAMFaillock | ||
""" | ||
return PAMFaillock(self, file) | ||
|
||
|
||
class PAMAccess: | ||
""" | ||
Management of PAM Access on the client host. | ||
.. code-block:: python | ||
:caption: Example usage | ||
@pytest.mark.topology(KnownTopologyGroup.AnyProvider) | ||
def test_example(client: Client, provider: GenericProvider): | ||
# Add users | ||
provider.user("user-1").add() | ||
provider.user("user-2").add() | ||
# Add rule to permit "user-1" and deny "user-2" | ||
access = client.pam.access() | ||
access.config_set([{"access": "+", "user": "user-1", "origin": "ALL"}, | ||
{"access": "-", "user": "user-2", "origin": "ALL"}]) | ||
access.config_write() | ||
client.sssd.authselect.enable_feature(["with-pamaccess"]) | ||
client.sssd.start() | ||
# Check the results | ||
assert client.auth.ssh.password("user-1", "Secret123") | ||
assert not client.auth.ssh.password("user-2", "Secret123") | ||
""" | ||
|
||
def __init__(self, util: PAMUtils, file: str) -> None: | ||
""" | ||
:param util: PAMUtils utility object | ||
:type util: PAMUtils | ||
:param file: File name of access file | ||
:type file: str, optional | ||
""" | ||
self.util: PAMUtils = util | ||
self.file: str = file | ||
self.path: str = "/files" + self.file | ||
self.args: str = f'--noautoload --transform "Access.lns incl {self.file}"' | ||
self.cmd: str = "" | ||
|
||
def config_write(self): | ||
""" | ||
Write access configuration. | ||
:return: None | ||
""" | ||
self.util.fs.backup(self.file) | ||
self.util.logger.info(f"Saving Augeas changes to {self.file} on {self.util.host.hostname}") | ||
self.util.host.ssh.run(f'echo -e "{self.cmd} save\n" | augtool --echo {self.args}') | ||
|
||
def config_read(self) -> str: | ||
""" | ||
Read access file as Augeas tree. | ||
:return: PAM access configuration | ||
:rtype: str | ||
""" | ||
self.util.logger.info(f"Reading Augeas tree {self.file} on {self.util.host.hostname}") | ||
result = self.util.host.ssh.run(f"augtool {self.args} print {self.path}") | ||
|
||
return result.stdout | ||
|
||
def config_delete(self, value: dict[str, str]) -> None: | ||
""" | ||
Delete access configuration. | ||
TODO: Needs testing | ||
Sample output: | ||
/files/etc/security/access.conf/access[1] = "+" | ||
/files/etc/security/access.conf/access[1]/user = "test" | ||
/files/etc/security/access.conf/access[1]/origin = "ALL" | ||
/files/etc/security/access.conf/access[2] = "+" | ||
/files/etc/security/access.conf/access[2]/user = "dan" | ||
/files/etc/security/access.conf/access[2]/origin = "ALL" | ||
/files/etc/security/access.conf/access[3] = "+" | ||
/files/etc/security/access.conf/access[3]/user = "dan" | ||
/files/etc/security/access.conf/access[3]/origin = "ALL" | ||
/files/etc/security/access.conf/access[4] = "+" | ||
/files/etc/security/access.conf/access[4]/user = "dan" | ||
/files/etc/security/access.conf/access[4]/origin = "ALL" | ||
/files/etc/security/access.conf/access[5] = "+" | ||
/files/etc/security/access.conf/access[5]/user = "dan" | ||
/files/etc/security/access.conf/access[5]/origin = "ALL" | ||
Sample command: | ||
augtool rm /files/etc/security/access.conf/access[1] | ||
:param value: Configuration. | ||
:type value: dict[str, str] | ||
:return: None | ||
""" | ||
if value is None: | ||
raise ValueError("No data!") | ||
|
||
match = self.util.host.ssh.run(f"augtool {self.args} match {self.path}/access").stdout.split("\n") | ||
|
||
self.util.logger.info(f"Deleting node in Augeas tree on {self.util.host.hostname}") | ||
for i in range(1, 1 + len(match)): | ||
for k, v in value.items(): | ||
if k and v in match[i]: | ||
self.util.host.ssh.run(f"augtool {self.args} --autosave rm {self.path}/access[{i}]") | ||
|
||
def config_set(self, value: list[dict[str, str]]) -> None: | ||
""" | ||
Configure access configuration file. | ||
:param value: Access rule | ||
:type value: list[list[str]] | ||
:return: None | ||
""" | ||
if value is None: | ||
raise ValueError("No data!") | ||
|
||
count = 1 | ||
for i in value: | ||
self.cmd = self.cmd + f"set {self.path}/access[{count}] " + i["access"] + "\n" | ||
self.cmd = self.cmd + f"set {self.path}/access[{count}]/user " + i["user"] + "\n" | ||
self.cmd = self.cmd + f"set {self.path}/access[{count}]/origin " + i["origin"] + "\n" | ||
count = count + 1 | ||
|
||
|
||
class PAMFaillock: | ||
""" | ||
Management of PAM Faillock on the client host. | ||
.. code-block:: python | ||
:caption: Example usage | ||
@pytest.mark.topology(KnownTopologyGroup.AnyProvider) | ||
def test_example(client: Client, provider: GenericProvider): | ||
# Add user | ||
provider.user("user-1").add() | ||
# Setup faillock | ||
faillock = client.pam.faillock() | ||
faillock.config_set({"deny": "3", "unlock_time": "300"}) | ||
faillock.config_write() | ||
client.sssd.common.pam(["with-faillock"]) | ||
# Start SSSD | ||
client.sssd.start() | ||
# Check the results | ||
assert client.auth.ssh.password("user-1", "Secret123") | ||
# Three failed login attempts | ||
for i in range(3): | ||
assert not client.auth.ssh.password("user-1", "bad_password") | ||
assert not client.auth.ssh.password("user-1", "Secret123") | ||
# Reset user lockout | ||
client.pam.faillock("user-1").reset | ||
assert client.auth.ssh.password("user-1", "Secret123") | ||
""" | ||
|
||
def __init__(self, util: PAMUtils, file: str) -> None: | ||
""" | ||
:param util: PAMUtils object | ||
:type util: PAMUtils | ||
:param file: Faillock configuration file | ||
:type file: str, optional | ||
""" | ||
self.util: PAMUtils = util | ||
self.file: str = file | ||
self.path: str = "/files" + self.file | ||
self.args: str = f'--noautoload --transform "Simplevars.lns incl {self.file}"' | ||
self.cmd: str = "" | ||
|
||
def config_write(self) -> None: | ||
""" | ||
Write faillock configuration. | ||
:return: Self | ||
:rtype: PAMFaillock | ||
""" | ||
self.util.fs.backup(self.file) | ||
self.util.logger.info(f"Saving Augeas changes to {self.file} on {self.util.host.hostname}") | ||
self.util.host.ssh.run(f'echo -e "{self.cmd}save\n" | augtool --echo {self.args}') | ||
|
||
def config_read(self) -> str: | ||
""" | ||
Read faillock configuration as augeas tree. | ||
:return: PAM access configuration | ||
:rtype: str | ||
""" | ||
self.util.logger.info(f"Reading Augeas tree {self.file} on {self.util.host.hostname}") | ||
result = self.util.host.ssh.run(f"augtool {self.args} print {self.path}").stdout | ||
|
||
return result | ||
|
||
def config_delete(self, value: dict[str, str]) -> None: | ||
""" | ||
Delete faillock configuration. | ||
:param value: Configuration. | ||
:type value: dict[str, str] | ||
:return: None | ||
""" | ||
if value is None: | ||
raise ValueError("No data!") | ||
|
||
self.util.logger.info(f"Deleting node in Augeas tree on {self.util.host.hostname}") | ||
for k, v in value.items(): | ||
self.util.host.ssh.run(f"augtool {self.args} --autosave rm {self.path}/{k} {v}") | ||
|
||
def config_set(self, value: dict[str, str]) -> None: | ||
""" | ||
Set faillock configuration. | ||
:param value: Configuration parameter(s) and value(s). | ||
:type value: dict[str, str] | ||
:return: None | ||
""" | ||
if value is None: | ||
raise ValueError("No data!") | ||
|
||
for k, v in value.items(): | ||
self.cmd = self.cmd + f"set {self.path}/{k} {v}\n" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.