forked from IdentityPython/pyFF
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_mdsl.py
More file actions
124 lines (104 loc) · 4.04 KB
/
test_mdsl.py
File metadata and controls
124 lines (104 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import json
import os
import shutil
import sys
import tempfile
import pytest
import six
import yaml
from mako.lookup import TemplateLookup
from mock import patch
from pyff import builtins
from pyff.exceptions import MetadataException
from pyff.parse import ParserException
from pyff.pipes import PipeException, Plumbing, plumbing
from pyff.repo import MDRepository
from pyff.resource import ResourceException
from pyff.test import ExitException, SignerTestCase
from pyff.utils import hash_id, parse_xml, resource_filename, root, dumptree
from pyff.constants import NS
__author__ = 'leifj'
# The 'builtins' import appears unused to static analysers, ensure it isn't removed
assert builtins is not None
class PipeLineTest(SignerTestCase):
@pytest.fixture(autouse=True)
def _capsys(self, capsys):
self._capsys = capsys
@property
def captured_stdout(self) -> str:
""" Return anything written to STDOUT during this test """
out, _err = self._capsys.readouterr() # type: ignore
return out
@property
def captured_stderr(self) -> str:
""" Return anything written to STDERR during this test """
_out, err = self._capsys.readouterr() # type: ignore
return err
@pytest.fixture(autouse=True)
def _caplog(self, caplog):
""" Return anything written to the logging system during this test """
self._caplog = caplog
@property
def captured_log_text(self) -> str:
return self._caplog.text # type: ignore
def run_pipeline(self, pl_name, ctx=None, md=None):
if ctx is None:
ctx = dict()
if md is None:
md = MDRepository()
templates = TemplateLookup(directories=[os.path.join(self.datadir, 'simple-pipeline')])
pipeline = tempfile.NamedTemporaryFile('w').name
template = templates.get_template(pl_name)
with open(pipeline, "w") as fd:
fd.write(template.render(ctx=ctx))
res = plumbing(pipeline).process(md, state={'batch': True, 'stats': {}})
os.unlink(pipeline)
return res, md, ctx
def exec_pipeline(self, pstr):
md = MDRepository()
p = yaml.safe_load(six.StringIO(pstr))
print("\n{}".format(yaml.dump(p)))
pl = Plumbing(p, pid="test")
res = pl.process(md, state={'batch': True, 'stats': {}})
return res, md
@classmethod
def setUpClass(cls):
SignerTestCase.setUpClass()
def setUp(self):
SignerTestCase.setUpClass()
self.templates = TemplateLookup(directories=[os.path.join(self.datadir, 'simple-pipeline')])
class ParseTest(PipeLineTest):
def test_eidas_country(self):
tmpfile = tempfile.NamedTemporaryFile('w').name
try:
self.exec_pipeline(f"""
- when eidas:
- xslt:
stylesheet: eidas-cleanup.xsl
- break
- load:
- file://{self.datadir}/eidas/eidas.xml cleanup eidas
- select
- publish: {tmpfile}
"""
)
xml = parse_xml(tmpfile)
assert xml is not None
entityID = "https://pre.eidas.gov.gr/EidasNode/ServiceMetadata"
with_hide_from_discovery = xml.find("{%s}EntityDescriptor[@entityID='%s']" % (NS['md'], entityID))
assert with_hide_from_discovery is not None
search = "{%s}Extensions/{%s}EntityAttributes/{%s}Attribute[@Name='%s']" % (NS['md'], NS['mdattr'], NS['saml'],'http://macedir.org/entity-category')
ecs = with_hide_from_discovery.find(search)
assert ecs is not None
entityID2 = "https://eidas.pp.dev-franceconnect.fr/EidasNode/ServiceMetadata"
without_hide_from_discovery = xml.find("{%s}EntityDescriptor[@entityID='%s']" % (NS['md'], entityID2))
ecs2 = without_hide_from_discovery.find(search)
assert ecs2 is None
except IOError:
pass
finally:
try:
#os.unlink(tmpfile)
pass
except (IOError, OSError):
pass