plexpy/lib/ipwhois/utils.py
2016-07-31 11:05:42 -07:00

554 lines
16 KiB
Python

# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
from xml.dom.minidom import parseString
from os import path
import re
import copy
import io
import csv
import logging
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
ip_network,
IPv4Address,
IPv4Network,
IPv6Address,
summarize_address_range,
collapse_addresses)
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPNetwork as ip_network,
IPv4Address,
IPv4Network,
IPv6Address,
summarize_address_range,
collapse_address_list as collapse_addresses)
try: # pragma: no cover
from itertools import filterfalse
except ImportError: # pragma: no cover
from itertools import ifilterfalse as filterfalse
log = logging.getLogger(__name__)
IETF_RFC_REFERENCES = {
# IPv4
'RFC 1122, Section 3.2.1.3':
'http://tools.ietf.org/html/rfc1122#section-3.2.1.3',
'RFC 1918': 'http://tools.ietf.org/html/rfc1918',
'RFC 3927': 'http://tools.ietf.org/html/rfc3927',
'RFC 5736': 'http://tools.ietf.org/html/rfc5736',
'RFC 5737': 'http://tools.ietf.org/html/rfc5737',
'RFC 3068': 'http://tools.ietf.org/html/rfc3068',
'RFC 2544': 'http://tools.ietf.org/html/rfc2544',
'RFC 3171': 'http://tools.ietf.org/html/rfc3171',
'RFC 919, Section 7': 'http://tools.ietf.org/html/rfc919#section-7',
# IPv6
'RFC 4291, Section 2.7': 'http://tools.ietf.org/html/rfc4291#section-2.7',
'RFC 4291': 'http://tools.ietf.org/html/rfc4291',
'RFC 4291, Section 2.5.2':
'http://tools.ietf.org/html/rfc4291#section-2.5.2',
'RFC 4291, Section 2.5.3':
'http://tools.ietf.org/html/rfc4291#section-2.5.3',
'RFC 4291, Section 2.5.6':
'http://tools.ietf.org/html/rfc4291#section-2.5.6',
'RFC 4291, Section 2.5.7':
'http://tools.ietf.org/html/rfc4291#section-2.5.7',
'RFC 4193': 'https://tools.ietf.org/html/rfc4193'
}
IP_REGEX = (
r'(?P<ip>'
# IPv4
'(((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.)){3}'
'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)'
# IPv6
'|\[?(((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:)'
'{6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|'
'2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]'
'{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d'
'\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|'
'((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|'
'2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]'
'{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)'
'(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(('
'(:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1'
'\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(('
'[0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4})'
'{0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]'
'?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:(('
'25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})'
')|:)))(%.+)?))\]?'
# Optional IPv4 Port
'((:(6553[0-5]|655[0-2]\d|65[0-4]\d{2}|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}'
# Optional CIDR block
'))|(\/(?:[012]\d?|3[012]?|[4-9])))?'
')'
)
def ipv4_lstrip_zeros(address):
"""
The function to strip leading zeros in each octet of an IPv4 address.
Args:
address: An IPv4 address in string format.
Returns:
String: The modified IPv4 address string.
"""
# Split the octets.
obj = address.strip().split('.')
for x, y in enumerate(obj):
# Strip leading zeros. Split / here in case CIDR is attached.
obj[x] = y.split('/')[0].lstrip('0')
if obj[x] in ['', None]:
obj[x] = '0'
return '.'.join(obj)
def calculate_cidr(start_address, end_address):
"""
The function to calculate a CIDR range(s) from a start and end IP address.
Args:
start_address: The starting IP address in string format.
end_address: The ending IP address in string format.
Returns:
List: A list of calculated CIDR ranges.
"""
tmp_addrs = []
try:
tmp_addrs.extend(summarize_address_range(
ip_address(start_address),
ip_address(end_address)))
except (KeyError, ValueError, TypeError): # pragma: no cover
try:
tmp_addrs.extend(summarize_address_range(
ip_network(start_address).network_address,
ip_network(end_address).network_address))
except AttributeError: # pragma: no cover
tmp_addrs.extend(summarize_address_range(
ip_network(start_address).ip,
ip_network(end_address).ip))
return [i.__str__() for i in collapse_addresses(tmp_addrs)]
def get_countries(is_legacy_xml=False):
"""
The function to generate a dictionary containing ISO_3166-1 country codes
to names.
Args:
is_legacy_xml: Boolean for whether to use the older country code
list (iso_3166-1_list_en.xml).
Returns:
Dictionary: A dictionary with the country codes as the keys and the
country names as the values.
"""
# Initialize the countries dictionary.
countries = {}
# Set the data directory based on if the script is a frozen executable.
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
data_dir = path.dirname(sys.executable) # pragma: no cover
else:
data_dir = path.dirname(__file__)
if is_legacy_xml:
log.debug('Opening country code legacy XML: {0}'.format(
str(data_dir) + '/data/iso_3166-1_list_en.xml'))
# Create the country codes file object.
f = io.open(str(data_dir) + '/data/iso_3166-1_list_en.xml', 'r',
encoding='ISO-8859-1')
# Read the file.
data = f.read()
# Check if there is data.
if not data: # pragma: no cover
return {}
# Parse the data to get the DOM.
dom = parseString(data)
# Retrieve the country entries.
entries = dom.getElementsByTagName('ISO_3166-1_Entry')
# Iterate through the entries and add to the countries dictionary.
for entry in entries:
# Retrieve the country code and name from the DOM.
code = entry.getElementsByTagName(
'ISO_3166-1_Alpha-2_Code_element')[0].firstChild.data
name = entry.getElementsByTagName(
'ISO_3166-1_Country_name')[0].firstChild.data
# Add to the countries dictionary.
countries[code] = name.title()
else:
log.debug('Opening country code CSV: {0}'.format(
str(data_dir) + '/data/iso_3166-1_list_en.xml'))
# Create the country codes file object.
f = io.open(str(data_dir) + '/data/iso_3166-1.csv', 'r',
encoding='utf-8')
# Create csv reader object.
csv_reader = csv.reader(f, delimiter=',', quotechar='"')
# Iterate through the rows and add to the countries dictionary.
for row in csv_reader:
# Retrieve the country code and name columns.
code = row[0]
name = row[1]
# Add to the countries dictionary.
countries[code] = name
return countries
def ipv4_is_defined(address):
"""
The function for checking if an IPv4 address is defined (does not need to
be resolved).
Args:
address: An IPv4 address in string format.
Returns:
Tuple:
:Boolean: True if given address is defined, otherwise False
:String: IETF assignment name if given address is defined, otherwise ''
:String: IETF assignment RFC if given address is defined, otherwise ''
"""
# Initialize the IP address object.
query_ip = IPv4Address(str(address))
# This Network
if query_ip in IPv4Network('0.0.0.0/8'):
return True, 'This Network', 'RFC 1122, Section 3.2.1.3'
# Loopback
elif query_ip.is_loopback:
return True, 'Loopback', 'RFC 1122, Section 3.2.1.3'
# Link Local
elif query_ip.is_link_local:
return True, 'Link Local', 'RFC 3927'
# IETF Protocol Assignments
elif query_ip in IPv4Network('192.0.0.0/24'):
return True, 'IETF Protocol Assignments', 'RFC 5736'
# TEST-NET-1
elif query_ip in IPv4Network('192.0.2.0/24'):
return True, 'TEST-NET-1', 'RFC 5737'
# 6to4 Relay Anycast
elif query_ip in IPv4Network('192.88.99.0/24'):
return True, '6to4 Relay Anycast', 'RFC 3068'
# Network Interconnect Device Benchmark Testing
elif query_ip in IPv4Network('198.18.0.0/15'):
return (True,
'Network Interconnect Device Benchmark Testing',
'RFC 2544')
# TEST-NET-2
elif query_ip in IPv4Network('198.51.100.0/24'):
return True, 'TEST-NET-2', 'RFC 5737'
# TEST-NET-3
elif query_ip in IPv4Network('203.0.113.0/24'):
return True, 'TEST-NET-3', 'RFC 5737'
# Multicast
elif query_ip.is_multicast:
return True, 'Multicast', 'RFC 3171'
# Limited Broadcast
elif query_ip in IPv4Network('255.255.255.255/32'):
return True, 'Limited Broadcast', 'RFC 919, Section 7'
# Private-Use Networks
elif query_ip.is_private:
return True, 'Private-Use Networks', 'RFC 1918'
return False, '', ''
def ipv6_is_defined(address):
"""
The function for checking if an IPv6 address is defined (does not need to
be resolved).
Args:
address: An IPv6 address in string format.
Returns:
Tuple:
:Boolean: True if address is defined, otherwise False
:String: IETF assignment name if address is defined, otherwise ''
:String: IETF assignment RFC if address is defined, otherwise ''
"""
# Initialize the IP address object.
query_ip = IPv6Address(str(address))
# Multicast
if query_ip.is_multicast:
return True, 'Multicast', 'RFC 4291, Section 2.7'
# Unspecified
elif query_ip.is_unspecified:
return True, 'Unspecified', 'RFC 4291, Section 2.5.2'
# Loopback.
elif query_ip.is_loopback:
return True, 'Loopback', 'RFC 4291, Section 2.5.3'
# Reserved
elif query_ip.is_reserved:
return True, 'Reserved', 'RFC 4291'
# Link-Local
elif query_ip.is_link_local:
return True, 'Link-Local', 'RFC 4291, Section 2.5.6'
# Site-Local
elif query_ip.is_site_local:
return True, 'Site-Local', 'RFC 4291, Section 2.5.7'
# Unique Local Unicast
elif query_ip.is_private:
return True, 'Unique Local Unicast', 'RFC 4193'
return False, '', ''
def unique_everseen(iterable, key=None):
"""
The generator to list unique elements, preserving the order. Remember all
elements ever seen. This was taken from the itertools recipes.
Args:
iterable: An iterable to process.
key: Optional function to run when checking elements (e.g., str.lower)
Returns:
Generator: Yields a generator object.
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_addresses(data=None, file_path=None):
"""
The function to search an input string and/or file, extracting and
counting IPv4/IPv6 addresses/networks. Summarizes ports with sub-counts.
If both a string and file_path are provided, it will process them both.
Args:
data: A string to process.
file_path: An optional file path to process.
Returns:
Dictionary:
:ip address/network: Each address or network found is a dictionary w/\:
:count: Total number of times seen (Integer)
:ports: Dictionary with port numbers as keys and the number of
times seen for this ip as values (Dictionary)
Raises:
ValueError: Arguments provided are invalid.
"""
if not data and not file_path:
raise ValueError('No data or file path provided.')
ret = {}
base = {
'count': 0,
'ports': {}
}
file_data = None
if file_path:
log.debug('Opening file for unique address analysis: {0}'.format(
str(file_path)))
f = open(str(file_path), 'r')
# Read the file.
file_data = f.read()
pattern = re.compile(
str(IP_REGEX),
re.DOTALL
)
# Check if there is data.
log.debug('Analyzing input/file data'.format(
str(file_path)))
for input_data in [data, file_data]:
if input_data:
# Search for IPs.
for match in pattern.finditer(input_data):
is_net = False
port = None
try:
found = match.group('ip')
if '.' in found and ':' in found:
split = found.split(':')
ip_or_net = split[0]
port = split[1]
elif '[' in found:
split = found.split(']:')
ip_or_net = split[0][1:]
port = split[1]
elif '/' in found:
is_net = True
ip_or_net = found
else:
ip_or_net = found
if is_net:
ip_obj = ip_network(ip_or_net)
else:
ip_obj = ip_address(ip_or_net)
obj_str = ip_obj.__str__()
if obj_str not in ret.keys():
ret[obj_str] = copy.deepcopy(base)
ret[obj_str]['count'] += 1
if port:
try:
ret[obj_str]['ports'][str(port)] += 1
except KeyError:
ret[obj_str]['ports'][str(port)] = 1
except (KeyError, ValueError):
continue
return ret