from nose.tools import eq_ as eq, assert_raises import os from cStringIO import StringIO from gitosis import ssh from gitosis.test.util import mkdir, maketemp, writeFile, readFile def _key(s): return ''.join(s.split('\n')).strip() KEY_1 = _key(""" ssh-rsa +v5XLsUrLsHOKy7Stob1lHZM17YCCNXplcKfbpIztS2PujyixOaBev1ku6H6ny gUXfuYVzY+PmfTLviSwD3UETxEkR/jlBURACDQARJdUxpgt9XG2Lbs8bhOjonAPapxrH0o 9O8R0Y6Pm1Vh+H2U0B4UBhPgEframpeJYedijBxBV5aq3yUvHkXpcjM/P0gsKqr036k= j unk@gunk """) KEY_2 = _key(""" ssh-rsa 4BX2TxZoD3Og2zNjHwaMhVEa5/NLnPcw+Z02TDR0IGJrrqXk7YlfR3oz+Wb/Eb Ctli20SoWY0Ur8kBEF/xR4hRslZ2U8t0PAJhr8cq5mifhok/gAdckmSzjD67QJ68uZbga8 ZwIAo7y/BU7cD3Y9UdVZykG34NiijHZLlCBo/TnobXjFIPXvFbfgQ3y8g+akwocFVcQ= f roop@snoop """) class ReadKeys_Test(object): def test_empty(self): tmp = maketemp() empty = os.path.join(tmp, 'empty') mkdir(empty) gen = ssh.readKeys(keydir=empty) assert_raises(StopIteration, gen.next) def test_ignore_dot(self): tmp = maketemp() keydir = os.path.join(tmp, 'ignore_dot') mkdir(keydir) writeFile(os.path.join(keydir, '.jdoe.pub'), KEY_1+'\n') gen = ssh.readKeys(keydir=keydir) assert_raises(StopIteration, gen.next) def test_ignore_nonpub(self): tmp = maketemp() keydir = os.path.join(tmp, 'ignore_dot') mkdir(keydir) writeFile(os.path.join(keydir, 'jdoe.xub'), KEY_1+'\n') gen = ssh.readKeys(keydir=keydir) assert_raises(StopIteration, gen.next) def test_one(self): tmp = maketemp() keydir = os.path.join(tmp, 'one') mkdir(keydir) writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n') gen = ssh.readKeys(keydir=keydir) eq(gen.next(), ('jdoe', KEY_1)) assert_raises(StopIteration, gen.next) def test_two(self): tmp = maketemp() keydir = os.path.join(tmp, 'two') mkdir(keydir) writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n') writeFile(os.path.join(keydir, 'wsmith.pub'), KEY_2+'\n') gen = ssh.readKeys(keydir=keydir) got = frozenset(gen) eq(got, frozenset([ ('jdoe', KEY_1), ('wsmith', KEY_2), ])) def test_multiple_lines(self): tmp = maketemp() keydir = os.path.join(tmp, 'keys') mkdir(keydir) writeFile(os.path.join(keydir, 'jd"oe.pub'), KEY_1+'\n') gen = ssh.readKeys(keydir=keydir) got = frozenset(gen) eq(got, frozenset([])) def test_bad_filename(self): tmp = maketemp() keydir = os.path.join(tmp, 'two') mkdir(keydir) writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n'+KEY_2+'\n') gen = ssh.readKeys(keydir=keydir) got = frozenset(gen) eq(got, frozenset([ ('jdoe', KEY_1), ('jdoe', KEY_2), ])) class GenerateAuthorizedKeys_Test(object): def test_simple(self): def k(): yield ('jdoe', KEY_1) yield ('wsmith', KEY_2) gen = ssh.generateAuthorizedKeys(k()) eq(gen.next(), ssh.COMMENT) eq(gen.next(), ( 'command="gitosis-serve jdoe",no-port-forwarding,no-X11-f' +'orwarding,no-agent-forwarding,no-pty %s' % KEY_1)) eq(gen.next(), ( 'command="gitosis-serve wsmith",no-port-forwarding,no-X11' +'-forwarding,no-agent-forwarding,no-pty %s' % KEY_2)) assert_raises(StopIteration, gen.next) class FilterAuthorizedKeys_Test(object): def run(self, s): f = StringIO(s) lines = ssh.filterAuthorizedKeys(f) got = ''.join(['%s\n' % line for line in lines]) return got def check_no_change(self, s): got = self.run(s) eq(got, s) def test_notFiltered_comment(self): self.check_no_change('#comment\n') def test_notFiltered_junk(self): self.check_no_change('junk\n') def test_notFiltered_key(self): self.check_no_change('%s\n' % KEY_1) def test_notFiltered_keyWithCommand(self): s = '''\ command="faketosis-serve wsmith",no-port-forwarding,no-X11-forwardin\ g,no-agent-forwarding,no-pty %(key_1)s ''' % dict(key_1=KEY_1) self.check_no_change(s) def test_filter_autogeneratedComment_backwardsCompat(self): got = self.run('### autogenerated by gitosis, DO NOT EDIT\n') eq(got, '') def test_filter_autogeneratedComment_current(self): got = self.run(ssh.COMMENT+'\n') eq(got, '') def test_filter_simple(self): s = '''\ command="gitosis-serve wsmith",no-port-forwarding,no-X11-forwardin\ g,no-agent-forwarding,no-pty %(key_1)s ''' % dict(key_1=KEY_1) got = self.run(s) eq(got, '') def test_filter_withPath(self): s = '''\ command="/foo/bar/baz/gitosis-serve wsmith",no-port-forwarding,no-X11-forwardin\ g,no-agent-forwarding,no-pty %(key_1)s ''' % dict(key_1=KEY_1) got = self.run(s) eq(got, '') class WriteAuthorizedKeys_Test(object): def test_simple(self): tmp = maketemp() path = os.path.join(tmp, 'authorized_keys') f = file(path, 'w') try: f.write('''\ # foo bar ### autogenerated by gitosis, DO NOT EDIT command="/foo/bar/baz/gitosis-serve wsmith",no-port-forwarding,\ no-X11-forwarding,no-agent-forwarding,no-pty %(key_2)s baz ''' % dict(key_2=KEY_2)) finally: f.close() keydir = os.path.join(tmp, 'one') mkdir(keydir) writeFile(os.path.join(keydir, 'jdoe.pub'), KEY_1+'\n') ssh.writeAuthorizedKeys( path=path, keydir=keydir) got = readFile(path) eq(got, '''\ # foo bar baz ### autogenerated by gitosis, DO NOT EDIT command="gitosis-serve jdoe",no-port-forwarding,\ no-X11-forwarding,no-agent-forwarding,no-pty %(key_1)s ''' % dict(key_1=KEY_1))