Skip to content

Commit

Permalink
Version 1.2.2 (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkoestner authored Sep 17, 2024
1 parent 63c08c3 commit 3da1581
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 72 deletions.
17 changes: 14 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@
# from where the dockerfile is located.
FROM python:3.9-slim

# Install git and chromium (lighter version of Chrome for seleniumbase)
# Install git, chromium, and x11 display(lighter version of Chrome for seleniumbase)
# x11 are only needed when debugging
RUN apt-get update && \
apt-get install -y --no-install-recommends git chromium && \
apt-get install -y --no-install-recommends \
chromium \
git \
xvfb \
x11vnc \
xfonts-base \
xauth \
x11-apps && \
ln -s /usr/bin/chromium /usr/bin/google-chrome && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Copy uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

# Set work directory
WORKDIR /code

# Install requirements (without copying the whole directory)
RUN pip install --no-cache-dir "git+https://github.com/jkoestner/folioflex.git@main"
RUN uv pip install --no-cache-dir --system "git+https://github.com/jkoestner/folioflex.git@main"

# Create new user
RUN adduser --disabled-password --gecos '' ffx && \
Expand Down
260 changes: 200 additions & 60 deletions folioflex/chatbot/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
"""

import datetime
import http.client
import os
import re

import requests
from bs4 import BeautifulSoup
from seleniumbase import SB
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.remote.webdriver import WebDriver
from seleniumbase import SB, Driver

from folioflex.portfolio import helper
from folioflex.utils import config_helper, custom_logger

logger = custom_logger.setup_logging(__name__)
Expand Down Expand Up @@ -44,28 +45,9 @@ def scrape_html(
"""
scrape_results = {"url": url, "text": None}
if url.startswith("https://www.wsj.com/finance"):
# get todays date and make sure it's a valid trading day to use in url
now = datetime.datetime.now()
start_hour = 8
if now.hour < start_hour:
today = datetime.date.today() - datetime.timedelta(days=1)
else:
today = datetime.date.today()
today = helper.check_stock_dates(today, fix=True, warning=False)["fix_tx_df"][
"date"
][0]
formatted_today = today.strftime("%m-%d-%Y")

# go to url
url = (
"https://www.wsj.com/livecoverage/stock-market-today-dow-jones-earnings-"
+ formatted_today
)

logger.info(f"scraping '{url}' with '{scraper}'")
if scraper == "selenium":
soup = scrape_selenium(url, **kwargs)
soup, url = scrape_selenium(url, **kwargs)
elif scraper == "bee":
soup = scrape_bee(url, **kwargs)

Expand All @@ -84,7 +66,7 @@ def scrape_html(
except AttributeError:
logger.info("no match found")

scrape_results = {"url": url, "text": scrape_text}
scrape_results = {"url": url, "text": scrape_text}

return scrape_results

Expand All @@ -102,41 +84,69 @@ def close_windows(sb, url):
"""
open_windows = sb.driver.window_handles
logger.info(f"close {len(open_windows)-1} open windows")
nbr_windows = len(open_windows)
logger.info(f"close {nbr_windows-1} open windows")
for window in open_windows:
sb.driver.switch_to.window(window)
if url not in sb.get_current_url():
nbr_windows = len(sb.driver.window_handles)
if url not in sb.get_current_url() and nbr_windows > 1:
sb.driver.close()
open_windows = sb.driver.window_handles
sb.driver.switch_to.window(open_windows[0])


def scrape_selenium(
url,
xvfb=None,
screenshot=False,
port=None,
**kwargs,
):
"""
Scrape the html of a website using seleniumbase.
seleniumbase has a lot of methods that can be used in kwargs shown here:
https://github.com/seleniumbase/SeleniumBase/blob/af3d9545473e55b2a25cdbab8be0b1ed5e1f6afa/seleniumbase/plugins/sb_manager.py
Parameters
----------
url : str
url of the website to scrape
xvfb : bool (optional)
It's recommended if using uc=True to not run the headless2=True option and to
have xvfb=True if running in linux.
screenshot : bool (optional)
take a screenshot of the website
port : int (optional)
port to use for debugging
**kwargs : dict (optional)
keyword arguments for the options of the driver
Returns
-------
soup : bs4.BeautifulSoup
beautiful soup object with the html of the website
url : str
url of the website
"""
# kwargs defaults
binary_location = kwargs.pop("binary_location", None)
extension_dir = kwargs.pop("extension_dir", None)
headless2 = kwargs.pop("headless2", True)
headless2 = kwargs.pop("headless2", False)
wait_time = kwargs.pop("wait_time", 10)
proxy = kwargs.pop("proxy", None)
chromium_arg = None

# use xvfb if running a linux os and xvfb is not specified
if xvfb is None and os.name == "posix" and not headless2:
logger.info("using xvfb for browser")
xvfb = True
if port:
logger.info(f"using port {port} for browser debugging")
chromium_arg = f"--remote-debugging-port={port}"

if binary_location or config_helper.BROWSER_LOCATION:
logger.info("using binary location for browser")
Expand All @@ -147,46 +157,75 @@ def scrape_selenium(
if proxy:
logger.info("using proxy for browser")

with SB(
uc=True,
headless2=headless2,
binary_location=binary_location,
extension_dir=extension_dir,
proxy=proxy,
**kwargs,
) as sb:
logger.info("initializing the driver")
# wsj has a specific landing page
if url.startswith("https://www.wsj.com/livecoverage"):
url = "https://www.wsj.com/finance"
sb.driver.uc_open_with_reconnect(url, reconnect_time=wait_time)
# specific landing page
#
# this breaks frequently. added the following due to breaks
# good issue describing the detection:
# https://github.com/seleniumbase/SeleniumBase/issues/2842
#
# incognito=True, to avoid detection
# xvfb=True, uc works better when display is shown and linux usually needs xvfb
# headless2=False, uc works better when display is shown
if re.match(r"https://www\.w.j\.com/finance", url, re.IGNORECASE):
with SB(
uc=True,
incognito=True,
xvfb=xvfb,
headless2=headless2,
binary_location=binary_location,
extension_dir=extension_dir,
proxy=proxy,
chromium_arg=chromium_arg,
**kwargs,
) as sb:
sb.sleep(2)
close_windows(sb, url)
logger.info("obtaining the landing page")
sb.driver.uc_open_with_reconnect(url, reconnect_time=wait_time + 5)
try:
logger.info("wsj has specific landing page")
selector = "//p[contains(text(), 'View All')]/ancestor::a[1]"
url = sb.get_attribute(
selector=selector,
attribute="href",
by="xpath",
timeout=6,
hard_fail=True,
sb.driver.uc_click(
"(//p[contains(text(), 'View All')])[1]", reconnect_time=wait_time
)
sb.driver.uc_open_with_reconnect(url, reconnect_time=wait_time)
close_windows(sb, url)
url = sb.get_current_url()
logger.info(f"scraping {url}")
soup = sb.get_beautiful_soup()
if screenshot:
logger.info("screenshot saved to 'screenshot.png'")
sb.driver.save_screenshot("screenshot.png")
except Exception:
logger.error("WSJ probably flagged bot: returning None")
html_content = "<html><body><p>could not scrape wsj</p></body></html>"
logger.error("probably flagged bot: returning None")
html_content = "<html><body><p>could not scrape</p></body></html>"
soup = BeautifulSoup(html_content, "html.parser")
return soup
# all other
else:
sb.driver.uc_open_with_reconnect(url, reconnect_time=wait_time)
if screenshot:
logger.info("screenshot saved to 'screenshot.png'")
sb.driver.save_screenshot("screenshot.png")
return soup, url

else:
with SB(
uc=True,
incognito=True,
xvfb=xvfb,
headless2=headless2,
binary_location=binary_location,
extension_dir=extension_dir,
proxy=proxy,
chromium_arg=chromium_arg,
**kwargs,
) as sb:
sb.sleep(2)
close_windows(sb, url)
logger.info(f"scraping {sb.get_current_url()}")
sb.sleep(wait_time) # wait for page to load
soup = sb.get_beautiful_soup()
logger.info("initializing the driver")
sb.driver.uc_open_with_reconnect(url, reconnect_time=wait_time)
logger.info(f"scraping {url}")
soup = sb.get_beautiful_soup()
if screenshot:
logger.info("screenshot saved to 'screenshot.png'")
sb.driver.save_screenshot("screenshot.png")

return soup
logger.info("scraped")

return soup, url


def scrape_bee(url, **kwargs):
Expand Down Expand Up @@ -221,3 +260,104 @@ def scrape_bee(url, **kwargs):
soup = BeautifulSoup(response.content, "html.parser")

return soup


def scrape_test(url, **kwargs):
"""
Test basic functionality of seleniumbase scraper.
When debugging a website it is useful if able to able to find the cause of
the error from the function or from another source. This function creates
a pause when connecting to the website that will temporarily stop the selenium
driver.
Here are some common test sites:
- pixelscan.net
- fingerprint.com/products/bot-detection/
- nowsecure.nl
Parameters
----------
url : str
url of the website to scrape
**kwargs : dict (optional)
keyword arguments for the options of the driver
Returns
-------
soup : bs4.BeautifulSoup
beautiful soup object with the html of the website
"""
driver = Driver(
uc=True,
headless2=False,
)
driver.sleep(2)
close_windows(driver, url)
logger.info("connecting to website")
driver.uc_open_with_reconnect(url, reconnect_time="breakpoint")
logger.info("exit site")
driver.quit()


def attach_to_session(executor_url, session_id, options=None):
"""
Attach to an existing browser session.
Parameters
----------
executor_url : str
The URL of the WebDriver server to connect to.
session_id : str
The ID of the session to attach to.
options : Options, optional
The options to use when attaching to the session.
Returns
-------
WebDriver
A WebDriver instance that is attached to the existing session.
"""
# save the original execute method of driver
original_execute = WebDriver.execute

def override_execute_method(self, command, params=None):
"""
Override for the newSession command.
Parameters
----------
self : WebDriver
The WebDriver instance to execute the command on.
command : str
The name of the command to execute.
params : dict, optional
The parameters for the command.
Returns
-------
dict
The result of the command execution.
"""
# point to the existing session
if command == "newSession":
return {"value": {"sessionId": session_id, "capabilities": {}}}
else:
return original_execute(self, command, params)

# override the execute method with the original session
WebDriver.execute = override_execute_method

# create the driver
if options is None:
options = Options()
driver = WebDriver(command_executor=executor_url, options=options)
driver.session_id = session_id

# restore the method
WebDriver.execute = original_execute

return driver
Loading

0 comments on commit 3da1581

Please sign in to comment.