# HG changeset patch # User David Douard # Date 1415009555 -3600 # Node ID e9ce904b62a998185441462978d32c059d054995 # Parent c359a85eef93b4c92419257fb996ae2787eff1b7 [test] add unit tests for ruleset We extract the rules building logic from readfile into a Ruleset.buildrules() method to ease testing. diff -r c359a85eef93 -r e9ce904b62a9 src/mercurialserver/ruleset.py --- a/src/mercurialserver/ruleset.py Wed Jul 23 14:31:59 2014 +0000 +++ b/src/mercurialserver/ruleset.py Mon Nov 03 11:12:35 2014 +0100 @@ -31,6 +31,7 @@ matchers = [(k, globmatcher(v)) for k, v in pairs] def c(kw): return min(rmatch(k, m, kw) for k, m in matchers) + c.patterns = [(k, m.pattern) for k, m in matchers] return c class Ruleset(object): @@ -66,19 +67,28 @@ def readfile(self, fn): f = open(fn) try: - for l in f: - l = l.strip() - if len(l) == 0 or l.startswith("#"): - continue - l = l.split() - # Unrecognized actions are off the high end - if l[0] in self.levels: - ix = self.levels.index(l[0]) - else: - ix = len(self.levels) - self.rules.append((ix, - rule([c.split("=", 1) for c in l[1:]]))) + self.buildrules(f) finally: f.close() + def buildrules(self, f): + """Build rules from f + + f shoud be iterable per line, each line is like: + + level [user=pattern] [repo=pattern] [file=pattern] [branch=pattern] + """ + for l in f: + l = l.strip() + if not l or l.startswith("#"): + continue + l = l.split() + # Unrecognized actions are off the high end + if l[0] in self.levels: + ix = self.levels.index(l[0]) + else: + ix = len(self.levels) + self.rules.append((ix, + rule([c.split("=", 1) for c in l[1:]]))) + rules = Ruleset() diff -r c359a85eef93 -r e9ce904b62a9 test/unittest_ruleset.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/unittest_ruleset.py Mon Nov 03 11:12:35 2014 +0100 @@ -0,0 +1,166 @@ +import os.path as osp +from unittest import TestCase + +from mercurialserver import ruleset + +class _RuleSetBaseTC(TestCase): + alllevels = ["init", "write", "read", "deny", "none"] + levels = alllevels[:-1] + def setUp(self): + self.rs = ruleset.Ruleset() + self.rs.buildrules(self.accessrules.splitlines()) + + def check_level(self, level, **kw): + idx = self.alllevels.index(level) + msg = ", ".join(["%s=%s"%item for item in self.rs.preset.items()]) + msg = msg + ": " + if idx > 0: + prevlevel = self.levels[idx-1] + self.assertFalse(self.rs.allow(prevlevel, **kw), msg+prevlevel) + if level != "none": + self.assertTrue(self.rs.allow(level, **kw), msg+level) + +class RuleSetDefaultTC(_RuleSetBaseTC): + accessrules = ''' +init user=root/** +deny repo=hgadmin +write user=users/** +''' + + def test_norules(self): + for level in self.levels: + self.assertFalse(self.rs.allow(level), level) + + def test_root(self): + self.rs.set(user='root/key') + for level in self.levels: + self.assertTrue(self.rs.allow(level), level) + + def test_user_norepo(self): + self.rs.set(user='user/key') + for level in self.levels: + self.assertFalse(self.rs.allow(level), level) + + def test_user(self): + self.rs.set(user='users/key') + self.rs.set(repo='some/repo') + self.check_level('write') + + def test_user_kwargs(self): + self.check_level('write', user='users/key', repo='some/repo') + +class RuleSet2TC(_RuleSetBaseTC): + accessrules = ''' +init user=root/** +deny repo=hgadmin +init user=users/toto/* repo=toto +write user=users/toto/* repo=pub/** +write user=users/w/* +write repo=allpub/** +read user=users/** +''' + + def test_hgadmin(self): + self.rs.set(repo='hgadmin') + self.check_level('deny', user='users/key') + self.check_level('deny', user='key') + + def test_user(self): + self.check_level('read', user='users/key', repo='some/repo') + + def test_repo(self): + self.check_level('init', user='users/toto/key', repo='toto') + + def test_write(self): + self.rs.set(repo='toto') + self.check_level('read', user='users/w') + self.check_level('write', user='users/w/key') + + self.rs.set(repo='pub/stuff') + self.check_level('read', user='users/w') + self.check_level('write', user='users/w/key') + self.check_level('read', user='users/toto') + self.check_level('write', user='users/toto/key') + + self.rs.set(repo='other/repo') + self.check_level('read', user='users/toto') + self.check_level('read', user='users/toto/key') + self.check_level('read', user='users/w') + self.check_level('write', user='users/w/key') + + self.rs.set(repo='allpub/repo') + self.check_level('write', user='users/toto') + self.check_level('write', user='users/toto/key') + self.check_level('write', user='users/w') + self.check_level('write', user='users/w/key') + + self.rs.set(repo='hgadmin') + self.check_level('deny', user='users/toto') + self.check_level('deny', user='users/toto/key') + self.check_level('deny', user='users/w') + self.check_level('deny', user='users/w/key') + + def test_init(self): + self.rs.set(repo='toto') + self.check_level('read', user='users/toto') + self.check_level('init', user='users/toto/key') + +class RuleSet3TC(_RuleSetBaseTC): + accessrules = ''' +read user=users/w/* repo=toto +deny user=users/w/* repo=no +write user=users/w/* +read user=users/** +''' + + def test_user_w(self): + self.rs.set(user='users/w/key') + self.check_level('read', repo='toto') + self.check_level('deny', repo='no') + self.check_level('write', repo='other') + + def test_user_k(self): + self.rs.set(user='users/k/key') + self.check_level('read', repo='toto') + self.check_level('read', repo='no') + self.check_level('read', repo='other') + + def test_otheruser(self): + self.rs.set(user='jay/key') + self.check_level('none', repo='toto') + self.check_level('none', repo='no') + self.check_level('none', repo='other') + +class RuleSet4TC(_RuleSetBaseTC): + accessrules = ''' +read user=users/w/* repo=toto +write user=users/w/* +deny user=users/w/* repo=no +read user=users/** +''' + + def test_user_w(self): + self.rs.set(user='users/w/key') + self.check_level('read', repo='toto') + # deny has no effect here, write match first + self.check_level('write', repo='no') + self.check_level('write', repo='other') + +class RuleSet5TC(_RuleSetBaseTC): + accessrules = ''' +read user=users/w/* repo=toto +deny user=users/w/* repo=no +write user=users/w/* +read user=users/** +''' + + def test_user_w(self): + self.rs.set(user='users/w/key') + self.check_level('read', repo='toto') + # deny takes effect here + self.check_level('deny', repo='no') + self.check_level('write', repo='other') + +if __name__ == '__main__': + from unittest import main + main()