Use repo vfs to join the server log
The method join on repo has been removed in changeset edb7f628ef8b of
mercurial.
import os.path as osp
from unittest import TestCase
from mercurialserver import ruleset
class _RuleSetBaseTC(TestCase):
alllevels = ["init", "publish", "write", "read", "deny", "none"]
levels = alllevels[:-1]
def setUp(self):
self.rs = ruleset.Ruleset()
self.rs.buildrules(self.accessrules.splitlines())
def check_level(self, level, **kw):
idx = self.alllevels.index(level)
msg = ", ".join(["%s=%s"%item for item in self.rs.preset.items()])
msg = msg + ": "
if idx > 0:
prevlevel = self.levels[idx-1]
self.assertFalse(self.rs.allow(prevlevel, **kw), msg+prevlevel)
if level != "none":
self.assertTrue(self.rs.allow(level, **kw), msg+level)
class RuleSetDefaultTC(_RuleSetBaseTC):
accessrules = '''
init user=root/**
deny repo=hgadmin
write user=users/**
'''
def test_norules(self):
for level in self.levels:
self.assertFalse(self.rs.allow(level), level)
def test_root(self):
self.rs.set(user='root/key')
for level in self.levels:
self.assertTrue(self.rs.allow(level), level)
def test_user_norepo(self):
self.rs.set(user='user/key')
for level in self.levels:
self.assertFalse(self.rs.allow(level), level)
def test_user(self):
self.rs.set(user='users/key')
self.rs.set(repo='some/repo')
self.check_level('write')
def test_user_kwargs(self):
self.check_level('write', user='users/key', repo='some/repo')
class RuleSet2TC(_RuleSetBaseTC):
accessrules = '''
init user=root/**
deny repo=hgadmin
init user=users/toto/* repo=toto
write user=users/toto/* repo=pub/**
write user=users/w/*
write repo=allpub/**
read user=users/**
'''
def test_hgadmin(self):
self.rs.set(repo='hgadmin')
self.check_level('deny', user='users/key')
self.check_level('deny', user='key')
def test_user(self):
self.check_level('read', user='users/key', repo='some/repo')
def test_repo(self):
self.check_level('init', user='users/toto/key', repo='toto')
def test_write(self):
self.rs.set(repo='toto')
self.check_level('read', user='users/w')
self.check_level('write', user='users/w/key')
self.rs.set(repo='pub/stuff')
self.check_level('read', user='users/w')
self.check_level('write', user='users/w/key')
self.check_level('read', user='users/toto')
self.check_level('write', user='users/toto/key')
self.rs.set(repo='other/repo')
self.check_level('read', user='users/toto')
self.check_level('read', user='users/toto/key')
self.check_level('read', user='users/w')
self.check_level('write', user='users/w/key')
self.rs.set(repo='allpub/repo')
self.check_level('write', user='users/toto')
self.check_level('write', user='users/toto/key')
self.check_level('write', user='users/w')
self.check_level('write', user='users/w/key')
self.rs.set(repo='hgadmin')
self.check_level('deny', user='users/toto')
self.check_level('deny', user='users/toto/key')
self.check_level('deny', user='users/w')
self.check_level('deny', user='users/w/key')
def test_init(self):
self.rs.set(repo='toto')
self.check_level('read', user='users/toto')
self.check_level('init', user='users/toto/key')
class RuleSet3TC(_RuleSetBaseTC):
accessrules = '''
read user=users/w/* repo=toto
deny user=users/w/* repo=no
write user=users/w/*
read user=users/**
'''
def test_user_w(self):
self.rs.set(user='users/w/key')
self.check_level('read', repo='toto')
self.check_level('deny', repo='no')
self.check_level('write', repo='other')
def test_user_k(self):
self.rs.set(user='users/k/key')
self.check_level('read', repo='toto')
self.check_level('read', repo='no')
self.check_level('read', repo='other')
def test_otheruser(self):
self.rs.set(user='jay/key')
self.check_level('none', repo='toto')
self.check_level('none', repo='no')
self.check_level('none', repo='other')
class RuleSet4TC(_RuleSetBaseTC):
accessrules = '''
read user=users/w/* repo=toto
write user=users/w/*
deny user=users/w/* repo=no
read user=users/**
'''
def test_user_w(self):
self.rs.set(user='users/w/key')
self.check_level('read', repo='toto')
# deny has no effect here, write match first
self.check_level('write', repo='no')
self.check_level('write', repo='other')
class RuleSet5TC(_RuleSetBaseTC):
accessrules = '''
read user=users/w/* repo=toto
deny user=users/w/* repo=no
write user=users/w/*
read user=users/**
'''
def test_user_w(self):
self.rs.set(user='users/w/key')
self.check_level('read', repo='toto')
# deny takes effect here
self.check_level('deny', repo='no')
self.check_level('write', repo='other')
class RuleSetPublishTC(_RuleSetBaseTC):
accessrules = '''
init user=root/**
deny repo=hgadmin
init user=users/toto/* repo=toto
publish user=users/toto/* repo=pub/**
publish repo=allpub/**
write user=users/w/*
read user=users/**
'''
def test_publish(self):
self.rs.set(user='users/w/key')
self.check_level('publish', repo='allpub/stuff')
self.check_level('write', repo='toto')
self.check_level('write', repo='other/stuff')
self.rs.set(repo='pub/stuff')
self.check_level('write', user='users/w/key')
self.check_level('publish', user='users/toto/key')
if __name__ == '__main__':
from unittest import main
main()