forked from jahanxb/flcode
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile_grpc_lib.py
78 lines (59 loc) · 2.33 KB
/
file_grpc_lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
from concurrent import futures
import grpc
import time
import filetrans_pb2 as pb2
import filetrans_pb2_grpc as pb2_grpc
import random,string
# CHUNK_SIZE = 1024 * 1024 # 1MB
CHUNK_SIZE = 2154387
def get_file_chunks(filename):
with open(filename, 'rb') as f:
while True:
piece = f.read(CHUNK_SIZE);
if len(piece) == 0:
return
yield pb2.Chunk(buffer=piece)
def save_chunks_to_file(chunks, filename):
with open(filename, 'wb') as f:
for chunk in chunks:
f.write(chunk.buffer)
class FileClient:
def __init__(self, address):
channel = grpc.insecure_channel("10.10.1.3:9991")
self.stub = pb2_grpc.FileServerStub(channel)
def upload(self, in_file_name):
chunks_generator = get_file_chunks(in_file_name)
response = self.stub.upload(chunks_generator)
assert response.length == os.path.getsize(in_file_name)
def download(self, target_name, out_file_name):
response = self.stub.download(pb2.Request(name=target_name))
save_chunks_to_file(response, out_file_name)
class FileServer(pb2_grpc.FileServerServicer):
def __init__(self):
class Servicer(pb2_grpc.FileServerServicer):
def __init__(self):
self.tmp_file_name = ''
# letters = string.ascii_lowercase
# ''.join(random.choice(letters) for i in range(10))
def upload(self, request_iterator, context):
save_chunks_to_file(request_iterator, self.tmp_file_name)
return pb2.Reply(length=os.path.getsize(self.tmp_file_name))
def download(self, request, context):
if request.name:
return get_file_chunks(self.tmp_file_name)
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
pb2_grpc.add_FileServerServicer_to_server(Servicer(), self.server)
def start(self):
self.tmp_file_name = ''
# self.server.add_insecure_port(f'[::]:{port}')
self.server.add_insecure_port("10.10.1.3:9991")
self.server.start()
try:
while True:
time.sleep(10)
# time.sleep(60*60*24)
except KeyboardInterrupt:
self.server.stop(0)
def stop_me(self):
self.server.stop(0)