diff --git a/osc/_private/api.py b/osc/_private/api.py
index 09d6e964a..ebb64899e 100644
--- a/osc/_private/api.py
+++ b/osc/_private/api.py
@@ -10,6 +10,7 @@
from ..util.xml import xml_escape
from ..util.xml import xml_indent
from ..util.xml import xml_unescape
+from ..util.xml import xml_parse
def get(apiurl, path, query=None):
@@ -36,7 +37,7 @@ def get(apiurl, path, query=None):
url = osc_core.makeurl(apiurl, path, query)
with osc_connection.http_GET(url) as f:
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root
@@ -64,7 +65,7 @@ def post(apiurl, path, query=None):
url = osc_core.makeurl(apiurl, path, query)
with osc_connection.http_POST(url) as f:
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root
@@ -92,7 +93,7 @@ def put(apiurl, path, query=None, data=None):
url = osc_core.makeurl(apiurl, path, query)
with osc_connection.http_PUT(url, data=data) as f:
- root = osc_core.ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root
diff --git a/osc/build.py b/osc/build.py
index 6a1a85067..46c89209f 100644
--- a/osc/build.py
+++ b/osc/build.py
@@ -30,6 +30,7 @@
from .util import archquery, debquery, packagequery, rpmquery
from .util import repodata
from .util.helper import decode_it
+from .util.xml import xml_parse
change_personality = {
@@ -79,7 +80,7 @@ class Buildinfo:
def __init__(self, filename, apiurl, buildtype='spec', localpkgs=None, binarytype='rpm'):
localpkgs = localpkgs or []
try:
- tree = ET.parse(filename)
+ tree = xml_parse(filename)
except ET.ParseError:
print('could not parse the buildinfo:', file=sys.stderr)
print(open(filename).read(), file=sys.stderr)
@@ -1351,7 +1352,7 @@ def __str__(self):
if build_type == 'kiwi':
# Is a obsrepositories tag used?
try:
- tree = ET.parse(build_descr)
+ tree = xml_parse(build_descr)
except:
print('could not parse the kiwi file:', file=sys.stderr)
print(open(build_descr).read(), file=sys.stderr)
diff --git a/osc/commandline.py b/osc/commandline.py
index 89ce4744f..ce3f6a2d8 100644
--- a/osc/commandline.py
+++ b/osc/commandline.py
@@ -30,6 +30,8 @@
from . import commands as osc_commands
from . import oscerr
from .commandline_common import *
+from .util.xml import xml_fromstring
+from .util.xml import xml_parse
class OscCommand(Command):
@@ -1165,7 +1167,7 @@ def do_list(self, subcmd, opts, *args):
break
m = show_files_meta(apiurl, project, package)
li = Linkinfo()
- root = ET.fromstring(m)
+ root = xml_fromstring(m)
li.read(root.find('linkinfo'))
if li.haserror():
raise oscerr.LinkExpandError(project, package, li.error)
@@ -2095,7 +2097,7 @@ def _check_service(root):
# get _link info from server, that knows about the local state ...
u = makeurl(apiurl, ['source', project, p])
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
_check_service(root)
linkinfo = root.find('linkinfo')
if linkinfo is None:
@@ -2140,7 +2142,7 @@ def _check_service(root):
u = makeurl(apiurl, ['request'], query='cmd=create&addrevision=1')
f = http_POST(u, data=xml)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
sr_ids.append(root.get('id'))
print("Request(s) created: ", end=' ')
@@ -2150,7 +2152,7 @@ def _check_service(root):
# was this project created by clone request ?
u = makeurl(apiurl, ['source', project, '_attribute', 'OBS:RequestCloned'])
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
value = root.findtext('attribute/value')
if value and not opts.yes:
repl = ''
@@ -2214,7 +2216,7 @@ def _check_service(root):
# check for failed source service
u = makeurl(apiurl, ['source', src_project, src_package])
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
_check_service(root)
if not opts.nodevelproject:
@@ -2242,7 +2244,7 @@ def _check_service(root):
# get _link info from server, that knows about the local state ...
u = makeurl(apiurl, ['source', src_project, src_package], query="expand=1")
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
rev = root.get('rev')
@@ -2358,7 +2360,7 @@ def _submit_request(self, args, opts, options_block):
# get _link info from server, that knows about the local state ...
u = makeurl(apiurl, ['source', project, p])
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
print("Package ", p, " is not a source link.")
@@ -2751,7 +2753,7 @@ def do_createrequest(self, subcmd, opts, *args):
u = makeurl(apiurl, ['request'], query='cmd=create')
f = http_POST(u, data=xml)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
rid = root.get('id')
print(f"Request {rid} created")
for srid in supersede:
@@ -3249,21 +3251,21 @@ def do_request(self, subcmd, opts, *args):
query = {'cmd': cmd}
url = makeurl(apiurl, ['request', reqid], query)
r = http_POST(url, data=opts.message)
- print(ET.parse(r).getroot().get('code'))
+ print(xml_parse(r).getroot().get('code'))
# change incidents
elif cmd == 'setincident':
query = {'cmd': 'setincident', 'incident': incident}
url = makeurl(apiurl, ['request', reqid], query)
r = http_POST(url, data=opts.message)
- print(ET.parse(r).getroot().get('code'))
+ print(xml_parse(r).getroot().get('code'))
# change priority
elif cmd in ['prioritize', 'priorize']:
query = {'cmd': 'setpriority', 'priority': priority}
url = makeurl(apiurl, ['request', reqid], query)
r = http_POST(url, data=opts.message)
- print(ET.parse(r).getroot().get('code'))
+ print(xml_parse(r).getroot().get('code'))
# add new reviewer to existing request
elif cmd in ['add'] and subcmd == 'review':
@@ -3280,7 +3282,7 @@ def do_request(self, subcmd, opts, *args):
if not opts.message:
opts.message = edit_message()
r = http_POST(url, data=opts.message)
- print(ET.parse(r).getroot().get('code'))
+ print(xml_parse(r).getroot().get('code'))
# list and approvenew
elif cmd == 'list' or cmd == 'approvenew':
@@ -3436,7 +3438,7 @@ def do_request(self, subcmd, opts, *args):
except HTTPError as e:
if e.code == 404:
# Any referenced object does not exist, eg. the superseded request
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
summary = root.find('summary')
print(summary.text, file=sys.stderr)
raise oscerr.WrongOptions("Object does not exist")
@@ -3521,7 +3523,7 @@ def do_request(self, subcmd, opts, *args):
details = e.hdrs.get('X-Opensuse-Errorcode')
if details:
print(details, file=sys.stderr)
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None:
print(summary.text)
@@ -3544,7 +3546,7 @@ def do_request(self, subcmd, opts, *args):
'match': f"([devel[@project='{action.tgt_project}' and @package='{action.tgt_package}']])"
})
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
if root.findall('package') and not opts.no_devel:
for node in root.findall('package'):
project = node.get('project')
@@ -3554,7 +3556,7 @@ def do_request(self, subcmd, opts, *args):
links_to_project = links_to_package = None
try:
file = http_GET(link_url)
- root = ET.parse(file).getroot()
+ root = xml_parse(file).getroot()
link_node = root.find('linkinfo')
if link_node is not None:
links_to_project = link_node.get('project') or project
@@ -3713,7 +3715,7 @@ def do_detachbranch(self, subcmd, opts, *args):
try:
copy_pac(apiurl, project, package, apiurl, project, package, expand=True, comment=opts.message)
except HTTPError as e:
- root = ET.fromstring(show_files_meta(apiurl, project, package, 'latest', expand=False))
+ root = xml_fromstring(show_files_meta(apiurl, project, package, 'latest', expand=False))
li = Linkinfo()
li.read(root.find('linkinfo'))
if li.islink() and li.haserror():
@@ -4040,7 +4042,7 @@ def do_releaserequest(self, subcmd, opts, *args):
source_project = self._process_project_name(args[0])
f = show_project_meta(apiurl, source_project)
- root = ET.fromstring(b''.join(f))
+ root = xml_fromstring(b''.join(f))
if not opts.message:
opts.message = edit_message()
@@ -4116,14 +4118,14 @@ def do_createincident(self, subcmd, opts, *args):
url = makeurl(apiurl, ['source', target_project], query=query)
r = http_POST(url, data=opts.message)
project = None
- for i in ET.fromstring(r.read()).findall('data'):
+ for i in xml_fromstring(r.read()).findall('data'):
if i.get('name') == 'targetproject':
project = i.text.strip()
if project:
print("Incident project created: ", project)
else:
- print(ET.parse(r).getroot().get('code'))
- print(ET.parse(r).getroot().get('error'))
+ print(xml_parse(r).getroot().get('code'))
+ print(xml_parse(r).getroot().get('error'))
@cmdln.option('-a', '--attribute', metavar='ATTRIBUTE',
help='Use this attribute to find default maintenance project (default is OBS:MaintenanceProject)')
@@ -4524,7 +4526,7 @@ def do_branch(self, subcmd, opts, *args):
devloc = None
if not exists and (srcprj != self._process_project_name(args[0]) or srcpkg != args[1]):
try:
- root = ET.fromstring(b''.join(show_attribute_meta(apiurl, args[0], None, None,
+ root = xml_fromstring(b''.join(show_attribute_meta(apiurl, args[0], None, None,
conf.config['maintained_update_project_attribute'], None, None)))
# this might raise an AttributeError
uproject = root.find('attribute').find('value').text
@@ -4781,7 +4783,7 @@ def do_diff(self, subcmd, opts, *args):
u = makeurl(apiurl, ['source', project, package], query=query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
raise oscerr.APIError('package is not a source link')
@@ -4973,7 +4975,7 @@ def _pdiff_get_parent_from_link(self, apiurl, project, package):
try:
file = http_GET(link_url)
- root = ET.parse(file).getroot()
+ root = xml_parse(file).getroot()
except HTTPError as e:
return (None, None)
except SyntaxError as e:
@@ -4996,7 +4998,7 @@ def _pdiff_get_exists_and_parent(self, apiurl, project, package):
link_url = makeurl(apiurl, ['source', project, package])
try:
file = http_GET(link_url)
- root = ET.parse(file).getroot()
+ root = xml_parse(file).getroot()
except HTTPError as e:
if e.code != 404:
print(f'Cannot get list of files for {project}/{package}: {e}', file=sys.stderr)
@@ -5527,7 +5529,7 @@ def do_checkout(self, subcmd, opts, *args):
try:
m = show_files_meta(apiurl, project, package)
li = Linkinfo()
- li.read(ET.fromstring(''.join(m)).find('linkinfo'))
+ li.read(xml_fromstring(''.join(m)).find('linkinfo'))
if not li.haserror():
if li.project == project:
print(statfrmt('S', package + " link to package " + li.package))
@@ -6028,7 +6030,7 @@ def do_update(self, subcmd, opts, *args):
pacs[0].name, revision=rev,
linkrev=opts.linkrev,
expand=opts.server_side_source_service_files)
- directory = ET.fromstring(meta)
+ directory = xml_fromstring(meta)
li_node = directory.find('linkinfo')
if li_node is None:
print(f'Revision \'{rev}\' is no link', file=sys.stderr)
@@ -6609,7 +6611,7 @@ def do_buildlog(self, subcmd, opts, *args):
query['lastsucceeded'] = 1
u = makeurl(self.get_api_url(), ['build', project, repository, arch, package, '_log'], query=query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
offset = int(root.find('entry').get('size'))
if opts.offset:
offset = offset - int(opts.offset)
@@ -6714,7 +6716,7 @@ def do_remotebuildlog(self, subcmd, opts, *args):
query['lastsucceeded'] = 1
u = makeurl(self.get_api_url(), ['build', project, repository, arch, package, '_log'], query=query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
offset = int(root.find('entry').get('size'))
if opts.offset:
offset = offset - int(opts.offset)
@@ -6744,7 +6746,7 @@ def _find_last_repo_arch(self, repo=None, fatal=True):
for f in files[1:]:
if os.stat(f).st_atime > os.stat(cfg).st_atime:
cfg = f
- root = ET.parse(cfg).getroot()
+ root = xml_parse(cfg).getroot()
repo = root.get("repository")
arch = root.findtext("arch")
return repo, arch
@@ -6870,7 +6872,7 @@ def do_triggerreason(self, subcmd, opts, *args):
print(apiurl, project, package, repository, arch)
xml = show_package_trigger_reason(apiurl, project, package, repository, arch)
- root = ET.fromstring(xml)
+ root = xml_fromstring(xml)
if root.find('explain') is None:
reason = "No triggerreason found"
print(reason)
@@ -6984,7 +6986,7 @@ def _dependson(self, reverse, opts, *args):
project_packages = meta_get_packagelist(apiurl, project, deleted=False, expand=False)
xml = get_dependson(apiurl, project, repository, arch, packages, reverse)
- root = ET.fromstring(xml)
+ root = xml_fromstring(xml)
for package in root.findall('package'):
print(package.get('name'), ":")
for dep in package.findall('pkgdep'):
@@ -8829,7 +8831,7 @@ def do_my(self, subcmd, opts, *args):
'match': f"([kind='patchinfo' and issue[@state='OPEN' and owner/@login='{user}']])"
})
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
if root.findall('package'):
print("Patchinfos with open bugs assigned to you:\n")
for node in root.findall('package'):
@@ -8838,7 +8840,7 @@ def do_my(self, subcmd, opts, *args):
print(project, "/", package, '\n')
p = makeurl(apiurl, ['source', project, package], {'view': 'issues'})
fp = http_GET(p)
- issues = ET.parse(fp).findall('issue')
+ issues = xml_parse(fp).findall('issue')
for issue in issues:
if issue.find('state') is None or issue.find('state').text != "OPEN":
continue
@@ -8865,7 +8867,7 @@ def do_my(self, subcmd, opts, *args):
'user': user,
})
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
if root.findall('request'):
print("Requests which request a review by you:\n")
for node in root.findall('request'):
@@ -8881,7 +8883,7 @@ def do_my(self, subcmd, opts, *args):
'user': user,
})
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
if root.findall('request'):
print("Requests for your packages:\n")
for node in root.findall('request'):
@@ -8897,7 +8899,7 @@ def do_my(self, subcmd, opts, *args):
'user': user,
})
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
if root.findall('request'):
print("Declined requests created by you (revoke, reopen or supersede):\n")
for node in root.findall('request'):
@@ -9326,7 +9328,7 @@ def do_importsrcpkg(self, subcmd, opts):
'name': pac,
'user': user}), apiurl=apiurl)
if data:
- data = ET.fromstring(parse_meta_to_string(data))
+ data = xml_fromstring(parse_meta_to_string(data))
data.find('title').text = ''.join(title)
data.find('description').text = ''.join(descr)
data.find('url').text = url
@@ -9657,7 +9659,7 @@ def setBugownerHelper(apiurl, project, package, bugowner):
u = makeurl(apiurl, ['request'], query='cmd=create')
f = http_POST(u, data=xml)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
print("Request ID:", root.get('id'))
elif opts.delete:
@@ -9678,7 +9680,7 @@ def setBugownerHelper(apiurl, project, package, bugowner):
else:
if pac:
m = show_package_meta(apiurl, prj, pac)
- metaroot = ET.fromstring(b''.join(m))
+ metaroot = xml_fromstring(b''.join(m))
if not opts.nodevelproject:
while metaroot.findall('devel'):
d = metaroot.find('devel')
@@ -9687,18 +9689,18 @@ def setBugownerHelper(apiurl, project, package, bugowner):
if opts.verbose:
print(f"Following to the development space: {prj}/{pac}")
m = show_package_meta(apiurl, prj, pac)
- metaroot = ET.fromstring(b''.join(m))
+ metaroot = xml_fromstring(b''.join(m))
if not metaroot.findall('person') and not metaroot.findall('group'):
if opts.verbose:
print("No dedicated persons in package defined, showing the project persons.")
pac = None
m = show_project_meta(apiurl, prj)
- metaroot = ET.fromstring(b''.join(m))
+ metaroot = xml_fromstring(b''.join(m))
else:
# fallback to project lookup for old servers
if prj and not searchresult:
m = show_project_meta(apiurl, prj)
- metaroot = ET.fromstring(b''.join(m))
+ metaroot = xml_fromstring(b''.join(m))
# extract the maintainers
projects = []
@@ -9993,7 +9995,7 @@ def do_repairlink(self, subcmd, opts, *args):
query = {'rev': 'latest'}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
raise oscerr.APIError('package is not a source link')
@@ -10005,7 +10007,7 @@ def do_repairlink(self, subcmd, opts, *args):
query = {'rev': 'latest', 'linkrev': 'base'}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo.get('error') is None:
workingrev = linkinfo.get('xsrcmd5')
@@ -10014,7 +10016,7 @@ def do_repairlink(self, subcmd, opts, *args):
query = {'lastworking': 1}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
linkinfo = root.find('linkinfo')
if linkinfo is None:
raise oscerr.APIError('package is not a source link')
@@ -10031,7 +10033,7 @@ def do_repairlink(self, subcmd, opts, *args):
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
meta = f.readlines()
- root_new = ET.fromstring(b''.join(meta))
+ root_new = xml_fromstring(b''.join(meta))
dir_new = {'apiurl': apiurl, 'project': prj, 'package': package}
dir_new['srcmd5'] = root_new.get('srcmd5')
dir_new['entries'] = [[n.get('name'), n.get('md5')] for n in root_new.findall('entry')]
@@ -10039,7 +10041,7 @@ def do_repairlink(self, subcmd, opts, *args):
query = {'rev': workingrev}
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
- root_oldpatched = ET.parse(f).getroot()
+ root_oldpatched = xml_parse(f).getroot()
linkinfo_oldpatched = root_oldpatched.find('linkinfo')
if linkinfo_oldpatched is None:
raise oscerr.APIError('working rev is not a source link?')
@@ -10053,7 +10055,7 @@ def do_repairlink(self, subcmd, opts, *args):
query['rev'] = linkinfo_oldpatched.get('srcmd5')
u = makeurl(apiurl, ['source', linkinfo_oldpatched.get('project'), linkinfo_oldpatched.get('package')], query=query)
f = http_GET(u)
- root_old = ET.parse(f).getroot()
+ root_old = xml_parse(f).getroot()
dir_old = {'apiurl': apiurl}
dir_old['project'] = linkinfo_oldpatched.get('project')
dir_old['package'] = linkinfo_oldpatched.get('package')
@@ -10187,7 +10189,7 @@ def do_pull(self, subcmd, opts, *args):
u = makeurl(p.apiurl, ['source', p.prjname, p.name], query=query)
f = http_GET(u)
meta = f.readlines()
- root_new = ET.fromstring(b''.join(meta))
+ root_new = xml_fromstring(b''.join(meta))
linkinfo_new = root_new.find('linkinfo')
if linkinfo_new is None:
raise oscerr.APIError('link is not a really a link?')
@@ -10207,7 +10209,7 @@ def do_pull(self, subcmd, opts, *args):
query = {'rev': linkinfo.srcmd5}
u = makeurl(p.apiurl, ['source', linkinfo.project, linkinfo.package], query=query)
f = http_GET(u)
- root_old = ET.parse(f).getroot()
+ root_old = xml_parse(f).getroot()
dir_old = {'apiurl': p.apiurl, 'project': linkinfo.project, 'package': linkinfo.package, 'srcmd5': linkinfo.srcmd5}
dir_old['entries'] = [[n.get('name'), n.get('md5')] for n in root_old.findall('entry')]
diff --git a/osc/core.py b/osc/core.py
index 2b8938efa..3949936a9 100644
--- a/osc/core.py
+++ b/osc/core.py
@@ -83,7 +83,9 @@
from .output import sanitize_text
from .util import xdg
from .util.helper import decode_list, decode_it, raw_input, _html_escape
+from .util.xml import xml_fromstring
from .util.xml import xml_indent_compat as xmlindent
+from .util.xml import xml_parse
ET_ENCODING = "unicode"
@@ -993,7 +995,7 @@ def create(self, apiurl: str, addrevision=False, enforce_branching=False):
query['enforce_branching'] = "1"
u = makeurl(apiurl, ['request'], query=query)
f = http_POST(u, data=self.to_str())
- root = ET.fromstring(f.read())
+ root = xml_fromstring(f.read())
self.read(root)
@@ -1220,7 +1222,7 @@ def meta_get_packagelist(apiurl: str, prj, deleted=None, expand=False):
u = makeurl(apiurl, ['source', prj], query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return [node.get('name') for node in root.findall('entry')]
@@ -1244,7 +1246,7 @@ def meta_get_filelist(
u = makeurl(apiurl, ['source', prj, package], query=query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
if not verbose:
return [node.get('name') for node in root.findall('entry')]
@@ -1270,7 +1272,7 @@ def meta_get_project_list(apiurl: str, deleted=False):
u = makeurl(apiurl, ['source'], query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return sorted(node.get('name') for node in root if node.get('name'))
@@ -1443,7 +1445,7 @@ def show_pattern_metalist(apiurl: str, prj: str):
url = makeurl(apiurl, ['source', prj, '_pattern'])
try:
f = http_GET(url)
- tree = ET.parse(f)
+ tree = xml_parse(f)
except HTTPError as e:
e.osc_msg = f'show_pattern_metalist: Error getting pattern list for project \'{prj}\''
raise
@@ -1543,7 +1545,7 @@ def edit(self):
print('BuildService API error:', error_help, file=sys.stderr)
# examine the error - we can't raise an exception because we might want
# to try again
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None:
print(summary.text, file=sys.stderr)
@@ -1695,7 +1697,7 @@ def edit_meta(
if metatype == 'pkg':
# check if the package is a link to a different project
project, package = path_args
- orgprj = ET.fromstring(parse_meta_to_string(data)).get('project')
+ orgprj = xml_fromstring(parse_meta_to_string(data)).get('project')
if orgprj is not None and unquote(project) != orgprj:
print('The package is linked from a different project.')
@@ -1752,7 +1754,7 @@ def show_upstream_srcmd5(
apiurl: str, prj: str, pac: str, expand=False, revision=None, meta=False, include_service_files=False, deleted=False
):
m = show_files_meta(apiurl, prj, pac, expand=expand, revision=revision, meta=meta, deleted=deleted)
- et = ET.fromstring(m)
+ et = xml_fromstring(m)
if include_service_files:
try:
sinfo = et.find('serviceinfo')
@@ -1776,7 +1778,7 @@ def show_upstream_xsrcmd5(
meta=meta,
expand=include_service_files,
)
- et = ET.fromstring(m)
+ et = xml_fromstring(m)
if include_service_files:
return et.get('srcmd5')
@@ -1820,7 +1822,7 @@ def get_project_sourceinfo(apiurl: str, project: str, nofilename: bool, *package
pkgs = packages[n:]
res.update(get_project_sourceinfo(apiurl, project, nofilename, *pkgs))
return res
- root = ET.fromstring(si)
+ root = xml_fromstring(si)
res = {}
for sinfo in root.findall('sourceinfo'):
res[sinfo.get('package')] = sinfo
@@ -1829,7 +1831,7 @@ def get_project_sourceinfo(apiurl: str, project: str, nofilename: bool, *package
def show_upstream_rev_vrev(apiurl: str, prj, pac, revision=None, expand=False, meta=False):
m = show_files_meta(apiurl, prj, pac, revision=revision, expand=expand, meta=meta)
- et = ET.fromstring(m)
+ et = xml_fromstring(m)
rev = et.get("rev") or None
vrev = et.get("vrev") or None
return rev, vrev
@@ -1839,7 +1841,7 @@ def show_upstream_rev(
apiurl: str, prj, pac, revision=None, expand=False, linkrev=None, meta=False, include_service_files=False
):
m = show_files_meta(apiurl, prj, pac, revision=revision, expand=expand, linkrev=linkrev, meta=meta)
- et = ET.fromstring(m)
+ et = xml_fromstring(m)
if include_service_files:
try:
sinfo = et.find('serviceinfo')
@@ -2071,7 +2073,7 @@ def clone_request(apiurl: str, reqid, msg=None):
query = {'cmd': 'branch', 'request': reqid}
url = makeurl(apiurl, ['source'], query)
r = http_POST(url, data=msg)
- root = ET.fromstring(r.read())
+ root = xml_fromstring(r.read())
project = None
for i in root.findall('data'):
if i.get('name') == 'targetproject':
@@ -2182,7 +2184,7 @@ def create_submit_request(
def get_request(apiurl: str, reqid):
u = makeurl(apiurl, ['request', reqid], {'withfullhistory': '1'})
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
r = Request()
r.read(root, apiurl=apiurl)
@@ -2205,7 +2207,7 @@ def change_review_state(
query['superseded_by'] = supersed
u = makeurl(apiurl, ['request', reqid], query=query)
f = http_POST(u, data=message)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root.get('code')
@@ -2221,7 +2223,7 @@ def change_request_state(apiurl: str, reqid, newstate, message="", supersed=None
['request', reqid], query=query)
f = http_POST(u, data=message)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root.get('code', 'unknown')
@@ -2370,7 +2372,7 @@ def get_request_collection(
u = makeurl(apiurl, ['request'], query)
f = http_GET(u)
- res = ET.parse(f).getroot()
+ res = xml_parse(f).getroot()
requests = []
for root in res.findall('request'):
@@ -2648,7 +2650,7 @@ def get_user_meta(apiurl: str, user: str):
def _get_xml_data(meta, *tags):
data = []
if meta is not None:
- root = ET.fromstring(meta)
+ root = xml_fromstring(meta)
for tag in tags:
elm = root.find(tag)
if elm is None or elm.text is None:
@@ -2911,7 +2913,7 @@ def server_diff(
del_issue_list = []
add_issue_list = []
chn_issue_list = []
- root = ET.fromstring(f.read())
+ root = xml_fromstring(f.read())
node = root.find('issues')
for issuenode in node.findall('issue'):
if issuenode.get('state') == 'deleted':
@@ -2966,7 +2968,7 @@ def server_diff_noex(
new_project, new_package, new_revision,
unified, missingok, meta, False, files=files)
except:
- elm = ET.fromstring(body).find('summary')
+ elm = xml_fromstring(body).find('summary')
summary = ''
if elm is not None and elm.text is not None:
summary = elm.text
@@ -2992,14 +2994,14 @@ def get_request_issues(apiurl: str, reqid):
"""
u = makeurl(apiurl, ['request', reqid], query={'cmd': 'diff', 'view': 'xml', 'withissues': '1'})
f = http_POST(u)
- request_tree = ET.parse(f).getroot()
+ request_tree = xml_parse(f).getroot()
issue_list = []
for elem in request_tree.iterfind('action/sourcediff/issues/issue'):
issue_id = elem.get('name')
encode_search = f'@name=\'{issue_id}\''
u = makeurl(apiurl, ['search/issue'], query={'match': encode_search})
f = http_GET(u)
- collection = ET.parse(f).getroot()
+ collection = xml_parse(f).getroot()
for cissue in collection:
issue = {}
for issue_detail in cissue.iter():
@@ -3023,10 +3025,10 @@ def submit_action_diff(apiurl: str, action: Action):
except HTTPError as e:
if e.code != 404:
raise e
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
return b'error: \'%s\' does not exist' % root.findtext("summary").encode()
elif e.code == 404:
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
return b'error: \'%s\' does not exist' % root.findtext("summary").encode()
raise e
@@ -3158,7 +3160,7 @@ def checkout_package(
# before we create directories and stuff, check if the package actually
# exists
meta_data = b''.join(show_package_meta(apiurl, project, package))
- root = ET.fromstring(meta_data)
+ root = xml_fromstring(meta_data)
scmsync_element = root.find("scmsync")
if not native_obs_package and scmsync_element is not None and scmsync_element.text is not None:
directory = make_dir(apiurl, project, package, pathname, prj_dir, conf.config['do_package_tracking'], outdir)
@@ -3222,7 +3224,7 @@ def replace_pkg_meta(
only maintainer (unless keep_maintainers is set). Additionally remove the
develproject entry () unless keep_develproject is true.
"""
- root = ET.fromstring(b''.join(pkgmeta))
+ root = xml_fromstring(b''.join(pkgmeta))
root.set('name', new_name)
root.set('project', new_prj)
# never take releasename, it needs to be explicit
@@ -3411,7 +3413,7 @@ def aggregate_pac(
path_args=(dst_project, dst_package_meta),
template_args=None,
create_new=False, apiurl=apiurl)
- root = ET.fromstring(parse_meta_to_string(dst_meta))
+ root = xml_fromstring(parse_meta_to_string(dst_meta))
if root.get('project') != dst_project:
# The source comes from a different project via a project link, we need to create this instance
meta_change = True
@@ -3455,7 +3457,7 @@ def aggregate_pac(
if disable_publish:
meta_change = True
- root = ET.fromstring(''.join(dst_meta))
+ root = xml_fromstring(''.join(dst_meta))
elm = root.find('publish')
if not elm:
elm = ET.SubElement(root, 'publish')
@@ -3543,7 +3545,7 @@ def attribute_branch_pkg(
try:
f = http_POST(u)
except HTTPError as e:
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None and summary.text is not None:
raise oscerr.APIError(summary.text)
@@ -3552,7 +3554,7 @@ def attribute_branch_pkg(
r = None
- root = ET.fromstring(f.read())
+ root = xml_fromstring(f.read())
if dryrun:
return root
# TODO: change api here and return parsed XML as class
@@ -3597,7 +3599,7 @@ def branch_pkg(
# read src_package meta
try:
m = b"".join(show_package_meta(apiurl, src_project, src_package))
- root = ET.fromstring(m)
+ root = xml_fromstring(m)
except HTTPError as e:
if e.code == 404 and missingok:
root = None
@@ -3614,7 +3616,7 @@ def branch_pkg(
if devel_project:
# replace src_package meta with devel_package meta because we're about branch from devel
m = b"".join(show_package_meta(apiurl, devel_project, devel_package))
- root = ET.fromstring(m)
+ root = xml_fromstring(m)
# error out if we're branching a scmsync package (we'd end up with garbage anyway)
if root is not None and root.find("scmsync") is not None:
@@ -3660,7 +3662,7 @@ def branch_pkg(
try:
f = http_POST(u)
except HTTPError as e:
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
if missingok:
if root and root.get('code') == "not_missing":
raise oscerr.NotMissing("Package exists already via project link, but link will point to given project")
@@ -3675,7 +3677,7 @@ def branch_pkg(
raise
return (True, m.group(1), m.group(2), None, None)
- root = ET.fromstring(f.read())
+ root = xml_fromstring(f.read())
if conf.config['http_debug']:
print(ET.tostring(root, encoding=ET_ENCODING), file=sys.stderr)
data = {}
@@ -3684,7 +3686,7 @@ def branch_pkg(
if disable_build:
target_meta = show_package_meta(apiurl, data["targetproject"], data["targetpackage"])
- root = ET.fromstring(b''.join(target_meta))
+ root = xml_fromstring(b''.join(target_meta))
elm = root.find('build')
if not elm:
@@ -3751,7 +3753,7 @@ def copy_pac(
if meta is None:
meta = show_files_meta(dst_apiurl, dst_project, dst_package)
- root = ET.fromstring(meta)
+ root = xml_fromstring(meta)
if root.find("scmsync") is not None:
print("Note: package source is managed via SCM")
return
@@ -3776,7 +3778,7 @@ def copy_pac(
query = {'rev': 'upload'}
xml = show_files_meta(src_apiurl, src_project, src_package,
expand=expand, revision=revision)
- filelist = ET.fromstring(xml)
+ filelist = xml_fromstring(xml)
revision = filelist.get('srcmd5')
# filter out _service: files
for entry in filelist.findall('entry'):
@@ -3905,7 +3907,7 @@ def get_platforms(apiurl: str):
def get_repositories(apiurl: str):
f = http_GET(makeurl(apiurl, ['platform']))
- tree = ET.parse(f)
+ tree = xml_parse(f)
r = sorted(node.get('name') for node in tree.getroot())
return r
@@ -3915,7 +3917,7 @@ def get_distributions(apiurl: str):
'distribution', 'project', 'repository', 'reponame'"""
f = http_GET(makeurl(apiurl, ['distributions']))
- root = ET.fromstring(b''.join(f))
+ root = xml_fromstring(b''.join(f))
distlist = []
for node in root.findall('distribution'):
@@ -3994,7 +3996,7 @@ def get_binarylist(
query['withccache'] = 1
u = makeurl(apiurl, ['build', prj, repo, arch, what], query=query)
f = http_GET(u)
- tree = ET.parse(f)
+ tree = xml_parse(f)
if not verbose:
return [node.get('filename') for node in tree.findall('binary')]
else:
@@ -4011,7 +4013,7 @@ def get_binarylist(
def get_binarylist_published(apiurl: str, prj: str, repo: str, arch: str):
u = makeurl(apiurl, ['published', prj, repo, arch])
f = http_GET(u)
- tree = ET.parse(f)
+ tree = xml_parse(f)
r = [node.get('name') for node in tree.findall('entry')]
return r
@@ -4058,7 +4060,7 @@ def show_prj_results_meta(
def result_xml_to_dicts(xml):
# assumption: xml contains at most one status element (maybe we should
# generalize this to arbitrary status element)
- root = ET.fromstring(xml)
+ root = xml_fromstring(xml)
for node in root.findall('result'):
rmap = {}
rmap['project'] = rmap['prj'] = node.get('project')
@@ -4196,13 +4198,13 @@ def get_package_results(apiurl: str, project: str, package: Optional[str] = None
if e.code == 502 or e.code == 504:
# re-try result request
continue
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
if e.code == 400 and kwargs.get('multibuild') and re.search('multibuild', getattr(root.find('summary'), 'text', '')):
kwargs['multibuild'] = None
kwargs['locallink'] = None
continue
raise
- root = ET.fromstring(xml)
+ root = xml_fromstring(xml)
kwargs['oldstate'] = root.get('state')
for result in root.findall('result'):
if result.get('dirty') is not None:
@@ -4270,7 +4272,7 @@ def get_prj_results(
r = []
f = show_prj_results_meta(apiurl, prj)
- root = ET.fromstring(b''.join(f))
+ root = xml_fromstring(b''.join(f))
if name_filter is not None:
name_filter = re.compile(name_filter)
@@ -4632,11 +4634,11 @@ def create_pbuild_config(apiurl: str, project: str, repository: str, arch: str,
f.write(decode_it(bc))
# create the _pbuild file based on expanded repository path informations
- pb = ET.fromstring('')
+ pb = xml_fromstring('')
tree = ET.ElementTree(pb)
preset = ET.SubElement(pb, 'preset', name=repository, default="") # default should be empty, but ET crashes
bi_text = decode_it(get_buildinfo(apiurl, project, '_repository', repository, arch, specfile="Name: dummy"))
- root = ET.fromstring(bi_text)
+ root = xml_fromstring(bi_text)
# cross compile setups are not yet supported
# for path in root.findall('hostsystem'):
@@ -4661,7 +4663,7 @@ def check_constraints(apiurl: str, prj: str, repository: str, arch: str, package
query = {"cmd": "checkconstraints", "project": prj, "package": package, "repository": repository, "arch": arch}
u = makeurl(apiurl, ["worker"], query)
f = http_POST(u, data=constraintsfile)
- root = ET.fromstring(b''.join(f))
+ root = xml_fromstring(b''.join(f))
return [node.get('name') for node in root.findall('entry')]
@@ -4675,7 +4677,7 @@ def get_source_rev(apiurl: str, project: str, package: str, revision=None):
else:
url = makeurl(apiurl, ['source', project, package, '_history'])
f = http_GET(url)
- xml = ET.parse(f)
+ xml = xml_parse(f)
ent = None
for new in xml.findall('revision'):
# remember the newest one.
@@ -4701,7 +4703,7 @@ def print_jobhistory(apiurl: str, prj: str, current_package: str, repository: st
query['limit'] = int(limit)
u = makeurl(apiurl, ['build', prj, repository, arch, '_jobhistory'], query)
f = http_GET(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
if format == 'text':
print("time package reason code build time worker")
@@ -4833,7 +4835,7 @@ def runservice(apiurl: str, prj: str, package: str):
e.osc_msg = f'could not trigger service run for project \'{prj}\' package \'{package}\''
raise
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root.get('code')
@@ -4846,7 +4848,7 @@ def waitservice(apiurl: str, prj: str, package: str):
e.osc_msg = f'The service for project \'{prj}\' package \'{package}\' failed'
raise
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root.get('code')
@@ -4863,7 +4865,7 @@ def mergeservice(apiurl: str, prj: str, package: str):
e.osc_msg = f'could not merge service files in project \'{prj}\' package \'{package}\''
raise
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root.get('code')
@@ -4885,7 +4887,7 @@ def rebuild(apiurl: str, prj: str, package: str, repo: str, arch: str, code=None
e.osc_msg = f'could not trigger rebuild for project \'{prj}\' package \'{package}\''
raise
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root.get('code')
@@ -4941,7 +4943,7 @@ def cmdbuild(
e.osc_msg += f' sysrq={code}'
raise
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root.get('code')
@@ -5106,7 +5108,7 @@ def search(apiurl: str, queries=None, **kwargs):
query['match'] = xpath
u = makeurl(apiurl, path, query)
f = http_GET(u)
- res[urlpath] = ET.parse(f).getroot()
+ res[urlpath] = xml_parse(f).getroot()
return res
@@ -5148,7 +5150,7 @@ def owner(
res = None
try:
f = http_GET(u)
- res = ET.parse(f).getroot()
+ res = xml_parse(f).getroot()
except HTTPError as e:
# old server not supporting this search
pass
@@ -5159,7 +5161,7 @@ def set_link_rev(apiurl: str, project: str, package: str, revision="", expand=Fa
url = makeurl(apiurl, ["source", project, package, "_link"])
try:
f = http_GET(url)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
except HTTPError as e:
e.osc_msg = f'Unable to get _link file in package \'{package}\' for project \'{project}\''
raise
@@ -5307,7 +5309,7 @@ def addPerson(apiurl: str, prj: str, pac: str, user: str, role="maintainer"):
create_new=False)
if data and get_user_meta(apiurl, user) is not None:
- root = ET.fromstring(parse_meta_to_string(data))
+ root = xml_fromstring(parse_meta_to_string(data))
found = False
for person in root.iter('person'):
if person.get('userid') == user and person.get('role') == role:
@@ -5342,7 +5344,7 @@ def delPerson(apiurl: str, prj: str, pac: str, user: str, role="maintainer"):
template_args=None,
create_new=False)
if data and get_user_meta(apiurl, user) is not None:
- root = ET.fromstring(parse_meta_to_string(data))
+ root = xml_fromstring(parse_meta_to_string(data))
found = False
for person in root.iter('person'):
if person.get('userid') == user and person.get('role') == role:
@@ -5374,7 +5376,7 @@ def setBugowner(apiurl: str, prj: str, pac: str, user=None, group=None):
group = user.replace('group:', '')
user = None
if data:
- root = ET.fromstring(parse_meta_to_string(data))
+ root = xml_fromstring(parse_meta_to_string(data))
for group_element in root.iter('group'):
if group_element.get('role') == "bugowner":
root.remove(group_element)
@@ -5426,9 +5428,9 @@ def addGitSource(url):
service_file = os.path.join(os.getcwd(), '_service')
addfile = False
if os.path.exists(service_file):
- services = ET.parse(os.path.join(os.getcwd(), '_service')).getroot()
+ services = xml_parse(os.path.join(os.getcwd(), '_service')).getroot()
else:
- services = ET.fromstring("")
+ services = xml_fromstring("")
addfile = True
stripETxml(services)
si = Serviceinfo()
@@ -5451,9 +5453,9 @@ def addDownloadUrlService(url):
service_file = os.path.join(os.getcwd(), '_service')
addfile = False
if os.path.exists(service_file):
- services = ET.parse(os.path.join(os.getcwd(), '_service')).getroot()
+ services = xml_parse(os.path.join(os.getcwd(), '_service')).getroot()
else:
- services = ET.fromstring("")
+ services = xml_fromstring("")
addfile = True
stripETxml(services)
si = Serviceinfo()
@@ -5708,7 +5710,7 @@ def safe_change_request_state(*args, **kwargs):
details = e.hdrs.get('X-Opensuse-Errorcode')
if details:
print(details, file=sys.stderr)
- root = ET.fromstring(e.read())
+ root = xml_fromstring(e.read())
summary = root.find('summary')
if summary is not None:
print(summary.text, file=sys.stderr)
@@ -6230,7 +6232,7 @@ def which(name: str):
def get_comments(apiurl: str, kind, *args):
url = makeurl(apiurl, ["comments", kind] + list(args))
f = http_GET(url)
- return ET.parse(f).getroot()
+ return xml_parse(f).getroot()
def print_comments(apiurl: str, kind, *args):
@@ -6254,7 +6256,7 @@ def create_comment(apiurl: str, kind, comment, *args, **kwargs) -> Optional[str]
query["parent_id"] = kwargs.get("parent", None)
u = makeurl(apiurl, ["comments", kind] + list(args), query=query)
f = http_POST(u, data=comment)
- ret = ET.fromstring(f.read()).find('summary')
+ ret = xml_fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text
@@ -6263,7 +6265,7 @@ def create_comment(apiurl: str, kind, comment, *args, **kwargs) -> Optional[str]
def delete_comment(apiurl: str, cid: str) -> Optional[str]:
u = makeurl(apiurl, ['comment', cid])
f = http_DELETE(u)
- ret = ET.fromstring(f.read()).find('summary')
+ ret = xml_fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text
@@ -6378,7 +6380,7 @@ def parse_multibuild_data(s: str):
if not s:
return result
- root = ET.fromstring(s)
+ root = xml_fromstring(s)
for node in root.findall("flavor"):
result.add(node.text)
return result
diff --git a/osc/obs_api/package.py b/osc/obs_api/package.py
index ac596b49d..6a9d53250 100644
--- a/osc/obs_api/package.py
+++ b/osc/obs_api/package.py
@@ -159,7 +159,7 @@ def cmd_release(
@classmethod
def get_revision_list(cls, apiurl: str, project: str, package: str, deleted: Optional[bool] = None, meta: Optional[bool] = None):
- from xml.etree import ElementTree as ET
+ from ..util.xml import xml_parse
url_path = ["source", project, package, "_history"]
url_query = {
@@ -167,7 +167,7 @@ def get_revision_list(cls, apiurl: str, project: str, package: str, deleted: Opt
"deleted": deleted,
}
response = cls.xml_request("GET", apiurl, url_path, url_query)
- root = ET.parse(response).getroot()
+ root = xml_parse(response).getroot()
assert root.tag == "revisionlist"
result = []
for node in root:
diff --git a/osc/obs_api/person.py b/osc/obs_api/person.py
index ae4b6c51e..340951352 100644
--- a/osc/obs_api/person.py
+++ b/osc/obs_api/person.py
@@ -63,7 +63,7 @@ def search(
state: Optional[str] = None,
**kwargs,
) -> List["Person"]:
- from xml.etree import ElementTree as ET
+ from ..util.xml import xml_parse
from ..util.xpath import XPathQuery as Q
url_path = ["search", "person"]
@@ -77,7 +77,7 @@ def search(
),
}
response = cls.xml_request("GET", apiurl, url_path, url_query)
- root = ET.parse(response).getroot()
+ root = xml_parse(response).getroot()
assert root.tag == "collection"
result = []
for node in root:
diff --git a/osc/obs_api/token.py b/osc/obs_api/token.py
index b0dc52a6d..12c4451b1 100644
--- a/osc/obs_api/token.py
+++ b/osc/obs_api/token.py
@@ -108,12 +108,12 @@ def to_human_readable_string(self) -> str:
@classmethod
def do_list(cls, apiurl: str, user: str):
- from ..util.xml import ET
+ from ..util.xml import xml_parse
url_path = ["person", user, "token"]
url_query = {}
response = cls.xml_request("GET", apiurl, url_path, url_query)
- root = ET.parse(response).getroot()
+ root = xml_parse(response).getroot()
assert root.tag == "directory"
result = []
for node in root:
diff --git a/osc/obs_scm/package.py b/osc/obs_scm/package.py
index 721f17f15..95bb2c470 100644
--- a/osc/obs_scm/package.py
+++ b/osc/obs_scm/package.py
@@ -11,6 +11,8 @@
from .. import conf
from .. import oscerr
from ..util.xml import ET
+from ..util.xml import xml_fromstring
+from ..util.xml import xml_parse
from .file import File
from .linkinfo import Linkinfo
from .serviceinfo import Serviceinfo
@@ -413,7 +415,7 @@ def commit_filelist(apiurl: str, project: str, package: str, filelist, msg="", u
query.update({'cmd': 'commitfilelist', 'user': user, 'comment': msg})
u = makeurl(apiurl, ['source', project, package], query=query)
f = http_POST(u, data=ET.tostring(filelist, encoding=ET_ENCODING))
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
return root
@staticmethod
@@ -616,7 +618,7 @@ def commit(self, msg='', verbose=False, skip_local_service_run=False, can_branch
li.read(sfilelist.find('linkinfo'))
if li.xsrcmd5 is None:
raise oscerr.APIError(f'linkinfo has no xsrcmd5 attr:\n{ET.tostring(sfilelist, encoding=ET_ENCODING)}\n')
- sfilelist = ET.fromstring(self.get_files_meta(revision=li.xsrcmd5))
+ sfilelist = xml_fromstring(self.get_files_meta(revision=li.xsrcmd5))
for i in sfilelist.findall('entry'):
if i.get('name') in self.skipped:
i.set('skipped', 'true')
@@ -639,7 +641,7 @@ def commit(self, msg='', verbose=False, skip_local_service_run=False, can_branch
sys.stdout.write('.')
sys.stdout.flush()
# does it make sense to add some delay?
- sfilelist = ET.fromstring(http_GET(u).read())
+ sfilelist = xml_fromstring(http_GET(u).read())
# if sinfo is None another commit might have occured in the "meantime"
sinfo = sfilelist.find('serviceinfo')
print('')
@@ -754,7 +756,7 @@ def get_files_meta(self, revision='latest', skip_service=True):
fm = show_files_meta(self.apiurl, self.prjname, self.name, revision=revision, meta=self.meta)
# look for "too large" files according to size limit and mark them
- root = ET.fromstring(fm)
+ root = xml_fromstring(fm)
for e in root.findall('entry'):
size = e.get('size')
if size and self.size_limit and int(size) > self.size_limit \
@@ -797,7 +799,7 @@ def get_local_origin_project(self):
meta = self.get_local_meta()
if meta is None:
return self.prjname
- root = ET.fromstring(meta)
+ root = xml_fromstring(meta)
return root.get('project')
def is_link_to_different_project(self):
@@ -1125,7 +1127,7 @@ def diff_add_delete(fname, add, revision):
raise oscerr.OscIOError(None, f'file \'{fname}\' is not under version control')
else:
fm = self.get_files_meta(revision=revision)
- root = ET.fromstring(fm)
+ root = xml_fromstring(fm)
rfiles = self.__get_files(root)
# swap added and deleted
kept, deleted, added, services = self.__get_rev_changes(rfiles)
@@ -1391,7 +1393,7 @@ def update(self, rev=None, service_files=False, size_limit=None):
in_update_files_path = os.path.join(self.storedir, "_in_update", "_files")
if os.path.isfile(in_update_files_path) and os.path.getsize(in_update_files_path) != 0:
print('resuming broken update...')
- root = ET.parse(os.path.join(self.storedir, '_in_update', '_files')).getroot()
+ root = xml_parse(os.path.join(self.storedir, '_in_update', '_files')).getroot()
rfiles = self.__get_files(root)
kept, added, deleted, services = self.__get_rev_changes(rfiles)
# check if we aborted in the middle of a file update
@@ -1445,7 +1447,7 @@ def update(self, rev=None, service_files=False, size_limit=None):
os.rmdir(os.path.join(self.storedir, '_in_update'))
# ok everything is ok (hopefully)...
fm = self.get_files_meta(revision=rev)
- root = ET.fromstring(fm)
+ root = xml_fromstring(fm)
rfiles = self.__get_files(root)
store_write_string(self.absdir, '_files', fm + '\n', subdir='_in_update')
kept, added, deleted, services = self.__get_rev_changes(rfiles)
@@ -1546,7 +1548,7 @@ def run_source_services(self, mode=None, singleservice=None, verbose=None):
si = Serviceinfo()
if os.path.exists('_service'):
try:
- service = ET.parse(os.path.join(self.absdir, '_service')).getroot()
+ service = xml_parse(os.path.join(self.absdir, '_service')).getroot()
except ET.ParseError as v:
line, column = v.position
print(f'XML error in _service file on line {line}, column {column}')
diff --git a/osc/obs_scm/project.py b/osc/obs_scm/project.py
index d12f3ef1a..a8fd2af9e 100644
--- a/osc/obs_scm/project.py
+++ b/osc/obs_scm/project.py
@@ -6,6 +6,7 @@
from .. import conf
from .. import oscerr
from ..util.xml import ET
+from ..util.xml import xml_parse
from .store import Store
from .store import delete_storedir
from .store import store
@@ -277,7 +278,7 @@ def read_packages(self):
packages_file = os.path.join(self.absdir, store, '_packages')
if os.path.isfile(packages_file) and os.path.getsize(packages_file):
try:
- result = ET.parse(packages_file)
+ result = xml_parse(packages_file)
except:
msg = f'Cannot read package file \'{packages_file}\'. '
msg += 'You can try to remove it and then run osc repairwc.'
@@ -294,7 +295,7 @@ def read_packages(self):
and Package(pac_dir).name == data:
cur_pacs.append(ET.Element('package', name=data, state=' '))
store_write_initial_packages(self.absdir, self.name, cur_pacs)
- return ET.parse(os.path.join(self.absdir, store, '_packages'))
+ return xml_parse(os.path.join(self.absdir, store, '_packages'))
def write_packages(self):
from ..core import ET_ENCODING
diff --git a/osc/obs_scm/serviceinfo.py b/osc/obs_scm/serviceinfo.py
index 020f5a860..2c6c75fe7 100644
--- a/osc/obs_scm/serviceinfo.py
+++ b/osc/obs_scm/serviceinfo.py
@@ -64,13 +64,14 @@ def error(msg, xml):
def getProjectGlobalServices(self, apiurl: str, project: str, package: str):
from ..core import http_POST
from ..core import makeurl
+ from ..util.xml import xml_parse
self.apiurl = apiurl
# get all project wide services in one file, we don't store it yet
u = makeurl(apiurl, ["source", project, package], query={"cmd": "getprojectservices"})
try:
f = http_POST(u)
- root = ET.parse(f).getroot()
+ root = xml_parse(f).getroot()
self.read(root, True)
self.project = project
self.package = package
diff --git a/osc/obs_scm/store.py b/osc/obs_scm/store.py
index 595b84ec9..2d70e2809 100644
--- a/osc/obs_scm/store.py
+++ b/osc/obs_scm/store.py
@@ -172,9 +172,11 @@ def write_int(self, fn, value, subdir=None):
self.write_string(fn, value, subdir=subdir)
def read_xml_node(self, fn, node_name, subdir=None):
+ from ..util.xml import xml_parse
+
path = self.get_path(fn, subdir=subdir)
try:
- tree = ET.parse(path)
+ tree = xml_parse(path)
except SyntaxError as e:
msg = f"Unable to parse '{path}': {e}"
raise oscerr.NoWorkingCopy(msg)
@@ -463,6 +465,8 @@ def is_package_dir(d):
def read_filemeta(dir):
+ from ..util.xml import xml_parse
+
global store
msg = f'\'{dir}\' is not a valid working copy.'
@@ -475,7 +479,7 @@ def read_filemeta(dir):
raise oscerr.NoWorkingCopy(f'{msg} ({filesmeta} does not exist)')
try:
- r = ET.parse(filesmeta)
+ r = xml_parse(filesmeta)
except SyntaxError as e:
raise oscerr.NoWorkingCopy(f'{msg}\nWhen parsing .osc/_files, the following error was encountered:\n{e}')
return r
diff --git a/osc/util/models.py b/osc/util/models.py
index caff6abd0..061374cbb 100644
--- a/osc/util/models.py
+++ b/osc/util/models.py
@@ -567,7 +567,7 @@ def from_string(cls, string: str, *, apiurl: Optional[str] = None) -> "XmlModel"
"""
Instantiate model from string.
"""
- root = ET.fromstring(string)
+ root = xml.xml_fromstring(string)
return cls.from_xml(root, apiurl=apiurl)
@classmethod
@@ -575,7 +575,7 @@ def from_file(cls, file: Union[str, typing.IO], *, apiurl: Optional[str] = None)
"""
Instantiate model from file.
"""
- root = ET.parse(file).getroot()
+ root = xml.xml_parse(file).getroot()
return cls.from_xml(root, apiurl=apiurl)
def to_bytes(self, *, with_comments: bool = False) -> bytes:
diff --git a/osc/util/repodata.py b/osc/util/repodata.py
index e99972bfe..f44ab9bd1 100644
--- a/osc/util/repodata.py
+++ b/osc/util/repodata.py
@@ -31,8 +31,10 @@ def primaryPath(directory):
:rtype: str
:raise IOError: if repomd.xml contains no primary location
"""
+ from .xml import xml_parse
+
metaDataPath = os.path.join(directory, "repodata", "repomd.xml")
- elementTree = ET.parse(metaDataPath)
+ elementTree = xml_parse(metaDataPath)
root = elementTree.getroot()
for dataElement in root:
@@ -56,10 +58,12 @@ def queries(directory):
:return: list of RepoDataQueryResult instances
:raise IOError: if repomd.xml contains no primary location
"""
+ from .xml import xml_parse
+
path = primaryPath(directory)
gunzippedPrimary = gzip.GzipFile(path)
- elementTree = ET.parse(gunzippedPrimary)
+ elementTree = xml_parse(gunzippedPrimary)
root = elementTree.getroot()
packageQueries = []
diff --git a/osc/util/xml.py b/osc/util/xml.py
index 166a5c185..15cea2f8a 100644
--- a/osc/util/xml.py
+++ b/osc/util/xml.py
@@ -2,8 +2,9 @@
Functions that manipulate with XML.
"""
-
+import io
import xml.sax.saxutils
+from typing import Union
from xml.etree import ElementTree as ET
@@ -79,3 +80,61 @@ def xml_indent(root):
ET.indent(root)
else:
xml_indent_compat(root)
+
+
+def _extend_parser_error_msg(e: ET.ParseError, text: Union[str, bytes]):
+ from ..output import tty
+
+ y, x = e.position
+ text = text.splitlines()[y-1][x-1:]
+
+ if isinstance(text, bytes):
+ text = text.decode("utf-8")
+
+ new_text = ""
+ for char in text:
+ if char >= " ":
+ new_text += char
+ continue
+ byte = ord(char)
+ char = f"0x{byte:0>2X}"
+ char = tty.colorize(char, "bg_red")
+ new_text += char
+ e.msg += ": " + new_text
+
+
+def xml_fromstring(text: str):
+ """
+ xml.etree.ElementTree.fromstring() wrapper that extends error message in ParseError
+ exceptions with a snippet of the broken XML.
+ """
+ try:
+ return ET.fromstring(text)
+ except ET.ParseError as e:
+ _extend_parser_error_msg(e, text)
+ raise
+
+
+def xml_parse(source):
+ """
+ xml.etree.ElementTree.parse() wrapper that extends error message in ParseError
+ exceptions with a snippet of the broken XML.
+ """
+ if isinstance(source, str):
+ # source is a file name
+ with open(source, "rb") as f:
+ data = f.read()
+ else:
+ # source is an IO object
+ data = source.read()
+
+ if isinstance(data, bytes):
+ f = io.BytesIO(data)
+ else:
+ f = io.StringIO(data)
+
+ try:
+ return ET.parse(f)
+ except ET.ParseError as e:
+ _extend_parser_error_msg(e, text)
+ raise
diff --git a/tests/common.py b/tests/common.py
index db826a85b..6f267d804 100644
--- a/tests/common.py
+++ b/tests/common.py
@@ -13,6 +13,7 @@
import osc.conf
import osc.core
+from osc.util.xml import xml_fromstring
def urlcompare(url, *args):
@@ -41,8 +42,8 @@ def urlcompare(url, *args):
def xml_equal(actual, exp):
try:
- actual_xml = ET.fromstring(actual)
- exp_xml = ET.fromstring(exp)
+ actual_xml = xml_fromstring(actual)
+ exp_xml = xml_fromstring(exp)
except ET.ParseError:
return False
todo = [(actual_xml, exp_xml)]
@@ -257,7 +258,7 @@ def _check_digests(self, fname, *skipfiles):
with open(fname) as f:
files_exp = f.read()
self.assertXMLEqual(files_act, files_exp)
- root = ET.fromstring(files_act)
+ root = xml_fromstring(files_act)
for i in root.findall('entry'):
if i.get('name') in skipfiles:
continue
diff --git a/tests/test_request.py b/tests/test_request.py
index df5180185..f0d0a1d63 100644
--- a/tests/test_request.py
+++ b/tests/test_request.py
@@ -4,6 +4,7 @@
import osc.core
import osc.oscerr
+from osc.util.xml import xml_fromstring
from .common import OscTestCase
@@ -263,7 +264,7 @@ def test_action_from_xml1(self):
"""
- action = osc.core.Action.from_xml(ET.fromstring(xml))
+ action = osc.core.Action.from_xml(xml_fromstring(xml))
self.assertEqual(action.type, 'add_role')
self.assertEqual(action.tgt_project, 'foo')
self.assertEqual(action.tgt_package, 'bar')
@@ -283,7 +284,7 @@ def test_action_from_xml2(self):
1
"""
- action = osc.core.Action.from_xml(ET.fromstring(xml))
+ action = osc.core.Action.from_xml(xml_fromstring(xml))
self.assertEqual(action.type, 'submit')
self.assertEqual(action.src_project, 'foo')
self.assertEqual(action.src_package, 'bar')
@@ -301,7 +302,7 @@ def test_action_from_xml3(self):
"""
- action = osc.core.Action.from_xml(ET.fromstring(xml))
+ action = osc.core.Action.from_xml(xml_fromstring(xml))
self.assertEqual(action.type, 'submit')
self.assertEqual(action.src_project, 'testprj')
self.assertEqual(action.src_package, 'bar')
@@ -320,13 +321,13 @@ def test_action_from_xml3(self):
def test_action_from_xml_unknown_type(self):
"""try to create action from xml with unknown type"""
xml = ''
- self.assertRaises(osc.oscerr.WrongArgs, osc.core.Action.from_xml, ET.fromstring(xml))
+ self.assertRaises(osc.oscerr.WrongArgs, osc.core.Action.from_xml, xml_fromstring(xml))
def test_read_request1(self):
"""read in a request"""
xml = self._get_fixture('test_read_request1.xml')
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '42')
self.assertEqual(r.actions[0].type, 'submit')
self.assertEqual(r.actions[0].src_project, 'foo')
@@ -357,7 +358,7 @@ def test_read_request2(self):
"""read in a request (with reviews)"""
xml = self._get_fixture('test_read_request2.xml')
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '123')
self.assertEqual(r.actions[0].type, 'submit')
self.assertEqual(r.actions[0].src_project, 'xyz')
@@ -404,7 +405,7 @@ def test_read_request3(self):
"""
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '2')
self.assertEqual(r.actions[0].type, 'set_bugowner')
self.assertEqual(r.actions[0].tgt_project, 'foo')
@@ -442,14 +443,14 @@ def test_request_list_view1(self):
delete: deleteme
delete: foo/bar\n"""
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
self.assertEqual(exp, r.list_view())
def test_request_list_view2(self):
"""test the list_view method (with history elements and description)"""
xml = self._get_fixture('test_request_list_view2.xml')
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
exp = """\
21 State:accepted By:foobar When:2010-12-29T16:37:45
Created by: foobar
@@ -465,7 +466,7 @@ def test_request_str1(self):
xml = self._get_fixture('test_request_str1.xml')
r = osc.core.Request()
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
self.assertEqual(r.creator, 'creator')
exp = """\
Request: 123
@@ -510,7 +511,7 @@ def test_request_str2(self):
"""
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
self.assertEqual(r.creator, 'creator')
exp = """\
Request: 98765
@@ -538,7 +539,7 @@ def test_legacy_request(self):
"""
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
self.assertEqual(r.reqid, '1234')
self.assertEqual(r.actions[0].type, 'submit')
self.assertEqual(r.actions[0].src_project, 'foobar')
@@ -566,7 +567,7 @@ def test_get_actions(self):
"""test get_actions method"""
xml = self._get_fixture('test_request_list_view1.xml')
r = osc.core.Request()
- r.read(ET.fromstring(xml))
+ r.read(xml_fromstring(xml))
sr_actions = r.get_actions('submit')
self.assertTrue(len(sr_actions) == 2)
for i in sr_actions: