-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFiles.py
126 lines (95 loc) · 3.93 KB
/
Files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import zlib
from hashlib import sha1
encoding = "utf-8"
def create_file_tree(tree, prefix):
for name in tree:
path = os.path.join(prefix, name)
if isinstance(tree[name], str):
with open(path, "w") as f:
f.write(tree[name])
else:
if not os.path.exists(path):
os.makedirs(path)
create_file_tree(tree[name], path)
def match_files(path):
if not os.path.exists(path):
return []
elif os.path.isfile(path):
return [path]
elif os.path.isdir(path):
files = []
for child in os.listdir(path):
if child == ".gym":
continue
files += match_files(os.path.join(path, child))
return files
def write_tree(tree):
tree_string = ""
for key in tree:
if isinstance(tree[key], str):
tree_string += f'blob {tree[key]} {key}\n'
else:
subtree = write_tree(tree[key])
tree_string += f'tree {subtree} {key}\n'
return tree_string
def unflatten_tree(flat_tree):
tree = {}
for path in flat_tree:
path_list = path.split("/")
node_value = flat_tree[path]
current_dict = tree
for key in path_list[:-1]:
if key not in current_dict:
current_dict[key] = {}
current_dict = current_dict[key]
current_dict[path_list[-1]] = node_value
return tree
def add_to_filename(filepath, addition):
filename_current = os.path.split(filepath)
temp = filename_current[1].split('.')
temp[-2 if len(temp) > 1 else 0] += addition
temp = '.'.join(temp)
return os.path.join(filename_current[0], temp)
def blobify(data, target_directory):
"""Creates a blob object from data, creates child directory
in target_directory and a file in it, where in writes the data"""
# создаем объектный хэш из данных
obj_hash = sha1(data).hexdigest()
# разбиваем хэш на каталоги и имя файла
directory = obj_hash[:2]
filename = obj_hash[2:]
# создаем каталог, если он не существует
object_directory = os.path.join(target_directory, directory)
os.makedirs(object_directory, exist_ok=True)
# создаем новый файл в объектном каталоге
object_filename = os.path.join(object_directory, filename)
with open(object_filename, 'wb') as f:
# сжимаем данные zlib
compressed_data = zlib.compress(data)
# записываем заголовок объека
header = f'blob {len(data)}\0'.encode(encoding)
# записываем заголовок, данные и сжатый размер
f.write(header)
f.write(compressed_data)
# возвращаем хэш созданного объекта
return obj_hash
def unblobify(obj_hash, target_directory):
"""From a hash of the object gets the names of directory and file
inside target_directory, where it searches for file"""
# разбиваем хэш на каталоги и имя файла
directory = obj_hash[:2]
filename = obj_hash[2:]
# находим путь к файлу объекта
object_filename = os.path.join(target_directory, directory, filename)
# считываем данные из файла объекта
with open(object_filename, 'rb') as f:
header, compressed_data = f.read().split(b"\x00", 1)
header = header.strip().decode(encoding)
# распаковываем сжатые данные
data = zlib.decompress(compressed_data)
# проверяем, что распакованные данные имеют нужный размер
expected_size = int(header.split()[1])
if len(data) != expected_size:
raise ValueError(f'Error: expected {expected_size} bytes, got {len(data)} bytes')
return data