from nose.tools import eq_ as eq, assert_raises
import os
from cStringIO import StringIO
from gitosis import ssh
from gitosis.test.util import mkdir, maketemp, writeFile, readFile
def _key(s):
return ''.join(s.split('\n')).strip()
KEY_1 = _key("""
ssh-rsa +v5XLsUrLsHOKy7Stob1lHZM17YCCNXplcKfbpIztS2PujyixOaBev1ku6H6ny
gUXfuYVzY+PmfTLviSwD3UETxEkR/jlBURACDQARJdUxpgt9XG2Lbs8bhOjonAPapxrH0o
9O8R0Y6Pm1Vh+H2U0B4UBhPgEframpeJYedijBxBV5aq3yUvHkXpcjM/P0gsKqr036k= j
unk@gunk
""")
KEY_2 = _key("""
ssh-rsa 4BX2TxZoD3Og2zNjHwaMhVEa5/NLnPcw+Z02TDR0IGJrrqXk7YlfR3oz+Wb/Eb
Ctli20SoWY0Ur8kBEF/xR4hRslZ2U8t0PAJhr8cq5mifhok/gAdckmSzjD67QJ68uZbga8
ZwIAo7y/BU7cD3Y9UdVZykG34NiijHZLlCBo/TnobXjFIPXvFbfgQ3y8g+akwocFVcQ= f
roop@snoop
""")
class ReadKeys_Test(object):
def test_empty(self):
tmp = maketemp()
empty = os.path.join(tmp, 'empty')
mkdir(empty)
gen = ssh.readKeys(keydir=empty)
assert_raises(StopIteration, gen.next)
def test_ignore_dot(self):
tmp = maketemp()
keydir = os.path.join(tmp, 'ignore_dot')
mkdir(keydir)
writeFile(os.path.join(keydir, '.jdoe.pub'), KEY_1+'\n')
gen = ssh.readKeys(keydir=keydir)
assert_raises(StopIteration, gen.next)
def test_ignore_nonpub(self):
tmp = maketemp()
keydir = os.path.join(tmp, 'ignore_dot')
mkdir(keydir)
writeFile(os.path.join(keydir, 'jdoe.xub'), KEY_1+'\n')
gen = ssh.readKeys(keydir=keydir)
assert_raises(StopIteration, gen.next)
def test_one(self):
tmp = maketemp()
keydir = os.path.join(tmp, 'one')
mkdir(keydir)
writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n')
gen = ssh.readKeys(keydir=keydir)
eq(gen.next(), ('jdoe', KEY_1))
assert_raises(StopIteration, gen.next)
def test_two(self):
tmp = maketemp()
keydir = os.path.join(tmp, 'two')
mkdir(keydir)
writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n')
writeFile(os.path.join(keydir, 'wsmith.pub'), KEY_2+'\n')
gen = ssh.readKeys(keydir=keydir)
got = frozenset(gen)
eq(got,
frozenset([
('jdoe', KEY_1),
('wsmith', KEY_2),
]))
def test_multiple_lines(self):
tmp = maketemp()
keydir = os.path.join(tmp, 'keys')
mkdir(keydir)
writeFile(os.path.join(keydir, 'jd"oe.pub'), KEY_1+'\n')
gen = ssh.readKeys(keydir=keydir)
got = frozenset(gen)
eq(got, frozenset([]))
def test_bad_filename(self):
tmp = maketemp()
keydir = os.path.join(tmp, 'two')
mkdir(keydir)
writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n'+KEY_2+'\n')
gen = ssh.readKeys(keydir=keydir)
got = frozenset(gen)
eq(got,
frozenset([
('jdoe', KEY_1),
('jdoe', KEY_2),
]))
class GenerateAuthorizedKeys_Test(object):
def test_simple(self):
def k():
yield ('jdoe', KEY_1)
yield ('wsmith', KEY_2)
gen = ssh.generateAuthorizedKeys(k())
eq(gen.next(), ssh.COMMENT)
eq(gen.next(), (
'command="gitosis-serve jdoe",no-port-forwarding,no-X11-f'
+'orwarding,no-agent-forwarding,no-pty %s' % KEY_1))
eq(gen.next(), (
'command="gitosis-serve wsmith",no-port-forwarding,no-X11'
+'-forwarding,no-agent-forwarding,no-pty %s' % KEY_2))
assert_raises(StopIteration, gen.next)
class FilterAuthorizedKeys_Test(object):
def run(self, s):
f = StringIO(s)
lines = ssh.filterAuthorizedKeys(f)
got = ''.join(['%s\n' % line for line in lines])
return got
def check_no_change(self, s):
got = self.run(s)
eq(got, s)
def test_notFiltered_comment(self):
self.check_no_change('#comment\n')
def test_notFiltered_junk(self):
self.check_no_change('junk\n')
def test_notFiltered_key(self):
self.check_no_change('%s\n' % KEY_1)
def test_notFiltered_keyWithCommand(self):
s = '''\
command="faketosis-serve wsmith",no-port-forwarding,no-X11-forwardin\
g,no-agent-forwarding,no-pty %(key_1)s
''' % dict(key_1=KEY_1)
self.check_no_change(s)
def test_filter_autogeneratedComment_backwardsCompat(self):
got = self.run('### autogenerated by gitosis, DO NOT EDIT\n')
eq(got, '')
def test_filter_autogeneratedComment_current(self):
got = self.run(ssh.COMMENT+'\n')
eq(got, '')
def test_filter_simple(self):
s = '''\
command="gitosis-serve wsmith",no-port-forwarding,no-X11-forwardin\
g,no-agent-forwarding,no-pty %(key_1)s
''' % dict(key_1=KEY_1)
got = self.run(s)
eq(got, '')
def test_filter_withPath(self):
s = '''\
command="/foo/bar/baz/gitosis-serve wsmith",no-port-forwarding,no-X11-forwardin\
g,no-agent-forwarding,no-pty %(key_1)s
''' % dict(key_1=KEY_1)
got = self.run(s)
eq(got, '')
class WriteAuthorizedKeys_Test(object):
def test_simple(self):
tmp = maketemp()
path = os.path.join(tmp, 'authorized_keys')
f = file(path, 'w')
try:
f.write('''\
# foo
bar
### autogenerated by gitosis, DO NOT EDIT
command="/foo/bar/baz/gitosis-serve wsmith",no-port-forwarding,\
no-X11-forwarding,no-agent-forwarding,no-pty %(key_2)s
baz
''' % dict(key_2=KEY_2))
finally:
f.close()
keydir = os.path.join(tmp, 'one')
mkdir(keydir)
writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n')
ssh.writeAuthorizedKeys(
path=path, keydir=keydir)
got = readFile(path)
eq(got, '''\
# foo
bar
baz
### autogenerated by gitosis, DO NOT EDIT
command="gitosis-serve jdoe",no-port-forwarding,\
no-X11-forwarding,no-agent-forwarding,no-pty %(key_1)s
''' % dict(key_1=KEY_1))