summaryrefslogtreecommitdiff
path: root/passlib
diff options
context:
space:
mode:
authorEli Collins <elic@assurancetechnologies.com>2011-03-17 00:49:02 -0400
committerEli Collins <elic@assurancetechnologies.com>2011-03-17 00:49:02 -0400
commitcf096c6dd2d97b154aaa91ea6d0281daa59081d4 (patch)
treeed68e2868a1026ffc893e7b0a1f84642bdab3928 /passlib
parent03a8965b77e822c987ce736cf2733036ab28cf3c (diff)
downloadpasslib-cf096c6dd2d97b154aaa91ea6d0281daa59081d4.tar.gz
passlib.apache: improved interface; added docs & UTs (all passlib.apache uts pass)
Diffstat (limited to 'passlib')
-rw-r--r--passlib/apache.py284
-rw-r--r--passlib/tests/test_apache.py314
2 files changed, 548 insertions, 50 deletions
diff --git a/passlib/apache.py b/passlib/apache.py
index 3187612..745655b 100644
--- a/passlib/apache.py
+++ b/passlib/apache.py
@@ -43,50 +43,83 @@ __all__ = [
class _CommonFile(object):
"helper for HtpasswdFile / HtdigestFile"
- def __init__(self, path, create=False):
+ #NOTE: 'path' is a property so that mtime is wiped if path is changed.
+ _path = None
+ def _get_path(self):
+ return self._path
+ def _set_path(self, path):
+ if path != self._path:
+ self.mtime = 0
+ self._path = path
+ path = property(_get_path, _set_path)
+
+ def __init__(self, path=None, autoload=True):
self.path = path
- if create:
+ ##if autoload == "exists":
+ ## autoload = bool(path and os.path.exists(path))
+ if autoload and path:
+ self.load()
+ ##elif raw:
+ ## self._load_lines(raw.split("\n"))
+ else:
self._entry_order = []
self._entry_map = {}
- self.mtime = 0
- else:
- self.load()
- def reload(self):
- "load only if file has changed; throw error if file not found"
- if self.mtime and self.mtime == os.path.getmtime(self.path):
+ def load(self, force=True):
+ """load entries from file
+
+ :param force:
+ if ``True`` (the default), always loads state from file.
+ if ``False``, only loads state if file has been modified since last load.
+
+ :raises IOError: if file not found
+
+ :returns: ``False`` if ``force=False`` and no load performed; otherwise ``True``.
+ """
+ path = self.path
+ if not path:
+ raise RuntimeError, "no load path specified"
+ if not force and self.mtime and self.mtime == os.path.getmtime(path):
return False
- self.load()
+ with file(path, "rU") as fh:
+ self.mtime = os.path.getmtime(path)
+ self._load_lines(fh)
return True
- def load(self):
- "load entries from file; throw error if file not found or malformed"
+ def _load_lines(self, lines):
pl = self._parse_line
- with file(self.path, "rU") as fh:
- self.mtime = os.path.getmtime(self.path)
- entry_order = self._entry_order = []
- entry_map = self._entry_map = {}
- for line in fh:
- key, value = pl(line)
- if key in entry_map:
- #XXX: should we use data from first entry, or last entry?
- # going w/ first entry for now.
- continue
- entry_order.append(key)
- entry_map[key] = value
- return True
+ entry_order = self._entry_order = []
+ entry_map = self._entry_map = {}
+ for line in lines:
+ key, value = pl(line)
+ if key in entry_map:
+ #XXX: should we use data from first entry, or last entry?
+ # going w/ first entry for now.
+ continue
+ entry_order.append(key)
+ entry_map[key] = value
#subclass: _parse_line(line) -> (key, hash)
def save(self):
"save entries to file"
+ if not self.path:
+ raise RuntimeError, "no save path specified"
rl = self._render_line
entry_order = self._entry_order
entry_map = self._entry_map
assert len(entry_order) == len(entry_map), "internal error in entry list"
with file(self.path, "wb") as fh:
fh.writelines(rl(key, entry_map[key]) for key in entry_order)
- self.mtime = os.path.getmtime(self.path)
+ self.mtime = os.path.getmtime(self.path)
+
+ def to_string(self):
+ "export whole database as a string"
+ rl = self._render_line
+ entry_order = self._entry_order
+ entry_map = self._entry_map
+ assert len(entry_order) == len(entry_map), "internal error in entry list"
+ return "".join(rl(key, entry_map[key]) for key in entry_order)
#subclass: _render_line(entry) -> line
@@ -109,17 +142,83 @@ class _CommonFile(object):
else:
return False
+ invalid_chars = ":\n\r\t\x00"
+
+ def _validate_user(self, user):
+ if len(user) > 255:
+ raise ValueError, "user must be at most 255 characters: %r" % (user,)
+ ic = self.invalid_chars
+ if any(c in ic for c in user):
+ raise ValueError, "user contains invalid characters: %r" % (user,)
+ return True
+
+ def _validate_realm(self, realm):
+ if len(realm) > 255:
+ raise ValueError, "realm must be at most 255 characters: %r" % (realm,)
+ ic = self.invalid_chars
+ if any(c in ic for c in realm):
+ raise ValueError, "realm contains invalid characters: %r" % (realm,)
+ return True
+
+ #FIXME: htpasswd doc sez passwords limited to 255 chars under Windows & MPE,
+ # longer ones are truncated.
+
#=========================================================
#htpasswd editing
#=========================================================
#FIXME: apr_md5_crypt technically the default only for windows, netware and tpf.
#TODO: find out if htpasswd's "crypt" mode is crypt *call* or des_crypt implementation.
-htpasswd_context = CryptContext(["apr_md5_crypt", "des_crypt", "ldap_sha1", "plaintext" ])
+htpasswd_context = CryptContext([
+ "apr_md5_crypt", #man page notes supported everywhere, default on Windows, Netware, TPF
+ "des_crypt", #man page notes server does NOT support this on Windows, Netware, TPF
+ "ldap_sha1", #man page notes only for transitioning <-> ldap
+ "plaintext" # man page notes server ONLY supports this on Windows, Netware, TPF
+ ])
class HtpasswdFile(_CommonFile):
- "class for reading & writing Htpasswd files"
+ """class for reading & writing Htpasswd files.
+
+ :arg path: path to htpasswd file to load from / save to (required)
+
+ :param default:
+ optionally specify default scheme to use when encoding new passwords.
+
+ Must be one of ``None``, ``"apr_md5_crypt"``, ``"des_crypt"``, ``"ldap_sha1"``, ``"plaintext"``.
+
+ If no value is specified, this class currently uses ``apr_md5_crypt`` when creating new passwords.
+
+ :param autoload:
+ if ``True`` (the default), :meth:`load` will be automatically called
+ by constructor.
- def __init__(self, path, default=None, **kwds):
+ Set to ``False`` to disable automatic loading (primarily used when
+ creating new htdigest file).
+
+ Loading & Saving
+ ================
+ .. automethod:: load
+ .. automethod:: save
+ .. automethod:: to_string
+
+ Inspection
+ ================
+ .. automethod:: users
+ .. automethod:: verify
+
+ Modification
+ ================
+ .. automethod:: update
+ .. automethod:: delete
+
+ .. note::
+
+ All of the methods in this class enforce some data validation
+ on the ``user`` parameter:
+ they will raise a :exc:`ValueError` if the string
+ contains one of the forbidden characters ``:\\r\\n\\t\\x00``,
+ or is longer than 255 characters.
+ """
+ def __init__(self, path=None, default=None, **kwds):
self.context = htpasswd_context
if default:
self.context = self.context.replace(default=default)
@@ -132,30 +231,87 @@ class HtpasswdFile(_CommonFile):
def _render_line(self, user, hash):
return "%s:%s\n" % (user, hash)
+ def users(self):
+ "return list of all users in file"
+ return list(self._entry_order)
+
def update(self, user, password):
- "update entry for user; added user if needed"
+ """update password for user; adds user if needed.
+
+ :returns: ``True`` if existing user was updated, ``False`` if user added.
+ """
+ self._validate_user(user)
hash = self.context.encrypt(password)
return self._update_key(user, hash)
def delete(self, user):
- "delete any entries for specified user"
+ """delete user's entry.
+
+ :returns: ``True`` if user deleted, ``False`` if user not found.
+ """
+ self._validate_user(user)
return self._delete_key(user)
def verify(self, user, password):
- "verify password for specified user"
+ """verify password for specified user.
+
+ :returns:
+ * ``None`` if user not found
+ * ``False`` if password does not match
+ * ``True`` if password matches.
+ """
+ self._validate_user(user)
hash = self._entry_map.get(user)
if hash is None:
return None
else:
return self.context.verify(password, hash)
+ #TODO: support migration from deprecated hashes
#=========================================================
#htdigest editing
#=========================================================
class HtdigestFile(_CommonFile):
- "class for reading & writing Htdigest files"
+ """class for reading & writing Htdigest files
+
+ :arg path: path to htpasswd file to load from / save to (required)
+
+ :param autoload:
+ if ``True`` (the default), :meth:`load` will be automatically called
+ by constructor.
+
+ Set to ``False`` to disable automatic loading (primarily used when
+ creating new htdigest file).
+
+ Loading & Saving
+ ================
+ .. automethod:: load
+ .. automethod:: save
+ .. automethod:: to_string
+
+ Inspection
+ ==========
+ .. automethod:: realms
+ .. automethod:: users
+ .. automethod:: find
+ .. automethod:: verify
+
+ Modification
+ ============
+ .. automethod:: update
+ .. automethod:: delete
+ .. automethod:: delete_realm
+ .. note::
+
+ All of the methods in this class enforce some data validation
+ on the ``user`` and ``realm`` parameters:
+ they will raise a :exc:`ValueError` if either string
+ contains one of the forbidden characters ``:\\r\\n\\t\\x00``,
+ or is longer than 255 characters.
+
+ """
def _parse_line(self, line):
user, realm, hash = line.rstrip().split(":")
return (user, realm), hash
@@ -163,37 +319,65 @@ class HtdigestFile(_CommonFile):
def _render_line(self, key, hash):
return "%s:%s:%s\n" % (key[0], key[1], hash)
+ def realms(self):
+ "return all realms listed in file"
+ return list(set(key[1] for key in self._entry_order))
+
+ def users(self, realm):
+ "return list of all users within specified realm"
+ return [ key[0] for key in self._entry_order if key[1] == realm ]
+
def update(self, user, realm, password):
- "update entry for user+realm; added entry if needed"
+ """update password for user under specified realm; adding user if needed
+
+ :returns: ``True`` if existing user was updated, ``False`` if user added.
+ """
+ self._validate_user(user)
+ self._validate_realm(realm)
key = (user,realm)
hash = md5("%s:%s:%s" % (user,realm,password)).hexdigest()
return self._update_key(key, hash)
def delete(self, user, realm):
- "delete any entries for specified user+realm"
- key = (user,realm)
- return self._delete_key(key)
+ """delete user's entry for specified realm.
+
+ :returns: ``True`` if user deleted, ``False`` if user not found in realm.
+ """
+ self._validate_user(user)
+ self._validate_realm(realm)
+ return self._delete_key((user,realm))
def delete_realm(self, realm):
- "delete all entries for specified realm"
- entry_order = self._entry_order
- entry_map = self._entry_map
+ """delete all users for specified realm
+
+ :returns: number of users deleted
+ """
+ self._validate_realm(realm)
keys = [
- key for key in entry_map
+ key for key in self._entry_map
if key[1] == realm
]
- if keys:
- for key in keys:
- del entry_map[key]
- entry_order.remove(key)
- return True
- else:
- return False
+ for key in keys:
+ self._delete_key(key)
+ return len(keys)
+
+ def find(self, user, realm):
+ """return digest hash for specified user+realm; returns ``None`` if not found"""
+ self._validate_user(user)
+ self._validate_realm(realm)
+ return self._entry_map.get((user,realm))
def verify(self, user, realm, password):
- "verify password for specified user+realm"
- key = (user, realm)
- hash = self._entry_map.get(key)
+ """verify password for specified user + realm.
+
+ :returns:
+ * ``None`` if user not found
+ * ``False`` if password does not match
+ * ``True`` if password matches.
+ """
+ self._validate_user(user)
+ self._validate_realm(realm)
+ hash = self._entry_map.get((user,realm))
if hash is None:
return None
return hash == md5("%s:%s:%s" % (user,realm,password)).hexdigest()
diff --git a/passlib/tests/test_apache.py b/passlib/tests/test_apache.py
new file mode 100644
index 0000000..0d4019b
--- /dev/null
+++ b/passlib/tests/test_apache.py
@@ -0,0 +1,314 @@
+"""tests for passlib.apache -- (c) Assurance Technologies 2008-2011"""
+#=========================================================
+#imports
+#=========================================================
+from __future__ import with_statement
+#core
+import hashlib
+from logging import getLogger
+import os
+import time
+#site
+#pkg
+from passlib import apache
+from passlib.tests.utils import TestCase, mktemp
+#module
+log = getLogger(__name__)
+
+def set_file(path, content):
+ with file(path, "wb") as fh:
+ fh.write(content)
+
+def get_file(path):
+ with file(path, "rb") as fh:
+ return fh.read()
+
+#=========================================================
+#htpasswd
+#=========================================================
+class HtpasswdFileTest(TestCase):
+ "test HtpasswdFile class"
+ case_prefix = "HtpasswdFile"
+
+ sample_01 = 'user2:2CHkkwa2AtqGs\nuser3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\nuser4:pass4\nuser1:$apr1$t4tc7jTh$GPIWVUo8sQKJlUdV8V5vu0\n'
+ sample_02 = 'user3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\nuser4:pass4\n'
+ sample_03 = 'user2:pass2x\nuser3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\nuser4:pass4\nuser1:$apr1$t4tc7jTh$GPIWVUo8sQKJlUdV8V5vu0\nuser5:pass5\n'
+
+ def test_00_constructor(self):
+ "test constructor & to_string()"
+ #check with existing file
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtpasswdFile(path)
+ self.assertEqual(ht.to_string(), self.sample_01)
+
+ #check autoload=False
+ ht = apache.HtpasswdFile(path, autoload=False)
+ self.assertEqual(ht.to_string(), "")
+
+ #check missing file
+ os.remove(path)
+ self.assertRaises(IOError, apache.HtpasswdFile, path)
+
+ #check no path
+ ht = apache.HtpasswdFile()
+ self.assertEqual(ht.to_string(), "")
+
+ #NOTE: "default" option checked via update() test, among others
+
+ def test_01_delete(self):
+ "test delete()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtpasswdFile(path)
+ self.assert_(ht.delete("user1"))
+ self.assert_(ht.delete("user2"))
+ self.assert_(not ht.delete("user5"))
+ self.assertEqual(ht.to_string(), self.sample_02)
+
+ self.assertRaises(ValueError, ht.delete, "user:")
+
+ def test_02_update(self):
+ "test update()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtpasswdFile(path, default="plaintext")
+ self.assert_(ht.update("user2", "pass2x"))
+ self.assert_(not ht.update("user5", "pass5"))
+ self.assertEqual(ht.to_string(), self.sample_03)
+
+ self.assertRaises(ValueError, ht.update, "user:", "pass")
+
+ def test_03_users(self):
+ "test users()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtpasswdFile(path)
+ ht.update("user5", "pass5")
+ ht.delete("user3")
+ ht.update("user3", "pass3")
+ self.assertEqual(ht.users(), ["user2", "user4", "user1", "user5", "user3"])
+
+ def test_03_verify(self):
+ "test verify()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtpasswdFile(path)
+ self.assert_(ht.verify("user5","pass5") is None)
+ for i in xrange(1,5):
+ i = str(i)
+ self.assert_(ht.verify("user"+i, "pass"+i))
+ self.assert_(ht.verify("user"+i, "pass5") is False)
+
+ self.assertRaises(ValueError, ht.verify, "user:", "pass")
+
+ def test_04_load(self):
+ "test load()"
+
+ #setup empty file
+ path = mktemp()
+ set_file(path, "")
+ ha = apache.HtpasswdFile(path, default="plaintext")
+ self.assertEqual(ha.to_string(), "")
+ time.sleep(.1) #so file modify time can advance
+
+ #make changes, check force=False does nothing
+ ha.update("user1", "pass1")
+ ha.load(force=False)
+ self.assertEqual(ha.to_string(), "user1:pass1\n")
+
+ #change file
+ set_file(path, self.sample_01)
+ ha.load(force=False)
+ self.assertEqual(ha.to_string(), self.sample_01)
+ #FIXME: this may fail if os mtime resolution is
+ #worse than the time.sleep delay above.
+
+ #make changes, check force=True overwrites them
+ ha.update("user5", "pass5")
+ ha.load()
+ self.assertEqual(ha.to_string(), self.sample_01)
+
+ #test load w/ no path
+ hb = apache.HtpasswdFile()
+ self.assertRaises(RuntimeError, hb.load)
+ self.assertRaises(RuntimeError, hb.load, force=False)
+
+ def test_05_save(self):
+ "test save()"
+ #load from file
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtpasswdFile(path)
+
+ #make changes, check they saved
+ ht.delete("user1")
+ ht.delete("user2")
+ ht.save()
+ self.assertEquals(get_file(path), self.sample_02)
+
+ #test save w/ no path
+ hb = apache.HtpasswdFile()
+ hb.update("user1", "pass1")
+ self.assertRaises(RuntimeError, hb.save)
+
+ #=========================================================
+ #eoc
+ #=========================================================
+
+#=========================================================
+#htdigest
+#=========================================================
+class HtdigestFileTest(TestCase):
+ "test HtdigestFile class"
+ case_prefix = "HtdigestFile"
+
+ sample_01 = 'user2:realm:549d2a5f4659ab39a80dac99e159ab19\nuser3:realm:a500bb8c02f6a9170ae46af10c898744\nuser4:realm:ab7b5d5f28ccc7666315f508c7358519\nuser1:realm:2a6cf53e7d8f8cf39d946dc880b14128\n'
+ sample_02 = 'user3:realm:a500bb8c02f6a9170ae46af10c898744\nuser4:realm:ab7b5d5f28ccc7666315f508c7358519\n'
+ sample_03 = 'user2:realm:5ba6d8328943c23c64b50f8b29566059\nuser3:realm:a500bb8c02f6a9170ae46af10c898744\nuser4:realm:ab7b5d5f28ccc7666315f508c7358519\nuser1:realm:2a6cf53e7d8f8cf39d946dc880b14128\nuser5:realm:03c55fdc6bf71552356ad401bdb9af19\n'
+
+ def test_00_constructor(self):
+ "test constructor & to_string()"
+ #check with existing file
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+ self.assertEqual(ht.to_string(), self.sample_01)
+
+ #check autoload=False
+ ht = apache.HtdigestFile(path, autoload=False)
+ self.assertEqual(ht.to_string(), "")
+
+ #check missing file
+ os.remove(path)
+ self.assertRaises(IOError, apache.HtdigestFile, path)
+
+ #check no path
+ ht = apache.HtdigestFile()
+ self.assertEqual(ht.to_string(), "")
+
+ def test_01_delete(self):
+ "test delete()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+ self.assert_(ht.delete("user1", "realm"))
+ self.assert_(ht.delete("user2", "realm"))
+ self.assert_(not ht.delete("user5", "realm"))
+ self.assertEqual(ht.to_string(), self.sample_02)
+
+ self.assertRaises(ValueError, ht.delete, "user:", "realm")
+
+ def test_02_update(self):
+ "test update()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+ self.assert_(ht.update("user2", "realm", "pass2x"))
+ self.assert_(not ht.update("user5", "realm", "pass5"))
+ self.assertEqual(ht.to_string(), self.sample_03)
+
+ self.assertRaises(ValueError, ht.update, "user:", "realm", "pass")
+
+ def test_03_users(self):
+ "test users()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+ ht.update("user5", "realm", "pass5")
+ ht.delete("user3", "realm")
+ ht.update("user3", "realm", "pass3")
+ self.assertEqual(ht.users("realm"), ["user2", "user4", "user1", "user5", "user3"])
+
+ def test_03_verify(self):
+ "test verify()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+ self.assert_(ht.verify("user5", "realm","pass5") is None)
+ for i in xrange(1,5):
+ i = str(i)
+ self.assert_(ht.verify("user"+i, "realm", "pass"+i))
+ self.assert_(ht.verify("user"+i, "realm", "pass5") is False)
+
+ self.assertRaises(ValueError, ht.verify, "user:", "realm", "pass")
+
+ def test_04_load(self):
+ "test load()"
+
+ #setup empty file
+ path = mktemp()
+ set_file(path, "")
+ ha = apache.HtdigestFile(path)
+ self.assertEqual(ha.to_string(), "")
+ time.sleep(.1) #so file modify time can advance
+
+ #make changes, check force=False does nothing
+ ha.update("user1", "realm", "pass1")
+ ha.load(force=False)
+ self.assertEqual(ha.to_string(), 'user1:realm:2a6cf53e7d8f8cf39d946dc880b14128\n')
+
+ #change file
+ set_file(path, self.sample_01)
+ ha.load(force=False)
+ self.assertEqual(ha.to_string(), self.sample_01)
+ #FIXME: this may fail if os mtime resolution is
+ #worse than the time.sleep delay above.
+
+ #make changes, check force=True overwrites them
+ ha.update("user5", "realm", "pass5")
+ ha.load()
+ self.assertEqual(ha.to_string(), self.sample_01)
+
+ #test load w/ no path
+ hb = apache.HtdigestFile()
+ self.assertRaises(RuntimeError, hb.load)
+ self.assertRaises(RuntimeError, hb.load, force=False)
+
+ def test_05_save(self):
+ "test save()"
+ #load from file
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+
+ #make changes, check they saved
+ ht.delete("user1", "realm")
+ ht.delete("user2", "realm")
+ ht.save()
+ self.assertEquals(get_file(path), self.sample_02)
+
+ #test save w/ no path
+ hb = apache.HtdigestFile()
+ hb.update("user1", "realm", "pass1")
+ self.assertRaises(RuntimeError, hb.save)
+
+ def test_06_realms(self):
+ "test realms() & delete_realm()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+
+ self.assertEquals(ht.delete_realm("x"), 0)
+ self.assertEquals(ht.realms(), ['realm'])
+
+ self.assertEquals(ht.delete_realm("realm"), 4)
+ self.assertEquals(ht.realms(), [])
+ self.assertEquals(ht.to_string(), "")
+
+ def test_07_find(self):
+ "test find()"
+ path = mktemp()
+ set_file(path, self.sample_01)
+ ht = apache.HtdigestFile(path)
+ self.assertEquals(ht.find("user3", "realm"), "a500bb8c02f6a9170ae46af10c898744")
+ self.assertEquals(ht.find("user4", "realm"), "ab7b5d5f28ccc7666315f508c7358519")
+ self.assertEquals(ht.find("user5", "realm"), None)
+
+ #=========================================================
+ #eoc
+ #=========================================================
+
+#=========================================================
+#EOF
+#=========================================================