Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Match DNs in a case-insensitive way, always use the object's DN as found in the mock directory. #19

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions fakeldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def simple_bind_s(self, who='', cred=''):
return value

def search_s(self, base, scope, filterstr='(objectClass=*)', attrlist=None, attrsonly=0):

# Hack, cause attributes as a list can't be hashed for storing it
if isinstance(attrlist, list):
attrlist = ', '.join(attrlist)
Expand All @@ -198,6 +199,7 @@ def search_s(self, base, scope, filterstr='(objectClass=*)', attrlist=None, attr
value = self._get_return_value('search_s',
(base, scope, filterstr, attrlist, attrsonly))
if value is None:
base = self._use_dn_capitalization_from_directory(base)
value = self._search_s(base, scope, filterstr, attrlist, attrsonly)

return value
Expand Down Expand Up @@ -286,17 +288,18 @@ def _simple_bind_s(self, who='', cred=''):
raise ldap.INVALID_CREDENTIALS('%s:%s' % (who, cred))

def _compare_s(self, dn, attr, value):

try:
found = (value in self.directory[dn][attr])
found = (value in self.directory[self._use_dn_capitalization_from_directory(dn)][attr])
except KeyError:
found = False

return found and 1 or 0

def _modify_s(self, dn, mod_attrs):
try:
entry = self.directory[dn]
except KeyError:
dn = self._use_dn_capitalization_from_directory(dn)
entry = self.directory[dn]
if entry == {}:
raise ldap.NO_SUCH_OBJECT

for item in mod_attrs:
Expand Down Expand Up @@ -325,9 +328,9 @@ def _modify_s(self, dn, mod_attrs):
return (103, [])

def _rename_s(self, dn, newdn):
try:
entry = self.directory[dn]
except KeyError:
dn = self._use_dn_capitalization_from_directory(dn)
entry = self.directory[dn]
if entry == {}:
raise ldap.NO_SUCH_OBJECT

changes = newdn.split('=')
Expand All @@ -341,8 +344,9 @@ def _rename_s(self, dn, newdn):
return (109, [])

def _delete_s(self, dn):

try:
del self.directory[dn]
del self.directory[self._use_dn_capitalization_from_directory(dn)]
except KeyError:
raise ldap.NO_SUCH_OBJECT

Expand All @@ -360,7 +364,8 @@ def _search_s(self, base, scope, filterstr, attrlist, attrsonly):
if filterstr != '(objectClass=*)':
raise self.PresetReturnRequiredError('search_s("%s", %d, "%s", "%s", %d)' %
(base, scope, filterstr, attrlist, attrsonly))
attrs = self.directory.get(base)

attrs = self.directory.get(self._use_dn_capitalization_from_directory(base))
logger.debug("attrs: %s".format(attrs))
if attrs is None:
raise ldap.NO_SUCH_OBJECT
Expand All @@ -373,7 +378,7 @@ def _search_s(self, base, scope, filterstr, attrlist, attrsonly):
raise self.PresetReturnRequiredError('search_s("%s", %d, "%s", "%s", %d)' %
(base, scope, filterstr, attrlist, attrsonly))

return self._simple_onelevel_search(base, filterstr)
return self._simple_onelevel_search(self._use_dn_capitalization_from_directory(base), filterstr)
else:
raise self.PresetReturnRequiredError('search_s("%s", %d, "%s", "%s", %d)' %
(base, scope, filterstr, attrlist, attrsonly))
Expand All @@ -384,10 +389,10 @@ def _add_s(self, dn, record):
for item in record:
entry[item[0]] = item[1]
logger.debug("entry: %s".format(entry))
try:
self.directory[dn]

if self.directory[self._use_dn_capitalization_from_directory(dn)] != {}:
raise ldap.ALREADY_EXISTS
except KeyError:
else:
self.directory[dn] = entry
return (105,[], len(self.calls), [])

Expand Down Expand Up @@ -444,3 +449,19 @@ def _get_return_value(self, api_name, arguments):

return value


def _use_dn_capitalization_from_directory(self, dn):

# LDAP DNs in LDAP servers are normally matched in a case-insensitive way.
# So for MockLDAP, we search our mock directory and try to find a
# case-insensitive match of the given DN.
#
# Saying this, we always have to return the requested DN with the capitalization as
# stored in our mock LDAP directory. Otherwise, MockLDAP would behave differently from
# live LDAP server.

for _dn in self.directory:
if _dn.lower() == dn.lower():
dn = _dn

return dn