-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.py
256 lines (187 loc) · 8.49 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import re
import time
import os
from typing import List, Any
import pandas as pd
from deep_translator.exceptions import BaseError, RequestError
from pandas import DataFrame
from termcolor import colored
from errors import CannotSplitIntoChunksError, EmptyContentError, MaxChunkSizeExceededError, \
DelimiterAlreadyExistsError, TranslateIOMismatchError, \
DatasetParquetNameError, TranslationError
from multi_thread_handler import mth
def read_integer_from_file(file_path):
try:
with open(file_path, 'r') as file:
content = file.readline().strip()
if content:
value = int(content)
return value
else:
return 0
except FileNotFoundError:
mth.safe_print(f"File not found: {file_path}")
return 0
except ValueError:
mth.safe_print(f"Invalid integer value in file: {file_path}")
return 0
def update_integer_in_file(file_path, new_value):
try:
with open(file_path, 'w') as file:
file.write(str(new_value))
except Exception as e:
mth.safe_print(f"Error updating file: {e}")
def get_current_time():
return int(time.time())
def load_dataset(folder_path: str, start: int = None, end: int = None) -> DataFrame:
all_files = os.listdir(folder_path)
# Filter out only Parquet files
parquet_files = [f for f in all_files if f.endswith('.parquet')]
for file in parquet_files:
match = re.search(r'part\.(\d+)\.parquet', file)
if match is None:
raise DatasetParquetNameError()
sorted_parquet_files = sorted(parquet_files, key=lambda x: int(re.search(r'part\.(\d+)\.parquet', x).group(1)))
dataframes = []
# Read each Parquet file and append to the list
for file in sorted_parquet_files:
file_path = os.path.join(folder_path, file)
df = pd.read_parquet(file_path)
dataframes.append(df)
combined_df = pd.concat(dataframes).reset_index(drop=True)
# Apply slicing if start and/or end are not None
if start is not None and end is not None:
return combined_df.iloc[start:end]
elif start is not None:
return combined_df.iloc[start:]
elif end is not None:
return combined_df.iloc[:end]
else:
return combined_df
default_split_delimiters = ['\n', '.']
def split_text_into_chunks(text, split_delimiters=default_split_delimiters, chunk_size=4000) -> List[str]:
def find_best_delimiter(text_slice):
best_delimiter = None
best_index = -1
for delimiter in split_delimiters:
index = text_slice.rfind(delimiter)
if index > best_index:
best_index = index
best_delimiter = delimiter
return best_delimiter, best_index
chunks = []
while text:
if len(text) <= chunk_size:
chunks.append(text)
break
text_slice = text[:chunk_size]
best_delimiter, split_index = find_best_delimiter(text_slice)
if split_index == -1:
raise CannotSplitIntoChunksError()
index_with_delimiter = split_index + len(best_delimiter)
chunks.append(text[:index_with_delimiter])
text = text[index_with_delimiter:]
return chunks
def connect_back_chunks(chunks: List[str]) -> str:
return ''.join(chunks)
def escape_delimiters(text: str) -> str:
return repr(text)[1:-1]
def translate_by_chunk(translate_fn: callable, text: str, chunk_size=4000, split_delimiters=default_split_delimiters) -> str:
if not text.strip():
return text
if len(text) <= chunk_size:
return translate_fn(text)
chunks = split_text_into_chunks(text, split_delimiters, chunk_size)
translated_chunks = [translate_fn(chunk) for chunk in chunks]
translated_content = connect_back_chunks(translated_chunks)
mth.safe_print(
f"Translated {len(chunks)} chunks after splitting.")
return translated_content
combine_delimiters = ['\n<###>\n']
def combine_text_into_blob(content: List[str], combine_delimiter=combine_delimiters[0], max_chunk_size=4000) -> str:
# Combine text
combined_text = ""
for i, text in enumerate(content):
if i < len(content) - 1:
combined_text += text + combine_delimiter
else:
combined_text += text
if len(combined_text) >= max_chunk_size:
raise MaxChunkSizeExceededError()
return combined_text
def split_blob_into_text(blob: str, combine_delimiter=combine_delimiters[0]) -> List[str]:
return blob.split(combine_delimiter)
def translate_by_blob(translate_fn: callable, content: List[str], max_chunk_size=4000) -> List[str]:
# Check whether delimiter is already exists in the text
for text in content:
if not text.strip():
raise EmptyContentError()
if combine_delimiters[0] in text:
raise DelimiterAlreadyExistsError()
combined_text = combine_text_into_blob(content, combine_delimiters[0], max_chunk_size)
translated_text = translate_fn(combined_text)
split_content = split_blob_into_text(translated_text, combine_delimiters[0])
if len(split_content) != len(content):
raise TranslateIOMismatchError()
return split_content
def translate_by_sdk(translate_sdk_fn: callable, content: List[str], max_chunk_size=30000) -> List[str]:
# Check if the total content length exceeds the maximum chunk size. If it does not, translate the content as a whole
total_content_length = sum([len(text) for text in content])
if total_content_length <= max_chunk_size:
return translate_sdk_fn(content)
translated_content = []
split_delimeters = ['\n', '.', ' ']
for i, text in enumerate(content):
if len(text) >= max_chunk_size:
chunks = split_text_into_chunks(text, split_delimiters=split_delimeters, chunk_size=max_chunk_size)
translated_chunks = []
for chunk in chunks:
translated_chunks.extend(translate_sdk_fn([chunk]))
translated_text = connect_back_chunks(translated_chunks)
translated_content.append(translated_text)
else:
translated_content.extend(translate_sdk_fn([text]))
return translated_content
def choose_translation_method_and_translate(translate_fn: callable, translate_sdk_fn: callable, index: int, content: List[str],
max_chunk_size=4000) -> List[str]:
try:
try:
translated_content = translate_by_blob(translate_fn, content, max_chunk_size)
current_time = get_current_time()
mth.safe_print(f"Translated by blob for index {index}, Time: {current_time}")
return translated_content
except (TranslationError, RequestError) as e:
try:
mth.safe_print(colored(f"Translate by blob for index {index} failed, trying by chunks. Error: {e}", "yellow"))
translated_content = [translate_by_chunk(translate_fn, text, max_chunk_size) for text in content]
current_time = get_current_time()
mth.safe_print(f"Translated by chunk for index {index}, Time: {current_time}")
return translated_content
except (TranslationError, RequestError) as e:
mth.safe_print(colored(f"Translate by chunk for index {index} failed, trying with SDK. Error: {e}", "yellow"))
translated_content = translate_by_sdk(translate_sdk_fn, content)
current_time = get_current_time()
mth.safe_print(f"Translated by SDK for index {index}, Time: {current_time}")
return translated_content
except BaseError as e:
mth.safe_print(colored(f"Deep Translator Error: {e.message} at {index}", 'red'))
raise e
except Exception as e:
mth.safe_print(colored(f"Error: {e} at {index}", 'red'))
raise e
def get_estimated_time(content_len, i, start_time, current_time):
if i == 0:
return 0
elapsed_time = current_time - start_time
avg_time_per_chunk = elapsed_time / i
remaining_chunks = content_len - i
estimated_time = (avg_time_per_chunk * remaining_chunks) / 3600
return round(estimated_time, 2)
def get_speed(content_len, start_time, current_time):
elapsed_time = current_time - start_time
if elapsed_time == 0:
return 0
speed = content_len / elapsed_time
return round(speed, 2)
def get_output_csv_path(folder: str, index: int, suffix: str, ext: str = 'csv') -> str:
return f'{folder}/{index}-{suffix}.{ext}'