summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaphaël Barrois <raphael.barrois@polytechnique.org>2012-02-20 09:03:11 +0100
committerRaphaël Barrois <raphael.barrois@polytechnique.org>2012-02-20 09:03:11 +0100
commit3e69761bcea465dd5a8645dd917fdc7a2a1111a1 (patch)
treee6426e439a6bfcb8c1c90f6b89d9fedef842c008
parentRetab (diff)
downloadyubico-yubiserve-3e69761bcea465dd5a8645dd917fdc7a2a1111a1.tar.gz
yubico-yubiserve-3e69761bcea465dd5a8645dd917fdc7a2a1111a1.tar.bz2
yubico-yubiserve-3e69761bcea465dd5a8645dd917fdc7a2a1111a1.zip
Move OATH validation code to the 'crypto' module.
Signed-off-by: Raphaël Barrois <raphael.barrois@polytechnique.org>
-rw-r--r--crypto.py37
-rwxr-xr-xyubiserve.py58
2 files changed, 62 insertions, 33 deletions
diff --git a/crypto.py b/crypto.py
new file mode 100644
index 0000000..3f22ccc
--- /dev/null
+++ b/crypto.py
@@ -0,0 +1,37 @@
+# coding: utf-8
+
+
+class OATHValidator(object):
+ STATUS_OK = 'OK'
+ STATUS_BAD = 'BAD'
+ STATUS_NO_AUTH = 'NO_AUTH'
+ STATUS_NO_CLIENT = 'NO_CLIENT'
+
+ def __init__(self, dbread_callback, dbwrite_callback):
+ self.dbread_callback = dbread_callback
+ self.dbwrite_callback = dbwrite_callback
+
+ def test_HOTP(self, K, C, digits=6):
+ counter = ('%s' % C).rjust(16, '0').decode('hex')
+ HS = hmac.new(K, counter, hashlib.sha1).digest()
+ offset = ord(HS[19]) & 0xF
+ bin_code = int((chr(ord(HS[offset]) & 0x7F) + HS[offset+1:offset+4]).encode('hex'), 16)
+ return str(bin_code)[-digits:]
+
+ def validate_OATH(self, OATH, publicID):
+ if len(OATH) % 2 != 0:
+ return self.STATUS_BAD
+
+ token_data = self.dbread_callback(publicID=publicID)
+ if token_data.rowcount != 1:
+ return self.STATUS_BAD
+
+ (actualcounter, key) = token_data.fetchone()
+
+ K = key.decode('hex')
+ for C in range(actualcounter + 1, actualcounter + 256):
+ if OATH == self.test_HOTP(K, C, len(OATH)):
+ self.dbwrite_callback(counter=str(C), publicID=publicID)
+ return self.STATUS_OK
+
+ return self.STATUS_NO_AUTH
diff --git a/yubiserve.py b/yubiserve.py
index 5a107c3..704ed4c 100755
--- a/yubiserve.py
+++ b/yubiserve.py
@@ -16,6 +16,8 @@ from threading import Thread
from Crypto.Cipher import AES
from OpenSSL import SSL
+import crypto
+
try:
import MySQLdb
except ImportError:
@@ -42,38 +44,27 @@ def parseConfigFile(): # Originally I wrote this function to parse PHP configur
config = parseConfigFile()
-class OATHValidation():
- def __init__(self, connection):
- self.status = {'OK': 1, 'BAD_OTP': 2, 'NO_AUTH': 3, 'NO_CLIENT': 5}
- self.validationResult = 0
- self.con = connection
+class OATHValidator(crypto.OATHValidator):
- def testHOTP(self, K, C, digits=6):
- counter = ("%x"%C).rjust(16,'0').decode('hex') # Convert it into 8 bytes hex
- HS = hmac.new(K, counter, hashlib.sha1).digest()
- offset = ord(HS[19]) & 0xF
- # It doesn't look pretty, but it is optimized! :D
- bin_code = int((chr(ord(HS[offset]) & 0x7F) + HS[offset+1:offset+4]).encode('hex'),16)
- return str(bin_code)[-digits:]
-
- def validateOATH(self, OATH, publicID):
- cur = self.con.cursor()
- cur.execute("SELECT counter, secret FROM oathtokens WHERE publicname = '" + publicID + "' AND active = '1'")
- if (cur.rowcount != 1):
- validationResult = self.status['BAD_OTP']
- return validationResult
- (actualcounter, key) = cur.fetchone()
-
- if len(OATH) % 2 != 0:
- self.validationResult = self.status['BAD_OTP']
- return self.validationResult
- K = key.decode('hex') # key
- for C in range(actualcounter+1, actualcounter+256):
- if OATH == self.testHOTP(K, C, len(OATH)):
- cur.execute("UPDATE oathtokens SET counter = " + str(C) + " WHERE publicname = '" + publicID + "' AND active = '1'")
- self.con.commit()
- return self.status['OK']
- return self.status['NO_AUTH']
+ def __init__(self, connection):
+ cur = connection.cursor()
+ def dbread(publicID):
+ cur.execute("""
+ SELECT counter, secret
+ FROM oathtokens
+ WHERE publicname = %s AND active = '1'
+ """, (publicID,))
+ return cur
+
+ def dbwrite(counter, publicID):
+ cur.execute("""
+ UPDATE oathtokens
+ SET counter = %s
+ WHERE publicname = %s AND active = '1'
+ """, (counter, publicID))
+ connection.commit()
+
+ return super(OATHValidator, self).__init__(dbread, dbwrite)
class OTPValidation():
@@ -275,7 +266,8 @@ class YubiServeHandler (BaseHTTPServer.BaseHTTPRequestHandler):
try:
getData = self.getToDict(query)
if (len(query) > 0) and ((len(getData['otp']) == 6) or (len(getData['otp']) == 8) or (len(getData['otp']) == 18) or (len(getData['otp']) == 20)):
- oathvalidation = OATHValidation(self.con)
+
+ oathvalidation = OATHValidator(self.con)
OTP = getData['otp']
if (len(OTP) == 18) or (len(OTP) == 20):
publicID = OTP[0:12]
@@ -291,7 +283,7 @@ class YubiServeHandler (BaseHTTPServer.BaseHTTPRequestHandler):
self.send_header('Content-type', 'text/plain')
self.end_headers()
iso_time = time.strftime("%Y-%m-%dT%H:%M:%S")
- result = 'otp=' + getData['otp'] + '\r\nstatus=' + [k for k, v in oathvalidation.status.iteritems() if v == validation][0] + '\r\nt=' + iso_time
+ result = 'otp=' + getData['otp'] + '\r\nstatus=' + validation + '\r\nt=' + iso_time
otp_hmac = ''
try:
if (getData['id'] != None):