diff --git a/anyway/parsers/cbs/executor.py b/anyway/parsers/cbs/executor.py index df5040e90..d5fc2d095 100644 --- a/anyway/parsers/cbs/executor.py +++ b/anyway/parsers/cbs/executor.py @@ -11,7 +11,7 @@ import math import pandas as pd from sqlalchemy import or_, event -from typing import Tuple, Dict, List, Any +from typing import Dict, List from anyway.parsers.cbs import preprocessing_cbs_files from anyway import field_names, localization @@ -76,7 +76,6 @@ LocationAccuracy, ProviderCode, VehicleDamage, - Streets, AccidentMarkerView, InvolvedView, InvolvedMarkerView, @@ -716,9 +715,7 @@ def get_files(directory): return output_files_dict -def import_to_datastore( - directory, provider_code, year, batch_size -) -> Tuple[int, Dict[int, List[dict]]]: +def import_to_datastore(directory, provider_code, year, batch_size) -> int: """ goes through all the files in a given directory, parses and commits them Returns number of new items, and new streets dict. @@ -744,100 +741,14 @@ def import_to_datastore( new_items += vehicles_count logging.debug("\t{0} items in {1}".format(new_items, time_delta(started))) - return new_items, files_from_cbs[STREETS] + return new_items except ValueError as e: failed_dirs[directory] = str(e) if "Not found" in str(e): - return 0, {} + return 0 raise e -def import_streets_into_db(): - items = [] - max_name_len = 0 - for k, street_hebrew in yishuv_street_dict.items(): - yishuv_symbol, street = k - yishuv_name_street_num = yishuv_name_dict.get((yishuv_symbol, street_hebrew), None) - if yishuv_name_street_num is None or yishuv_name_street_num != street: - logging.error( - f"streets data mismatch:" - f"yishuv_street_dict entry: {k}->{street_hebrew}" - f",yishuv_name_dict entry: {(yishuv_symbol, street_hebrew)}->{yishuv_name_street_num}" - ) - continue - name_len = len(street_hebrew) - if name_len > max_name_len: - max_name_len = name_len - street_entry = { - "yishuv_symbol": yishuv_symbol, - "street": street, - "street_hebrew": street_hebrew[: min(name_len, Streets.MAX_NAME_LEN)], - } - items.append(street_entry) - logging.debug( - f"Writing to db: {len(yishuv_street_dict)}:{len(yishuv_name_dict)} -> {len(items)} rows" - ) - db.session.query(Streets).delete() - db.session.bulk_insert_mappings(Streets, items) - db.session.commit() - if max_name_len > Streets.MAX_NAME_LEN: - logging.error( - f"Importing streets table: Street hebrew name length exceeded: max name: {max_name_len}" - ) - else: - logging.debug(f"Max street name len:{max_name_len}") - logging.debug(f"Done. {len(yishuv_street_dict)}:{len(yishuv_name_dict)}") - - -yishuv_street_dict: Dict[Tuple[int, int], str] = {} -yishuv_name_dict: Dict[Tuple[int, str], int] = {} - - -def load_existing_streets(): - streets = db.session.query(Streets).all() - for s in streets: - s_dict = { - "yishuv_symbol": s.yishuv_symbol, - "street": s.street, - "street_hebrew": s.street_hebrew, - } - add_street_remove_name_duplicates(s_dict) - add_street_remove_num_duplicates(s_dict) - logging.debug(f"Loaded streets: {len(yishuv_street_dict)}:{len(yishuv_name_dict)}") - - -def add_to_streets(streets_map: Dict[int, List[dict]]): - for yishuv_symbol, streets_list in streets_map.items(): - for street in streets_list: - my_street = { - "yishuv_symbol": yishuv_symbol, - "street": street[field_names.street_sign], - "street_hebrew": street[field_names.street_name], - } - add_street_remove_name_duplicates(my_street) - add_street_remove_num_duplicates(my_street) - - -def add_street_remove_num_duplicates(street: Dict[str, Any]): - k = (street["yishuv_symbol"], street["street"]) - v = yishuv_street_dict.get(k, None) - if v is not None and v != street["street_hebrew"]: - logging.error(f"Duplicate street code: {k}-> {v} and {street['street_hebrew']}") - yishuv_street_dict[k] = street["street_hebrew"] - if v is None: - yishuv_street_dict[k] = street["street_hebrew"] - - -def add_street_remove_name_duplicates(street: Dict[str, Any]): - k = (street["yishuv_symbol"], street["street_hebrew"]) - v = yishuv_name_dict.get(k, None) - if v is not None and v != street["street"]: - logging.error(f"Duplicate street name: {k}-> {v} and {street['street']}") - yishuv_name_dict[k] = street["street"] - if v is None: - yishuv_name_dict[k] = street["street"] - - def delete_invalid_entries(batch_size): """ deletes all markers in the database with null latitude or longitude @@ -1112,7 +1023,7 @@ def recreate_table_for_location_extraction(): db.session.execute("""TRUNCATE cbs_locations""") db.session.execute("""INSERT INTO cbs_locations (SELECT ROW_NUMBER() OVER (ORDER BY road1) as id, LOCATIONS.* - FROM + FROM (SELECT DISTINCT road1, road2, non_urban_intersection_hebrew, @@ -1135,7 +1046,6 @@ def recreate_table_for_location_extraction(): def main(batch_size, source, load_start_year=None): try: - load_existing_streets() total = 0 started = datetime.now() if source == "s3": @@ -1161,11 +1071,10 @@ def main(batch_size, source, load_start_year=None): ) logging.debug("Importing Directory " + cbs_files_dir) preprocessing_cbs_files.update_cbs_files_names(cbs_files_dir) - num_new, streets = import_to_datastore( + num_new = import_to_datastore( cbs_files_dir, provider_code, year, batch_size ) total += num_new - add_to_streets(streets) shutil.rmtree(s3_data_retriever.local_temp_directory) elif source == "local_dir_for_tests_only": path = "static/data/cbs" @@ -1184,13 +1093,10 @@ def main(batch_size, source, load_start_year=None): ) provider_code = get_provider_code(parent_directory) logging.debug("Importing Directory " + directory) - num_new, streets = import_to_datastore( + num_new = import_to_datastore( directory, provider_code, int(year), batch_size ) total += num_new - add_to_streets(streets) - - import_streets_into_db() fill_db_geo_data() failed = [ diff --git a/tests/parsers/cbs/test_executor.py b/tests/parsers/cbs/test_executor.py index bd096e0d6..5938ebf9e 100644 --- a/tests/parsers/cbs/test_executor.py +++ b/tests/parsers/cbs/test_executor.py @@ -15,10 +15,8 @@ def mock_shutil(monkeypatch): def test_import_streets_is_called_once_when_source_is_s3(monkeypatch, mock_s3_data_retriever, mock_shutil): # Arrange - import_streets_mock = MagicMock() - monkeypatch.setattr('anyway.parsers.cbs.executor.import_streets_into_db', import_streets_mock) - monkeypatch.setattr('anyway.parsers.cbs.executor.load_existing_streets', MagicMock()) - monkeypatch.setattr('anyway.parsers.cbs.executor.delete_cbs_entries', MagicMock()) + delete_cbs_entries = MagicMock() + monkeypatch.setattr('anyway.parsers.cbs.executor.delete_cbs_entries', delete_cbs_entries) monkeypatch.setattr('anyway.parsers.cbs.executor.fill_db_geo_data', MagicMock()) monkeypatch.setattr('anyway.parsers.cbs.executor.create_tables', MagicMock()) @@ -26,11 +24,11 @@ def test_import_streets_is_called_once_when_source_is_s3(monkeypatch, mock_s3_da main(batch_size=MagicMock(), source='s3') # Assert - import_streets_mock.assert_called_once() + delete_cbs_entries.assert_called_once() def test_cbs_parsing_failed_is_raised_when_something_bad_happens(monkeypatch): - monkeypatch.setattr('anyway.parsers.cbs.executor.load_existing_streets', + monkeypatch.setattr('anyway.parsers.cbs.executor.create_tables', MagicMock(side_effect=Exception('something bad'))) with pytest.raises(CBSParsingFailed, match='Exception occurred while loading the cbs data: something bad'):