#!/usr/bin/env python"""hg-ssh - limit access to hg repositories reached via ssh. Part ofmercurial-server.It is called by ssh due to an entry in the authorized_keys file,with the name for the key passed on the command line.It uses SSH_ORIGINAL_COMMAND to determine what the user was trying todo and to what repository, and then checks each rule in the rule filein turn for a matching rule which decides what to do, defaulting todisallowing the action."""# enable importing on demand to reduce startup timefrom mercurial import demandimport; demandimport.enable()from mercurial import dispatchtry: request = dispatch.requestexcept AttributeError: request = listimport sys, os, os.pathimport base64from mercurialserver import config, rulesetdef fail(message): sys.stderr.write("mercurial-server: %s\n" % message) sys.exit(-1)config.initExe()for k,v in config.getEnv(): os.environ[k.upper()] = vif len(sys.argv) == 3 and sys.argv[1] == "--base64": ruleset.rules.set(user = base64.b64decode(sys.argv[2]))elif len(sys.argv) == 2: ruleset.rules.set(user = sys.argv[1])else: fail("hg-ssh wrongly called, is authorized_keys corrupt? (%s)" % sys.argv)os.chdir(config.getReposPath())for f in config.getAccessPaths(): if os.path.isfile(f): ruleset.rules.readfile(f)alloweddots = config.getAllowedDots()def dotException(pathtail): for ex in alloweddots: splex = ex.split("/") if len(pathtail) >= len(splex) and pathtail[:len(splex)] == splex: return True return Falsedef checkDots(path, pathtail = []): head, tail = os.path.split(path) pathtail = [tail] + pathtail if tail.startswith(".") and not dotException(pathtail): fail("paths cannot contain dot file components") if head: checkDots(head, pathtail)def getrepo(op, repo): # First canonicalise, then check the string, then the rules repo = repo.strip().rstrip("/") if len(repo) == 0: fail("path to repository seems to be empty") if repo.startswith("/"): fail("absolute paths are not supported") checkDots(repo) ruleset.rules.set(repo=repo) if not ruleset.rules.allow(op, branch=None, file=None): fail("access denied") return repocmd = os.environ.get('SSH_ORIGINAL_COMMAND', None)if cmd is None: fail("direct logins on the hg account prohibited")elif cmd.startswith('hg -R ') and cmd.endswith(' serve --stdio'): repo = getrepo("read", cmd[6:-14]) if not os.path.isdir(repo + "/.hg"): fail("no such repository %s" % repo) dispatch.dispatch(request(['-R', repo, 'serve', '--stdio']))elif cmd.startswith('hg init '): repo = getrepo("init", cmd[8:]) if os.path.exists(repo): fail("%s exists" % repo) d = os.path.dirname(repo) if d != "" and not os.path.isdir(d): os.makedirs(d) dispatch.dispatch(request(['init', repo]))else: fail("illegal command %r" % cmd)