From 3e69761bcea465dd5a8645dd917fdc7a2a1111a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Barrois?= Date: Mon, 20 Feb 2012 09:03:11 +0100 Subject: Move OATH validation code to the 'crypto' module. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Raphaƫl Barrois --- crypto.py | 37 +++++++++++++++++++++++++++++++++++++ yubiserve.py | 58 +++++++++++++++++++++++++--------------------------------- 2 files changed, 62 insertions(+), 33 deletions(-) create mode 100644 crypto.py diff --git a/crypto.py b/crypto.py new file mode 100644 index 0000000..3f22ccc --- /dev/null +++ b/crypto.py @@ -0,0 +1,37 @@ +# coding: utf-8 + + +class OATHValidator(object): + STATUS_OK = 'OK' + STATUS_BAD = 'BAD' + STATUS_NO_AUTH = 'NO_AUTH' + STATUS_NO_CLIENT = 'NO_CLIENT' + + def __init__(self, dbread_callback, dbwrite_callback): + self.dbread_callback = dbread_callback + self.dbwrite_callback = dbwrite_callback + + def test_HOTP(self, K, C, digits=6): + counter = ('%s' % C).rjust(16, '0').decode('hex') + HS = hmac.new(K, counter, hashlib.sha1).digest() + offset = ord(HS[19]) & 0xF + bin_code = int((chr(ord(HS[offset]) & 0x7F) + HS[offset+1:offset+4]).encode('hex'), 16) + return str(bin_code)[-digits:] + + def validate_OATH(self, OATH, publicID): + if len(OATH) % 2 != 0: + return self.STATUS_BAD + + token_data = self.dbread_callback(publicID=publicID) + if token_data.rowcount != 1: + return self.STATUS_BAD + + (actualcounter, key) = token_data.fetchone() + + K = key.decode('hex') + for C in range(actualcounter + 1, actualcounter + 256): + if OATH == self.test_HOTP(K, C, len(OATH)): + self.dbwrite_callback(counter=str(C), publicID=publicID) + return self.STATUS_OK + + return self.STATUS_NO_AUTH diff --git a/yubiserve.py b/yubiserve.py index 5a107c3..704ed4c 100755 --- a/yubiserve.py +++ b/yubiserve.py @@ -16,6 +16,8 @@ from threading import Thread from Crypto.Cipher import AES from OpenSSL import SSL +import crypto + try: import MySQLdb except ImportError: @@ -42,38 +44,27 @@ def parseConfigFile(): # Originally I wrote this function to parse PHP configur config = parseConfigFile() -class OATHValidation(): - def __init__(self, connection): - self.status = {'OK': 1, 'BAD_OTP': 2, 'NO_AUTH': 3, 'NO_CLIENT': 5} - self.validationResult = 0 - self.con = connection +class OATHValidator(crypto.OATHValidator): - def testHOTP(self, K, C, digits=6): - counter = ("%x"%C).rjust(16,'0').decode('hex') # Convert it into 8 bytes hex - HS = hmac.new(K, counter, hashlib.sha1).digest() - offset = ord(HS[19]) & 0xF - # It doesn't look pretty, but it is optimized! :D - bin_code = int((chr(ord(HS[offset]) & 0x7F) + HS[offset+1:offset+4]).encode('hex'),16) - return str(bin_code)[-digits:] - - def validateOATH(self, OATH, publicID): - cur = self.con.cursor() - cur.execute("SELECT counter, secret FROM oathtokens WHERE publicname = '" + publicID + "' AND active = '1'") - if (cur.rowcount != 1): - validationResult = self.status['BAD_OTP'] - return validationResult - (actualcounter, key) = cur.fetchone() - - if len(OATH) % 2 != 0: - self.validationResult = self.status['BAD_OTP'] - return self.validationResult - K = key.decode('hex') # key - for C in range(actualcounter+1, actualcounter+256): - if OATH == self.testHOTP(K, C, len(OATH)): - cur.execute("UPDATE oathtokens SET counter = " + str(C) + " WHERE publicname = '" + publicID + "' AND active = '1'") - self.con.commit() - return self.status['OK'] - return self.status['NO_AUTH'] + def __init__(self, connection): + cur = connection.cursor() + def dbread(publicID): + cur.execute(""" + SELECT counter, secret + FROM oathtokens + WHERE publicname = %s AND active = '1' + """, (publicID,)) + return cur + + def dbwrite(counter, publicID): + cur.execute(""" + UPDATE oathtokens + SET counter = %s + WHERE publicname = %s AND active = '1' + """, (counter, publicID)) + connection.commit() + + return super(OATHValidator, self).__init__(dbread, dbwrite) class OTPValidation(): @@ -275,7 +266,8 @@ class YubiServeHandler (BaseHTTPServer.BaseHTTPRequestHandler): try: getData = self.getToDict(query) if (len(query) > 0) and ((len(getData['otp']) == 6) or (len(getData['otp']) == 8) or (len(getData['otp']) == 18) or (len(getData['otp']) == 20)): - oathvalidation = OATHValidation(self.con) + + oathvalidation = OATHValidator(self.con) OTP = getData['otp'] if (len(OTP) == 18) or (len(OTP) == 20): publicID = OTP[0:12] @@ -291,7 +283,7 @@ class YubiServeHandler (BaseHTTPServer.BaseHTTPRequestHandler): self.send_header('Content-type', 'text/plain') self.end_headers() iso_time = time.strftime("%Y-%m-%dT%H:%M:%S") - result = 'otp=' + getData['otp'] + '\r\nstatus=' + [k for k, v in oathvalidation.status.iteritems() if v == validation][0] + '\r\nt=' + iso_time + result = 'otp=' + getData['otp'] + '\r\nstatus=' + validation + '\r\nt=' + iso_time otp_hmac = '' try: if (getData['id'] != None): -- cgit v1.2.3