Started work on acl.py replacement - currently broken.
# Copyright 2008 Paul Crowley <paul@lshift.net>
# Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, incorporated herein by reference.
from mercurial.i18n import _
from mercurial.node import *
from mercurial import util
import os
import re
allowedchars = "A-Za-z0-9_-"
goodglobre = re.compile("[*/%s]+$" % allowedchars)
# Don't put anything except *A-Za-z0-9_- in rule globs or
# it will match nothing. No regexp metachars, not even .
# We may fix this later.
def globmatcher(pattern):
if not goodglobre.match(pattern):
#fail("Bad glob pattern in auth config: %s" % pattern)
# FIXME: report it somehow
return lambda x: False
pattern = pattern.replace("*", "[]")
pattern = pattern.replace("[][]", "[/%s]*" % allowedchars)
pattern = pattern.replace("[]", "[%s]*" % allowedchars)
rex = re.compile(pattern + "$")
return lambda x: rex.match(x) is not None
def rule(pairs):
matchers = [(k, globmatcher(v)) for k, v in pairs]
def c(**kw):
for k, m in matchers:
if k not in kw or not m(kw[k]):
return False
return True
return c
class Rulefile(object):
'''Class representing the rules in a rule file'''
self.levels = ["init", "write", "read", "deny"]
def __init__(self):
self.rules = []
def add(self, action, conditions):
self.rules.append((action, conditions))
def matchrule(self, **kw):
for a, c in self.rules:
if c(**kw):
return a
return None
def allow(self, level, **kw):
a = matchrule(self, **kw)
return a in self.levels and self.levels.index(a) <= self.levels.index(level)
def read_rules(fn):
res = Rulefile()
f = open(fn)
try:
for l in f:
l = l.strip()
if len(l) == 0 or l.startswith["#"]:
continue
res.add(l[0], rule([c.split("=", 1) for c in l[1:]]))
finally:
f.close()
class Checker(object):
'''acl checker.'''
def getuser(self):
'''return name of hg-ssh user'''
return os.environ['REMOTE_USER']
def __init__(self, ui, repo):
self.ui = ui
self.repo = repo
self.rules = read_rules(os.environ['HG_ACCESS_RULES_FILE'])
def check(self, node):
'''return if access allowed, raise exception if not.'''
files = self.repo.changectx(node).files()
for f in files:
if not self.rules.allow("write", user=self.user, ):
self.ui.debug(_('%s: user %s not allowed on %s\n') %
(__name__, self.getuser(), f))
raise util.Abort(_('%s: access denied for changeset %s') %
(__name__, short(node)))
self.ui.debug(_('%s: allowing changeset %s\n') % (__name__, short(node)))
def hook(ui, repo, hooktype, node=None, source=None, **kwargs):
if hooktype != 'pretxnchangegroup':
raise util.Abort(_('config error - hook type "%s" cannot stop '
'incoming changesets') % hooktype)
c = checker(ui, repo)
start = repo.changelog.rev(bin(node))
end = repo.changelog.count()
for rev in xrange(start, end):
c.check(repo.changelog.node(rev))