forked from armcknight/twitterwipe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitterwipe.py
130 lines (89 loc) · 3.52 KB
/
twitterwipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import os
import tweepy
import json
import yaml
import logging
import concurrent.futures
from datetime import timedelta, datetime
full_path = os.path.dirname(os.path.realpath(__file__))
logging.basicConfig(filename=full_path + '/log.log', level=logging.INFO,
format='%(asctime)s %(message)s')
logger = logging.getLogger(__name__)
def main():
logger.info('starting twitterwipe')
with open(full_path + '/config.yaml', 'r') as yamlfile:
config = yaml.load(yamlfile, Loader=yaml.FullLoader)
delete_timestamps = get_delete_timestamps(config)
purge_activity(delete_timestamps)
logger.info('done')
def get_delete_timestamps(config):
curr_dt_utc = datetime.utcnow()
days = config['days_to_save']
likes = days['likes']
retweets = days['retweets']
tweets = days['tweets']
likes_delta = timedelta(likes)
retweets_delta = timedelta(tweets)
tweets_delta = timedelta(tweets)
likes_time = curr_dt_utc - likes_delta
retweets_time = curr_dt_utc - retweets_delta
tweets_time = curr_dt_utc - tweets_delta
return (likes_time, retweets_time, tweets_time)
def purge_activity(delete_timestamps):
api = get_api()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as e:
e.submit(delete_tweets, api, delete_timestamps[2])
e.submit(delete_retweets, api, delete_timestamps[1])
e.submit(delete_favorites, api, delete_timestamps[0])
def delete_tweets(api, ts):
logger.info('deleting tweets before {}'.format(str(ts)))
count = 0
for status in tweepy.Cursor(api.user_timeline).items():
if status.created_at < ts:
try:
api.destroy_status(status.id)
count += 1
except Exception as e:
logger.error("failed to delete {}".format(status.id), exc_info=True)
logger.info('{} tweets deleted'.format(count))
return
def delete_retweets(api, ts):
logger.info('deleting retweets before {}'.format(str(ts)))
count = 0
for status in tweepy.Cursor(api.user_timeline).items():
if status.created_at < ts:
try:
api.unretweet(status.id)
count+=1
except:
logger.error('failed to unretweet {}'.format(status.id),exc_info=True)
logger.info('{} retweets deleted'.format(count))
return
def delete_favorites(api, ts):
logger.info('deleting favorites before {}'.format(str(ts)))
count = 0
for status in tweepy.Cursor(api.favorites).items():
if status.created_at < ts:
try:
api.destroy_favorite(status.id)
count+=1
except:
logger.error('failed to delete favorite'.format(status.id), exc_info=True)
logger.info('{} favorites deleted'.format(count))
return
def get_api( ):
try:
# get keys from os environment variables if they exist
d = { 'consumer_key' : os.environ['CONSUMER_KEY'],
'consumer_secret' : os.environ['CONSUMER_SECRET'],
'app_key' : os.environ['APP_KEY'],
'app_secret' :os.environ['APP_SECRET'] }
except KeyError:
# environment variables are not found, so try the json file instead
with open(full_path + '/keys.json', 'r') as f:
d = json.load(f)
auth = tweepy.OAuthHandler(d['consumer_key'], d['consumer_secret'])
auth.set_access_token(d['app_key'], d['app_secret'])
return tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
if __name__ == '__main__':
main()