#!/usr/bin/python
# coding: utf-8
import BaseHTTPServer
import SocketServer
import hashlib
import hmac
import os
import re
import socket
import time
import urllib
import urlparse
from threading import Thread
from Crypto.Cipher import AES
from OpenSSL import SSL
import crypto
try:
import MySQLdb
except ImportError:
pass
try:
import sqlite
except ImportError:
pass
def parseConfigFile(): # Originally I wrote this function to parse PHP configuration files!
config = open(os.path.dirname(os.path.realpath(__file__)) + '/yubiserve.cfg', 'r').read().splitlines()
keys = {}
for line in config:
match = re.search('(.*?)=(.*);', line)
try: # Check if it's a string or a number
if ((match.group(2).strip()[0] != '"') and (match.group(2).strip()[0] != '\'')):
keys[match.group(1).strip()] = int(match.group(2).strip())
else:
keys[match.group(1).strip()] = match.group(2).strip('"\' ')
except:
pass
return keys
config = parseConfigFile()
class OATHValidator(crypto.OATHValidator):
def __init__(self, connection):
cur = connection.cursor()
def dbread(publicID):
cur.execute("""
SELECT counter, secret
FROM oathtokens
WHERE publicname = %s AND active = '1'
""", (publicID,))
return cur
def dbwrite(counter, publicID):
cur.execute("""
UPDATE oathtokens
SET counter = %s
WHERE publicname = %s AND active = '1'
""", (counter, publicID))
connection.commit()
return super(OATHValidator, self).__init__(dbread, dbwrite)
class OTPValidation():
def __init__(self, connection):
self.status = {'OK': 1, 'BAD_OTP': 2, 'REPLAYED_OTP': 3, 'DELAYED_OTP': 4, 'NO_CLIENT': 5}
self.validationResult = 0
self.con = connection
def hexdec(self, hex):
return int(hex, 16)
def modhex2hex(self, string):
hex = "0123456789abcdef"
modhex = "cbdefghijklnrtuv"
retVal = ''
for i in range (0, len(string)):
pos = modhex.find(string[i])
if pos > -1:
retVal += hex[pos]
else:
raise Exception, '"' + string[i] + '": Character is not a valid hex string'
return retVal
def CRC(self):
crc = 0xffff;
for i in range(0, 16):
b = self.hexdec(self.plaintext[i*2] + self.plaintext[(i*2)+1])
for j in range(0, 8):
n = crc & 1
crc = crc >> 1
if n != 0:
crc = crc ^ 0x8408
self.OTPcrc = crc
return [crc]
def isCRCValid(self):
return (self.crc == 0xf0b8)
def aes128ecb_decrypt(self, aeskey, aesdata):
return AES.new(aeskey.decode('hex'), AES.MODE_ECB).decrypt(aesdata.decode('hex')).encode('hex')
def getResult(self):
return self.validationResult
def getResponse(self):
return self.validationResponse
def validateOTP(self, OTP):
self.OTP = re.escape(OTP)
self.validationResult = 0
if (len(OTP) <= 32) or (len(OTP) > 48):
self.validationResult = self.status['BAD_OTP']
return self.validationResult
match = re.search('([cbdefghijklnrtuv]{0,16})([cbdefghijklnrtuv]{32})', re.escape(OTP))
try:
if match.group(1) and match.group(2):
self.userid = match.group(1)
self.token = self.modhex2hex(match.group(2))
cur = self.con.cursor()
cur.execute('SELECT aeskey, internalname FROM yubikeys WHERE publicname = "' + self.userid + '" AND active = "1"')
if (cur.rowcount != 1):
self.validationResult = self.status['BAD_OTP']
return self.validationResult
(self.aeskey, self.internalname) = cur.fetchone()
self.plaintext = self.aes128ecb_decrypt(self.aeskey, self.token)
uid = self.plaintext[:12]
if (self.internalname != uid):
self.validationResult = self.status['BAD_OTP']
return self.validationResult
if not (self.CRC() or self.isCRCValid()):
self.validationResult = self.status['BAD_OTP']
return self.validationResult
self.internalcounter = self.hexdec(self.plaintext[14:16] + self.plaintext[12:14] + self.plaintext[22:24])
self.timestamp = self.hexdec(self.plaintext[20:22] + self.plaintext[18:20] + self.plaintext[16:18])
cur.execute('SELECT counter, time FROM yubikeys WHERE publicname = "' + self.userid + '" AND active = "1"')
if (cur.rowcount != 1):
self.validationResult = self.status['BAD_OTP']
return self.validationResult
(self.counter, self.time) = cur.fetchone()
if (self.counter) >= (self.internalcounter):
self.validationResult = self.status['REPLAYED_OTP']
return self.validationResult
if (self.time >= self.timestamp) and ((self.counter >> 8) == (self.internalcounter >> 8)):
self.validationResult = self.status['DELAYED_OTP']
return self.validationResult
except IndexError:
self.validationResult = self.status['BAD_OTP']
return self.validationResult
self.validationResult = self.status['OK']
cur.execute('UPDATE yubikeys SET counter = ' + str(self.internalcounter) + ', time = ' + str(self.timestamp) + ' WHERE publicname = "' + self.userid + '"')
self.con.commit()
return self.validationResult
class YubiServeHandler (BaseHTTPServer.BaseHTTPRequestHandler):
__base = BaseHTTPServer.BaseHTTPRequestHandler
__base_handle = __base.handle
server_version = 'Yubiserve/3.0'
global config
#try:
if config['yubiDB'] == 'sqlite':
con = sqlite.connect(os.path.dirname(os.path.realpath(__file__)) + '/yubikeys.sqlite')
elif config['yubiDB'] == 'mysql':
con = MySQLdb.connect(host=config['yubiMySQLHost'], user=config['yubiMySQLUser'], passwd=config['yubiMySQLPass'], db=config['yubiMySQLName'])
#except:
# print "There's a problem with the database!\n"
# quit()
def getToDict(self, qs):
dict = {}
for singleValue in qs.split('&'):
keyVal = singleValue.split('=')
dict[urllib.unquote_plus(keyVal[0])] = urllib.unquote_plus(keyVal[1])
return dict
def setup(self):
self.connection = self.request
self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
def log_message(self, format, *args):
pass
def do_GET(self):
(scm, netloc, path, params, query, fragment) = urlparse.urlparse(self.path, 'http')
if scm != 'http':
self.send_error(501, "The server does not support the facility required.")
return
if (path != '/wsapi/2.0/verify') and (path != '/wsapi/2.0/oathverify'):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('')
# Yubico Yubikey
self.wfile.write('Yubico Yubikeys: