plexpy/lib/ipwhois/whois.py
2021-10-14 23:18:51 -07:00

770 lines
25 KiB
Python

# Copyright (c) 2013-2020 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
import re
import copy
from datetime import datetime
import logging
from .utils import unique_everseen
from . import (BlacklistError, WhoisLookupError, NetError)
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
ip_network,
summarize_address_range,
collapse_addresses)
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPNetwork as ip_network,
summarize_address_range,
collapse_address_list as collapse_addresses)
log = logging.getLogger(__name__)
# Legacy base whois output dictionary.
BASE_NET = {
'cidr': None,
'name': None,
'handle': None,
'range': None,
'description': None,
'country': None,
'state': None,
'city': None,
'address': None,
'postal_code': None,
'emails': None,
'created': None,
'updated': None
}
RIR_WHOIS = {
'arin': {
'server': 'whois.arin.net',
'fields': {
'name': r'(NetName):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(NetHandle):[^\S\n]+(?P<val>.+?)\n',
'description': r'(OrgName|CustName):[^\S\n]+(?P<val>.+?)'
'(?=(\n\\S):?)',
'country': r'(Country):[^\S\n]+(?P<val>.+?)\n',
'state': r'(StateProv):[^\S\n]+(?P<val>.+?)\n',
'city': r'(City):[^\S\n]+(?P<val>.+?)\n',
'address': r'(Address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'postal_code': r'(PostalCode):[^\S\n]+(?P<val>.+?)\n',
'emails': (
r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
'[^\\S\n]+.*?)*?\n'
),
'created': r'(RegDate):[^\S\n]+(?P<val>.+?)\n',
'updated': r'(Updated):[^\S\n]+(?P<val>.+?)\n',
},
'dt_format': '%Y-%m-%d'
},
'ripencc': {
'server': 'whois.ripe.net',
'fields': {
'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'emails': (
r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
'[^\\S\n]+.*?)*?\n'
),
'created': (
r'(created):[^\S\n]+(?P<val>[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]'
'{2}:[0-9]{2}:[0-9]{2}Z).*?\n'
),
'updated': (
r'(last-modified):[^\S\n]+(?P<val>[0-9]{4}-[0-9]{2}-[0-9]{2}T'
'[0-9]{2}:[0-9]{2}:[0-9]{2}Z).*?\n'
)
},
'dt_format': '%Y-%m-%dT%H:%M:%SZ'
},
'apnic': {
'server': 'whois.apnic.net',
'fields': {
'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'emails': (
r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
'[^\\S\n]+.*?)*?\n'
),
'updated': r'(changed):[^\S\n]+.*(?P<val>[0-9]{8}).*?\n'
},
'dt_format': '%Y%m%d'
},
'lacnic': {
'server': 'whois.lacnic.net',
'fields': {
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(owner):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'emails': (
r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
'[^\\S\n]+.*?)*?\n'
),
'created': r'(created):[^\S\n]+(?P<val>[0-9]{8}).*?\n',
'updated': r'(changed):[^\S\n]+(?P<val>[0-9]{8}).*?\n'
},
'dt_format': '%Y%m%d'
},
'afrinic': {
'server': 'whois.afrinic.net',
'fields': {
'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
'emails': (
r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
'[^\\S\n]+.*?)*?\n'
),
}
}
}
RWHOIS = {
'fields': {
'cidr': r'(network:IP-Network):(?P<val>.+?)\n',
'name': r'(network:ID):(?P<val>.+?)\n',
'description': (
r'(network:(Org-Name|Organization(;I)?)):(?P<val>.+?)\n'
),
'country': r'(network:(Country|Country-Code)):(?P<val>.+?)\n',
'state': r'(network:State):(?P<val>.+?)\n',
'city': r'(network:City):(?P<val>.+?)\n',
'address': r'(network:Street-Address):(?P<val>.+?)\n',
'postal_code': r'(network:Postal-Code):(?P<val>.+?)\n',
'emails': (
r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
'[^\\S\n]+.*?)*?\n'
),
'created': r'(network:Created):(?P<val>.+?)\n',
'updated': r'(network:Updated):(?P<val>.+?)\n'
}
}
ASN_REFERRALS = {
'whois://whois.ripe.net': 'ripencc',
'whois://whois.apnic.net': 'apnic',
'whois://whois.lacnic.net': 'lacnic',
'whois://whois.afrinic.net': 'afrinic',
}
class Whois:
"""
The class for parsing via whois
Args:
net (:obj:`ipwhois.net.Net`): The network object.
Raises:
NetError: The parameter provided is not an instance of
ipwhois.net.Net
IPDefinedError: The address provided is defined (does not need to be
resolved).
"""
def __init__(self, net):
from .net import Net
# ipwhois.net.Net validation
if isinstance(net, Net):
self._net = net
else:
raise NetError('The provided net parameter is not an instance of '
'ipwhois.net.Net')
def parse_fields(self, response, fields_dict, net_start=None,
net_end=None, dt_format=None, field_list=None):
"""
The function for parsing whois fields from a data input.
Args:
response (:obj:`str`): The response from the whois/rwhois server.
fields_dict (:obj:`dict`): The mapping of fields to regex search
values (required).
net_start (:obj:`int`): The starting point of the network (if
parsing multiple networks). Defaults to None.
net_end (:obj:`int`): The ending point of the network (if parsing
multiple networks). Defaults to None.
dt_format (:obj:`str`): The format of datetime fields if known.
Defaults to None.
field_list (:obj:`list` of :obj:`str`): If provided, fields to
parse. Defaults to:
::
['name', 'handle', 'description', 'country', 'state',
'city', 'address', 'postal_code', 'emails', 'created',
'updated']
Returns:
dict: A dictionary of fields provided in fields_dict, mapping to
the results of the regex searches.
"""
ret = {}
if not field_list:
field_list = ['name', 'handle', 'description', 'country', 'state',
'city', 'address', 'postal_code', 'emails',
'created', 'updated']
generate = ((field, pattern) for (field, pattern) in
fields_dict.items() if field in field_list)
for field, pattern in generate:
pattern = re.compile(
str(pattern),
re.DOTALL
)
if net_start is not None:
match = pattern.finditer(response, net_end, net_start)
elif net_end is not None:
match = pattern.finditer(response, net_end)
else:
match = pattern.finditer(response)
values = []
sub_section_end = None
for m in match:
if sub_section_end:
if field not in (
'emails'
) and (sub_section_end != (m.start() - 1)):
break
try:
values.append(m.group('val').strip())
except IndexError:
pass
sub_section_end = m.end()
if len(values) > 0:
value = None
try:
if field == 'country':
value = values[0].upper()
elif field in ['created', 'updated'] and dt_format:
value = datetime.strptime(
values[0],
str(dt_format)).isoformat('T')
elif field in ['emails']:
value = list(unique_everseen(values))
else:
values = unique_everseen(values)
value = '\n'.join(values).strip()
except ValueError as e:
log.debug('Whois field parsing failed for {0}: {1}'.format(
field, e))
pass
ret[field] = value
return ret
def get_nets_arin(self, response):
"""
The function for parsing network blocks from ARIN whois data.
Args:
response (:obj:`str`): The response from the ARIN whois server.
Returns:
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
# Find the first NetRange value.
pattern = re.compile(
r'^NetRange:[^\S\n]+(.+)$',
re.MULTILINE
)
temp = pattern.search(response)
net_range = None
net_range_start = None
if temp is not None:
net_range = temp.group(1).strip()
net_range_start = temp.start()
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^CIDR:[^\S\n]+(.+?,[^\S\n].+|.+)$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
if len(nets) > 0:
temp = pattern.search(response, match.start())
net_range = None
net_range_start = None
if temp is not None:
net_range = temp.group(1).strip()
net_range_start = temp.start()
if net_range is not None:
if net_range_start < match.start() or len(nets) > 0:
try:
net['range'] = '{0} - {1}'.format(
ip_network(net_range)[0].__str__(),
ip_network(net_range)[-1].__str__()
) if '/' in net_range else net_range
except ValueError: # pragma: no cover
net['range'] = net_range
net['cidr'] = ', '.join(
[ip_network(c.strip()).__str__()
for c in match.group(1).split(', ')]
)
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except ValueError:
pass
return nets
def get_nets_lacnic(self, response):
"""
The function for parsing network blocks from LACNIC whois data.
Args:
response (:obj:`str`): The response from the LACNIC whois server.
Returns:
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^(inetnum|inet6num|route):[^\S\n]+(.+?,[^\S\n].+|.+)$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
net_range = match.group(2).strip()
try:
net['range'] = net['range'] = '{0} - {1}'.format(
ip_network(net_range)[0].__str__(),
ip_network(net_range)[-1].__str__()
) if '/' in net_range else net_range
except ValueError: # pragma: no cover
net['range'] = net_range
temp = []
for addr in net_range.split(', '):
count = addr.count('.')
if count != 0 and count < 4:
addr_split = addr.strip().split('/')
for i in range(count + 1, 4):
addr_split[0] += '.0'
addr = '/'.join(addr_split)
temp.append(ip_network(addr.strip()).__str__())
net['cidr'] = ', '.join(temp)
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except ValueError:
pass
return nets
def get_nets_other(self, response):
"""
The function for parsing network blocks from generic whois data.
Args:
response (:obj:`str`): The response from the whois/rwhois server.
Returns:
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^(inetnum|inet6num|route):[^\S\n]+((.+?)[^\S\n]-[^\S\n](.+)|'
'.+)$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
net_range = match.group(2).strip()
try:
net['range'] = net['range'] = '{0} - {1}'.format(
ip_network(net_range)[0].__str__(),
ip_network(net_range)[-1].__str__()
) if '/' in net_range else net_range
except ValueError: # pragma: no cover
net['range'] = net_range
if match.group(3) and match.group(4):
addrs = []
addrs.extend(summarize_address_range(
ip_address(match.group(3).strip()),
ip_address(match.group(4).strip())))
cidr = ', '.join(
[i.__str__() for i in collapse_addresses(addrs)]
)
else:
cidr = ip_network(net_range).__str__()
net['cidr'] = cidr
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except (ValueError, TypeError):
pass
return nets
def lookup(self, inc_raw=False, retry_count=3, response=None,
get_referral=False, extra_blacklist=None,
ignore_referral_errors=False, asn_data=None,
field_list=None, is_offline=False):
"""
The function for retrieving and parsing whois information for an IP
address via port 43/tcp (WHOIS).
Args:
inc_raw (:obj:`bool`, optional): Whether to include the raw
results in the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
response (:obj:`str`): Optional response object, this bypasses the
NIR lookup. Required when is_offline=True.
get_referral (:obj:`bool`): Whether to retrieve referral whois
information, if available. Defaults to False.
extra_blacklist (:obj:`list`): Blacklisted whois servers in
addition to the global BLACKLIST. Defaults to None.
ignore_referral_errors (:obj:`bool`): Whether to ignore and
continue when an exception is encountered on referral whois
lookups. Defaults to False.
asn_data (:obj:`dict`): Result from
:obj:`ipwhois.asn.IPASN.lookup` (required).
field_list (:obj:`list` of :obj:`str`): If provided, fields to
parse. Defaults to:
::
['name', 'handle', 'description', 'country', 'state',
'city', 'address', 'postal_code', 'emails', 'created',
'updated']
is_offline (:obj:`bool`): Whether to perform lookups offline. If
True, response and asn_data must be provided. Primarily used
for testing. Defaults to False.
Returns:
dict: The IP whois lookup results
::
{
'query' (str) - The IP address
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (str) - The assigned ASN CIDR
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (str) - The ASN description
'nets' (list) - Dictionaries containing network
information which consists of the fields listed in the
ipwhois.whois.RIR_WHOIS dictionary.
'raw' (str) - Raw whois results if the inc_raw parameter
is True.
'referral' (dict) - Referral whois information if
get_referral is True and the server is not blacklisted.
Consists of fields listed in the ipwhois.whois.RWHOIS
dictionary.
'raw_referral' (str) - Raw referral whois results if the
inc_raw parameter is True.
}
"""
# Create the return dictionary.
results = {
'query': self._net.address_str,
'nets': [],
'raw': None,
'referral': None,
'raw_referral': None
}
# The referral server and port. Only used if get_referral is True.
referral_server = None
referral_port = 0
# Only fetch the response if we haven't already.
if response is None or (not is_offline and
asn_data['asn_registry'] != 'arin'):
log.debug('Response not given, perform WHOIS lookup for {0}'
.format(self._net.address_str))
# Retrieve the whois data.
response = self._net.get_whois(
asn_registry=asn_data['asn_registry'], retry_count=retry_count,
extra_blacklist=extra_blacklist
)
if get_referral:
# Search for a referral server.
for match in re.finditer(
r'^ReferralServer:[^\S\n]+(.+:[0-9]+)$',
response,
re.MULTILINE
):
try:
temp = match.group(1)
if 'rwhois://' not in temp: # pragma: no cover
raise ValueError
temp = temp.replace('rwhois://', '').split(':')
if int(temp[1]) > 65535: # pragma: no cover
raise ValueError
referral_server = temp[0]
referral_port = int(temp[1])
except (ValueError, KeyError): # pragma: no cover
continue
break
# Retrieve the referral whois data.
if get_referral and referral_server:
log.debug('Perform referral WHOIS lookup')
response_ref = None
try:
response_ref = self._net.get_whois(
asn_registry='', retry_count=retry_count,
server=referral_server, port=referral_port,
extra_blacklist=extra_blacklist
)
except (BlacklistError, WhoisLookupError):
if ignore_referral_errors:
pass
else:
raise
if response_ref:
log.debug('Parsing referral WHOIS data')
if inc_raw:
results['raw_referral'] = response_ref
temp_rnet = self.parse_fields(
response_ref,
RWHOIS['fields'],
field_list=field_list
)
# Add the networks to the return dictionary.
results['referral'] = temp_rnet
# If inc_raw parameter is True, add the response to return dictionary.
if inc_raw:
results['raw'] = response
nets = []
if asn_data['asn_registry'] == 'arin':
nets_response = self.get_nets_arin(response)
elif asn_data['asn_registry'] == 'lacnic':
nets_response = self.get_nets_lacnic(response)
else:
nets_response = self.get_nets_other(response)
nets.extend(nets_response)
# Iterate through all of the network sections and parse out the
# appropriate fields for each.
log.debug('Parsing WHOIS data')
for index, net in enumerate(nets):
section_end = None
if index + 1 < len(nets):
section_end = nets[index + 1]['start']
try:
dt_format = RIR_WHOIS[results['asn_registry']]['dt_format']
except KeyError:
dt_format = None
temp_net = self.parse_fields(
response,
RIR_WHOIS[asn_data['asn_registry']]['fields'],
section_end,
net['end'],
dt_format,
field_list
)
# Merge the net dictionaries.
net.update(temp_net)
# The start and end values are no longer needed.
del net['start'], net['end']
# Add the networks to the return dictionary.
results['nets'] = nets
return results