From 59a681dcf9fc31b3e8d82602499808d17dfaca13 Mon Sep 17 00:00:00 2001 From: Pete Bryan Date: Sun, 1 May 2022 21:42:20 -0600 Subject: [PATCH] Added a parameter to allow get_bulk_asn_whois to return a dictionary of data. Made some changes to reduce code reuse. Added tests and docs. Updated failing tests. --- EXPERIMENTAL.rst | 5 ++ ipwhois/experimental.py | 70 ++++++++++------------- ipwhois/tests/online/test_experimental.py | 1 + ipwhois/tests/stress/test_experimental.py | 3 +- ipwhois/tests/stress/test_net.py | 2 +- 5 files changed, 38 insertions(+), 43 deletions(-) diff --git a/EXPERIMENTAL.rst b/EXPERIMENTAL.rst index 0c6179fb..c2f22103 100644 --- a/EXPERIMENTAL.rst +++ b/EXPERIMENTAL.rst @@ -39,6 +39,9 @@ Arguments supported: | timeout | int | The default timeout for socket connections in | | | | seconds. Defaults to 120. | +--------------------+--------+-----------------------------------------------+ +| as_dict | bool | Return the results as a dictionary instead of | +| | | a string. Defaults to False. | ++--------------------+--------+-----------------------------------------------+ .. _get_bulk_asn_whois-output: @@ -47,6 +50,8 @@ Output Outputs a string of the raw ASN bulk data, new line separated. The first line is obsolete. +If as_dict set to True returns a dictionary with the IP address as the key and +ASN details in a dictionary as the value. .. _get_bulk_asn_whois-examples: diff --git a/ipwhois/experimental.py b/ipwhois/experimental.py index 8f6ce230..c6d09018 100644 --- a/ipwhois/experimental.py +++ b/ipwhois/experimental.py @@ -36,8 +36,10 @@ log = logging.getLogger(__name__) +_IPASN = IPASN(Net('1.2.3.4')) -def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120): + +def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120, as_dict=False): """ The function for retrieving ASN information for multiple IP addresses from Cymru via port 43/tcp (WHOIS). @@ -49,6 +51,8 @@ def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120): Defaults to 3. timeout (:obj:`int`): The default timeout for socket connections in seconds. Defaults to 120. + as_dict (obj:`bool`): Return the results as a dictionary instead of a + string. Defaults to False. Returns: str: The raw ASN bulk data, new line separated. @@ -90,6 +94,9 @@ def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120): conn.close() + if as_dict: + return _raw_results_to_dict(data) + return str(data) except (socket.timeout, socket.error) as e: # pragma: no cover @@ -225,46 +232,10 @@ def bulk_lookup_rdap(addresses=None, inc_raw=False, retry_count=3, depth=0, rir_keys_ordered = ['lacnic', 'ripencc', 'apnic', 'afrinic', 'arin'] # First query the ASN data for all IPs, can raise ASNLookupError, no catch - bulk_asn = get_bulk_asn_whois(unique_ip_list, timeout=asn_timeout) - - # ASN results are returned as string, parse lines to list and remove first - asn_result_list = bulk_asn.split('\n') - del asn_result_list[0] - - # We need to instantiate IPASN, which currently needs a Net object, - # IP doesn't matter here - net = Net('1.2.3.4') - ipasn = IPASN(net) - - # Iterate each IP ASN result, and add valid RIR results to - # asn_parsed_results for RDAP lookups - for asn_result in asn_result_list: - - temp = asn_result.split('|') - - # Not a valid entry, move on to next - if len(temp) == 1: - - continue - - ip = temp[1].strip() + asn_parsed_results = get_bulk_asn_whois(unique_ip_list, timeout=asn_timeout, as_dict=True) - # We need this since ASN bulk lookup is returning duplicates - # This is an issue on the Cymru end - if ip in asn_parsed_results.keys(): # pragma: no cover - - continue - - try: - - asn_parsed = ipasn.parse_fields_whois(asn_result) - - except ASNRegistryError: # pragma: no cover - - continue - - # Add valid IP ASN result to asn_parsed_results for RDAP lookup - asn_parsed_results[ip] = asn_parsed + # Gather ASN Registry stats from data + for asn_parsed in asn_parsed_results.values(): stats[asn_parsed['asn_registry']]['total'] += 1 # Set the list of IPs that are not allocated/failed ASN lookup @@ -462,3 +433,22 @@ def bulk_lookup_rdap(addresses=None, inc_raw=False, retry_count=3, depth=0, return_tuple = namedtuple('return_tuple', ['results', 'stats']) return return_tuple(results, stats) + + +def _raw_results_to_dict(data: str) -> dict: + """Take a raw results set and return it as a dict.""" + data_list = data.split("\n") + asn_results = {} + # Use created ipasn instance + ipasn = _IPASN + for result in data_list: + # Discard the header line and any blank lines in the results + if not result.startswith("Bulk mode") and len(result) > 0: + ip_addr = result.split("|")[1].strip() + try: + asn_data = ipasn.parse_fields_whois(result) + except ASNRegistryError: # pragma: no cover + continue + if ip_addr not in asn_results: + asn_results[ip_addr] = asn_data + return asn_results diff --git a/ipwhois/tests/online/test_experimental.py b/ipwhois/tests/online/test_experimental.py index 6ea9b473..b9163b7e 100644 --- a/ipwhois/tests/online/test_experimental.py +++ b/ipwhois/tests/online/test_experimental.py @@ -30,6 +30,7 @@ def test_get_bulk_asn_whois(self): try: self.assertIsInstance(get_bulk_asn_whois(addresses=ips), str) + self.assertIsInstance(get_bulk_asn_whois(addresses=ips, as_dict=True), dict) except ASNLookupError: pass except AssertionError as e: diff --git a/ipwhois/tests/stress/test_experimental.py b/ipwhois/tests/stress/test_experimental.py index edd9377e..3d750962 100644 --- a/ipwhois/tests/stress/test_experimental.py +++ b/ipwhois/tests/stress/test_experimental.py @@ -18,8 +18,7 @@ def test_get_bulk_asn_whois(self): list(ipv6_generate_random(500))) try: self.assertIsInstance(get_bulk_asn_whois(addresses=ips), str) - except ASNLookupError: - pass + self.assertIsInstance(get_bulk_asn_whois(addresses=ips, as_dict=True), dict) except AssertionError as e: raise e except Exception as e: diff --git a/ipwhois/tests/stress/test_net.py b/ipwhois/tests/stress/test_net.py index 7e905787..66bda0be 100644 --- a/ipwhois/tests/stress/test_net.py +++ b/ipwhois/tests/stress/test_net.py @@ -17,7 +17,7 @@ def test_get_http_json(self): # Test for HTTPRateLimitError for up to 20 requests. Exits when raised. url = RIR_RDAP['lacnic']['ip_url'].format('200.57.141.161') result = Net('200.57.141.161') - count = 20 + count = 100 http_lookup_errors = 0 while count > 0: log.debug('Attempts left: {0}'.format(str(count)))