diff options
author | Marc Abramowitz <marc@marc-abramowitz.com> | 2016-03-07 14:05:52 -0800 |
---|---|---|
committer | Marc Abramowitz <marc@marc-abramowitz.com> | 2016-03-07 14:05:52 -0800 |
commit | 42b22881290e00e06b840dee1e42f0f5ef044d47 (patch) | |
tree | b4fef928625acd3e8ee45ccaa8c7a6c9810b3601 /paste/util/ip4.py | |
download | paste-git-tox_add_py35.tar.gz |
tox.ini: Add py35 to envlisttox_add_py35
Diffstat (limited to 'paste/util/ip4.py')
-rw-r--r-- | paste/util/ip4.py | 274 |
1 files changed, 274 insertions, 0 deletions
diff --git a/paste/util/ip4.py b/paste/util/ip4.py new file mode 100644 index 0000000..9ce17b8 --- /dev/null +++ b/paste/util/ip4.py @@ -0,0 +1,274 @@ +# -*- coding: iso-8859-15 -*- +"""IP4 address range set implementation. + +Implements an IPv4-range type. + +Copyright (C) 2006, Heiko Wundram. +Released under the MIT-license. +""" + +# Version information +# ------------------- + +__author__ = "Heiko Wundram <me@modelnine.org>" +__version__ = "0.2" +__revision__ = "3" +__date__ = "2006-01-20" + + +# Imports +# ------- + +from paste.util import intset +import socket +import six + + +# IP4Range class +# -------------- + +class IP4Range(intset.IntSet): + """IP4 address range class with efficient storage of address ranges. + Supports all set operations.""" + + _MINIP4 = 0 + _MAXIP4 = (1<<32) - 1 + _UNITYTRANS = "".join([chr(n) for n in range(256)]) + _IPREMOVE = "0123456789." + + def __init__(self,*args): + """Initialize an ip4range class. The constructor accepts an unlimited + number of arguments that may either be tuples in the form (start,stop), + integers, longs or strings, where start and stop in a tuple may + also be of the form integer, long or string. + + Passing an integer or long means passing an IPv4-address that's already + been converted to integer notation, whereas passing a string specifies + an address where this conversion still has to be done. A string + address may be in the following formats: + + - 1.2.3.4 - a plain address, interpreted as a single address + - 1.2.3 - a set of addresses, interpreted as 1.2.3.0-1.2.3.255 + - localhost - hostname to look up, interpreted as single address + - 1.2.3<->5 - a set of addresses, interpreted as 1.2.3.0-1.2.5.255 + - 1.2.0.0/16 - a set of addresses, interpreted as 1.2.0.0-1.2.255.255 + + Only the first three notations are valid if you use a string address in + a tuple, whereby notation 2 is interpreted as 1.2.3.0 if specified as + lower bound and 1.2.3.255 if specified as upper bound, not as a range + of addresses. + + Specifying a range is done with the <-> operator. This is necessary + because '-' might be present in a hostname. '<->' shouldn't be, ever. + """ + + # Special case copy constructor. + if len(args) == 1 and isinstance(args[0],IP4Range): + super(IP4Range,self).__init__(args[0]) + return + + # Convert arguments to tuple syntax. + args = list(args) + for i in range(len(args)): + argval = args[i] + if isinstance(argval,str): + if "<->" in argval: + # Type 4 address. + args[i] = self._parseRange(*argval.split("<->",1)) + continue + elif "/" in argval: + # Type 5 address. + args[i] = self._parseMask(*argval.split("/",1)) + else: + # Type 1, 2 or 3. + args[i] = self._parseAddrRange(argval) + elif isinstance(argval,tuple): + if len(tuple) != 2: + raise ValueError("Tuple is of invalid length.") + addr1, addr2 = argval + if isinstance(addr1,str): + addr1 = self._parseAddrRange(addr1)[0] + elif not isinstance(addr1, six.integer_types): + raise TypeError("Invalid argument.") + if isinstance(addr2,str): + addr2 = self._parseAddrRange(addr2)[1] + elif not isinstance(addr2, six.integer_types): + raise TypeError("Invalid argument.") + args[i] = (addr1,addr2) + elif not isinstance(argval, six.integer_types): + raise TypeError("Invalid argument.") + + # Initialize the integer set. + super(IP4Range,self).__init__(min=self._MINIP4,max=self._MAXIP4,*args) + + # Parsing functions + # ----------------- + + def _parseRange(self,addr1,addr2): + naddr1, naddr1len = _parseAddr(addr1) + naddr2, naddr2len = _parseAddr(addr2) + if naddr2len < naddr1len: + naddr2 += naddr1&(((1<<((naddr1len-naddr2len)*8))-1)<< + (naddr2len*8)) + naddr2len = naddr1len + elif naddr2len > naddr1len: + raise ValueError("Range has more dots than address.") + naddr1 <<= (4-naddr1len)*8 + naddr2 <<= (4-naddr2len)*8 + naddr2 += (1<<((4-naddr2len)*8))-1 + return (naddr1,naddr2) + + def _parseMask(self,addr,mask): + naddr, naddrlen = _parseAddr(addr) + naddr <<= (4-naddrlen)*8 + try: + if not mask: + masklen = 0 + else: + masklen = int(mask) + if not 0 <= masklen <= 32: + raise ValueError + except ValueError: + try: + mask = _parseAddr(mask,False) + except ValueError: + raise ValueError("Mask isn't parseable.") + remaining = 0 + masklen = 0 + if not mask: + masklen = 0 + else: + while not (mask&1): + remaining += 1 + while (mask&1): + mask >>= 1 + masklen += 1 + if remaining+masklen != 32: + raise ValueError("Mask isn't a proper host mask.") + naddr1 = naddr & (((1<<masklen)-1)<<(32-masklen)) + naddr2 = naddr1 + (1<<(32-masklen)) - 1 + return (naddr1,naddr2) + + def _parseAddrRange(self,addr): + naddr, naddrlen = _parseAddr(addr) + naddr1 = naddr<<((4-naddrlen)*8) + naddr2 = ( (naddr<<((4-naddrlen)*8)) + + (1<<((4-naddrlen)*8)) - 1 ) + return (naddr1,naddr2) + + # Utility functions + # ----------------- + + def _int2ip(self,num): + rv = [] + for i in range(4): + rv.append(str(num&255)) + num >>= 8 + return ".".join(reversed(rv)) + + # Iterating + # --------- + + def iteraddresses(self): + """Returns an iterator which iterates over ips in this iprange. An + IP is returned in string form (e.g. '1.2.3.4').""" + + for v in super(IP4Range,self).__iter__(): + yield self._int2ip(v) + + def iterranges(self): + """Returns an iterator which iterates over ip-ip ranges which build + this iprange if combined. An ip-ip pair is returned in string form + (e.g. '1.2.3.4-2.3.4.5').""" + + for r in self._ranges: + if r[1]-r[0] == 1: + yield self._int2ip(r[0]) + else: + yield '%s-%s' % (self._int2ip(r[0]),self._int2ip(r[1]-1)) + + def itermasks(self): + """Returns an iterator which iterates over ip/mask pairs which build + this iprange if combined. An IP/Mask pair is returned in string form + (e.g. '1.2.3.0/24').""" + + for r in self._ranges: + for v in self._itermasks(r): + yield v + + def _itermasks(self,r): + ranges = [r] + while ranges: + cur = ranges.pop() + curmask = 0 + while True: + curmasklen = 1<<(32-curmask) + start = (cur[0]+curmasklen-1)&(((1<<curmask)-1)<<(32-curmask)) + if start >= cur[0] and start+curmasklen <= cur[1]: + break + else: + curmask += 1 + yield "%s/%s" % (self._int2ip(start),curmask) + if cur[0] < start: + ranges.append((cur[0],start)) + if cur[1] > start+curmasklen: + ranges.append((start+curmasklen,cur[1])) + + __iter__ = iteraddresses + + # Printing + # -------- + + def __repr__(self): + """Returns a string which can be used to reconstruct this iprange.""" + + rv = [] + for start, stop in self._ranges: + if stop-start == 1: + rv.append("%r" % (self._int2ip(start),)) + else: + rv.append("(%r,%r)" % (self._int2ip(start), + self._int2ip(stop-1))) + return "%s(%s)" % (self.__class__.__name__,",".join(rv)) + +def _parseAddr(addr,lookup=True): + if lookup and any(ch not in IP4Range._IPREMOVE for ch in addr): + try: + addr = socket.gethostbyname(addr) + except socket.error: + raise ValueError("Invalid Hostname as argument.") + naddr = 0 + for naddrpos, part in enumerate(addr.split(".")): + if naddrpos >= 4: + raise ValueError("Address contains more than four parts.") + try: + if not part: + part = 0 + else: + part = int(part) + if not 0 <= part < 256: + raise ValueError + except ValueError: + raise ValueError("Address part out of range.") + naddr <<= 8 + naddr += part + return naddr, naddrpos+1 + +def ip2int(addr, lookup=True): + return _parseAddr(addr, lookup=lookup)[0] + +if __name__ == "__main__": + # Little test script. + x = IP4Range("172.22.162.250/24") + y = IP4Range("172.22.162.250","172.22.163.250","172.22.163.253<->255") + print(x) + for val in x.itermasks(): + print(val) + for val in y.itermasks(): + print(val) + for val in (x|y).itermasks(): + print(val) + for val in (x^y).iterranges(): + print(val) + for val in x: + print(val) |