Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

319 allow get_bulk_asn_whois to return a dictionary #320

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions EXPERIMENTAL.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ Arguments supported:
| timeout | int | The default timeout for socket connections in |
| | | seconds. Defaults to 120. |
+--------------------+--------+-----------------------------------------------+
| as_dict | bool | Return the results as a dictionary instead of |
| | | a string. Defaults to False. |
+--------------------+--------+-----------------------------------------------+

.. _get_bulk_asn_whois-output:

Expand All @@ -47,6 +50,8 @@ Output

Outputs a string of the raw ASN bulk data, new line separated. The first line
is obsolete.
If as_dict set to True returns a dictionary with the IP address as the key and
ASN details in a dictionary as the value.

.. _get_bulk_asn_whois-examples:

Expand Down
70 changes: 30 additions & 40 deletions ipwhois/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@

log = logging.getLogger(__name__)

_IPASN = IPASN(Net('1.2.3.4'))

def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120):

def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120, as_dict=False):
"""
The function for retrieving ASN information for multiple IP addresses from
Cymru via port 43/tcp (WHOIS).
Expand All @@ -49,6 +51,8 @@ def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120):
Defaults to 3.
timeout (:obj:`int`): The default timeout for socket connections in
seconds. Defaults to 120.
as_dict (obj:`bool`): Return the results as a dictionary instead of a
string. Defaults to False.

Returns:
str: The raw ASN bulk data, new line separated.
Expand Down Expand Up @@ -90,6 +94,9 @@ def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120):

conn.close()

if as_dict:
return _raw_results_to_dict(data)

return str(data)

except (socket.timeout, socket.error) as e: # pragma: no cover
Expand Down Expand Up @@ -225,46 +232,10 @@ def bulk_lookup_rdap(addresses=None, inc_raw=False, retry_count=3, depth=0,
rir_keys_ordered = ['lacnic', 'ripencc', 'apnic', 'afrinic', 'arin']

# First query the ASN data for all IPs, can raise ASNLookupError, no catch
bulk_asn = get_bulk_asn_whois(unique_ip_list, timeout=asn_timeout)

# ASN results are returned as string, parse lines to list and remove first
asn_result_list = bulk_asn.split('\n')
del asn_result_list[0]

# We need to instantiate IPASN, which currently needs a Net object,
# IP doesn't matter here
net = Net('1.2.3.4')
ipasn = IPASN(net)

# Iterate each IP ASN result, and add valid RIR results to
# asn_parsed_results for RDAP lookups
for asn_result in asn_result_list:

temp = asn_result.split('|')

# Not a valid entry, move on to next
if len(temp) == 1:

continue

ip = temp[1].strip()
asn_parsed_results = get_bulk_asn_whois(unique_ip_list, timeout=asn_timeout, as_dict=True)

# We need this since ASN bulk lookup is returning duplicates
# This is an issue on the Cymru end
if ip in asn_parsed_results.keys(): # pragma: no cover

continue

try:

asn_parsed = ipasn.parse_fields_whois(asn_result)

except ASNRegistryError: # pragma: no cover

continue

# Add valid IP ASN result to asn_parsed_results for RDAP lookup
asn_parsed_results[ip] = asn_parsed
# Gather ASN Registry stats from data
for asn_parsed in asn_parsed_results.values():
stats[asn_parsed['asn_registry']]['total'] += 1

# Set the list of IPs that are not allocated/failed ASN lookup
Expand Down Expand Up @@ -462,3 +433,22 @@ def bulk_lookup_rdap(addresses=None, inc_raw=False, retry_count=3, depth=0,

return_tuple = namedtuple('return_tuple', ['results', 'stats'])
return return_tuple(results, stats)


def _raw_results_to_dict(data: str) -> dict:
"""Take a raw results set and return it as a dict."""
data_list = data.split("\n")
asn_results = {}
# Use created ipasn instance
ipasn = _IPASN
for result in data_list:
# Discard the header line and any blank lines in the results
if not result.startswith("Bulk mode") and len(result) > 0:
ip_addr = result.split("|")[1].strip()
try:
asn_data = ipasn.parse_fields_whois(result)
except ASNRegistryError: # pragma: no cover
continue
if ip_addr not in asn_results:
asn_results[ip_addr] = asn_data
return asn_results
1 change: 1 addition & 0 deletions ipwhois/tests/online/test_experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def test_get_bulk_asn_whois(self):

try:
self.assertIsInstance(get_bulk_asn_whois(addresses=ips), str)
self.assertIsInstance(get_bulk_asn_whois(addresses=ips, as_dict=True), dict)
except ASNLookupError:
pass
except AssertionError as e:
Expand Down
3 changes: 1 addition & 2 deletions ipwhois/tests/stress/test_experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def test_get_bulk_asn_whois(self):
list(ipv6_generate_random(500)))
try:
self.assertIsInstance(get_bulk_asn_whois(addresses=ips), str)
except ASNLookupError:
pass
self.assertIsInstance(get_bulk_asn_whois(addresses=ips, as_dict=True), dict)
except AssertionError as e:
raise e
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion ipwhois/tests/stress/test_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_get_http_json(self):
# Test for HTTPRateLimitError for up to 20 requests. Exits when raised.
url = RIR_RDAP['lacnic']['ip_url'].format('200.57.141.161')
result = Net('200.57.141.161')
count = 20
count = 100
http_lookup_errors = 0
while count > 0:
log.debug('Attempts left: {0}'.format(str(count)))
Expand Down