Skip to content

Commit

Permalink
Splitting utils in half to allow easier worker environment management
Browse files Browse the repository at this point in the history
  • Loading branch information
bendavidsteel committed Sep 26, 2022
1 parent 8fcfbe8 commit d7338cf
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 73 deletions.
4 changes: 2 additions & 2 deletions seldonite/commoncrawl/cc_index_fetch_news.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

from seldonite.commoncrawl.sparkcc import CCIndexWarcSparkJob
from seldonite.commoncrawl.fetch_news import FetchNewsJob
from seldonite.helpers import heuristics, utils
from seldonite.helpers import worker_utils


class CCIndexFetchNewsJob(CCIndexWarcSparkJob, FetchNewsJob):
Expand Down Expand Up @@ -43,7 +43,7 @@ def set_query_options(self, urls=[], sites=[], crawls=[], lang=None, limit=None,
else:
three_lang = None

self.query = utils.construct_query(urls, sites, limit, crawls=crawls, lang=three_lang, url_black_list=url_black_list)
self.query = worker_utils.construct_query(urls, sites, limit, crawls=crawls, lang=three_lang, url_black_list=url_black_list)

def init_accumulators(self, spark_manager):
super().init_accumulators(spark_manager)
Expand Down
4 changes: 2 additions & 2 deletions seldonite/commoncrawl/fetch_news.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from seldonite import filters
from seldonite.commoncrawl.sparkcc import CCSparkJob
from seldonite.helpers import utils, heuristics
from seldonite.helpers import heuristics, worker_utils


class FetchNewsJob(CCSparkJob):
Expand Down Expand Up @@ -44,7 +44,7 @@ def _process_record(self, url, record):
page = record.content_stream().read()

try:
article = utils.html_to_article(url, page)
article = worker_utils.html_to_article(url, page)
except Exception as e:
self.get_logger().error("Error converting HTML to article for {}: {}",
record.rec_headers['WARC-Target-URI'], e)
Expand Down
65 changes: 0 additions & 65 deletions seldonite/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,8 @@
import requests
import pyspark.ml as sparkml
import sparknlp
from newspaper import Article

def link_to_article(link):
article = Article(link)
article.download()
article.parse()

return article

def html_to_article(url, html, title=None):
article = Article(url)
article.download(input_html=html)
article.parse()

if title is not None:
article.set_title(title)

return article

def dict_to_article(dict):
article = Article(dict['url'])
article.set_title(dict['title'])
article.set_text(dict['text'])
article.publish_date = dict['publish_date']
return article

def get_crawl_listing(crawl, data_type="wet"):
url = f"https://commoncrawl.s3.amazonaws.com/crawl-data/{crawl}/{data_type}.paths.gz"
Expand Down Expand Up @@ -124,48 +101,6 @@ def get_cc_crawls_since(date):

return crawl_ids

def construct_query(urls, sites, limit, crawls=None, lang='eng', url_black_list=[]):
#TODO automatically get most recent crawl
query = "SELECT url, warc_filename, warc_record_offset, warc_record_length, content_charset FROM ccindex WHERE subset = 'warc'"

if crawls:
#
if crawls == 'all':
pass
elif len(crawls) == 1:
query += f" AND crawl = '{crawls[0]}'"
else:
crawl_list = ', '.join([f"'{crawl}'" for crawl in crawls])
query += f" AND crawl IN ({crawl_list})"

# site restrict
if not all("." in domain for domain in sites):
raise ValueError("Sites should be the full registered domain, i.e. cbc.ca instead of just cbc")

if sites:
site_list = ', '.join([f"'{site}'" for site in sites])
query += f" AND url_host_registered_domain IN ({site_list})"

if urls:
url_list = ', '.join([f"'{url}'" for url in urls])
query += f" AND url IN ({url_list})"

# Language filter
if lang:
query += f" AND (content_languages IS NULL OR (content_languages IS NOT NULL AND content_languages = '{lang}'))"

if url_black_list:
# replace wildcards with %
url_black_list = [url_wildcard.replace('*', '%') for url_wildcard in url_black_list]
clause = " OR ".join((f"url_path LIKE '{url_wildcard}'" for url_wildcard in url_black_list))
query += f" AND NOT ({clause})"

# set limit to sites if needed
if limit:
query += f" LIMIT {str(limit)}"

return query


def map_col_with_index(iter, index_name, col_name, mapped_name, func, **kwargs):
index = []
Expand Down
67 changes: 67 additions & 0 deletions seldonite/helpers/worker_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from newspaper import Article

def link_to_article(link):
article = Article(link)
article.download()
article.parse()

return article

def html_to_article(url, html, title=None):
article = Article(url)
article.download(input_html=html)
article.parse()

if title is not None:
article.set_title(title)

return article

def dict_to_article(dict):
article = Article(dict['url'])
article.set_title(dict['title'])
article.set_text(dict['text'])
article.publish_date = dict['publish_date']
return article

def construct_query(urls, sites, limit, crawls=None, lang='eng', url_black_list=[]):
#TODO automatically get most recent crawl
query = "SELECT url, warc_filename, warc_record_offset, warc_record_length, content_charset FROM ccindex WHERE subset = 'warc'"

if crawls:
#
if crawls == 'all':
pass
elif len(crawls) == 1:
query += f" AND crawl = '{crawls[0]}'"
else:
crawl_list = ', '.join([f"'{crawl}'" for crawl in crawls])
query += f" AND crawl IN ({crawl_list})"

# site restrict
if not all("." in domain for domain in sites):
raise ValueError("Sites should be the full registered domain, i.e. cbc.ca instead of just cbc")

if sites:
site_list = ', '.join([f"'{site}'" for site in sites])
query += f" AND url_host_registered_domain IN ({site_list})"

if urls:
url_list = ', '.join([f"'{url}'" for url in urls])
query += f" AND url IN ({url_list})"

# Language filter
if lang:
query += f" AND (content_languages IS NULL OR (content_languages IS NOT NULL AND content_languages = '{lang}'))"

if url_black_list:
# replace wildcards with %
url_black_list = [url_wildcard.replace('*', '%') for url_wildcard in url_black_list]
clause = " OR ".join((f"url_path LIKE '{url_wildcard}'" for url_wildcard in url_black_list))
query += f" AND NOT ({clause})"

# set limit to sites if needed
if limit:
query += f" LIMIT {str(limit)}"

return query
4 changes: 2 additions & 2 deletions seldonite/sources/news.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from seldonite.commoncrawl.cc_index_fetch_news import CCIndexFetchNewsJob
from seldonite.commoncrawl.fetch_news import FetchNewsJob
from seldonite.commoncrawl.sparkcc import CCIndexSparkJob
from seldonite.helpers import utils
from seldonite.helpers import utils, worker_utils
from seldonite.spark import spark_tools

from googleapiclient.discovery import build as gbuild
Expand Down Expand Up @@ -335,7 +335,7 @@ def fetch(self, spark_manager, max_articles: int = 100, url_only=False):
if url_only:
articles.append(psql.Row(url=url))
else:
article = utils.link_to_article(url)
article = worker_utils.link_to_article(url)
row_values = collections.OrderedDict()
for feature in self.features:
if feature == 'url':
Expand Down
4 changes: 2 additions & 2 deletions tests/helpers/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime

import seldonite.helpers.utils as utils
from seldonite.helpers import utils, worker_utils

import pytest

Expand Down Expand Up @@ -72,5 +72,5 @@ def test_construct_db_uri():
("https://www.reuters.com/markets/commodities/ukraine-says-it-can-export-3-million-tonnes-grain-ports-next-month-2022-08-16/"),
("https://www.reuters.com/business/palladium-sheds-nearly-13-worries-over-china-demand-hit-2022-04-25/")])
def test_link_to_article(url):
article = utils.link_to_article(url)
article = worker_utils.link_to_article(url)
assert article.meta_data is not None

0 comments on commit d7338cf

Please sign in to comment.