summaryrefslogtreecommitdiff
path: root/bps/misc.py
diff options
context:
space:
mode:
Diffstat (limited to 'bps/misc.py')
-rw-r--r--bps/misc.py487
1 files changed, 487 insertions, 0 deletions
diff --git a/bps/misc.py b/bps/misc.py
new file mode 100644
index 0000000..e81e44c
--- /dev/null
+++ b/bps/misc.py
@@ -0,0 +1,487 @@
+"""bps.misc -- assorted functions that dont fit in another category"""
+#===================================================
+#imports
+#===================================================
+#core
+from functools import update_wrapper
+import time
+import re
+#pkg
+from bps.types import Undef
+from bps.meta import find_attribute
+#local
+__all__ = [
+ #property constructors
+ 'indirect_property',
+ 'constructor_property',
+
+ #http
+ 'parse_agent_string',
+ 'agent_string_has_product',
+
+ #other
+ 'stepped_delay',
+]
+
+#=========================================================
+#property constructors
+#=========================================================
+class indirect_property(object):
+ """descriptor which acts like property(), but resolves methods at instance time.
+
+ One of the drawbacks of the builtin :func:``property`` is that it stored
+ the functions directly. Thus, if a subclass overrides the method
+ which is also being used by a property's fget,
+ the property object will still use the original function.
+
+ This is a drop-in replacement for property which takes in
+ attribute names instead of actual functions. It does
+ runtime resolution of the attributes, so that the named
+ methods can be safely overridden (even on a per-instance basis)
+ and still have the properties use the correct code.
+
+ .. note::
+ Due to the repeated additional lookup, this is slower
+ than a normal property, so use it only if you have to.
+ """
+ #TODO: need to make this work right for various border cases (missing fget/fset)
+ #TODO: default doc based on attr names
+
+ def __init__(self, fget=None, fset=None, fdel=None, doc=None):
+ self.fget = fget
+ self.fset = fset
+ self.fdel = fdel
+ if doc:
+ self.__doc__ = doc
+
+ def __get__(self, obj, cls):
+ if obj is None:
+ return self
+ else:
+ return getattr(obj, self.fget)()
+
+ def __set__(self, obj, value):
+ if self.fset:
+ getattr(obj, self.fset)(value)
+ else:
+ raise AttributeError("readonly attribute")
+
+ def __delete__(self, obj):
+ if self.fdel:
+ getattr(obj, self.fdel)(value)
+ else:
+ raise AttributeError("can't delete attribute")
+
+class constructor_property(object):
+ """lazy-initialized attribute.
+
+ This is a class property,
+ which takes in a constructor func, and uses that function
+ to fill in the instance attribute when it's first accessed.
+
+ usage::
+ >>> from bps.misc import constructor_property
+ >>> #create a custom class
+ >>> class Example(object):
+ >>> a = constructor_property(dict)
+ >>> e = Example()
+ >>> #initially nothing is stored in 'a'
+ >>> e.__dict__.get("a")
+ None
+ >>> #but when it's first accessed, dict() is called, and the value is stored/returned
+ >>> e.a
+ {}
+ >>> #from then on, that's the value that will be returned for .a, until ``del e.a`` is called
+ >>> e.__dict__.get("a")
+ {}
+
+ :arg func:
+ function / class to call when attribute is first accessed for an instance
+ :arg name:
+ optionally let object know which attribute it's stored under
+ (will be autodiscovered later)
+ :param passref:
+ if True, func will be called with instance as first argument (eg ``func(self)``)
+ rather than without arguments (eg ``func()``)
+ """
+ def __init__(self, func, name=None, passref=False):
+ self.func = func
+ self.name = name
+ self.passref = passref
+
+ def __get__(self, obj, cls):
+ if obj is None:
+ return self
+ if self.name is None:
+ self.name = find_attribute(cls, self, required=True)
+ assert self.name not in obj.__dict__
+ if self.passref:
+ value = self.func(obj)
+ else:
+ value = self.func()
+ obj.__dict__[self.name] = value
+ #we should never get called again for this object
+ return value
+
+class class_property(object):
+ "classmethod+readonly property"
+ #TODO: document this
+ def __init__(self, fget):
+ self.fget = fget
+ def __get__(self, owner, cls):
+ return self.fget(cls)
+
+#=========================================================
+#
+#=========================================================
+def _iter_decay(lower, upper, half):
+ "helper for stepped_delay"
+ #default delay loop using "lower" "upper" and "half"
+ #equation: delay[idx] = upper - (upper-lower) * (decay ** idx)
+ #such that:
+ # delay[0] == lower
+ # delay[half] = (upper+lower)/2
+ # delay[idx] < upper
+ #
+ #this means decay = (1/2)**(1/half)
+ #
+ if half:
+ decay = .5**(1.0/half)
+ else:
+ decay = .9 ## approx ~ half=7
+ value = upper-lower
+ while True:
+ yield upper-value
+ value *= decay
+
+def stepped_delay(timeout=None, count=None, steps=None, lower=.1, upper=90, half=None):
+ """generate a stepped delay loop; useful when polling a resource repeatedly.
+
+ This function provides a delay loop
+ for such things as polling a filesystem for changes, etc.
+ It provides an initially short delay which slowly backs off.
+ It's designed to be used an iterator, so that all logic
+ stays within your application.
+
+ You can either specify a custom sequence of delay values via *steps*,
+ or use the default exponential decay algorithm, which
+ begans with a delay of *lower*, and slowly increases,
+ approaching a delay time of *upper*.
+
+ :param timeout:
+ If specified, the loop will stop after *timeout* seconds
+ have passed, no matter how many repetitions have been run.
+
+ :param count:
+ If specified, the loop will stop after *count* repetitions.
+
+ :param steps:
+ If specified, this should be a sequence
+ of delay values to use. When the sequence runs
+ out, the last delay value will be repeated.
+ If *steps* is not used, a default exponential
+ decay algorithm will be used.
+
+ :param lower:
+ [ignored if *steps* is specified]
+ This specifies the starting delay.
+ The first delay will be this length,
+ the next a little more, and so on.
+
+ :param upper:
+ [ignored if *steps* is specified]
+ This specifies the upper bound on the delay.
+ Each time the iterator sleeps, the delay
+ will increase, asymptotically approaching
+ the *upper* bound.
+
+ :param half:
+ [optional, ignored if *steps* is specified]
+ If specified, adjusts the rate of the exponential delay
+ increase such that it will take exactly *half*
+ rounds through the iterator before the delay
+ is at the half-way mark between *lower* and *upper*.
+
+ :Returns:
+ This loop yields tuples of ``(index,delay)``,
+ where *index* is the number of passes that have been made,
+ and *delay* is the amount of time it slept before
+ yielding the last tuple. It will increase the delay
+ used each time before it yeilds a new tuple,
+ in accordance with the configuration above.
+ If the loop ends due to *timeout* or *count*,
+ the iterator will raise :exc:`StopIteration`.
+
+ Usage Example::
+
+ >>> import time
+ >>> from bps.misc import stepped_delay
+ >>> for i,d in stepped_delay(count=10, lower=.1, upper=10):
+ >>> print i,d,time.time()
+ >>> #... do stuff, calling break if done with loop
+ >>> else:
+ >>> print "loop exit w/o success"
+ 0 0 1244648293.01
+ 1 0.1 1244648293.11
+ 2 1.09 1244648294.2
+ 3 1.981 1244648296.19
+ 4 2.7829 1244648298.97
+ 5 3.50461 1244648302.48
+ 6 4.154149 1244648306.64
+ 7 4.7387341 1244648311.38
+ 8 5.26486069 1244648316.65
+ 9 5.738374621 1244648322.39
+ loop exit w/o success
+
+ .. todo::
+ Could allow delay to be reset to initial value
+ by sending ``"reset"`` back to the yield statement.
+ """
+
+ #run first round without any delay
+ yield 0, 0
+
+ #prepare delay value generator
+ if steps:
+ #ignore 'lower', 'upper', and 'half'
+ def loopgen():
+ for value in steps:
+ yield value
+ while True: #repeat last value
+ yield value
+ loop = loopgen()
+ else:
+ if upper <= lower: #allow us to set really small 'upper' and auto-scale lower
+ lower = .1 * upper
+ loop = _iter_decay(lower, upper, half)
+
+ #run main delay loop
+ if timeout:
+ end = time.time() + timeout
+ for idx, delay in enumerate(loop):
+ time.sleep(delay)
+ yield idx+1, delay
+ #check if it's time to abort
+ if count and idx+2 >= count:
+ return
+ if timeout and time.time() >= end:
+ return
+
+#=========================================================
+#http agent string
+#=========================================================
+_clean_re = re.compile(r"\s+")
+
+_agent_re = re.compile(
+ r"""
+ ^
+ \s*
+ (
+ (?P<product>
+ (?P<name>
+ [^\s()/]+ # technically this should only be TOKEN chars
+ )
+ (
+ /
+ (?P<version>
+ [^\s()]+ #XXX: what _is_ allowed here? TOKEN?
+ )
+ )?
+ )
+ |
+ (
+ \(
+ (?P<comment>
+ [^)]+ #technically this should only be TOKEN chars
+ )
+ \)
+ )
+ )
+ \s*
+ """, re.I|re.X)
+
+def parse_agent_string(value, normalize=True):
+ """parse a HTTP user agent string.
+
+ This parses an HTTP User Agent string,
+ returning a list of agents identified in the string, in order.
+
+
+ :type value: str
+ :param value:
+ The agent string to parse
+
+ :type normalize: bool
+ :param normalize:
+ This flag (enabled by default)
+ turns on any special-case heuristics for known
+ atypical user agent strings, as well
+ as converting the string to lower case.
+ It can be disabled to get the unmangled results.
+
+ :returns:
+ A list of dictionaries, one for each product found.
+ The first dictionary is usually considered the primary.
+ This code assumes comments will always *follow* the product description
+ they are attached to, but if this rule is violated,
+ a "blank" product entry will be inserted, where all relevant keys
+ except "comment" will be ``None``. Other than that case,
+ the following keys should be filled out for each dictionary:
+
+ product
+ This will contain the raw product name, eg "Mozilla/5.0".
+ name
+ This will contain just the name of the product
+ (assuming it has the format "name/version").
+ If the product couldn't be parsed this way, name's contents are undefined.
+ version
+ This will contain the version of the product,
+ (assuming it has the format "name/version").
+ If the product couldn't be parsed this way, version's contents are undefined.
+ comment
+ This is present if a comment stanza followed
+ the product definition. This will be a list of strings,
+ as read from the comment and separated by semicolons.
+ If no comment is present, the key will not be included.
+
+ Usage Example::
+
+ >>> from bps.misc import parse_agent_string
+ >>> parse_agent_string("Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.0.11) Gecko/2009060309 Ubuntu/9.04 (jaunty) Firefox/3.0.11")
+ [
+ { 'name': 'Mozilla', 'version': '5.0',
+ 'product': 'Mozilla/5.0',
+ 'comments': ['X11', 'U', 'Linux x86_64',
+ 'en-US', 'rv:1.9.0.11'],
+ },
+ { 'name': 'Gecko', 'product': 'Gecko/2009060309',
+ 'version': '2009060309'
+ },
+ { 'name': 'Ubuntu', 'version': '9.04',
+ 'product': 'Ubuntu/9.04',
+ 'comments': ['jaunty'],
+ },
+ { 'name': 'Firefox', 'version': '3.0.11',
+ 'product': 'Firefox/3.0.11',
+ }
+ ]
+
+ .. seealso:
+
+ :rfc:`2068` is the authoritative agent string format spec.
+ """
+ #NOTE: this code makes the assumption
+ #that a comment will always be FOLLOWING (and is associated with) the preceding product.
+ #this goes against the grammar of RFC2068, but is the de facto case.
+ #thus, if a unexpected comment is encountered, a empty product entry will be created.
+ orig = value
+ value = _clean_re.sub(" ", value).strip()
+ if normalize:
+ value = value.lower()
+ out = []
+ while value:
+ m = _agent_re.match(value)
+ if m:
+ comment = m.group("comment")
+ if comment:
+ comments = [ elem.strip() for elem in comment.split(";") ]
+ if out and isinstance(out[-1], dict) and 'comments' not in out[-1]:
+ out[-1]['comments'] = comments
+ else:
+ log.warning("unexpected comment segment in agent: %r %r", comment, orig)
+ out.append(dict(product=None, name=None, version=None, comments=comments))
+ else:
+ product, name, version = m.group("product", "name", "version")
+ out.append(dict(product=product, name=name, version=version))
+ value = value[m.end():]
+ else:
+ #can this _ever_ happen?
+ log.warning("failed to parse agent segment: %r of %r", value, orig)
+ value = ''
+## if not normalize:
+## return out
+ #TODO: detect the "+http://homepage" elements add end of comment list,
+ # move out to "url" kwd
+ #TODO: detect platform info
+ #TODO: detect firefox, opera, konq, safari, chrome,
+ # and move their products to the front
+## #now we apply various bits of UA-specific knowledge to normalize things
+## #TODO: could pull out 'MSIE' etc
+## for entry in out:
+## if not entry['product'] or not entry['comments']:
+## continue
+## #could parse out site urls
+ return out
+
+def _parse_agent_version(value):
+ if value is None:
+ return None
+ #XXX: use a real version parser here.
+ if isinstance(value, str):
+ try:
+ return tuple(int(x) for x in value.split("."))
+ except ValueError:
+ return None
+ elif isinstance(value, int):
+ return tuple(value)
+ #should be tuple of ints.
+ return value
+
+def agent_string_has_product(agent, name, min_version=None):
+ """tests if agent string references a product name.
+
+ This wrapper for :func:`parse_agent_string`
+ checks if a given product is found in the provided string.
+ This is a simple function, more complex cases may require
+ rolling your own test function.
+
+ :param agent:
+ The raw agent string, OR the output of parse_agent_string.
+ :param name:
+ The name of the product to check for.
+ :param min_version:
+ Optional minimum version.
+ For this to work, min_version must be an integer,
+ tuple of integers, or a period-separated string.
+
+ :returns:
+ Returns ``True`` if a match is found,
+ ``False`` if a match is not found.
+ """
+ name = name.lower()
+ min_version = _parse_agent_version(min_version)
+ if isinstance(agent, str):
+ agent = parse_agent_string(agent)
+ for entry in agent:
+ if entry['name'] == name:
+ if not min_version or min_version <= _parse_agent_version(entry['version']):
+ return True
+ #TODO: IE detect here or in extended?
+ return False
+
+#=========================================================
+#code scraps
+#=========================================================
+
+#need to clean this up a little, but might be useful
+##def formatFuncStr(fname, *args, **kwds):
+## if isinstance(fname, str):
+## pass## elif callable(fname):
+## fname = fname.__name__
+## else:
+## fname = str(fname)
+##
+## body = ""
+## if args:
+## for a in args:
+## if body != "": body += ","
+## body += repr(a)
+## if kwds:
+## for k,v in kwds.items():
+## if body != "": body += ","
+## body += "%s=%r" % (k,v)
+## return "%s(%s)" % (fname,body)
+
+#=========================================================
+#
+#=========================================================