-
Notifications
You must be signed in to change notification settings - Fork 3
/
MastodonApi.py
75 lines (64 loc) · 2.68 KB
/
MastodonApi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from mastodon import Mastodon
from bs4 import BeautifulSoup as bs
import logging
import re
import os
from types import SimpleNamespace
class MastodonApi:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
api = None
def get_api(Self, client_id, client_secret, access_token, access_token_secret):
Self.api=Mastodon(client_id, client_secret, access_token, os.getenv('MASTODON_SERVER') )
return Self.api
def media_upload(Self,filename):
media = SimpleNamespace()
mastodon_media = Self.api.media_post(filename,synchronous=True)
media.media_id=mastodon_media.id
return media
def update_status(Self,text, media ,id):
status= Self.api.status_post(text,in_reply_to_id=id, media_ids=[media.media_id])
return status
def reply(Self, toot, text):
msg=" "
for line in text.split("\n"):
if "ERROR" in line or "error:" in line:
msg=msg+line+"\n"
Self.logger.info(f"MSG: {msg}")
status = {}
try:
status = Self.api.status_post(msg,in_reply_to_id=toot.id)
except:
Self.logger.error(f"Unable to post message: {status}")
def get_replies(Self, since_id):
replies={}
#result=api.search("#atari8bitbot", result_type="statuses",exclude_unreviewed=False, min_id=since_id)
result=Self.api.timeline_hashtag("atari8bitbot", since_id=since_id)
Self.logger.debug(f"result: {result}")
for toot in result:
#parse the message to extract entities
message=Self.extract_entities(toot.content)
status=SimpleNamespace()
status.id = toot['id']
status.entities={}
if 'urls' in message.keys():
if 'urls' not in status.entities.keys():
status.entities['urls']=[]
status.entities['urls']=message['urls']
status.user=SimpleNamespace()
status.user.screen_name=toot.account.display_name
status.user.name=toot.account.acct
status.full_text=message['text'].strip()
replies[status.id]=status
Self.logger.debug(f"status: {status.id}")
Self.logger.info(f"replies: {replies}")
return replies.values()
def extract_entities(Self,html_doc):
message={}
html_doc=re.sub(r'<br\s*/?>', '\n', html_doc)
html_doc=re.sub(r'</p>', '\n', html_doc)
html_doc=re.sub('<[^<]+?>', '', html_doc)
html_doc=re.sub('#atari8bitbot\s?', '', html_doc, flags=re.IGNORECASE)
soup = bs(html_doc, 'html.parser')
message['text'] = soup.get_text(separator="\n")
return message