-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparfu_main_0_6_test.cc
284 lines (230 loc) · 9.5 KB
/
parfu_main_0_6_test.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
////////////////////////////////////////////////////////////////////////////////
//
// University of Illinois/NCSA Open Source License
// http://otm.illinois.edu/disclose-protect/illinois-open-source-license
//
// Parfu is copyright (c) 2017-2022,
// by The Trustees of the University of Illinois.
// All rights reserved.
//
// Parfu was developed by:
// The University of Illinois
// The National Center For Supercomputing Applications (NCSA)
// Blue Waters Science and Engineering Applications Support Team (SEAS)
// Craig P Steffen <[email protected]>
// Roland Haas <[email protected]>
//
// https://github.com/ncsa/parfu_archive_tool
// http://www.ncsa.illinois.edu/People/csteffen/parfu/
//
// For full licnse text see the LICENSE file provided with the source
// distribution.
//
////////////////////////////////////////////////////////////////////////////////
#include "parfu_main.hh"
//#define DEFAULT_BUCKET_SIZE (200000000)
// must be a multiple of 512??
#define DEFAULT_BUCKET_SIZE (4992000)
int main(int argc, char *argv[]){
char run_mode='C';
Parfu_directory *my_target_directory;
// string base_path;
Parfu_target_collection *my_target_collec;
vector <string> *transfer_orders=nullptr;
vector <string> *target_paths=nullptr;
// Parfu_rank_order_set *my_orders=nullptr;
int my_rank,total_ranks;
string initial_order;
int mpi_return_val;
// int *length_buffer=nullptr;
unsigned max_orders_per_bucket;
long unsigned bucket_size=DEFAULT_BUCKET_SIZE;
string *archive_file_name_from_command_line;
int *archive_file_multiplier;
string archive_file_name;
if(argc<2){
// no arguments
parfu_usage();
exit(-42);
}
MPI_File *file_handle=nullptr;
// MPI_Info file_info;
// char *word_buffer=nullptr;
// void *order_buffer=nullptr;
// length_buffer = new int;
// Do argument parsing and "exit with the usage() or help() message stuff
// above this point, so that if run in a scalar context (outside of
// an MPI launch infrastructure) all that stuff gets done properly
// and the code exist cleanly. Users appreciate that.
// MPI_Init(NULL,NULL);
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&total_ranks);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
cout << "I am rank " << my_rank << " out of a total of " << total_ranks << "\n";
if(my_rank == 0){
archive_file_name_from_command_line = new string;
archive_file_multiplier = new int;
*archive_file_multiplier = 1;
// parse command line
// cerr << "checking: max orders per bucket initialized to: ";
// cerr << max_orders_per_bucket << "\n";
if((target_paths =
parfu_parse_args(argc,argv,&bucket_size,&max_orders_per_bucket,
archive_file_name_from_command_line,
archive_file_multiplier))
== nullptr){
cerr << "Error from command line parsing! Exiting.\n";
parfu_broadcast_order(string("X"),string("abort"));
exit(1);
}
// cerr << "checking: max orders per bucket after argument parsing: ";
// cerr << max_orders_per_bucket << "\n";
archive_file_name = *archive_file_name_from_command_line;
if(archive_file_name.size()<1){
cerr << "No archive file specified. Aborting.\n";
exit(5);
}
cerr << "We got " << target_paths->size() << " target paths from command line.\n";
for(unsigned i=0;i<target_paths->size();i++){
cerr << "path " << i << " :" << target_paths->at(i) << "\n";
}
if(run_mode == 'C' && (target_paths->size() < 1)){
cerr << "We are in \"create\" mode and have no targets to archive. Aborting.\n";
exit(3);
}
// this is for the testing stage, where we assume we have one and only one target
if(run_mode == 'C' && (target_paths->size() > 1)){
cerr << "Parfu test-mode create only allows one target. Aborting.\n";
exit(4);
}
// TODO change for multiple targets
my_target_directory = new Parfu_directory(target_paths->front());
// cout << "parfu test build\n";
// if(argc > 1){
// cout << "We will scan directory:";
// cout << argv[1];
// cout << "\n";
// }
// else{
// cout << "You must input a directory to scan!\n";
// return 1;
// }
// if(argc>2){
// archive_file_name = string(argv[2]);
// cout << "archive file: " << archive_file_name << "\n";
// }
// else{
// cerr << "ERROR! No archive file specified! \n";
// exit(1);
// }
// base_path = string(argv[1]);
// cout << "Have we spidered directory? " << my_target_directory->is_directory_spidered() << "\n";
my_target_directory->spider_directory();
// cout << "Have we spidered directory? " << my_target_directory->is_directory_spidered() << "\n";
// cout << "First build the target collection\n";
my_target_collec = new Parfu_target_collection(my_target_directory);
cout << "Target collection built. ";
// cout << "Now dump it, unsorted.\n";
// my_target_collec->dump();
cout << "now sort the files...\n";
my_target_collec->order_files();
// cout << "and dump it again.\n";
// my_target_collec->dump();
cout << "set offsets.\n";
my_target_collec->set_offsets();
// cout << "dump offsets\n";
// my_target_collec->dump_offsets();
cout << "generate rank orders\n";
transfer_orders = my_target_collec->create_transfer_orders(0,bucket_size,max_orders_per_bucket);
cout << "there are " << transfer_orders->size() << " orders.\n";
// cout << "\n\n\nFirst order:\n\n";
// cout << transfer_orders->front();
// cout << "\n\n end first order.\n\n";
// cout << "transfer orders:\n\n";
// for(unsigned int i=0;i<transfer_orders->size();i++){
// cout << "transfer order " << i << "\n";
// cout << transfer_orders->at(i);
// cout << "\n";
//}
// send initial broadcast orders
/*
initial_order = string("");
initial_order.append("A");
initial_order.append(archive_file_name);
*length_buffer = initial_order.size()+1;
// mpi_return_val = MPI_Send(length_buffer,1,MPI_INT,MPI_Bcast,0,MPI_COMM_WORLD);
cout << "about to send initial order length\n";
mpi_return_val = MPI_Bcast(length_buffer,1,MPI_INT,0,MPI_COMM_WORLD);
cout << "Sent order length. Now send order itself.\n";
mpi_return_val =
MPI_Bcast(((void *)(initial_order.data())),initial_order.size()+1,MPI_CHAR,0,MPI_COMM_WORLD);
cout << "set up buffer for collective open\n";
word_buffer = (char*)malloc(archive_file_name.size()+1);
memcpy(word_buffer,base_path.c_str(),base_path.size()+1);
file_handle = (MPI_File*)malloc(sizeof(MPI_File));
*/
string bucket_size_string = to_string(bucket_size);
parfu_broadcast_order(string("U"),
bucket_size_string);
cout << "Now we try collective file open.\n";
parfu_broadcast_order(string("A"),
archive_file_name);
// mpi_return_val =
// MPI_File_open(MPI_COMM_WORLD,word_buffer,
// MPI_MODE_WRONLY|MPI_MODE_CREATE,
// MPI_INFO_NULL,file_handle);
file_handle = new MPI_File;
// MPI_Barrier(MPI_COMM_WORLD);
if((mpi_return_val =
MPI_File_open(MPI_COMM_WORLD,archive_file_name.c_str(),
MPI_MODE_WRONLY|MPI_MODE_CREATE|MPI_MODE_EXCL,
MPI_INFO_NULL,file_handle)) != MPI_SUCCESS){
cerr << "\n\nmain MPI_File_open returned " << mpi_return_val << "!\n";
cerr << "WARNING! Attempted to open file:>" << archive_file_name << "<\n";
cerr << "but it failed. This is likely because the file already exists,\n";
cerr << "or the directory doesn't exist or you don't have permission\n";
cerr << "to write there. Exiting program.\n";
parfu_broadcast_order(string("X"),
string("bye"));
MPI_Finalize();
exit(1);
}
cout << "Successfully opened archive file >" << archive_file_name << "< for writing.\n";
// Now send out a set of orders.
cout << "We have " << transfer_orders->size();
cout << " transfer orders available; return val=" << mpi_return_val <<".\n";
parfu_broadcast_order(string("N"),
string("individual"));
// we're now in individual mode.
// all commands must be sent to each rank individually including the shutdown
// until we've either sent each of them a shutdown, or sent each of them a command
// to go back to broadcast mode.
for(int i=1; i<total_ranks; i++){
parfu_send_order_to_rank(i,0,string("P"),target_paths->front());
}
/*
for(int i=1; i<total_ranks; i++){
parfu_send_order_to_rank(i,0,string("C"),transfer_orders->at(i-1));
}
*/
cout << "About to call push_out_all_orders\n";
push_out_all_orders(transfer_orders,total_ranks);
cout << "push_out_all_orders has returned.\n";
// cerr << "\ndump order zero: \n\n";
// cerr << transfer_orders->at(0);
// cerr << "\n\nend order zero\n\n";
for(int i=1; i<total_ranks; i++){
parfu_send_order_to_rank(i,0,string("X"),string("shutdown"));
}
cerr << "sent shutdown orders; now we're done.\n";
// This is the shutdown, but only if we're in broadcast mode.
// parfu_broadcast_order(string("X"),
// string("bye"));
} // if(my_rank == 0)
else{
parfu_worker_node(my_rank,total_ranks);
}
cout << "rank " << my_rank << " done, about to call MPI_Finalize()\n";
MPI_Finalize();
return 0;
}