"""
Enforce git-shell to only serve allowed by access control policy.
directory. The client should refer to them without any extra directory
prefix. Repository names are forced to match ALLOW_RE.
"""
import logging
import sys, os, re
from gitosis import access
from gitosis import configutil
from gitosis import repository
from gitosis import app
from gitosis import util
from gitosis import run_hook
ALLOW_RE = re.compile(
"^'(?P<path>[a-zA-Z0-9][a-zA-Z0-9@._-]*(/[a-zA-Z0-9][a-zA-Z0-9@._-]*)*)'$"
)
COMMANDS_READONLY = [
'git-upload-pack',
]
COMMANDS_WRITE = [
'git-receive-pack',
]
class ServingError(Exception):
"""Serving error"""
def __str__(self):
return '%s' % self.__doc__
class CommandMayNotContainNewlineError(ServingError):
"""Command may not contain newline"""
class UnknownCommandError(ServingError):
"""Unknown command denied"""
class UnsafeArgumentsError(ServingError):
"""Arguments to command look dangerous"""
class AccessDenied(ServingError):
"""Access denied to repository"""
class WriteAccessDenied(AccessDenied):
"""Repository write access denied"""
class ReadAccessDenied(AccessDenied):
"""Repository read access denied"""
def serve(cfg, user, command):
"""Check the git command for sanity, and then run the git command."""
log = logging.getLogger('gitosis.serve.serve')
if '\n' in command:
raise CommandMayNotContainNewlineError()
try:
verb, args = command.split(None, 1)
except ValueError:
raise UnknownCommandError()
if (verb not in COMMANDS_WRITE
and verb not in COMMANDS_READONLY):
raise UnknownCommandError()
log.debug('Got command %(cmd)r and args %(args)r' % dict(
cmd=verb,
args=args,
))
if args.startswith("'/") and args.endswith("'"):
args = args[1:-1]
repos = util.getRepositoryDir(cfg)
reposreal = os.path.realpath(repos)
if args.startswith(repos):
args = os.path.realpath(args)[len(repos)+1:]
elif args.startswith(reposreal):
args = os.path.realpath(args)[len(reposreal)+1:]
else:
args = args[1:]
args = "'%s'" % (args, )
match = ALLOW_RE.match(args)
if match is None:
raise UnsafeArgumentsError()
path = match.group('path')
newpath = access.haveAccess(
config=cfg,
user=user,
mode='writable',
path=path)
if newpath is None:
newpath = access.haveAccess(
config=cfg,
user=user,
mode='readonly',
path=path)
if newpath is None:
raise ReadAccessDenied()
if verb in COMMANDS_WRITE:
raise WriteAccessDenied()
(topdir, relpath) = newpath
assert not relpath.endswith('.git'), \
'git extension should have been stripped: %r' % relpath
repopath = '%s.git' % relpath
fullpath = os.path.join(topdir, repopath)
if (not os.path.exists(fullpath)
and verb in COMMANDS_WRITE):
path = topdir
newdirmode = configutil.get_default(cfg, 'repo %s' % (relpath, ), 'dirmode', None)
if newdirmode is None:
newdirmode = configutil.get_default(cfg, 'gitosis', 'dirmode', '0750')
newdirmode = int(newdirmode, 8)
for segment in repopath.split(os.sep)[:-1]:
path = os.path.join(path, segment)
util.mkdir(path, newdirmode)
repository.init(path=fullpath, mode=newdirmode)
run_hook.build_reposistory_data(cfg)
newcmd = "%(verb)s '%(path)s'" % dict(
verb=verb,
path=fullpath,
)
return newcmd
class Main(app.App):
"""gitosis-serve program."""
def create_parser(self):
"""Declare the input for this program."""
parser = super(Main, self).create_parser()
parser.set_usage('%prog [OPTS] USER')
parser.set_description(
'Allow restricted git operations under DIR')
return parser
def handle_args(self, parser, cfg, options, args):
"""Parse the input for this program."""
try:
(user,) = args
except ValueError:
parser.error('Missing argument USER.')
log = logging.getLogger('gitosis.serve.main')
os.umask(0022)
cmd = os.environ.get('SSH_ORIGINAL_COMMAND', None)
if cmd is None:
log.error('Need SSH_ORIGINAL_COMMAND in environment.')
sys.exit(1)
log.debug('Got command %(cmd)r' % dict(
cmd=cmd,
))
os.chdir(os.path.expanduser('~'))
try:
newcmd = serve(
cfg=cfg,
user=user,
command=cmd,
)
except ServingError, ex:
log.error('%s', ex)
sys.exit(1)
log.debug('Serving %s', newcmd)
os.execvp('git-shell', ['git-shell', '-c', newcmd])
log.error('Cannot execute git-shell.')
sys.exit(1)