plexpy/lib/ipwhois/experimental.py
2021-10-14 23:18:51 -07:00

465 lines
18 KiB
Python

# Copyright (c) 2017-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import socket
import logging
import time
from collections import namedtuple
from .exceptions import (ASNLookupError, HTTPLookupError, HTTPRateLimitError,
ASNRegistryError)
from .asn import IPASN
from .net import (CYMRU_WHOIS, Net)
from .rdap import RDAP
from .utils import unique_everseen
log = logging.getLogger(__name__)
def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120):
"""
The function for retrieving ASN information for multiple IP addresses from
Cymru via port 43/tcp (WHOIS).
Args:
addresses (:obj:`list` of :obj:`str`): IP addresses to lookup.
retry_count (:obj:`int`): The number of times to retry in case socket
errors, timeouts, connection resets, etc. are encountered.
Defaults to 3.
timeout (:obj:`int`): The default timeout for socket connections in
seconds. Defaults to 120.
Returns:
str: The raw ASN bulk data, new line separated.
Raises:
ValueError: addresses argument must be a list of IPv4/v6 address
strings.
ASNLookupError: The ASN bulk lookup failed.
"""
if not isinstance(addresses, list):
raise ValueError('addresses argument must be a list of IPv4/v6 '
'address strings.')
try:
# Create the connection for the Cymru whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(timeout)
log.debug('ASN bulk query initiated.')
conn.connect((CYMRU_WHOIS, 43))
# Query the Cymru whois server, and store the results.
conn.sendall((
' -r -a -c -p -f begin\n{0}\nend'.format(
'\n'.join(addresses))
).encode())
data = ''
while True:
d = conn.recv(4096).decode()
data += d
if not d:
break
conn.close()
return str(data)
except (socket.timeout, socket.error) as e: # pragma: no cover
log.debug('ASN bulk query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('ASN bulk query retrying (count: {0})'.format(
str(retry_count)))
return get_bulk_asn_whois(addresses, retry_count - 1, timeout)
else:
raise ASNLookupError('ASN bulk lookup failed.')
except: # pragma: no cover
raise ASNLookupError('ASN bulk lookup failed.')
def bulk_lookup_rdap(addresses=None, inc_raw=False, retry_count=3, depth=0,
excluded_entities=None, rate_limit_timeout=60,
socket_timeout=10, asn_timeout=240, proxy_openers=None):
"""
The function for bulk retrieving and parsing whois information for a list
of IP addresses via HTTP (RDAP). This bulk lookup method uses bulk
ASN Whois lookups first to retrieve the ASN for each IP. It then optimizes
RDAP queries to achieve the fastest overall time, accounting for
rate-limiting RIRs.
Args:
addresses (:obj:`list` of :obj:`str`): IP addresses to lookup.
inc_raw (:obj:`bool`, optional): Whether to include the raw whois
results in the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case socket
errors, timeouts, connection resets, etc. are encountered.
Defaults to 3.
depth (:obj:`int`): How many levels deep to run queries when additional
referenced objects are found. Defaults to 0.
excluded_entities (:obj:`list` of :obj:`str`): Entity handles to not
perform lookups. Defaults to None.
rate_limit_timeout (:obj:`int`): The number of seconds to wait before
retrying when a rate limit notice is returned via rdap+json.
Defaults to 60.
socket_timeout (:obj:`int`): The default timeout for socket
connections in seconds. Defaults to 10.
asn_timeout (:obj:`int`): The default timeout for bulk ASN lookups in
seconds. Defaults to 240.
proxy_openers (:obj:`list` of :obj:`OpenerDirector`): Proxy openers
for single/rotating proxy support. Defaults to None.
Returns:
namedtuple:
:results (dict): IP address keys with the values as dictionaries
returned by IPWhois.lookup_rdap().
:stats (dict): Stats for the lookups:
::
{
'ip_input_total' (int) - The total number of addresses
originally provided for lookup via the addresses argument.
'ip_unique_total' (int) - The total number of unique addresses
found in the addresses argument.
'ip_lookup_total' (int) - The total number of addresses that
lookups were attempted for, excluding any that failed ASN
registry checks.
'ip_failed_total' (int) - The total number of addresses that
lookups failed for. Excludes any that failed initially, but
succeeded after further retries.
'lacnic' (dict) -
{
'failed' (list) - The addresses that failed to lookup.
Excludes any that failed initially, but succeeded after
further retries.
'rate_limited' (list) - The addresses that encountered
rate-limiting. Unless an address is also in 'failed',
it eventually succeeded.
'total' (int) - The total number of addresses belonging to
this RIR that lookups were attempted for.
}
'ripencc' (dict) - Same as 'lacnic' above.
'apnic' (dict) - Same as 'lacnic' above.
'afrinic' (dict) - Same as 'lacnic' above.
'arin' (dict) - Same as 'lacnic' above.
'unallocated_addresses' (list) - The addresses that are
unallocated/failed ASN lookups. These can be addresses that
are not listed for one of the 5 RIRs (other). No attempt
was made to perform an RDAP lookup for these.
}
Raises:
ASNLookupError: The ASN bulk lookup failed, cannot proceed with bulk
RDAP lookup.
"""
if not isinstance(addresses, list):
raise ValueError('addresses must be a list of IP address strings')
# Initialize the dicts/lists
results = {}
failed_lookups_dict = {}
rated_lookups = []
stats = {
'ip_input_total': len(addresses),
'ip_unique_total': 0,
'ip_lookup_total': 0,
'ip_failed_total': 0,
'lacnic': {'failed': [], 'rate_limited': [], 'total': 0},
'ripencc': {'failed': [], 'rate_limited': [], 'total': 0},
'apnic': {'failed': [], 'rate_limited': [], 'total': 0},
'afrinic': {'failed': [], 'rate_limited': [], 'total': 0},
'arin': {'failed': [], 'rate_limited': [], 'total': 0},
'unallocated_addresses': []
}
asn_parsed_results = {}
if proxy_openers is None:
proxy_openers = [None]
proxy_openers_copy = iter(proxy_openers)
# Make sure addresses is unique
unique_ip_list = list(unique_everseen(addresses))
# Get the unique count to return
stats['ip_unique_total'] = len(unique_ip_list)
# This is needed for iteration order
rir_keys_ordered = ['lacnic', 'ripencc', 'apnic', 'afrinic', 'arin']
# First query the ASN data for all IPs, can raise ASNLookupError, no catch
bulk_asn = get_bulk_asn_whois(unique_ip_list, timeout=asn_timeout)
# ASN results are returned as string, parse lines to list and remove first
asn_result_list = bulk_asn.split('\n')
del asn_result_list[0]
# We need to instantiate IPASN, which currently needs a Net object,
# IP doesn't matter here
net = Net('1.2.3.4')
ipasn = IPASN(net)
# Iterate each IP ASN result, and add valid RIR results to
# asn_parsed_results for RDAP lookups
for asn_result in asn_result_list:
temp = asn_result.split('|')
# Not a valid entry, move on to next
if len(temp) == 1:
continue
ip = temp[1].strip()
# We need this since ASN bulk lookup is returning duplicates
# This is an issue on the Cymru end
if ip in asn_parsed_results.keys(): # pragma: no cover
continue
try:
asn_parsed = ipasn.parse_fields_whois(asn_result)
except ASNRegistryError: # pragma: no cover
continue
# Add valid IP ASN result to asn_parsed_results for RDAP lookup
asn_parsed_results[ip] = asn_parsed
stats[asn_parsed['asn_registry']]['total'] += 1
# Set the list of IPs that are not allocated/failed ASN lookup
stats['unallocated_addresses'] = list(k for k in addresses if k not in
asn_parsed_results)
# Set the total lookup count after unique IP and ASN result filtering
stats['ip_lookup_total'] = len(asn_parsed_results)
# Track the total number of LACNIC queries left. This is tracked in order
# to ensure the 9 priority LACNIC queries/min don't go into infinite loop
lacnic_total_left = stats['lacnic']['total']
# Set the start time, this value is updated when the rate limit is reset
old_time = time.time()
# Rate limit tracking dict for all RIRs
rate_tracker = {
'lacnic': {'time': old_time, 'count': 0},
'ripencc': {'time': old_time, 'count': 0},
'apnic': {'time': old_time, 'count': 0},
'afrinic': {'time': old_time, 'count': 0},
'arin': {'time': old_time, 'count': 0}
}
# Iterate all of the IPs to perform RDAP lookups until none are left
while len(asn_parsed_results) > 0:
# Sequentially run through each RIR to minimize lookups in a row to
# the same RIR.
for rir in rir_keys_ordered:
# If there are still LACNIC IPs left to lookup and the rate limit
# hasn't been reached, skip to find a LACNIC IP to lookup
if (
rir != 'lacnic' and lacnic_total_left > 0 and
(rate_tracker['lacnic']['count'] != 9 or
(time.time() - rate_tracker['lacnic']['time']
) >= rate_limit_timeout
)
): # pragma: no cover
continue
# If the RIR rate limit has been reached and hasn't expired,
# move on to the next RIR
if (
rate_tracker[rir]['count'] == 9 and (
(time.time() - rate_tracker[rir]['time']
) < rate_limit_timeout)
): # pragma: no cover
continue
# If the RIR rate limit has expired, reset the count/timer
# and perform the lookup
elif ((time.time() - rate_tracker[rir]['time']
) >= rate_limit_timeout): # pragma: no cover
rate_tracker[rir]['count'] = 0
rate_tracker[rir]['time'] = time.time()
# Create a copy of the lookup IP dict so we can modify on
# successful/failed queries. Loop each IP until it matches the
# correct RIR in the parent loop, and attempt lookup
tmp_dict = asn_parsed_results.copy()
for ip, asn_data in tmp_dict.items():
# Check to see if IP matches parent loop RIR for lookup
if asn_data['asn_registry'] == rir:
log.debug('Starting lookup for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Add to count for rate-limit tracking only for LACNIC,
# since we have not seen aggressive rate-limiting from the
# other RIRs yet
if rir == 'lacnic':
rate_tracker[rir]['count'] += 1
# Get the next proxy opener to use, or None
try:
opener = next(proxy_openers_copy)
# Start at the beginning if all have been used
except StopIteration:
proxy_openers_copy = iter(proxy_openers)
opener = next(proxy_openers_copy)
# Instantiate the objects needed for the RDAP lookup
net = Net(ip, timeout=socket_timeout, proxy_opener=opener)
rdap = RDAP(net)
try:
# Perform the RDAP lookup. retry_count is set to 0
# here since we handle that in this function
rdap_result = rdap.lookup(
inc_raw=inc_raw, retry_count=0, asn_data=asn_data,
depth=depth, excluded_entities=excluded_entities
)
log.debug('Successful lookup for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Lookup was successful, add to result. Set the nir
# key to None as this is not supported
# (yet - requires more queries)
results[ip] = asn_data
results[ip].update(rdap_result)
results[ip]['nir'] = None
# Remove the IP from the lookup queue
del asn_parsed_results[ip]
# If this was LACNIC IP, reduce the total left count
if rir == 'lacnic':
lacnic_total_left -= 1
log.debug(
'{0} total lookups left, {1} LACNIC lookups left'
''.format(str(len(asn_parsed_results)),
str(lacnic_total_left))
)
# If this IP failed previously, remove it from the
# failed return dict
if (
ip in failed_lookups_dict.keys()
): # pragma: no cover
del failed_lookups_dict[ip]
# Break out of the IP list loop, we need to change to
# the next RIR
break
except HTTPLookupError: # pragma: no cover
log.debug('Failed lookup for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Add the IP to the failed lookups dict if not there
if ip not in failed_lookups_dict.keys():
failed_lookups_dict[ip] = 1
# This IP has already failed at least once, increment
# the failure count until retry_count reached, then
# stop trying
else:
failed_lookups_dict[ip] += 1
if failed_lookups_dict[ip] == retry_count:
del asn_parsed_results[ip]
stats[rir]['failed'].append(ip)
stats['ip_failed_total'] += 1
if rir == 'lacnic':
lacnic_total_left -= 1
# Since this IP failed, we don't break to move to next
# RIR, we check the next IP for this RIR
continue
except HTTPRateLimitError: # pragma: no cover
# Add the IP to the rate-limited lookups dict if not
# there
if ip not in rated_lookups:
rated_lookups.append(ip)
stats[rir]['rate_limited'].append(ip)
log.debug('Rate limiting triggered for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Since rate-limit was reached, reset the timer and
# max out the count
rate_tracker[rir]['time'] = time.time()
rate_tracker[rir]['count'] = 9
# Break out of the IP list loop, we need to change to
# the next RIR
break
return_tuple = namedtuple('return_tuple', ['results', 'stats'])
return return_tuple(results, stats)