src/hg-ssh
author Paul Crowley <paul@lshift.net>
Wed, 07 Sep 2011 11:05:13 +0100 (2011-09-07)
changeset 327 6f2a3f251d3b
parent 306 f832a8aeef44
child 374 7a1d6b228af6
permissions -rwxr-xr-x
Copy in and use environment vars to control test
#!/usr/bin/env python

"""
hg-ssh - limit access to hg repositories reached via ssh.  Part of
mercurial-server.

It is called by ssh due to an entry in the authorized_keys file,
with the name for the key passed on the command line.

It uses SSH_ORIGINAL_COMMAND to determine what the user was trying to
do and to what repository, and then checks each rule in the rule file
in turn for a matching rule which decides what to do, defaulting to
disallowing the action.

"""

# enable importing on demand to reduce startup time
from mercurial import demandimport; demandimport.enable()

from mercurial import dispatch

try:
    request = dispatch.request
except AttributeError:
    request = list

import sys, os, os.path
import base64
from mercurialserver import config, ruleset

def fail(message):
    sys.stderr.write("mercurial-server: %s\n" % message)
    sys.exit(-1)

config.initExe()

for k,v in config.getEnv():
    os.environ[k.upper()] = v

if len(sys.argv) == 3 and sys.argv[1] == "--base64":
    ruleset.rules.set(user = base64.b64decode(sys.argv[2]))
elif len(sys.argv) == 2:
    ruleset.rules.set(user = sys.argv[1])
else:
    fail("hg-ssh wrongly called, is authorized_keys corrupt? (%s)"
        % sys.argv)

os.chdir(config.getReposPath())

for f in config.getAccessPaths():
    if os.path.isfile(f):
        ruleset.rules.readfile(f)

alloweddots = config.getAllowedDots()

def dotException(pathtail):
    for ex in alloweddots:
        splex = ex.split("/")
        if len(pathtail) >= len(splex) and pathtail[:len(splex)] == splex:
            return True
    return False

def checkDots(path, pathtail = []):
    head, tail = os.path.split(path)
    pathtail = [tail] + pathtail
    if tail.startswith(".") and not dotException(pathtail):
            fail("paths cannot contain dot file components")
    if head:
        checkDots(head, pathtail)

def getrepo(op, repo):
    # First canonicalise, then check the string, then the rules
    repo = repo.strip().rstrip("/")
    if len(repo) == 0:
        fail("path to repository seems to be empty")
    if repo.startswith("/"):
        fail("absolute paths are not supported")
    checkDots(repo)
    ruleset.rules.set(repo=repo)
    if not ruleset.rules.allow(op, branch=None, file=None):
        fail("access denied")
    return repo

cmd = os.environ.get('SSH_ORIGINAL_COMMAND', None)
if cmd is None:
    fail("direct logins on the hg account prohibited")
elif cmd.startswith('hg -R ') and cmd.endswith(' serve --stdio'):
    repo = getrepo("read", cmd[6:-14])
    if not os.path.isdir(repo + "/.hg"):
        fail("no such repository %s" % repo)
    dispatch.dispatch(request(['-R', repo, 'serve', '--stdio']))
elif cmd.startswith('hg init '):
    repo = getrepo("init", cmd[8:])
    if os.path.exists(repo):
        fail("%s exists" % repo)
    d = os.path.dirname(repo)
    if d != "" and not os.path.isdir(d):
        os.makedirs(d)
    dispatch.dispatch(request(['init', repo]))
else:
    fail("illegal command %r" % cmd)