summaryrefslogtreecommitdiff
path: root/passlib/apache.py
diff options
context:
space:
mode:
authorEli Collins <elic@assurancetechnologies.com>2011-03-17 00:49:02 -0400
committerEli Collins <elic@assurancetechnologies.com>2011-03-17 00:49:02 -0400
commitcf096c6dd2d97b154aaa91ea6d0281daa59081d4 (patch)
treeed68e2868a1026ffc893e7b0a1f84642bdab3928 /passlib/apache.py
parent03a8965b77e822c987ce736cf2733036ab28cf3c (diff)
downloadpasslib-cf096c6dd2d97b154aaa91ea6d0281daa59081d4.tar.gz
passlib.apache: improved interface; added docs & UTs (all passlib.apache uts pass)
Diffstat (limited to 'passlib/apache.py')
-rw-r--r--passlib/apache.py284
1 files changed, 234 insertions, 50 deletions
diff --git a/passlib/apache.py b/passlib/apache.py
index 3187612..745655b 100644
--- a/passlib/apache.py
+++ b/passlib/apache.py
@@ -43,50 +43,83 @@ __all__ = [
class _CommonFile(object):
"helper for HtpasswdFile / HtdigestFile"
- def __init__(self, path, create=False):
+ #NOTE: 'path' is a property so that mtime is wiped if path is changed.
+ _path = None
+ def _get_path(self):
+ return self._path
+ def _set_path(self, path):
+ if path != self._path:
+ self.mtime = 0
+ self._path = path
+ path = property(_get_path, _set_path)
+
+ def __init__(self, path=None, autoload=True):
self.path = path
- if create:
+ ##if autoload == "exists":
+ ## autoload = bool(path and os.path.exists(path))
+ if autoload and path:
+ self.load()
+ ##elif raw:
+ ## self._load_lines(raw.split("\n"))
+ else:
self._entry_order = []
self._entry_map = {}
- self.mtime = 0
- else:
- self.load()
- def reload(self):
- "load only if file has changed; throw error if file not found"
- if self.mtime and self.mtime == os.path.getmtime(self.path):
+ def load(self, force=True):
+ """load entries from file
+
+ :param force:
+ if ``True`` (the default), always loads state from file.
+ if ``False``, only loads state if file has been modified since last load.
+
+ :raises IOError: if file not found
+
+ :returns: ``False`` if ``force=False`` and no load performed; otherwise ``True``.
+ """
+ path = self.path
+ if not path:
+ raise RuntimeError, "no load path specified"
+ if not force and self.mtime and self.mtime == os.path.getmtime(path):
return False
- self.load()
+ with file(path, "rU") as fh:
+ self.mtime = os.path.getmtime(path)
+ self._load_lines(fh)
return True
- def load(self):
- "load entries from file; throw error if file not found or malformed"
+ def _load_lines(self, lines):
pl = self._parse_line
- with file(self.path, "rU") as fh:
- self.mtime = os.path.getmtime(self.path)
- entry_order = self._entry_order = []
- entry_map = self._entry_map = {}
- for line in fh:
- key, value = pl(line)
- if key in entry_map:
- #XXX: should we use data from first entry, or last entry?
- # going w/ first entry for now.
- continue
- entry_order.append(key)
- entry_map[key] = value
- return True
+ entry_order = self._entry_order = []
+ entry_map = self._entry_map = {}
+ for line in lines:
+ key, value = pl(line)
+ if key in entry_map:
+ #XXX: should we use data from first entry, or last entry?
+ # going w/ first entry for now.
+ continue
+ entry_order.append(key)
+ entry_map[key] = value
#subclass: _parse_line(line) -> (key, hash)
def save(self):
"save entries to file"
+ if not self.path:
+ raise RuntimeError, "no save path specified"
rl = self._render_line
entry_order = self._entry_order
entry_map = self._entry_map
assert len(entry_order) == len(entry_map), "internal error in entry list"
with file(self.path, "wb") as fh:
fh.writelines(rl(key, entry_map[key]) for key in entry_order)
- self.mtime = os.path.getmtime(self.path)
+ self.mtime = os.path.getmtime(self.path)
+
+ def to_string(self):
+ "export whole database as a string"
+ rl = self._render_line
+ entry_order = self._entry_order
+ entry_map = self._entry_map
+ assert len(entry_order) == len(entry_map), "internal error in entry list"
+ return "".join(rl(key, entry_map[key]) for key in entry_order)
#subclass: _render_line(entry) -> line
@@ -109,17 +142,83 @@ class _CommonFile(object):
else:
return False
+ invalid_chars = ":\n\r\t\x00"
+
+ def _validate_user(self, user):
+ if len(user) > 255:
+ raise ValueError, "user must be at most 255 characters: %r" % (user,)
+ ic = self.invalid_chars
+ if any(c in ic for c in user):
+ raise ValueError, "user contains invalid characters: %r" % (user,)
+ return True
+
+ def _validate_realm(self, realm):
+ if len(realm) > 255:
+ raise ValueError, "realm must be at most 255 characters: %r" % (realm,)
+ ic = self.invalid_chars
+ if any(c in ic for c in realm):
+ raise ValueError, "realm contains invalid characters: %r" % (realm,)
+ return True
+
+ #FIXME: htpasswd doc sez passwords limited to 255 chars under Windows & MPE,
+ # longer ones are truncated.
+
#=========================================================
#htpasswd editing
#=========================================================
#FIXME: apr_md5_crypt technically the default only for windows, netware and tpf.
#TODO: find out if htpasswd's "crypt" mode is crypt *call* or des_crypt implementation.
-htpasswd_context = CryptContext(["apr_md5_crypt", "des_crypt", "ldap_sha1", "plaintext" ])
+htpasswd_context = CryptContext([
+ "apr_md5_crypt", #man page notes supported everywhere, default on Windows, Netware, TPF
+ "des_crypt", #man page notes server does NOT support this on Windows, Netware, TPF
+ "ldap_sha1", #man page notes only for transitioning <-> ldap
+ "plaintext" # man page notes server ONLY supports this on Windows, Netware, TPF
+ ])
class HtpasswdFile(_CommonFile):
- "class for reading & writing Htpasswd files"
+ """class for reading & writing Htpasswd files.
+
+ :arg path: path to htpasswd file to load from / save to (required)
+
+ :param default:
+ optionally specify default scheme to use when encoding new passwords.
+
+ Must be one of ``None``, ``"apr_md5_crypt"``, ``"des_crypt"``, ``"ldap_sha1"``, ``"plaintext"``.
+
+ If no value is specified, this class currently uses ``apr_md5_crypt`` when creating new passwords.
+
+ :param autoload:
+ if ``True`` (the default), :meth:`load` will be automatically called
+ by constructor.
- def __init__(self, path, default=None, **kwds):
+ Set to ``False`` to disable automatic loading (primarily used when
+ creating new htdigest file).
+
+ Loading & Saving
+ ================
+ .. automethod:: load
+ .. automethod:: save
+ .. automethod:: to_string
+
+ Inspection
+ ================
+ .. automethod:: users
+ .. automethod:: verify
+
+ Modification
+ ================
+ .. automethod:: update
+ .. automethod:: delete
+
+ .. note::
+
+ All of the methods in this class enforce some data validation
+ on the ``user`` parameter:
+ they will raise a :exc:`ValueError` if the string
+ contains one of the forbidden characters ``:\\r\\n\\t\\x00``,
+ or is longer than 255 characters.
+ """
+ def __init__(self, path=None, default=None, **kwds):
self.context = htpasswd_context
if default:
self.context = self.context.replace(default=default)
@@ -132,30 +231,87 @@ class HtpasswdFile(_CommonFile):
def _render_line(self, user, hash):
return "%s:%s\n" % (user, hash)
+ def users(self):
+ "return list of all users in file"
+ return list(self._entry_order)
+
def update(self, user, password):
- "update entry for user; added user if needed"
+ """update password for user; adds user if needed.
+
+ :returns: ``True`` if existing user was updated, ``False`` if user added.
+ """
+ self._validate_user(user)
hash = self.context.encrypt(password)
return self._update_key(user, hash)
def delete(self, user):
- "delete any entries for specified user"
+ """delete user's entry.
+
+ :returns: ``True`` if user deleted, ``False`` if user not found.
+ """
+ self._validate_user(user)
return self._delete_key(user)
def verify(self, user, password):
- "verify password for specified user"
+ """verify password for specified user.
+
+ :returns:
+ * ``None`` if user not found
+ * ``False`` if password does not match
+ * ``True`` if password matches.
+ """
+ self._validate_user(user)
hash = self._entry_map.get(user)
if hash is None:
return None
else:
return self.context.verify(password, hash)
+ #TODO: support migration from deprecated hashes
#=========================================================
#htdigest editing
#=========================================================
class HtdigestFile(_CommonFile):
- "class for reading & writing Htdigest files"
+ """class for reading & writing Htdigest files
+
+ :arg path: path to htpasswd file to load from / save to (required)
+
+ :param autoload:
+ if ``True`` (the default), :meth:`load` will be automatically called
+ by constructor.
+
+ Set to ``False`` to disable automatic loading (primarily used when
+ creating new htdigest file).
+
+ Loading & Saving
+ ================
+ .. automethod:: load
+ .. automethod:: save
+ .. automethod:: to_string
+
+ Inspection
+ ==========
+ .. automethod:: realms
+ .. automethod:: users
+ .. automethod:: find
+ .. automethod:: verify
+
+ Modification
+ ============
+ .. automethod:: update
+ .. automethod:: delete
+ .. automethod:: delete_realm
+ .. note::
+
+ All of the methods in this class enforce some data validation
+ on the ``user`` and ``realm`` parameters:
+ they will raise a :exc:`ValueError` if either string
+ contains one of the forbidden characters ``:\\r\\n\\t\\x00``,
+ or is longer than 255 characters.
+
+ """
def _parse_line(self, line):
user, realm, hash = line.rstrip().split(":")
return (user, realm), hash
@@ -163,37 +319,65 @@ class HtdigestFile(_CommonFile):
def _render_line(self, key, hash):
return "%s:%s:%s\n" % (key[0], key[1], hash)
+ def realms(self):
+ "return all realms listed in file"
+ return list(set(key[1] for key in self._entry_order))
+
+ def users(self, realm):
+ "return list of all users within specified realm"
+ return [ key[0] for key in self._entry_order if key[1] == realm ]
+
def update(self, user, realm, password):
- "update entry for user+realm; added entry if needed"
+ """update password for user under specified realm; adding user if needed
+
+ :returns: ``True`` if existing user was updated, ``False`` if user added.
+ """
+ self._validate_user(user)
+ self._validate_realm(realm)
key = (user,realm)
hash = md5("%s:%s:%s" % (user,realm,password)).hexdigest()
return self._update_key(key, hash)
def delete(self, user, realm):
- "delete any entries for specified user+realm"
- key = (user,realm)
- return self._delete_key(key)
+ """delete user's entry for specified realm.
+
+ :returns: ``True`` if user deleted, ``False`` if user not found in realm.
+ """
+ self._validate_user(user)
+ self._validate_realm(realm)
+ return self._delete_key((user,realm))
def delete_realm(self, realm):
- "delete all entries for specified realm"
- entry_order = self._entry_order
- entry_map = self._entry_map
+ """delete all users for specified realm
+
+ :returns: number of users deleted
+ """
+ self._validate_realm(realm)
keys = [
- key for key in entry_map
+ key for key in self._entry_map
if key[1] == realm
]
- if keys:
- for key in keys:
- del entry_map[key]
- entry_order.remove(key)
- return True
- else:
- return False
+ for key in keys:
+ self._delete_key(key)
+ return len(keys)
+
+ def find(self, user, realm):
+ """return digest hash for specified user+realm; returns ``None`` if not found"""
+ self._validate_user(user)
+ self._validate_realm(realm)
+ return self._entry_map.get((user,realm))
def verify(self, user, realm, password):
- "verify password for specified user+realm"
- key = (user, realm)
- hash = self._entry_map.get(key)
+ """verify password for specified user + realm.
+
+ :returns:
+ * ``None`` if user not found
+ * ``False`` if password does not match
+ * ``True`` if password matches.
+ """
+ self._validate_user(user)
+ self._validate_realm(realm)
+ hash = self._entry_map.get((user,realm))
if hash is None:
return None
return hash == md5("%s:%s:%s" % (user,realm,password)).hexdigest()