Skip to content
This repository has been archived by the owner on Dec 14, 2023. It is now read-only.

Fix solr collections #842

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
815042a
Make no_dedup_sentences the default for extract-and-vector
philbudne Mar 6, 2022
c77dd18
common/Dockerfile: skip jieba.cache creation; makes empty root owned …
philbudne Mar 18, 2022
f37b7f9
solr-base/Dockerfile: try cloning mediacloud config as mediacloud64
philbudne Mar 18, 2022
e1be9a4
common/Dockerfile: reenable jieba.cache creation & chown it
philbudne Mar 23, 2022
1868296
apps/common/src/python/mediawords/solr/request.py: add/use SOLR_COLLE…
philbudne Mar 23, 2022
4ab3729
apps/import-solr-data/src/perl/MediaWords/Solr/Dump.pm: speedups for …
philbudne Mar 23, 2022
791248a
add apps/solr-base/src/solr/aliases.json with "mediacloud2" solr alias
philbudne Mar 23, 2022
c4fb3d6
apps/common/src/requirements.txt: force MarkupSafe==2.0.1
philbudne Mar 25, 2022
5845a61
solr-zookeeper: preload aliases.json into zookeeper
philbudne Mar 27, 2022
40391ed
apps/postgresql-server/bin/apply_migrations.sh: increase PGCTL_START_…
philbudne Apr 27, 2022
f395116
postgresql-pgbouncer/conf/pgbounder.init:
philbudne May 7, 2022
76be844
pgbouncer.ini: use postgresql server ip
philbudne May 7, 2022
bf74554
apps/webapp-api/src/perl/MediaWords/Controller/Api/V2/Timespans.pm
philbudne May 18, 2022
6654fbb
fix solr query on multiple collections
thepsalmist Nov 7, 2022
e6bc74d
refactor merge solr function
thepsalmist Nov 10, 2022
bc3c626
fix solr response schema
thepsalmist Feb 9, 2023
e8bb5a8
update solr merge function
thepsalmist Mar 7, 2023
cb168c9
fix solr merge buckets duplicat date values
thepsalmist Mar 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/common/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ COPY bin/build_jieba_dict_cache.py /
RUN \
/build_jieba_dict_cache.py && \
rm /build_jieba_dict_cache.py && \
chown mediacloud:mediacloud /var/tmp/jieba.cache && \
ls -l /var/tmp/jieba.cache && \
true

# Symlink Log::Log4perl configuration to where it's going to be found
Expand Down
133 changes: 114 additions & 19 deletions apps/common/src/python/mediawords/solr/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import abc
import time
import json
from typing import Union, Optional
from urllib.parse import urlencode

Expand All @@ -24,6 +25,10 @@
__QUERY_HTTP_TIMEOUT = 15 * 60
"""Timeout of a single HTTP query."""

# Testing alias!!
SOLR_COLLECTION = 'mediacloud2'
MEDIACLOUD_32 = 'mediacloud'
MEDIACLOUD_64 = 'mediacloud64'

class _AbstractSolrRequestException(Exception, metaclass=abc.ABCMeta):
"""Abstract .solr.request exception."""
Expand Down Expand Up @@ -59,7 +64,7 @@ def __wait_for_solr_to_start(config: Optional[CommonConfig]) -> None:
"""Wait for Solr to start and collections to become available, if needed."""

# search for an empty or rare term here because searching for *:* sometimes causes a timeout for some reason
sample_select_url = f"{config.solr_url()}/mediacloud/select?q=BOGUSQUERYTHATRETURNSNOTHINGNADA&rows=1&wt=json"
sample_select_url = f"{config.solr_url()}/{SOLR_COLLECTION}/select?q=BOGUSQUERYTHATRETURNSNOTHINGNADA&rows=1&wt=json"

connected = False

Expand Down Expand Up @@ -152,6 +157,81 @@ def __solr_error_message_from_response(response: Response) -> str:
return error_message


def merge_responses(mc_32_bit_collection: dict,mc_64_bit_collection: dict):
"""
Merge solr responses from each of the collections to one

:param dict1: Response from mediacloud32 collection.
:param dict2: Response from mediacloud64 collection.

"""
new_response = {}

new_response.update(mc_32_bit_collection.get("responseHeader", {}))

mc_32_bit_response = mc_32_bit_collection.get("response", {})
mc_64_bit_response = mc_64_bit_collection.get("response", {})

num_found = mc_32_bit_response.get("numFound", 0) + mc_64_bit_response.get("numFound", 0)
start_index = mc_32_bit_response.get("start", 0) + mc_64_bit_response.get("start", 0)

docs = []

docs.extend(mc_32_bit_response.get("docs", []))
docs.extend(mc_64_bit_response.get("docs", []))

new_response.update({
"response": {
"numFound": num_found,
"start": start_index,
"docs": docs,
}
})

# facets
if "facets" in mc_32_bit_collection or "facets" in mc_64_bit_collection:
mc_32_bit_facets = mc_32_bit_response.get("facets", {})
mc_64_bit_facets = mc_64_bit_response.get("facets", {})

count = mc_32_bit_facets.get("count", 0) + mc_64_bit_facets.get("count", 0)
x = mc_32_bit_facets.get("x", 0) + mc_64_bit_facets.get("x", 0)

categories = {}

if "categories" in mc_32_bit_facets or "categories" in mc_64_bit_facets:
buckets = []
mc_32_buckets = mc_32_bit_facets.get("categories", {}).get("buckets", [])
mc_64_buckets = mc_64_bit_facets.get("categories", {}).get("buckets", [])
merged = {}
for item in mc_32_buckets + mc_64_buckets:
val = item['val']
if val in merged:
merged[val]['count'] += item['count']
merged[val]['x'] += item['x']
else:
merged[val] = item.copy()

merged = list(merged.values())
buckets.extend(merged)
categories.update({"buckets":buckets})

new_response.update({
"facets": {
"count": count,
"categories": categories
}
})
else:
new_response.update({
"facets": {
"count": count,
"x": x
}
})

return new_response


def solr_request(path: str,
params: SolrParams = None,
content: Union[str, SolrParams] = None,
Expand Down Expand Up @@ -191,10 +271,8 @@ def solr_request(path: str,
if not params:
params = {}

abs_uri = furl(f"{solr_url}/mediacloud/{path}")
abs_uri = abs_uri.set(params)
abs_url = str(abs_uri)

collections = [MEDIACLOUD_32, MEDIACLOUD_64]

ua = UserAgent()
ua.set_timeout(__QUERY_HTTP_TIMEOUT)
ua.set_max_size(None)
Expand All @@ -219,21 +297,38 @@ def solr_request(path: str,

content_encoded = content.encode('utf-8', errors='replace')

request = Request(method='POST', url=abs_url)
request.set_header(name='Content-Type', value=content_type)
request.set_header(name='Content-Length', value=str(len(content_encoded)))
request.set_content(content_encoded)

results = []
for collection in collections:
abs_uri = furl(f"{solr_url}/{collection}/{path}")
abs_uri = abs_uri.set(params)
abs_url = str(abs_uri)
request = Request(method='POST', url=abs_url)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can drop the abs_url = str(abs_uri) unless the abs_url is used somewhere else.

 request = Request(method='POST', url=str(abs_url))

request.set_header(name='Content-Type', value=content_type)
Copy link

@esirK esirK Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From this, I think we can combine all the headers while creating the Request something like

headers = {
    'Content-Type': content_type
    'Content-Length': str(len(content_encoded)),
}

request = Request(method='POST', url=abs_url, headers=headers)

request.set_header(name='Content-Length', value=str(len(content_encoded)))
request.set_content(content_encoded)
results.append(request)

else:

request = Request(method='GET', url=abs_url)
log.debug(f"Sending Solr request: {request}")

responses = []
if len(results) > 1:
for r in results:
response = ua.request(r)
if response.is_success():
responses.append(response.decoded_content())
else:
error_message = __solr_error_message_from_response(response=response)
raise McSolrRequestQueryErrorException(f"Error fetching Solr response: {error_message}")

response = merge_responses(json.loads(responses[0]),json.loads(responses[1]))
return json.dumps(response)

log.debug(f"Sending Solr request: {request}")

response = ua.request(request)

if not response.is_success():
error_message = __solr_error_message_from_response(response=response)
raise McSolrRequestQueryErrorException(f"Error fetching Solr response: {error_message}")
else:
response = ua.request(request)
if not response.is_success():
error_message = __solr_error_message_from_response(response=response)
raise McSolrRequestQueryErrorException(f"Error fetching Solr response: {error_message}")

return response.decoded_content()
return response.decoded_content()
10 changes: 10 additions & 0 deletions apps/common/src/python/mediawords/util/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ def env_value(name: str, required: bool = True, allow_empty_string: bool = False

return value

def env_bool(name: str, default: bool = False) -> bool:
"""
Retrieve boolean from environment variable; should be 0 or 1.

:param name: Environment variable name.
:param default: default value, if no value found.
"""

value = os.environ.get(name, default)
return bool(int(value))

def file_with_env_value(name: str, allow_empty_string: bool = False, encoded_with_base64: bool = False) -> str:
"""
Expand Down
4 changes: 4 additions & 0 deletions apps/common/src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ furl==2.1.0
# Chinese language tokenizer, stemmer, etc.
jieba==0.42.1

# For Jinja2 2.11.3, which requests MarkupSafe>=0.23 and is now
# getting version 2.1.1, which removed a deprecated function.
MarkupSafe==2.0.1

# Parsing email templates
Jinja2==2.11.3

Expand Down
5 changes: 4 additions & 1 deletion apps/extract-and-vector/bin/extract_and_vector_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from mediawords.db import connect_to_db
from mediawords.job import JobBroker
from mediawords.util.config import env_bool
from mediawords.util.log import create_logger
from mediawords.util.perl import decode_object_from_bytes_if_needed
from extract_and_vector.dbi.stories.extractor_arguments import PyExtractorArguments
Expand Down Expand Up @@ -69,8 +70,10 @@ def run_extract_and_vector(stories_id: int, use_cache: bool = False, use_existin

log.info("Extracting story {}...".format(stories_id))

no_dedup_sentences = env_bool('MC_NO_DEDUP_SENTENCES', True)
try:
extractor_args = PyExtractorArguments(use_cache=use_cache, use_existing=use_existing)
extractor_args = PyExtractorArguments(use_cache=use_cache, use_existing=use_existing,
no_dedup_sentences=no_dedup_sentences)
extract_and_process_story(db=db, story=story, extractor_args=extractor_args)

except Exception as ex:
Expand Down
6 changes: 4 additions & 2 deletions apps/import-solr-data/src/perl/MediaWords/Solr/Dump.pm
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Readonly my @SOLR_FIELDS => qw/stories_id media_id publish_date publish_day publ
text title language processed_stories_id tags_id_stories timespans_id/;

# how many sentences to fetch at a time from the postgres query
Readonly my $FETCH_BLOCK_SIZE => 100;
Readonly my $FETCH_BLOCK_SIZE => 200;

# default time sleep when there are less than MIN_STORIES_TO_PROCESS:
Readonly my $DEFAULT_THROTTLE => 60;
Expand Down Expand Up @@ -601,6 +601,7 @@ Options:
* throttle -- sleep this number of seconds between each block of stories (default 60)
* full -- shortcut for: update=false, empty_queue=true, throttle=1; assume and optimize for static queue
* skip_logging -- skip logging the import into the solr_import_stories or solr_imports tables (default=false)
* skip_update_snapshot -- skip setting snapshots.searchable=true (default=true)

The import will run in blocks of "max_queued_stories" at a time. The function
will keep trying to find stories to import. If there are less than
Expand All @@ -627,6 +628,7 @@ sub import_data($;$)
my $empty_queue = $options->{ empty_queue } // 0;
my $throttle = $options->{ throttle } // $DEFAULT_THROTTLE;
my $skip_logging = $options->{ skip_logging } // 0;
my $skip_update_snapshot = $options->{ skip_update_snapshot } // 1;
my $daemon = $options->{ daemon } // 0;

$_last_max_queue_stories_id = 0;
Expand Down Expand Up @@ -669,7 +671,7 @@ sub import_data($;$)
_save_import_log( $db, $stories_ids );
}

if ( !$skip_logging )
if ( !$skip_logging && !$skip_update_snapshot )
{
_update_snapshot_solr_status( $db );
}
Expand Down
3 changes: 2 additions & 1 deletion apps/postgresql-pgbouncer/conf/pgbouncer.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[databases]
* = host=postgresql-server port=5432 user=mediacloud
; PhilB 5/6/22: PG server running on postgresql EC2 server w/o docker
* = host=172.30.0.58 port=5432 user=mediacloud

[pgbouncer]

Expand Down
3 changes: 2 additions & 1 deletion apps/postgresql-server/bin/apply_migrations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ MIGRATIONS_DIR="/opt/postgresql-server/pgmigrate/migrations"
TEMP_PORT=12345

# In case the database is in recovery, wait for up to 1 hour for it to complete
PGCTL_START_TIMEOUT=3600
# PLB: increased to three hours
PGCTL_START_TIMEOUT=10800

if [ ! -d "${MIGRATIONS_DIR}" ]; then
echo "Migrations directory ${MIGRATIONS_DIR} does not exist."
Expand Down
13 changes: 13 additions & 0 deletions apps/solr-base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,18 @@ RUN \
RUN mkdir -p /usr/src/
COPY src/solr/ /usr/src/solr/

# Try to create 64-bit enabled mediacloud64 collection by cloning config
# NOTE: collections/mediacloud/conf/solrconfig.xml uses
# ${mediacloud.luceneMatchVersion} ${mediacloud.solr_webapp_dir} ${mediacloud.solr_dist_dir}
# which reference JVM properties set in solr-shard/bin/solr-shard.sh
# ALSO: core.properties has "instanceDir=/var/lib/solr/mediacloud" (dir does not exist?!)
# will be wacked to .../mediacloud64 (also does not exist)
RUN \
mkdir -p /usr/src/solr/collections/mediacloud64 && \
cp -rp /usr/src/solr/collections/mediacloud/* /usr/src/solr/collections/mediacloud64/ && \
sed -i.32 's/mediacloud/mediacloud64/' /usr/src/solr/collections/mediacloud64/core.properties && \
sed -i.32 '/<field name=.*type="int"/s/"int"/"long"/' /usr/src/solr/collections/mediacloud64/conf/schema.xml && \
true

# Add user that Solr will run as
RUN useradd -ms /bin/bash solr
1 change: 1 addition & 0 deletions apps/solr-base/src/solr/aliases.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"collection":{"mediacloud2":"mediacloud64,mediacloud"}}
7 changes: 7 additions & 0 deletions apps/solr-zookeeper/bin/init_solr_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,12 @@ for collection_path in /usr/src/solr/collections/*; do
fi
done

ALIASES=/usr/src/solr/aliases.json
if [ -f $ALIASES ]; then
/opt/solr/server/scripts/cloud-scripts/zkcli.sh \
-zkhost 127.0.0.1:2181 \
-cmd putfile /aliases.json $ALIASES
fi

# Stop after initial configuration
pkill java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ SQL
snapshots_id
FROM timespans AS t
where
topics_id = ? AND
topics_id = ?
$snapshot_clause
$focus_clause
$timespan_clause
Expand Down