#!/usr/bin/env python
#
# Copyright 2008-2009 LShift Ltd
# Copyright 2005-2007 by Intevation GmbH <intevation@intevation.de>
# Authors:
# Paul Crowley <paul@lshift.net>
# Thomas Arendsen Hein <thomas@intevation.de>
# with ideas from Mathieu PASQUET <kiorky@cryptelium.net>
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, incorporated herein by reference.
"""
hg-ssh - limit access to hg repositories reached via ssh. Part of
mercurial-server.
It is called by ssh due to an entry in the authorized_keys file,
with the name for the key passed on the command line.
It uses SSH_ORIGINAL_COMMAND to determine what the user was trying to
do and to what repository, and then checks each rule in the rule file
in turn for a matching rule which decides what to do, defaulting to
disallowing the action.
"""
# enable importing on demand to reduce startup time
from mercurial import demandimport; demandimport.enable()
from mercurial import dispatch
import sys, os, os.path
import base64
from mercurialserver import config, ruleset
def fail(message):
sys.stderr.write("mercurial-server: %s\n" % message)
sys.exit(-1)
def checkDots(path):
head, tail = os.path.split(path)
if tail.startswith("."):
fail("paths cannot contain dot file components")
if head:
checkDots(head)
def checkParents(path):
path = os.path.dirname(path)
if path == "":
return
if os.path.exists(path + "/.hg"):
fail("Cannot create repo under existing repo")
checkParents(path)
def getrepo(op, repo):
# First canonicalise, then check the string, then the rules
# and finally the filesystem.
repo = repo.rstrip("/")
if len(repo) == 0:
fail("path to repository seems to be empty")
if repo.startswith("/"):
fail("absolute paths are not supported")
checkDots(repo)
ruleset.rules.set(repo=repo)
if not ruleset.rules.allow(op, branch=None, file=None):
fail("access denied")
checkParents(repo)
return repo
config.setExePath()
if len(sys.argv) == 3 and sys.argv[1] == "--base64":
ruleset.rules.set(user = base64.b64decode(sys.argv[2]))
elif len(sys.argv) == 2:
ruleset.rules.set(user = sys.argv[1])
else:
fail("hg-ssh wrongly called, is authorized_keys corrupt? (%s)"
% sys.argv)
for k,v in config.getEnv().iteritems():
os.environ[k] = v
os.chdir(config.getReposPath())
for f in config.getAccessPaths():
if os.path.isfile(f):
ruleset.rules.readfile(f)
cmd = os.environ.get('SSH_ORIGINAL_COMMAND', None)
if cmd is None:
fail("direct logins on the hg account prohibited")
elif cmd.startswith('hg -R ') and cmd.endswith(' serve --stdio'):
repo = getrepo("read", cmd[6:-14])
if not os.path.isdir(repo + "/.hg"):
fail("no such repository %s" % repo)
dispatch.dispatch(['-R', repo, 'serve', '--stdio'])
elif cmd.startswith('hg init '):
repo = getrepo("init", cmd[8:])
if os.path.exists(repo):
fail("%s exists" % repo)
d = os.path.dirname(repo)
if d != "" and not os.path.isdir(d):
os.makedirs(d)
dispatch.dispatch(['init', repo])
else:
fail("illegal command %r" % cmd)