class OATHValidator(object):
STATUS_OK = 'OK'
STATUS_BAD = 'BAD'
STATUS_NO_AUTH = 'NO_AUTH'
STATUS_NO_CLIENT = 'NO_CLIENT'
def __init__(self, dbread_callback, dbwrite_callback):
self.dbread_callback = dbread_callback
self.dbwrite_callback = dbwrite_callback
def test_HOTP(self, K, C, digits=6):
counter = ('%s' % C).rjust(16, '0').decode('hex')
HS = hmac.new(K, counter, hashlib.sha1).digest()
offset = ord(HS[19]) & 0xF
bin_code = int((chr(ord(HS[offset]) & 0x7F) + HS[offset+1:offset+4]).encode('hex'), 16)
return str(bin_code)[-digits:]
def validate_OATH(self, OATH, publicID):
if len(OATH) % 2 != 0:
return self.STATUS_BAD
token_data = self.dbread_callback(publicID=publicID)
if token_data.rowcount != 1:
return self.STATUS_BAD
(actualcounter, key) = token_data.fetchone()
K = key.decode('hex')
for C in range(actualcounter + 1, actualcounter + 256):
if OATH == self.test_HOTP(K, C, len(OATH)):
self.dbwrite_callback(counter=str(C), publicID=publicID)
return self.STATUS_OK
return self.STATUS_NO_AUTH