Skip to content

Commit

Permalink
Added multithreading for multiple loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
GLivshits committed Sep 29, 2021
1 parent 0b90699 commit 7bebdb9
Show file tree
Hide file tree
Showing 16 changed files with 322 additions and 33 deletions.
6 changes: 6 additions & 0 deletions .ipynb_checkpoints/Untitled-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 4
}
181 changes: 181 additions & 0 deletions Untitled.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from instaloader.instaloader import Instaloader, Post\n",
"from instaloader.nodeiterator import NodeIterator\n",
"import pickle"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"L = Instaloader(download_pictures=False, download_videos=False, download_video_thumbnails=False, download_geotags = False, download_comments = False, compress_json=True, save_metadata = True, post_metadata_txt_pattern=\"\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"LOC = '309580832976358'\n",
"post_iterator = NodeIterator(L.context, \n",
" \"ac38b90f0f3981c42092016a37c59bf7\", \n",
" lambda d: d['data']['location']['edge_location_to_media'], \n",
" lambda n: instaloader.Post(L.context, n),\n",
" {'id': LOC},\n",
" f\"https://www.instagram.com/explore/locations/{LOC}/\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"HASHTAG = 'bodypositive'\n",
"post_iterator = NodeIterator(\n",
" L.context, \"9b498c08113f1e09617a1703c22b2f32\",\n",
" lambda d: d['data']['hashtag']['edge_hashtag_to_media'],\n",
" lambda n: Post(L.context, n),\n",
" {'tag_name': HASHTAG},\n",
" f\"https://www.instagram.com/explore/tags/{HASHTAG}/\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" for post in post_iterator:\n",
" L.download_post(post, target = HASHTAG)\n",
"except Exception as err:\n",
" print(err)\n",
" iteratorfreeze = post_iterator.freeze()\n",
" with open(f'{HASHTAG}.pkl', 'wb') as resumefile:\n",
" pickle.dump(iteratorfreeze, resumefile)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(f'{HASHTAG}.pkl', 'rb') as resumefile:\n",
" iteratorfreeze = pickle.load(resumefile)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"iteratorfreeze"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"a = [1,2,3]"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"ename": "TypeError",
"evalue": "'int' object is not iterable",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m/tmp/ipykernel_2027462/3346660367.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0ma\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mextend\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m: 'int' object is not iterable"
]
}
],
"source": [
"a.extend(1)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import torch"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"ename": "FileNotFoundError",
"evalue": "[Errno 2] No such file or directory: 'stylegan2/models/finger_masks_asd/model_4.pt'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mFileNotFoundError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m/tmp/ipykernel_853173/2129044928.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mweights\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtorch\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mload\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'stylegan2/models/finger_masks_asd/model_4.pt'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m~/anaconda3/lib/python3.8/site-packages/torch/serialization.py\u001b[0m in \u001b[0;36mload\u001b[0;34m(f, map_location, pickle_module, **pickle_load_args)\u001b[0m\n\u001b[1;32m 577\u001b[0m \u001b[0mpickle_load_args\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'encoding'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m'utf-8'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 578\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 579\u001b[0;31m \u001b[0;32mwith\u001b[0m \u001b[0m_open_file_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'rb'\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mopened_file\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 580\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0m_is_zipfile\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mopened_file\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 581\u001b[0m \u001b[0;31m# The zipfile reader is going to advance the current file position.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/anaconda3/lib/python3.8/site-packages/torch/serialization.py\u001b[0m in \u001b[0;36m_open_file_like\u001b[0;34m(name_or_buffer, mode)\u001b[0m\n\u001b[1;32m 228\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m_open_file_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mname_or_buffer\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmode\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 229\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0m_is_path\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mname_or_buffer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 230\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0m_open_file\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mname_or_buffer\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmode\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 231\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 232\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;34m'w'\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mmode\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/anaconda3/lib/python3.8/site-packages/torch/serialization.py\u001b[0m in \u001b[0;36m__init__\u001b[0;34m(self, name, mode)\u001b[0m\n\u001b[1;32m 209\u001b[0m \u001b[0;32mclass\u001b[0m \u001b[0m_open_file\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_opener\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 210\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__init__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mname\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmode\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 211\u001b[0;31m \u001b[0msuper\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_open_file\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__init__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mopen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mname\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmode\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 212\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 213\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__exit__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mFileNotFoundError\u001b[0m: [Errno 2] No such file or directory: 'stylegan2/models/finger_masks_asd/model_4.pt'"
]
}
],
"source": [
"weights = torch.load('stylegan2/models/finger_masks_asd/model_4.pt')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
20 changes: 20 additions & 0 deletions app.log
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,23 @@ IndexError: list index out of range
2021-08-27 14:23:17,347 - JSON Query to graphql/query: Redirected to login page. Use --login. [retrying; skip with ^C]
2021-08-27 14:23:45,522 - [skipped by user]
2021-08-27 14:23:45,523 - im_vladapetrova: JSON Query to graphql/query: Redirected to login page. Use --login.
2021-09-14 11:32:16,910 - JSON Query to graphql/query: Redirected to login page. Use --login. [retrying; skip with ^C]
2021-09-14 11:35:21,538 - JSON Query to graphql/query: Redirected to login page. Use --login. [retrying; skip with ^C]
2021-09-14 11:37:19,106 - [skipped by user]
2021-09-14 16:39:53,918 - JSON Query to graphql/query: Redirected to login page. Use --login. [retrying; skip with ^C]
2021-09-14 16:40:11,029 - [skipped by user]
2021-09-22 10:43:38,102 - JSON Query to explore/tags/bodypositive/: Redirected to login page. Use --login. [retrying; skip with ^C]
2021-09-22 10:43:43,013 - [skipped by user]
2021-09-23 12:19:07,896 - JSON Query to graphql/query: HTTP error code 500. [retrying; skip with ^C]
2021-09-23 12:19:08,828 - JSON Query to graphql/query: HTTP error code 500. [retrying; skip with ^C]
2021-09-28 15:12:45,171 - Error occurred during loading data. Trying to use cache server https://fake-useragent.herokuapp.com/browsers/0.1.11
Traceback (most recent call last):
File "/home/grisha/anaconda3/lib/python3.8/site-packages/fake_useragent/utils.py", line 154, in load
for item in get_browsers(verify_ssl=verify_ssl):
File "/home/grisha/anaconda3/lib/python3.8/site-packages/fake_useragent/utils.py", line 99, in get_browsers
html = html.split('<table class="w3-table-all notranslate">')[1]
IndexError: list index out of range
2021-09-28 15:13:51,075 - JSON Query to im_vladapetrova/feed/: Expecting value: line 1 column 1 (char 0) [retrying; skip with ^C]
2021-09-28 15:13:53,448 - JSON Query to im_vladapetrova/feed/: Expecting value: line 1 column 1 (char 0) [retrying; skip with ^C]
2021-09-28 15:15:13,242 - JSON Query to im_vladapetrova/feed/: Expecting value: line 1 column 1 (char 0) [retrying; skip with ^C]
2021-09-28 15:15:15,011 - JSON Query to im_vladapetrova/feed/: Expecting value: line 1 column 1 (char 0) [retrying; skip with ^C]
107 changes: 92 additions & 15 deletions instaloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from instaloader.instaloader import Instaloader
from instaloader.instaloadercontext import default_user_agent
import pandas as pd
from tqdm import tqdm
from multiprocessing import Process
from queue import Empty, Queue
from threading import Thread
import os
from instaloader.utils import get_profile_struct

Expand Down Expand Up @@ -72,7 +72,7 @@ def delete_row(df, row_idx):
for i in range(args.proxy_index + 1):
proxy_objects.append(ProxyRotator(api_key = api_key, idx = i))
else:
proxy_objects.append(None)
proxy_objects = [None]

loaders = []
for proxy in proxy_objects:
Expand Down Expand Up @@ -122,19 +122,34 @@ def delete_row(df, row_idx):
else:
ids.append({'id': 'nan', 'username': item})

print(len(ids))
ids_container = iter(ids)

flag = True
total_index = 0
done = False

def produce(queue):
global ids_container
while True:
try:
item = next(ids_container)
print('Producer thread target: {}'.format(item))
queue.put(item)
except StopIteration:
break
except KeyboardInterrupt:
break


def subprocess(loader, target):
global total_index, df
total_index += 1
print('Current total index is {}.'.format(total_index))
try:
if args.task not in ['scrape_hashtag', 'scrape_location']:
target = get_profile_struct(loader, target)

func(loader, target)
func(loader, target, max_count = 3)
total_index += 1
if not (df is None):
df.loc[target['idx'], 'downloaded'] = True
Expand All @@ -153,18 +168,80 @@ def subprocess(loader, target):
raise err


def subprocess(loader, queue):
global total_index, df, done
total_index += 1
print('Current total index is {}.'.format(total_index))
while not done:
try:
target = queue.get(timeout = 1.)
print('Consumer thread target: {}'.format(target))
if args.task not in ['scrape_hashtag', 'scrape_location']:
target = get_profile_struct(loader, target)
func(loader, target, max_count = 3)
total_index += 1
if not (df is None):
df.loc[target['idx'], 'downloaded'] = True
if (total_index % 100 == 0):
df.to_csv(args.csv_path, sep = ';', index = None)
queue.task_done()
except (QueryReturnedNotFoundException, ProfileNotExistsException):
if not (df is None):
df = delete_row(df, target['idx'])
pass
except KeyboardInterrupt:
if not (df is None):
df.to_csv(args.csv_path, sep = ';', index = None)
raise
except Empty:
done = True
pass
except Exception as err:
print(err)
raise err

# num_processes = 1 if (not args.use_proxy) else (args.proxy_index + 1)
num_processes = len(loaders)

q = Queue(len(loaders))
producer = Thread(target = produce, args = (q,))
producer.start()

# if num_processes == 1:
# loader = loaders[0]
# del loaders
# for item in ids_container:
# subprocess(loader, item)
# else:
# processes = [None]*len(loaders)
processes = []
flag = True
for i in range(len(loaders)):
consumer = Thread(target = subprocess, args = (loaders[i], q))
consumer.start()
processes.append(consumer)
producer.join()
for consumer in processes:
consumer.join()
# processes[i].join()
# while flag:
# try:
# for i, process in enumerate(processes):
# time.sleep(0.1)
# if process.is_alive():
# continue
# else:
# process.kill()
# processes[i] = Thread(subprocess, args = (item, next(ids_container)))
# processes[i].start()
# # processes[i].join()



# except KeyboardInterrupt:
# print('KeyboardInterrupt! breaking the loop')
# flag = False

num_processes = 1 if (not args.use_proxy) else (args.proxy_index + 1)
if num_processes == 1:
loader = loaders[0]
del loaders
for item in ids_container:
subprocess(loader, item)
else:
processes = []
for item in loaders:
processes.append(Process(subprocess, args = (item, next(ids_container))))

print('Ready!')

Expand Down
Binary file modified instaloader/__pycache__/__init__.cpython-38.pyc
Binary file not shown.
Binary file modified instaloader/__pycache__/exceptions.cpython-38.pyc
Binary file not shown.
Binary file modified instaloader/__pycache__/instaloader.cpython-38.pyc
Binary file not shown.
Binary file modified instaloader/__pycache__/instaloadercontext.cpython-38.pyc
Binary file not shown.
Binary file modified instaloader/__pycache__/nodeiterator.cpython-38.pyc
Binary file not shown.
Binary file modified instaloader/__pycache__/proxyrotator.cpython-38.pyc
Binary file not shown.
Binary file modified instaloader/__pycache__/structures.cpython-38.pyc
Binary file not shown.
6 changes: 6 additions & 0 deletions instaloader/instaloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,13 +944,19 @@ def get_location_posts(self, location: str) -> Iterator[Post]:
"""
has_next_page = True
end_cursor = None
# location_hash = "ac38b90f0f3981c42092016a37c59bf7"
while has_next_page:
if end_cursor:
params = {'__a': 1, 'max_id': end_cursor}
else:
params = {'__a': 1}
# params['query_hash'] = location_hash
# params['id'] = str(location)
location_data = self.context.get_json('explore/locations/{0}/'.format(location),
params)['graphql']['location']['edge_location_to_media']
# location_data = self.context.get_json('graphql/query',
# params)['graphql']['location']['edge_location_to_media']

yield from (Post(self.context, edge['node']) for edge in location_data['edges'])
has_next_page = location_data['page_info']['has_next_page']
end_cursor = location_data['page_info']['end_cursor']
Expand Down
Loading

0 comments on commit 7bebdb9

Please sign in to comment.