test/unittest_ruleset.py
author Cédric Krier <ced@b2ck.com>
Mon, 11 Sep 2017 21:38:10 +0200
changeset 375 a41e4382ea6e
parent 372 80f78674c56e
permissions -rw-r--r--
Use repo vfs to join the server log The method join on repo has been removed in changeset edb7f628ef8b of mercurial.

import os.path as osp
from unittest import TestCase

from mercurialserver import ruleset

class _RuleSetBaseTC(TestCase):
    alllevels = ["init", "publish", "write", "read", "deny", "none"]
    levels = alllevels[:-1]
    def setUp(self):
        self.rs = ruleset.Ruleset()
        self.rs.buildrules(self.accessrules.splitlines())

    def check_level(self, level, **kw):
        idx = self.alllevels.index(level)
        msg = ", ".join(["%s=%s"%item for item in self.rs.preset.items()])
        msg = msg + ": "
        if idx > 0:
            prevlevel = self.levels[idx-1]
            self.assertFalse(self.rs.allow(prevlevel, **kw), msg+prevlevel)
        if level != "none":
            self.assertTrue(self.rs.allow(level, **kw), msg+level)

class RuleSetDefaultTC(_RuleSetBaseTC):
    accessrules = '''
init user=root/**
deny repo=hgadmin
write user=users/**
'''

    def test_norules(self):
        for level in self.levels:
            self.assertFalse(self.rs.allow(level), level)

    def test_root(self):
        self.rs.set(user='root/key')
        for level in self.levels:
            self.assertTrue(self.rs.allow(level), level)

    def test_user_norepo(self):
        self.rs.set(user='user/key')
        for level in self.levels:
            self.assertFalse(self.rs.allow(level), level)

    def test_user(self):
        self.rs.set(user='users/key')
        self.rs.set(repo='some/repo')
        self.check_level('write')

    def test_user_kwargs(self):
        self.check_level('write', user='users/key', repo='some/repo')

class RuleSet2TC(_RuleSetBaseTC):
    accessrules = '''
init user=root/**
deny repo=hgadmin
init user=users/toto/* repo=toto
write user=users/toto/* repo=pub/**
write user=users/w/*
write repo=allpub/**
read user=users/**
'''

    def test_hgadmin(self):
        self.rs.set(repo='hgadmin')
        self.check_level('deny', user='users/key')
        self.check_level('deny', user='key')

    def test_user(self):
        self.check_level('read', user='users/key', repo='some/repo')

    def test_repo(self):
        self.check_level('init', user='users/toto/key', repo='toto')

    def test_write(self):
        self.rs.set(repo='toto')
        self.check_level('read', user='users/w')
        self.check_level('write', user='users/w/key')

        self.rs.set(repo='pub/stuff')
        self.check_level('read', user='users/w')
        self.check_level('write', user='users/w/key')
        self.check_level('read', user='users/toto')
        self.check_level('write', user='users/toto/key')

        self.rs.set(repo='other/repo')
        self.check_level('read', user='users/toto')
        self.check_level('read', user='users/toto/key')
        self.check_level('read', user='users/w')
        self.check_level('write', user='users/w/key')

        self.rs.set(repo='allpub/repo')
        self.check_level('write', user='users/toto')
        self.check_level('write', user='users/toto/key')
        self.check_level('write', user='users/w')
        self.check_level('write', user='users/w/key')

        self.rs.set(repo='hgadmin')
        self.check_level('deny', user='users/toto')
        self.check_level('deny', user='users/toto/key')
        self.check_level('deny', user='users/w')
        self.check_level('deny', user='users/w/key')

    def test_init(self):
        self.rs.set(repo='toto')
        self.check_level('read', user='users/toto')
        self.check_level('init', user='users/toto/key')

class RuleSet3TC(_RuleSetBaseTC):
    accessrules = '''
read  user=users/w/* repo=toto
deny  user=users/w/* repo=no
write user=users/w/*
read  user=users/**
'''

    def test_user_w(self):
        self.rs.set(user='users/w/key')
        self.check_level('read', repo='toto')
        self.check_level('deny', repo='no')
        self.check_level('write', repo='other')

    def test_user_k(self):
        self.rs.set(user='users/k/key')
        self.check_level('read', repo='toto')
        self.check_level('read', repo='no')
        self.check_level('read', repo='other')

    def test_otheruser(self):
        self.rs.set(user='jay/key')
        self.check_level('none', repo='toto')
        self.check_level('none', repo='no')
        self.check_level('none', repo='other')

class RuleSet4TC(_RuleSetBaseTC):
    accessrules = '''
read  user=users/w/* repo=toto
write user=users/w/*
deny  user=users/w/* repo=no
read  user=users/**
'''

    def test_user_w(self):
        self.rs.set(user='users/w/key')
        self.check_level('read', repo='toto')
        # deny has no effect here, write match first
        self.check_level('write', repo='no')
        self.check_level('write', repo='other')

class RuleSet5TC(_RuleSetBaseTC):
    accessrules = '''
read  user=users/w/* repo=toto
deny  user=users/w/* repo=no
write user=users/w/*
read  user=users/**
'''

    def test_user_w(self):
        self.rs.set(user='users/w/key')
        self.check_level('read', repo='toto')
        # deny takes effect here
        self.check_level('deny', repo='no')
        self.check_level('write', repo='other')

class RuleSetPublishTC(_RuleSetBaseTC):
    accessrules = '''
init user=root/**
deny repo=hgadmin
init user=users/toto/* repo=toto
publish user=users/toto/* repo=pub/**
publish repo=allpub/**
write user=users/w/*
read user=users/**
'''
    def test_publish(self):
        self.rs.set(user='users/w/key')
        self.check_level('publish', repo='allpub/stuff')
        self.check_level('write', repo='toto')
        self.check_level('write', repo='other/stuff')

        self.rs.set(repo='pub/stuff')
        self.check_level('write', user='users/w/key')
        self.check_level('publish', user='users/toto/key')


if __name__ == '__main__':
    from unittest import main
    main()