aboutsummaryrefslogtreecommitdiff
path: root/gitosis/serve.py
blob: 5c02437c8416bdb6a1fa244281427677800952d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
Enforce git-shell to only serve allowed by access control policy.
directory. The client should refer to them without any extra directory
prefix. Repository names are forced to match ALLOW_RE.
"""
 
import logging
 
import sysosre
 
from gitosis import access
from gitosis import configutil
from gitosis import repository
from gitosis import app
from gitosis import util
from gitosis import run_hook
 
ALLOW_RE = re.compile(
    "^'(?P<path>[a-zA-Z0-9][a-zA-Z0-9@._-]*(/[a-zA-Z0-9][a-zA-Z0-9@._-]*)*)'$"
    )
 
COMMANDS_READONLY = [ 
    'git-upload-pack', 
    ]
 
COMMANDS_WRITE = [ 
    'git-receive-pack', 
    ]
 
class ServingError(Exception):
    """Serving error"""
 
    def __str__(self):
        return '%s' % self.__doc__
 
class CommandMayNotContainNewlineError(ServingError):
    """Command may not contain newline"""
 
class UnknownCommandError(ServingError):
    """Unknown command denied"""
 
class UnsafeArgumentsError(ServingError):
    """Arguments to command look dangerous"""
 
class AccessDenied(ServingError):
    """Access denied to repository"""
 
class WriteAccessDenied(AccessDenied):
    """Repository write access denied"""
 
class ReadAccessDenied(AccessDenied):
    """Repository read access denied"""
 
def serve(cfg, user, command):
    """Check the git command for sanity, and then run the git command."""
        
    log = logging.getLogger('gitosis.serve.serve')
 
    if '\n' in command:
        raise CommandMayNotContainNewlineError()
 
    try:
        verbargs = command.split(None1)
    except ValueError:
        # all known commands take one argument; improve if/when needed 
        raise UnknownCommandError()
 
    if (verb not in COMMANDS_WRITE
        and verb not in COMMANDS_READONLY):
        raise UnknownCommandError()
        
    log.debug('Got command %(cmd)r and args %(args)r' % dict(
                cmd=verb,
                args=args,
                ))
 
    if args.startswith("'/") and args.endswith("'"):
        args = args[1:-1]
        repos = util.getRepositoryDir(cfg)
        reposreal = os.path.realpath(repos)
        if args.startswith(repos):
            args = os.path.realpath(args)[len(repos)+1:]
        elif args.startswith(reposreal):
            args = os.path.realpath(args)[len(reposreal)+1:]
        else:
            args = args[1:]
        args = "'%s'" % (args)
 
    match = ALLOW_RE.match(args)
    if match is None:
        raise UnsafeArgumentsError()
 
    path = match.group('path')
 
    # write access is always sufficient 
    newpath = access.haveAccess(
        config=cfg,
        user=user,
        mode='writable',
        path=path)
 
    if newpath is None:
        # didn't have write access 
 
        newpath = access.haveAccess(
            config=cfg,
            user=user,
            mode='readonly',
            path=path)
 
        if newpath is None:
            raise ReadAccessDenied()
 
        if verb in COMMANDS_WRITE:
            # didn't have write access and tried to write 
            raise WriteAccessDenied()
 
    (topdirrelpath) = newpath
    assert not relpath.endswith('.git')\ 
           'git extension should have been stripped: %r' % relpath
    repopath = '%s.git' % relpath
    fullpath = os.path.join(topdirrepopath)
    if (not os.path.exists(fullpath)
        and verb in COMMANDS_WRITE):
        # it doesn't exist on the filesystem, but the configuration 
        # refers to it, we're serving a write request, and the user is 
        # authorized to do that: create the repository on the fly 
 
        # create leading directories 
        path = topdir
        newdirmode = configutil.get_default(cfg'repo %s' % (relpath)'dirmode'None)
        if newdirmode is None:
                newdirmode = configutil.get_default(cfg'gitosis''dirmode''0750')
 
        # Convert string as octal to a number 
        newdirmode = int(newdirmode8)
 
        for segment in repopath.split(os.sep)[:-1]:
            path = os.path.join(pathsegment)
            util.mkdir(pathnewdirmode)
 
        repository.init(path=fullpath, mode=newdirmode)
        run_hook.build_reposistory_data(cfg)
 
    # put the verb back together with the new path 
    newcmd = "%(verb)s '%(path)s'" % dict(
        verb=verb,
        path=fullpath,
        )
    return newcmd
 
class Main(app.App):
    """gitosis-serve program."""
    # W0613 - They also might ignore arguments here, where the descendant 
    # methods won't. 
    # pylint: disable-msg=W0613 
 
    def create_parser(self):
        """Declare the input for this program."""
        parser = super(Mainself).create_parser()
        parser.set_usage('%prog [OPTS] USER')
        parser.set_description(
            'Allow restricted git operations under DIR')
        return parser
 
    def handle_args(self, parser, cfg, options, args): #pragma: no cover 
        """Parse the input for this program."""
        try:
            (user,) = args
        except ValueError:
            parser.error('Missing argument USER.')
 
        log = logging.getLogger('gitosis.serve.main')
        os.umask(0022)
 
        cmd = os.environ.get('SSH_ORIGINAL_COMMAND'None)
        if cmd is None:
            log.error('Need SSH_ORIGINAL_COMMAND in environment.')
            sys.exit(1)
 
        log.debug('Got command %(cmd)r' % dict(
            cmd=cmd,
            ))
 
        os.chdir(os.path.expanduser('~'))
 
        try:
            newcmd = serve(
                cfg=cfg,
                user=user,
                command=cmd,
                )
        except ServingErrorex:
            log.error('%s'ex)
            sys.exit(1)
 
        log.debug('Serving %s'newcmd)
        os.execvp('git-shell'['git-shell', '-c', newcmd])
        log.error('Cannot execute git-shell.')
        sys.exit(1)