Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Email collector improvements #402

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 85 additions & 51 deletions src/collectors/collectors/email_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class EmailCollector(BaseCollector):
description = "Collector for gathering data from emails"

parameters = [
Parameter(0, "EMAIL_SERVER_TYPE", "Email server type", "Server type parameter means IMAP or POP3 email server", ParameterType.STRING),
Parameter(0, "EMAIL_SERVER_TYPE", "Email server type", "IMAP or POP3 protocol", ParameterType.STRING),
Parameter(0, "EMAIL_SERVER_HOSTNAME", "Email server hostname", "Hostname of email server", ParameterType.STRING),
Parameter(0, "EMAIL_SERVER_PORT", "Email server port", "Port of email server", ParameterType.NUMBER),
Parameter(0, "EMAIL_USERNAME", "Username", "Username of email account", ParameterType.STRING),
Expand All @@ -49,6 +49,8 @@ def collect(self, source):
Parameters:
source -- Source object.
"""
BaseCollector.update_last_attempt(source)
self.collector_source = f"{self.name} '{source.name}':"
news_items = []
email_server_type = source.parameter_values["EMAIL_SERVER_TYPE"]
email_server_hostname = source.parameter_values["EMAIL_SERVER_HOSTNAME"]
Expand All @@ -58,7 +60,8 @@ def collect(self, source):
proxy_server = source.parameter_values["PROXY_SERVER"]

def proxy_tunnel():
server = f"{email_server_type.lower()}.{email_server_hostname.lower()}"
logger.debug(f"{self.collector_source} Establishing proxy tunnel")
server = f"{email_server_hostname.lower()}"
port = email_server_port

server_proxy = proxy_server.rsplit(":", 1)[0]
Expand All @@ -72,68 +75,97 @@ def proxy_tunnel():
s.send(str.encode(con))
s.recv(4096)

def get_data():
def process_email(email_message):
email_string = email_message.as_string()
if len(email_string) > 3000:
email_string = f"{email_string[:3000]}\n..."
logger.debug(f"{self.collector_source} Processing email: {email_string}")
review = ""
content = ""
url = ""
address = ""
link = ""
key = ""
value = ""
news_item = None

date_tuple = email.utils.parsedate_tz(email_message["Date"])
local_date = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
published = f'{str(local_date.strftime("%a, %d %b %Y %H:%M:%S"))}'

author = str(email.header.make_header(email.header.decode_header(email_message["From"])))
title = str(email.header.make_header(email.header.decode_header(email_message["Subject"])))
logger.debug(f"{self.collector_source} Processing email: {title}")
author = str(email.header.make_header(email.header.decode_header(email_message["From"])))
address = email.utils.parseaddr(email_message["From"])[1]
message_id = str(email.header.make_header(email.header.decode_header(email_message["Message-ID"])))
date_tuple = email.utils.parsedate_tz(email_message["Date"])
published = datetime.datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)).strftime("%d.%m.%Y - %H:%M")

for part in email_message.walk():
if part.get_content_type() == "text/plain":
content = part.get_payload(decode=True)
review = content[:500].decode("utf-8")
content = content.decode("utf-8")

for_hash = author + title + message_id

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
title,
review,
url,
link,
published,
author,
datetime.datetime.now(),
content,
source.id,
attributes,
)

if part.get_content_maintype() == "multipart":
pass
if part.get("Content-Disposition") is None:
pass

file_name = part.get_filename()

if file_name:

charset = part.get_content_charset()
logger.debug(f"{self.collector_source} Detected encoding of email '{title}': {charset}")
text_data = part.get_payload(decode=True)
if charset is None:
charset = "utf-8"
content = text_data.decode(charset)
review = content[:500]

for_hash = author + title + message_id

news_item = NewsItemData(
uuid.uuid4(),
hashlib.sha256(for_hash.encode()).hexdigest(),
title,
review,
address,
link,
published,
author,
datetime.datetime.now(),
content,
source.id,
attributes,
)
break

if news_item:
for part in email_message.walk():
file_name = part.get_filename()
binary_mime_type = part.get_content_type()
binary_value = part.get_payload()

news_attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value)
news_item.attributes.append(news_attribute)
if binary_mime_type == "message/rfc822":
logger.debug(f"{self.collector_source} Found an attached email")
attached = part.get_payload()
if isinstance(attached, list):
attached_email = attached[0]
else:
attached_email = attached
# Process .eml file as an email
process_email(attached_email)

elif binary_mime_type == "application/pkcs7-signature" or binary_mime_type == "application/x-pkcs7-signature":
logger.debug(f"{self.collector_source} Found a X.509 signature attachment")
# Skip signature attachments
continue

elif binary_mime_type == "application/pgp-signature":
logger.debug(f"{self.collector_source} Found a PGP signature attachment")
binary_value = part.get_payload()
# Skip signature attachments
continue

elif file_name:
# Handle other binary attachments
logger.debug(f"{self.collector_source} Found an attachment '{file_name}' with MIME type '{binary_mime_type}'")
binary_value = part.get_payload()
if binary_value:
news_attribute = NewsItemAttribute(binary_mime_type, file_name, binary_mime_type, binary_value)
news_item.attributes.append(news_attribute)
else:
logger.error(f"{self.collector_source} Attachment is empty or could not be decoded: {file_name}")

news_items.append(news_item)

if email_server_type.lower() == "imap":
logger.debug(f"{self.collector_source} Fetching emails using IMAP")
try:
if proxy_server:
proxy_tunnel()

connection = imaplib.IMAP4_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port)
connection = imaplib.IMAP4_SSL(email_server_hostname.lower(), email_server_port)
connection.login(email_username, email_password)
connection.select("inbox")

Expand All @@ -148,18 +180,20 @@ def get_data():
raw_email_string = raw_email.decode("utf-8")
email_message = email.message_from_string(raw_email_string, policy=policy.default)

get_data()
process_email(email_message)

connection.close()
connection.logout()
except Exception as error:
logger.exception(f"{self.collector_source} Fetch emails using IMAP failed: {error}")
logger.exception(f"{self.collector_source} Failed to fetch emails using IMAP: {error}")

elif email_server_type.lower() == "pop3":
logger.debug(f"{self.collector_source} Fetching emails using POP3")
try:
if proxy_server:
proxy_tunnel()

connection = poplib.POP3_SSL(f"{email_server_type.lower()}.{email_server_hostname.lower()}", email_server_port)
connection = poplib.POP3_SSL(email_server_hostname.lower(), email_server_port)
connection.user(email_username)
connection.pass_(email_password)

Expand All @@ -171,11 +205,11 @@ def get_data():
raw_email = b"\n".join(connection.retr(i + 1)[1])
email_message = email.message_from_bytes(raw_email)

get_data()
process_email(email_message)

connection.quit()
except Exception as error:
logger.exception(f"{self.collector_source} Fetch emails using POP3 failed: {error}")
logger.exception(f"{self.collector_source} Failed to fetch emails using POP3: {error}")
else:
logger.error(f"{self.collector_source} Email server connection type is not supported: {email_server_type}")

Expand Down
10 changes: 7 additions & 3 deletions src/collectors/collectors/web_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,12 @@ def __process_title_page_articles(self, browser, title_page_handle, index_url):
news_item = self.__process_article_page(index_url, browser)
if news_item:
logger.debug(f"{self.collector_source} ... Title : {news_item.title}")
logger.debug(f"{self.collector_source} ... Review : {news_item.review.replace('\r', '').replace('\n', ' ').strip()[:100]}")
logger.debug(f"{self.collector_source} ... Content : {news_item.content.replace('\r', '').replace('\n', ' ').strip()[:100]}")
logger.debug(
f"{self.collector_source} ... Review : {news_item.review.replace('\r', '').replace('\n', ' ').strip()[:100]}"
)
logger.debug(
f"{self.collector_source} ... Content : {news_item.content.replace('\r', '').replace('\n', ' ').strip()[:100]}"
)
logger.debug(f"{self.collector_source} ... Published: {news_item.published}")
self.news_items.append(news_item)
else:
Expand Down Expand Up @@ -871,6 +875,6 @@ def __process_article_page(self, index_url, browser):
key = "Additional_ID"
binary_mime_type = ""
binary_value = ""
attribute = NewsItemAttribute(uuid.uuid4(), key, value, binary_mime_type, binary_value)
attribute = NewsItemAttribute(key, value, binary_mime_type, binary_value)
news_item.attributes.append(attribute)
return news_item
5 changes: 4 additions & 1 deletion src/publishers/publishers/email_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ def publish(self, publisher_input):
logger.info(f"Encrypting email with file {encrypt}")
envelope.encryption(key=open(encrypt))

logger.debug(f"=== COMPOSED FOLLOWING EMAIL ===\n{envelope}")
email_string = str(envelope)
if len(email_string) > 3000:
email_string = f"{email_string[:3000]}\n..."
logger.debug(f"=== COMPOSED FOLLOWING EMAIL ===\n{email_string}")

envelope.smtp(smtp)
try:
Expand Down