-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathWorkbenchConfig.py
346 lines (324 loc) · 14.6 KB
/
WorkbenchConfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
"""Class to encapsulate Workbench configuration definitions.
"""
import logging
from ruamel.yaml import YAML, YAMLError
import os
import sys
import tempfile
from getpass import getpass
from workbench_utils import *
from rich.console import Console
from rich.table import Table
class WorkbenchConfig:
def __init__(self, args):
self.args = args
self.path_check()
self.config = self.get_config()
self.validate()
logging.basicConfig(
filename=self.config["log_file_path"],
level=logging.INFO,
filemode="a",
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
)
# Get fully constructed config dictionary.
def get_config(self):
config = self.get_default_config()
user_mods = self.get_user_config()
# If the password is not set in the config file, or in the environment
# variable, prompt the user for the password.
if "password" not in user_mods:
if "ISLANDORA_WORKBENCH_PASSWORD" in os.environ:
config["password"] = os.environ["ISLANDORA_WORKBENCH_PASSWORD"]
else:
config["password"] = getpass(
f"Password for Drupal user {user_mods['username']}:"
)
# Blend defaults with user mods
for key, value in user_mods.items():
config[key] = value
# Modify some conditional values.
if "temp_dir" not in user_mods.keys():
config["temp_dir"] = tempfile.gettempdir()
if user_mods["task"] in ["add_media", "update", "delete", "export_csv"]:
config["id_field"] = "node_id"
if "task" == "delete_media":
config["id_field"] = "media_id"
if user_mods["task"] == "create_terms":
config["id_field"] = "term_name"
config["allow_adding_terms"] = True
# @todo: These two overrides aren't working. For now, they are set within workbench.update_terms().
if "task" == "update_terms":
config["id_field"] = "term_id"
if "paged_content_page_content_type" not in user_mods:
config["paged_content_page_content_type"] = config["content_type"]
# Add preprocessor, if specified.
if "preprocessors" in user_mods:
config["preprocessors"] = {}
for preprocessor in user_mods["preprocessors"]:
for key, value in preprocessor.items():
config["preprocessors"][key] = value
config["host"] = config["host"].rstrip("/")
config["current_config_file_path"] = os.path.abspath(self.args.config)
config["field_text_format_ids"] = self.get_field_level_text_output_formats()
if "csv_id_to_node_id_map_path" in user_mods:
if user_mods["csv_id_to_node_id_map_path"] is not False:
if os.path.isabs(config["csv_id_to_node_id_map_path"]) is False:
config["csv_id_to_node_id_map_path"] = os.path.join(
config["temp_dir"], "csv_id_to_node_id_map.db"
)
else:
config["csv_id_to_node_id_map_path"] = False
else:
config["csv_id_to_node_id_map_path"] = os.path.join(
config["temp_dir"], "csv_id_to_node_id_map.db"
)
if "page_files_source_dir_field" in user_mods:
config["page_files_source_dir_field"] = user_mods[
"page_files_source_dir_field"
]
else:
config["page_files_source_dir_field"] = config["id_field"]
config["config_file"] = self.args.config
return config
# Get user input as dictionary.
def get_user_config(self):
yaml = YAML()
with open(self.args.config, "r") as stream:
try:
loaded = yaml.load(stream)
except YAMLError as exc:
print(
f"There appears to be a YAML syntax error in your configuration file, {self.args.config}. Remove the username and\npassword, and run the file through https://codebeautify.org/yaml-validator/ or your YAML validator of choice."
)
sys.exit()
# 'media_file_fields' has been replaced with 'media_fields' and 'media_type_file_fields'.
# This is aliasing code that can be removed at some point in the future.
if "media_file_fields" in loaded:
media_fields = self.get_media_fields()
for media_field in loaded["media_file_fields"]:
for media_type, media_field in media_field.items():
media_fields[media_type] = media_field
loaded["media_fields"] = media_fields
loaded["media_type_file_fields"] = media_fields
if os.path.isabs(self.args.config):
loaded["config_file_path"] = self.args.config
else:
loaded["config_file_path"] = os.path.join(os.getcwd(), self.args.config)
return loaded
# Returns standard media fields.
def get_media_fields(self):
return dict(
{
"file": "field_media_file",
"document": "field_media_document",
"image": "field_media_image",
"audio": "field_media_audio_file",
"video": "field_media_video_file",
"extracted_text": "field_media_file",
"fits_technical_metadata": "field_media_file",
"remote_video": "field_media_oembed_video",
}
)
# Returns standard media extensions for given media type.
def get_media_types(self):
return [
{"image": ["png", "gif", "jpg", "jpeg"]},
{"document": ["pdf", "doc", "docx", "ppt", "pptx"]},
{"file": ["tif", "tiff", "jp2", "zip", "tar"]},
{"audio": ["mp3", "wav", "aac"]},
{
"video": [
"mp4",
"mov",
"wmv",
"avi",
"mts",
"flv",
"f4v",
"swf",
"mkv",
"webm",
"ogv",
"mpeg",
]
},
{"extracted_text": ["txt"]},
]
# Returns standard field name for media track files for given media type.
def get_media_track_file_fields(self):
return {"audio": "field_track", "video": "field_track"}
# Gets the field->text output format mapping dict from the optional 'field_text_format_ids'
# config setting. If the setting is absent, returns an empty dict.
def get_field_level_text_output_formats(self):
user_config = self.get_user_config()
field_text_output_map = {}
if "field_text_format_ids" in user_config:
for map_entry in user_config["field_text_format_ids"]:
for fieldname, text_format_id in map_entry.items():
field_text_output_map[fieldname] = text_format_id
return field_text_output_map
# Returns the standard allowed oEmbed provider URLs for a given media type. These
# are used to identify URLs in the 'file' CSV column as being remote media.
def get_oembed_media_types(self):
return [{"remote_video": ["https://www.youtube.com/", "https://youtu.be"]}]
# Returns default configs, to be updated by user-supplied config.
def get_default_config(self):
return {
"input_dir": "input_data",
"input_csv": "metadata.csv",
"media_use_tid": "http://pcdm.org/use#OriginalFile",
# 'drupal_filesystem' is used only in Drupal 8.x - 9.1; after that,
# the filesystem is automatically detected from the media's configuration.
"drupal_filesystem": "fedora://",
"id_field": "id",
"content_type": "islandora_object",
"delimiter": ",",
"subdelimiter": "|",
"log_file_path": "workbench.log",
"log_file_mode": "a",
"allow_missing_files": False,
# See Issue 620. Allows the "--check" function to keep running past errors that can be handled
# well with batch operations, rather than having to fix it in real-time when one error is hit.
"perform_soft_checks": False,
"update_mode": "replace",
"max_node_title_length": 255,
"paged_content_from_directories": False,
"delete_media_with_nodes": True,
"allow_adding_terms": False,
"columns_with_term_names": [],
"protected_vocabularies": [],
"nodes_only": False,
"log_response_time": False,
"adaptive_pause_threshold": 2,
"log_response_time_sample": False,
"log_request_url": False,
"log_json": False,
"log_response_body": False,
"log_response_status_code": False,
"log_headers": False,
"log_term_creation": True,
"log_file_name_and_line_number": False,
"progress_bar": False,
"user_agent": "Islandora Workbench",
"allow_redirects": True,
"secure_ssl_only": True,
"google_sheets_csv_filename": "google_sheet.csv",
"google_sheets_gid": "0",
"excel_worksheet": "Sheet1",
"excel_csv_filename": "excel.csv",
"ignore_csv_columns": list(),
"use_node_title_for_media": False,
"use_nid_in_media_title": False,
"use_node_title_for_media_title": False,
"field_for_remote_filename": False,
"field_for_media_title": False,
"delete_tmp_upload": False,
"input_data_zip_archives": [],
"delete_zip_archive_after_extraction": True,
"list_missing_drupal_fields": False,
"secondary_tasks": None,
"sqlite_db_filename": "workbench_temp_data.db",
"fixity_algorithm": None,
"validate_fixity_during_check": False,
"output_csv_include_input_csv": False,
"timestamp_rollback": False,
"rollback_dir": None,
"rollback_file_include_node_info": False,
"enable_http_cache": True,
"http_cache_storage": "memory",
"http_cache_storage_expire_after": 1200,
"validate_terms_exist": True,
"validate_parent_node_exists": True,
"media_types": self.get_media_types(),
"preprocessors": {},
"check": self.args.check,
"get_csv_template": self.args.get_csv_template,
"paged_content_sequence_separator": "-",
"media_type_file_fields": self.get_media_fields(),
"media_track_file_fields": self.get_media_track_file_fields(),
"media_fields": self.get_media_fields(),
"delete_media_by_node_media_use_tids": [],
"export_csv_term_mode": "tid",
"export_csv_file_path": None,
"export_csv_field_list": [],
"export_file_directory": None,
"export_file_media_use_term_id": "http://pcdm.org/use#OriginalFile",
"standalone_media_url": False,
"require_entity_reference_views": True,
"csv_start_row": 0,
"csv_stop_row": None,
"path_to_python": "python",
"path_to_workbench_script": os.path.join(os.getcwd(), "workbench"),
"oembed_providers": self.get_oembed_media_types(),
"contact_sheet_output_dir": "contact_sheet_output",
"contact_sheet_css_path": os.path.join(
"assets", "contact_sheet", "contact-sheet.css"
),
"page_title_template": "$parent_title, page $weight",
"csv_headers": "names",
"clean_csv_values_skip": [],
"text_format_id": "basic_html",
"ignore_existing_parent_ids": True,
"query_csv_id_to_node_id_map_for_parents": False,
"ignore_duplicate_parent_ids": True,
"redirect_status_code": 301,
"csv_value_templates": [],
"csv_value_templates_for_paged_content": [],
"csv_value_templates_rand_length": 5,
"allow_csv_value_templates_if_field_empty": [],
"remind_user_to_run_check": False,
"media_type_by_media_use": False,
"paged_content_ignore_files": ["Thumbs.db"],
"include_password_in_rollback_config_file": False,
}
# Tests validity and existence of configuration file path.
def path_check(self):
# Check existence of configuration file.
if not os.path.exists(self.args.config):
# Since the main logger gets its log file location from this file, we
# need to define a local logger to write to the default log file location,
# 'workbench.log'.
logging.basicConfig(
filename="workbench.log",
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
)
message = 'Error: Configuration file "' + self.args.config + '" not found.'
logging.error(message)
sys.exit(message)
# Validates config.
def validate(self):
error_messages = []
type_check = issue_request(
self.config,
"GET",
f"{self.config['host']}/entity/entity_form_display/node.{self.config['content_type']}.default?_format=json",
)
if type_check.status_code == 404:
message = f"Content type {self.config['content_type']} does not exist on {self.config['host']}."
error_messages.append(message)
mutators = [
"use_node_title_for_media",
"use_nid_in_media_title",
"field_for_media_title",
]
selected = [mutator for mutator in mutators if self.config[mutator]]
if len(selected) > 1:
message = f"You may only select one of {mutators}.\n - This config has selected {selected}."
error_messages.append(message)
if error_messages:
output = ""
for error_message in error_messages:
output += f"{error_message}\n"
sys.exit("Error: " + output)
# Convenience function for debugging - Prints config to console screen.
def print_config(self):
table = Table(title="Workbench Configuration")
table.add_column("Parameter", justify="left")
table.add_column("Value", justify="left")
for key, value in self.config.items():
table.add_row(key, str(value))
console = Console()
console.print(table)