diff --git a/error-10-ogc-service-url b/error-10-ogc-service-url new file mode 100644 index 0000000..7286cce --- /dev/null +++ b/error-10-ogc-service-url @@ -0,0 +1,377 @@ +from bs4 import BeautifulSoup +from dotenv import load_dotenv +from urllib.parse import urlparse, parse_qs, urlencode +import subprocess +import psycopg2 +import psycopg2.extras +import requests +import math +import time +import re +import os + +# When a URL reaches MAX_FAILURES consecutive failures it's marked +# as deprecated and excluded from future insertions in database +MAX_FAILURES = 10 + +# Load environment variables from .env file +load_dotenv() + +specific_urs = [ + "https://demo.pycsw.org/gisdata/collections/metadata:main/items?offset=100&f=json", + "https://www.dov.vlaanderen.be/geoserver/erosie/wfs", + "https://demo.pycsw.org/gisdata/collections/metadata:main/items/http://capita.wustl.edu/DataspaceMetadata_ISO/FZ Juelich.MACC.vmr_n2o.xml", + "https://demo.pycsw.org/gisdata/collections/metadata:main/items/urn:uuid:dcc3a538-932a-11ea-ad6f-823cf448c401" +] + +# base catalog +base = os.environ.get("OGCAPI_URL") or "https://demo.pycsw.org/gisdata" +collection = os.environ.get("OGCAPI_COLLECTION") or "metadata:main" + +# format catalogue path with f-string +catalogue_json_url= f"{base}/collections/{collection}/items?f=json" + +def setup_database(): + conn = psycopg2.connect( + host=os.environ.get("POSTGRES_HOST"), + port=os.environ.get("POSTGRES_PORT"), + dbname=os.environ.get("POSTGRES_DB"), + user=os.environ.get("POSTGRES_USER"), + password=os.environ.get("POSTGRES_PASSWORD") + ) + cur = conn.cursor() + + # Drop tables (only for development purposes) + cur.execute("DROP TABLE IF EXISTS validation_history CASCADE") + cur.execute("DROP TABLE IF EXISTS parent CASCADE") + cur.execute("DROP TABLE IF EXISTS links CASCADE") + + # Create or truncate linkchecker_output table + create_table_query = """ + CREATE TABLE IF NOT EXISTS links ( + id_link SERIAL PRIMARY KEY, + urlname TEXT UNIQUE, + status TEXT, + result TEXT, + info TEXT, + warning TEXT, + deprecated BOOLEAN DEFAULT FALSE, + consecutive_failures INTEGER DEFAULT 0 + ) + """ + cur.execute(create_table_query) + + # Create validation_history table if it doesn't exist + cur.execute(""" + CREATE TABLE IF NOT EXISTS parent ( + id SERIAL PRIMARY KEY, + parentname TEXT NULL, + baseref TEXT NULL, + fk_link INTEGER REFERENCES links(id_link), + UNIQUE (parentname, baseref, fk_link) + ) + """) + + # Create url_status table if it doesn't exist + cur.execute(""" + CREATE TABLE IF NOT EXISTS validation_history ( + id SERIAL PRIMARY KEY, + fk_link INTEGER REFERENCES links(id_link), + validation_result TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL + ) + """) + + conn.commit() + return conn, cur + +def get_pagination_info(url): + try: + # Fetch catalogue JSON + response = requests.get(url) + response.raise_for_status() # Raise exception for HHTP errors + data = response.json() + + # Extract relevant fields + number_matched = data.get('numberMatched', 0) + number_returned = data.get('numberReturned', 0) + + # Calculate total pages + total_pages = math.ceil(number_matched / number_returned) + return total_pages, number_returned + except requests.exceptions.RequestException as e: + print(f"Error fetching or parsing JSON data from {url}: {e}") + return None + except Exception as e: + print(f"Error calculating total pages from JSON data: {e}") + return None + +def extract_links(url): + try: + # Skip if URL is an email address + if url.startswith("mailto:"): + return [] + # Fetch the HTML content of the webpage + response = requests.get(url) + response.raise_for_status() # Raise an exception for HTTP errors + html_content = response.text + + # Parse the HTML content using BeautifulSoup + soup = BeautifulSoup(html_content, 'html.parser') + + # Find all anchor tags and extract their href attributes + links = [link.get('href') for link in soup.find_all('a')] + + return links + except Exception as e: + print(f"Error extracting links from {url}: {e}") + return [] + +# def check_single_url(url): +# process = subprocess.Popen([ +# "docker", "run", "--rm", "-i", "-u", "1000:1000", "ghcr.io/linkchecker/linkchecker:latest", +# "--verbose", "--check-extern", "--recursion-level=0", "--output=csv", +# url +# ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + +# stdout, _ = process.communicate() + +# return stdout.decode('utf-8').strip().split('\n') + +# def run_linkchecker(url): +# process = subprocess.Popen([ +# "docker", "run", "--rm", "-i", "-u", "1000:1000", "ghcr.io/linkchecker/linkchecker:latest", +# "--verbose", "--check-extern", "--recursion-level=1", "--output=csv", +# url +# ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + +# for line in process.stdout: +# yield line.decode('utf-8').strip() + +# process.wait() + +# def check_single_url(url): +# process = subprocess.Popen([ +# "linkchecker", +# "--verbose", +# "--check-extern", +# "--recursion-level=0", +# "--timeout=5", +# "--output=csv", +# url + "?f=html" +# ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + +# stdout, _ = process.communicate() + +# return stdout.decode('utf-8').strip().split('\n') + +# def run_linkchecker(url): +# # Run LinkChecker Docker command with specified user and group IDs for each URL +# process = subprocess.Popen([ +# "linkchecker", +# "--verbose", +# "--check-extern", +# "--recursion-level=1", +# "--timeout=5", +# "--output=csv", +# url + "?f=html" +# ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # Process the output line by line and yield each line + for line in process.stdout: + yield line.decode('utf-8').strip() # Decode bytes to string and strip newline characters + # Wait for the process to finish + process.wait() + +def insert_or_update_link(conn, urlname, status, result, info, warning, is_valid): + + with conn.cursor() as cur: + # Get current status + cur.execute("SELECT id_link, consecutive_failures, deprecated FROM links WHERE urlname = %s", (urlname,)) + existing_link = cur.fetchone() + + if existing_link: + link_id, consecutive_failures, deprecated = existing_link + + if existing_link[2]: + # Ignore deprecated URL's + # Deprecated URL's are these urls that consecutive have failed for MAX_FAILURES times + return None + + if not is_valid: + consecutive_failures += 1 + else: + consecutive_failures = 0 + + deprecated = deprecated or (consecutive_failures >= MAX_FAILURES) + + # Updade existing link + cur.execute(""" + UPDATE links SET + status = %s, + result = %s, + info = %s, + warning = %s, + deprecated = %s, + consecutive_failures = %s + WHERE id_link = %s + """,(status, result, info, warning, deprecated, consecutive_failures, link_id)) + else: + # Insert new link (not deprecated on the first insertion) + cur.execute(""" + INSERT INTO links (urlname, status, result, info, warning, deprecated, consecutive_failures) + VALUES (%s, %s, %s, %s, %s, %s, %s) + RETURNING id_link + """, (urlname, status, result, info, warning, False, 0 if is_valid else 1)) + + link_id = cur.fetchone()[0] + + # Insert new record in validation history + cur.execute(""" + INSERT INTO validation_history(fk_link, validation_result, timestamp) + VALUES(%s, %s, CURRENT_TIMESTAMP) + """,(link_id, status)) + conn.commit() + + return link_id + +def insert_parent(conn, parentname, baseref, link_id): + with conn.cursor() as cur: + # Convert empty strings to None + parentname = parentname if parentname else None + baseref = baseref if baseref else None + + cur.execute(""" + INSERT INTO parent (parentname, baseref, fk_link) + VALUES (%s, %s, %s) + ON CONFLICT (parentname, baseref, fk_link) DO NOTHING + """, (parentname, baseref, link_id)) + + # Commit the transaction + conn.commit() + +def is_valid_status(valid_string): + # Return if status is valid or not + parts = valid_string.split() + if parts[0].isdigit(): + if 200 <= int(parts[0]) < 400: # Valid HTTP status codes range + return True + return False + +def get_active_urls(conn): + with conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM validation_history") + count = cur.fetchone()[0] + + if count == 0: + return None # The table is empty + else: + cur.execute("SELECT url FROM validation_history WHERE NOT deprecated") + return [row[0] for row in cur.fetchall()] + +def determine_service_type(url): + ogc_patterns = ['/wms', '/wfs', '/csw', '/wcs', 'service='] + + if any(pattern in url.lower() for pattern in ogc_patterns): + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + + query_params.pop('service', None) + query_params.pop('request', None) + + query_params['request'] = ['GetCapabilities'] + + if 'service' not in query_params: + if '/wms' in parsed_url.path.lower(): + query_params['service'] = ['WMS'] + elif '/wfs' in parsed_url.path.lower(): + query_params['service'] = ['WFS'] + elif '/csw' in parsed_url.path.lower(): + query_params['service'] = ['CSW'] + elif '/wcs' in parsed_url.path.lower(): + query_params['service'] = ['WCS'] + + new_query = urlencode(query_params, doseq=True) + new_url = parsed_url._replace(query=new_query).geturl() + + return new_url + + return url + +def main(): + start_time = time.time() # Start timing + # Set up the database and create the table + print("Setting PostgreSQL db") + conn, cur = setup_database() + + print('Time started processing links.') + print(f'Loading {catalogue_json_url} links...') + total_pages, numbers_returned = get_pagination_info(catalogue_json_url) + + # Base URL + base_url = base + 'collections/' + collection + '/items?offset=' + + # Generate URLs for each page + urls = [base_url + str(i * numbers_returned) + "&f=html" for i in range(total_pages)] + + # Initialize an empty set to store all unique links + all_links = set() + # Iterate through the list of URLs and extract links from each one + for url in urls: + extracted_links = extract_links(url) + all_links.update(extracted_links) # Add new links to the set of all links + + # Specify the fields to include in the CSV file + fields_to_include = ['urlname', 'parentname', 'baseref', 'valid', 'result', 'warning', 'info'] + + print("Checking Links...") + + # Run LinkChecker and process the output + urls_to_recheck = set() + all_links = specific_urs + print("Initial Link Checking...") + for url in all_links: + for line in run_linkchecker(url): + if re.match(r'^http', line): + values = line.rstrip(';').split(';') + urlname = values[0] + + # Parse initial check results + filtered_values = [str(values[i]) if i < len(values) else "" for i in range(len(fields_to_include))] + urlname, parentname, baseref, valid, result, warning, info = filtered_values + + # Determine if URL needs to be rechecked + processed_url = determine_service_type(urlname) + if processed_url != urlname: + urls_to_recheck.add(processed_url) + else: + # If URL doesn't need reprocessing, insert results directly + is_valid = is_valid_status(valid) + link_id = insert_or_update_link(conn, urlname, valid, result, info, warning, is_valid) + insert_parent(conn, parentname, baseref, link_id) + + print("Rechecking OGC processed URLs...") + for url in urls_to_recheck: + results = check_single_url(url) + for line in results: + if re.match(r'^http', line): + values = line.rstrip(';').split(';') + filtered_values = [str(values[i]) if i < len(values) else "" for i in range(len(fields_to_include))] + urlname, parentname, baseref, valid, result, warning, info = filtered_values + is_valid = is_valid_status(valid) + link_id = insert_or_update_link(conn, urlname, valid, result, info, warning, is_valid) + insert_parent(conn, parentname, baseref, link_id) + + # conn.commit() + print("LinkChecker output written to PostgreSQL database") + + # Close the connection and cursor + cur.close() + conn.close() + + end_time = time.time() + elapsed_time = end_time - start_time + print(f"Time elapsed: {elapsed_time:.2f} seconds") + +if __name__ == "__main__": + main() diff --git a/linkcheck/linkchecker.py b/linkcheck/linkchecker.py index bad82dd..a055d37 100644 --- a/linkcheck/linkchecker.py +++ b/linkcheck/linkchecker.py @@ -1,7 +1,8 @@ from bs4 import BeautifulSoup from dotenv import load_dotenv +from lxml import html from urllib.parse import urlparse, parse_qs, urlencode -import subprocess +from concurrent.futures import ThreadPoolExecutor import psycopg2 import psycopg2.extras import requests @@ -10,9 +11,11 @@ import re import os -# When a URL reaches MAX_FAILURES consecutive failures it's marked -# as deprecated and excluded from future insertions in database -MAX_FAILURES = 10 +# Configuration constants + +MAX_FAILURES = 10 # Used to mark deprecated url's +TIMEOUT = 5 # Url timeout +MAX_WORKERS = 5 # Threads used for url checking # Load environment variables from .env file load_dotenv() @@ -24,6 +27,34 @@ # format catalogue path with f-string catalogue_json_url= f"{base}/collections/{collection}/items?f=json" +class URLChecker: + def __init__(self, timeout=TIMEOUT): + self.timeout = timeout + + def check_url(self, url): + try: + response = requests.head(url, timeout=self.timeout, allow_redirects=True) + + return { + 'url': url, + 'status_code': response.status_code, + 'is_redirect': response.url != url, + 'valid': 200 <= response.status_code < 400 + } + except requests.RequestException as e: + return { + 'url': url, + 'error': str(e), + 'status_code': None, + 'is_redirect': None, + 'valid': False + } + + def check_urls(self, urls): + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + results = list(executor.map(self.check_url, urls)) + return results + def setup_database(): conn = psycopg2.connect( host=os.environ.get("POSTGRES_HOST"), @@ -34,46 +65,34 @@ def setup_database(): ) cur = conn.cursor() - # Drop tables (only for development purposes) - # cur.execute("DROP TABLE IF EXISTS validation_history CASCADE") - # cur.execute("DROP TABLE IF EXISTS parent CASCADE") - # cur.execute("DROP TABLE IF EXISTS links CASCADE") - - # Create or truncate linkchecker_output table - create_table_query = """ - CREATE TABLE IF NOT EXISTS links ( - id_link SERIAL PRIMARY KEY, - urlname TEXT UNIQUE, - status TEXT, - result TEXT, - info TEXT, - warning TEXT, - deprecated BOOLEAN DEFAULT FALSE, - consecutive_failures INTEGER DEFAULT 0 - ) - """ - cur.execute(create_table_query) - - # Create validation_history table if it doesn't exist - cur.execute(""" - CREATE TABLE IF NOT EXISTS parent ( - id SERIAL PRIMARY KEY, - parentname TEXT NULL, - baseref TEXT NULL, - fk_link INTEGER REFERENCES links(id_link), - UNIQUE (parentname, baseref, fk_link) - ) - """) + # Drop existing tables + cur.execute("DROP TABLE IF EXISTS links CASCADE") + cur.execute("DROP TABLE IF EXISTS validation_history CASCADE") + + # Create tables + tables = [ + """ + CREATE TABLE IF NOT EXISTS links ( + id_link SERIAL PRIMARY KEY, + urlname TEXT UNIQUE, + deprecated BOOLEAN DEFAULT FALSE, + consecutive_failures INTEGER DEFAULT 0 + ) + """, + """ + CREATE TABLE IF NOT EXISTS validation_history ( + id SERIAL PRIMARY KEY, + fk_link INTEGER REFERENCES links(id_link), + status_code INTEGER, + is_redirect BOOLEAN, + error_message TEXT, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ + ] - # Create url_status table if it doesn't exist - cur.execute(""" - CREATE TABLE IF NOT EXISTS validation_history ( - id SERIAL PRIMARY KEY, - fk_link INTEGER REFERENCES links(id_link), - validation_result TEXT NOT NULL, - timestamp TIMESTAMP NOT NULL - ) - """) + for table in tables: + cur.execute(table) conn.commit() return conn, cur @@ -98,260 +117,138 @@ def get_pagination_info(url): except Exception as e: print(f"Error calculating total pages from JSON data: {e}") return None - -def extract_links(url): - try: - # Skip if URL is an email address - if url.startswith("mailto:"): - return [] - # Fetch the HTML content of the webpage - response = requests.get(url) - response.raise_for_status() # Raise an exception for HTTP errors - html_content = response.text - - # Parse the HTML content using BeautifulSoup - soup = BeautifulSoup(html_content, 'html.parser') - - # Find all anchor tags and extract their href attributes - links = [link.get('href') for link in soup.find_all('a')] - - return links - except Exception as e: - print(f"Error extracting links from {url}: {e}") - return [] - -def check_single_url(url): - process = subprocess.Popen([ - "linkchecker", - "--verbose", - "--check-extern", - "--recursion-level=0", - "--timeout=5", - "--output=csv", - url + "?f=html" - ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - # Process.communicate is good for shorter-running processes - stdout, _ = process.communicate() - - return stdout.decode('utf-8').strip().split('\n') - -def run_linkchecker(url): - print(f'check {url}') - # Run LinkChecker Docker command with specified user and group IDs for each URL - process = subprocess.Popen([ - "linkchecker", - "--verbose", - "--check-extern", - "--recursion-level=1", - "--timeout=5", - "--output=csv", - url + "?f=html" - ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - # Process the output line by line and yield each line - # Memory efficient for large outputs - for line in process.stdout: - yield line.decode('utf-8').strip() # Decode bytes to string and strip newline characters - # Wait for the process to finish - process.wait() - -def insert_or_update_link(conn, urlname, status, result, info, warning, is_valid): - + +def insert_or_update_link(conn, url_result): with conn.cursor() as cur: - # Get current status - cur.execute("SELECT id_link, consecutive_failures, deprecated FROM links WHERE urlname = %s", (urlname,)) - existing_link = cur.fetchone() + urlname = url_result['url'] - if existing_link: - link_id, consecutive_failures, deprecated = existing_link - - if existing_link[2]: - # Ignore deprecated URL's - # Deprecated URL's are these urls that consecutive have failed for MAX_FAILURES times - return None - - if not is_valid: - consecutive_failures += 1 - else: - consecutive_failures = 0 - - deprecated = deprecated or (consecutive_failures >= MAX_FAILURES) - - # Updade existing link - cur.execute(""" - UPDATE links SET - status = %s, - result = %s, - info = %s, - warning = %s, - deprecated = %s, - consecutive_failures = %s - WHERE id_link = %s - """,(status, result, info, warning, deprecated, consecutive_failures, link_id)) - else: - # Insert new link (not deprecated on the first insertion) - cur.execute(""" - INSERT INTO links (urlname, status, result, info, warning, deprecated, consecutive_failures) - VALUES (%s, %s, %s, %s, %s, %s, %s) - RETURNING id_link - """, (urlname, status, result, info, warning, False, 0 if is_valid else 1)) - - link_id = cur.fetchone()[0] - - # Insert new record in validation history + # Get or create link cur.execute(""" - INSERT INTO validation_history(fk_link, validation_result, timestamp) - VALUES(%s, %s, CURRENT_TIMESTAMP) - """,(link_id, status)) - conn.commit() + INSERT INTO links (urlname, consecutive_failures) + VALUES (%s, %s) + ON CONFLICT (urlname) DO UPDATE + SET consecutive_failures = + CASE + WHEN %s THEN 0 + ELSE links.consecutive_failures + 1 + END, + deprecated = + CASE + WHEN %s THEN false + WHEN links.consecutive_failures + 1 >= %s THEN true + ELSE links.deprecated + END + RETURNING id_link, deprecated + """, (urlname, 0 if url_result['valid'] else 1, url_result['valid'], url_result['valid'], MAX_FAILURES)) - return link_id - -def insert_parent(conn, parentname, baseref, link_id): - with conn.cursor() as cur: - # Convert empty strings to None - parentname = parentname if parentname else None - baseref = baseref if baseref else None - - cur.execute(""" - INSERT INTO parent (parentname, baseref, fk_link) - VALUES (%s, %s, %s) - ON CONFLICT (parentname, baseref, fk_link) DO NOTHING - """, (parentname, baseref, link_id)) + link_id, deprecated = cur.fetchone() - # Commit the transaction - conn.commit() - -def insert_validation_history(conn, url, validation_result): - with conn.cursor() as cur: - cur.execute( - "INSERT INTO validation_history (url, validation_result) VALUES (%s, %s)", - (url, validation_result) - ) - conn.commit() - -def is_valid_status(valid_string): - # Return if status is valid or not - parts = valid_string.split() - if parts[0].isdigit(): - if 200 <= int(parts[0]) < 400: # Valid HTTP status codes range - return True - return False - -def get_active_urls(conn): - with conn.cursor() as cur: - cur.execute("SELECT COUNT(*) FROM validation_history") - count = cur.fetchone()[0] + if not deprecated: + # Insert validation history + cur.execute(""" + INSERT INTO validation_history( + fk_link, status_code, + is_redirect, error_message + ) + VALUES(%s, %s, %s, %s) + """, ( + link_id, + url_result['status_code'], + url_result['is_redirect'], + url_result.get('error') + )) - if count == 0: - return None # The table is empty - else: - cur.execute("SELECT url FROM validation_history WHERE NOT deprecated") - return [row[0] for row in cur.fetchall()] - -def determine_service_type(url): - ogc_patterns = ['/wms', '/wfs', '/csw', '/wcs', 'service='] - - if any(pattern in url.lower() for pattern in ogc_patterns): - parsed_url = urlparse(url) - query_params = parse_qs(parsed_url.query) - - query_params.pop('service', None) - query_params.pop('request', None) - - query_params['request'] = ['GetCapabilities'] - - if 'service' not in query_params: - if '/wms' in parsed_url.path.lower(): - query_params['service'] = ['WMS'] - elif '/wfs' in parsed_url.path.lower(): - query_params['service'] = ['WFS'] - elif '/csw' in parsed_url.path.lower(): - query_params['service'] = ['CSW'] - elif '/wcs' in parsed_url.path.lower(): - query_params['service'] = ['WCS'] + conn.commit() + return link_id if not deprecated else None + +def process_item(item, relevant_links): + if isinstance(item, dict): + if 'href' in item and 'rel' not in item: + relevant_links.add(item['href']) + elif 'href' in item and item.get('rel') not in ['self', 'collection']: + relevant_links.add(item['href']) - new_query = urlencode(query_params, doseq=True) - new_url = parsed_url._replace(query=new_query).geturl() + for value in item.values(): + process_item(value, relevant_links) + elif isinstance(item, list): + for element in item: + process_item(element, relevant_links) + +def extract_relevant_links_from_json(json_url): + try: + response = requests.get(json_url) + response.raise_for_status() + data = response.json() + relevant_links = set() - return new_url - - return url + process_item(data, relevant_links) + return relevant_links + except Exception as e: + # print(f"Error extracting links from JSON at {json_url}: {e}") + return set() + +def extract_links(url): + try: + response = requests.get(url) + response.raise_for_status() + tree = html.fromstring(response.content) + return tree.xpath('//a/@href') + except Exception as e: + print(f"Error extracting links from {url}: {e}") + return [] def main(): - start_time = time.time() # Start timing - # Set up the database and create the table - print("Setting PostgreSQL db") + start_time = time.time() conn, cur = setup_database() - - print('Time started processing links.') - print(f'Loading {catalogue_json_url} links...') - total_pages, numbers_returned = get_pagination_info(catalogue_json_url) + url_checker = URLChecker() - # Base URL base_url = base + 'collections/' + collection + '/items?offset=' + total_pages, items_per_page = get_pagination_info(catalogue_json_url) # Generate URLs for each page - urls = [base_url + str(i * numbers_returned) + "&f=html" for i in range(total_pages)] - - # Initialize an empty set to store all unique links - all_links = set() - # Iterate through the list of URLs and extract links from each one - for url in urls: - extracted_links = extract_links(url) - all_links.update(extracted_links) # Add new links to the set of all links + print('Extracting links from catalogue...') + all_relevant_links = set() - # Specify the fields to include in the CSV file - fields_to_include = ['urlname', 'parentname', 'baseref', 'valid', 'result', 'warning', 'info'] + # Process catalogue page + for page in range(total_pages): + print(f"Processing page {page + 1} of {total_pages}") + + extracted_links = extract_links(f"{base_url}{page * items_per_page}&f=html") + + for link in extracted_links: + json_url = f"{link}?f=json" if "?f=json" not in link else link + relevant_links = extract_relevant_links_from_json(json_url) + all_relevant_links.update(relevant_links) - print("Checking Links...") - - # Run LinkChecker and process the output - urls_to_recheck = set() - print("Initial Link Checking...") - for url in all_links: - for line in run_linkchecker(url): - if re.match(r'^http', line): - values = line.rstrip(';').split(';') - urlname = values[0] - - # Parse initial check results - filtered_values = [str(values[i]) if i < len(values) else "" for i in range(len(fields_to_include))] - urlname, parentname, baseref, valid, result, warning, info = filtered_values - - # Determine if URL needs to be rechecked - processed_url = determine_service_type(urlname) - if processed_url != urlname: - urls_to_recheck.add(processed_url) - else: - # If URL doesn't need reprocessing, insert results directly - is_valid = is_valid_status(valid) - link_id = insert_or_update_link(conn, urlname, valid, result, info, warning, is_valid) - insert_parent(conn, parentname, baseref, link_id) - - print("Rechecking OGC processed URLs...") - for url in urls_to_recheck: - results = check_single_url(url) - for line in results: - if re.match(r'^http', line): - values = line.rstrip(';').split(';') - filtered_values = [str(values[i]) if i < len(values) else "" for i in range(len(fields_to_include))] - urlname, parentname, baseref, valid, result, warning, info = filtered_values - is_valid = is_valid_status(valid) - link_id = insert_or_update_link(conn, urlname, valid, result, info, warning, is_valid) - insert_parent(conn, parentname, baseref, link_id) + print(f"Found {len(all_relevant_links)} unique links to check") + + # Check all URLs concurrently + # print("Checking URLs...") + results = url_checker.check_urls(all_relevant_links) + + # Process results + print(f"Update database...") + processed_links = 0 + for result in results: + if insert_or_update_link(conn, result) is not None: + processed_links += 1 + + cur.execute(""" + SELECT + COUNT(*) as total_checks, + SUM(CASE WHEN status_code BETWEEN 200 AND 399 THEN 1 ELSE 0 END) as successful_checks + FROM validation_history + """) + total_checks, successful_checks = cur.fetchone() - # conn.commit() - print("LinkChecker output written to PostgreSQL database") + end_time = time.time() + print("\nSummary:") + print(f"Time elapsed: {end_time - start_time:.2f} seconds") + print(f"Total checks performed: {total_checks}") + print(f"Successful checks: {successful_checks}") - # Close the connection and cursor + # Close the database connection cur.close() conn.close() - - end_time = time.time() - elapsed_time = end_time - start_time - print(f"Time elapsed: {elapsed_time:.2f} seconds") - if __name__ == "__main__": main() diff --git a/linkcheck/requirements.txt b/linkcheck/requirements.txt index 2770f08..a5fe8aa 100644 --- a/linkcheck/requirements.txt +++ b/linkcheck/requirements.txt @@ -4,4 +4,5 @@ psycopg2-binary asyncpg linkchecker databases -python-dotenv \ No newline at end of file +python-dotenv +lxml \ No newline at end of file diff --git a/src/api.py b/src/api.py index 35c32ea..00737b2 100644 --- a/src/api.py +++ b/src/api.py @@ -3,7 +3,6 @@ from databases import Database from typing import List, Optional from pydantic import BaseModel -from urllib.parse import unquote from datetime import datetime import asyncpg import logging @@ -13,7 +12,6 @@ load_dotenv() # Database connection setup -# Load environment variables securely (replace with your actual variable names) DATABASE_URL = "postgresql://" + os.environ.get("POSTGRES_USER") + ":" +\ os.environ.get("POSTGRES_PASSWORD") + "@" + os.environ.get("POSTGRES_HOST") + ":" +\ os.environ.get("POSTGRES_PORT") + "/" + os.environ.get("POSTGRES_DB") @@ -29,52 +27,30 @@ ) logger = logging.getLogger(__name__) - -# Define response model -class StatusResponse(BaseModel): - id: int - urlname: Optional[str] - parent_urls: Optional[List[str]] - status: Optional[str] - result: Optional[str] - info: Optional[str] - warning: Optional[str] +# Define response models +class LinkResponse(BaseModel): + id_link: int + urlname: str deprecated: Optional[bool] = None - -# Model to get the availability history of a specific url -class URLAvailabilityResponse(BaseModel): - urlname: Optional[str] = None - status: Optional[str] = None - result: Optional[str] = None - info: Optional[str] = None - warning: Optional[str] = None - validation_valid: Optional[str] = None - last_checked: datetime - + consecutive_failures: Optional[int] = None + +class StatusResponse(LinkResponse): + status_code: Optional[int] = None + is_redirect: Optional[bool] = None + error_message: Optional[str] = None + timestamp: datetime + +class TimeoutResponse(LinkResponse): + status_code: Optional[int] = None # Make status_code optional for timeout cases + final_url: Optional[str] = None + is_redirect: Optional[bool] = None + error_message: Optional[str] = None + timestamp: datetime + # Define status lists -REDIRECTION_STATUSES = [ - "301 Moved Permanently", - "302 Found (Moved Temporarily)", - "304 Not Modified", - "307 Temporary Redirect", - "308 Permanent Redirect" -] - -CLIENT_ERROR_STATUSES = [ - "400 Bad Request", - "401 Unauthorized", - "403 Forbidden", - "404 Not Found", - "405 Method Not Allowed", - "409 Conflict" -] - -SERVER_ERROR_STATUSES = [ - "500 Internal Server Error", - "501 Not Implemented", - "503 Service Unavailable", - "504 Gateway Timeout" -] +REDIRECTION_STATUSES = [301, 302, 304, 307, 308] +CLIENT_ERROR_STATUSES = [400, 401, 403, 404, 405, 409] +SERVER_ERROR_STATUSES = [500, 501, 503, 504] # Helper function to execute SQL query and fetch results async def fetch_data(query: str, values: dict = {}): @@ -86,27 +62,21 @@ async def fetch_data(query: str, values: dict = {}): except Exception as e: logging.error(f"Database query failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Database query failed") - + # Endpoint to retrieve data with redirection statuses @app.get('/Redirection_URLs/3xx', response_model=List[StatusResponse]) async def get_redirection_statuses(): query = """ - SELECT - l.id_link AS id, - l.urlname, - l.status, - l.warning, - l.result, - l.info, - array_remove(array_agg(DISTINCT p.parentname), NULL) AS parent_urls - FROM - links l - LEFT JOIN - parent p ON l.id_link = p.fk_link - WHERE - l.status ILIKE ANY (:statuses) - GROUP BY - l.id_link, l.urlname, l.status, l.warning, result, info + SELECT l.id_link, l.urlname, l.deprecated, l.consecutive_failures, + vh.status_code, vh.is_redirect, vh.error_message, vh.timestamp + FROM links l + JOIN validation_history vh ON l.id_link = vh.fk_link + WHERE vh.status_code = ANY(:statuses) + AND vh.timestamp = ( + SELECT MAX(timestamp) + FROM validation_history + WHERE fk_link = l.id_link + ) """ data = await fetch_data(query=query, values={'statuses': REDIRECTION_STATUSES}) return data @@ -115,22 +85,16 @@ async def get_redirection_statuses(): @app.get('/Client_Error_URLs/4xx', response_model=List[StatusResponse]) async def get_client_error_statuses(): query = """ - SELECT - l.id_link AS id, - l.urlname, - l.status, - l.warning, - l.result, - l.info, - array_remove(array_agg(DISTINCT p.parentname), NULL) AS parent_urls - FROM - links l - LEFT JOIN - parent p ON l.id_link = p.fk_link - WHERE - l.status ILIKE ANY (:statuses) - GROUP BY - l.id_link, l.urlname, l.status, l.warning, result, info + SELECT l.id_link, l.urlname, l.deprecated, l.consecutive_failures, + vh.status_code, vh.is_redirect, vh.error_message, vh.timestamp + FROM links l + JOIN validation_history vh ON l.id_link = vh.fk_link + WHERE vh.status_code = ANY(:statuses) + AND vh.timestamp = ( + SELECT MAX(timestamp) + FROM validation_history + WHERE fk_link = l.id_link + ) """ data = await fetch_data(query=query, values={'statuses': CLIENT_ERROR_STATUSES}) return data @@ -139,122 +103,87 @@ async def get_client_error_statuses(): @app.get('/Server_Errors_URLs/5xx', response_model=List[StatusResponse]) async def get_server_error_statuses(): query = """ - SELECT - l.id_link AS id, - l.urlname, - l.status, - l.warning, - l.result, - l.info, - array_remove(array_agg(DISTINCT p.parentname), NULL) AS parent_urls - FROM - links l - LEFT JOIN - parent p ON l.id_link = p.fk_link - WHERE - l.status ILIKE ANY (:statuses) - GROUP BY - l.id_link, l.urlname, l.status, l.warning, result, info + SELECT l.id_link, l.urlname, l.deprecated, l.consecutive_failures, + vh.status_code, vh.is_redirect, vh.error_message, vh.timestamp + FROM links l + JOIN validation_history vh ON l.id_link = vh.fk_link + WHERE vh.status_code = ANY(:statuses) + AND vh.timestamp = ( + SELECT MAX(timestamp) + FROM validation_history + WHERE fk_link = l.id_link + ) """ data = await fetch_data(query=query, values={'statuses': SERVER_ERROR_STATUSES}) return data -# Endpoint to retrieve data with client error statuses +# Endpoint to retrieve data for a specific URL @app.get('/status/{item:path}', response_model=List[StatusResponse]) async def get_status_for_url(item): query = """ - SELECT - l.id_link AS id, - l.urlname, - l.status, - l.warning, - l.result, - l.info, - array_remove(array_agg(DISTINCT p.parentname), NULL) AS parent_urls - FROM - links l - LEFT JOIN - parent p ON l.id_link = p.fk_link - WHERE - l.urlname = :item - GROUP BY - l.id_link, l.urlname, l.status, l.warning, result, info + SELECT l.id_link, l.urlname, l.deprecated, l.consecutive_failures, + vh.status_code, vh.is_redirect, vh.error_message, vh.timestamp + FROM links l + JOIN validation_history vh ON l.id_link = vh.fk_link + WHERE l.urlname = :item + AND vh.timestamp = ( + SELECT MAX(timestamp) + FROM validation_history + WHERE fk_link = l.id_link + ) """ data = await fetch_data(query=query, values={'item': item}) return data -# Endpoint to retrieve URLs that that timed out. Timeout is set to 5 seconds currently -@app.get('/Timeout_URLs', response_model=List[StatusResponse]) +# Update the timeout endpoint to match other query structures +@app.get('/Timeout_URLs', response_model=List[TimeoutResponse]) async def get_timeout_urls(): query = """ - SELECT - l.id_link AS id, - l.urlname, - l.status, - l.warning, - l.result, - l.info, - array_remove(array_agg(DISTINCT p.parentname), NULL) AS parent_urls - FROM - links l - LEFT JOIN - parent p ON l.id_link = p.fk_link - WHERE - l.status LIKE '%ReadTimeout%' OR l.status LIKE '%ConnectTimeout%' - GROUP BY - l.id_link, l.urlname, l.status, l.warning, result, info + SELECT l.id_link, l.urlname, l.deprecated, l.consecutive_failures, + vh.status_code, vh.is_redirect, vh.error_message, vh.timestamp + FROM links l + JOIN validation_history vh ON l.id_link = vh.fk_link + WHERE (vh.error_message LIKE '%ReadTimeout%' OR vh.error_message LIKE '%ConnectTimeout%') + AND vh.timestamp = ( + SELECT MAX(timestamp) + FROM validation_history + WHERE fk_link = l.id_link + ) """ data = await fetch_data(query=query) return data -@app.get('/Deprecated URLs', response_model=List[StatusResponse]) +@app.get('/Deprecated_URLs', response_model=List[LinkResponse]) async def get_deprecated_urls(): query = """ - SELECT - l.id_link AS id, - l.urlname, - l.status, - l.warning, - l.result, - l.info, - l.deprecated, - array_remove(array_agg(DISTINCT p.parentname), NULL) AS parent_urls - FROM - links l - LEFT JOIN - parent p ON l.id_link = p.fk_link - WHERE l.deprecated IS TRUE - GROUP BY - l.id_link, l.urlname, l.status, l.warning, result, info + SELECT id_link, urlname, deprecated, consecutive_failures + FROM links + WHERE deprecated IS TRUE """ data = await fetch_data(query=query) return data -@app.get("/URL_status_history", response_model=List[URLAvailabilityResponse]) +@app.get("/URL_status_history", response_model=List[StatusResponse]) async def get_url_status_history( url: str = Query(..., description="URL to get availability history"), limit: int = Query(100, ge=1, le=1000, description="Maximum number of results (default: 100, min: 1, max: 1000)") -) -> List[URLAvailabilityResponse]: +) -> List[StatusResponse]: query = """ SELECT + l.id_link, l.urlname, - l.status, - l.result, - l.info, - l.warning, - vh.validation_result AS validation_valid, - vh.timestamp AS last_checked, - array_agg(DISTINCT p.parentname) AS parent_urls + l.deprecated, + l.consecutive_failures, + vh.status_code, + vh.is_redirect, + vh.error_message, + vh.timestamp FROM links l - LEFT JOIN - parent p ON l.id_link = p.fk_link - LEFT JOIN + JOIN validation_history vh ON l.id_link = vh.fk_link WHERE l.urlname = :url - GROUP BY - l.urlname, l.status, l.result, l.info, l.warning, vh.validation_result, vh.timestamp ORDER BY vh.timestamp DESC LIMIT :limit @@ -264,7 +193,7 @@ async def get_url_status_history( results = await fetch_data(query=query, values={'url': url, 'limit': limit}) logger.info(f"Query returned {len(results)} results for URL: {url}") - response_data = [URLAvailabilityResponse(**dict(row)) for row in results] + response_data = [StatusResponse(**dict(row)) for row in results] return response_data except Exception as e: @@ -284,4 +213,4 @@ async def shutdown(): try: await database.disconnect() except Exception as e: - raise HTTPException(status_code=500, detail="Database disconnection failed") from e + raise HTTPException(status_code=500, detail="Database disconnection failed") from e \ No newline at end of file