timemory 3.3.0
Modular C++ Toolkit for Performance Analysis and Logging. Profiling API and Tools for C, C++, CUDA, Fortran, and Python. The C++ template API is essentially a framework to creating tools: it is designed to provide a unifying interface for recording various performance measurements alongside data logging and interfaces to other tools.
mpi_get.hpp
Go to the documentation of this file.
1// MIT License
2//
3// Copyright (c) 2020, The Regents of the University of California,
4// through Lawrence Berkeley National Laboratory (subject to receipt of any
5// required approvals from the U.S. Dept. of Energy). All rights reserved.
6//
7// Permission is hereby granted, free of charge, to any person obtaining a copy
8// of this software and associated documentation files (the "Software"), to deal
9// in the Software without restriction, including without limitation the rights
10// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11// copies of the Software, and to permit persons to whom the Software is
12// furnished to do so, subject to the following conditions:
13//
14// The above copyright notice and this permission notice shall be included in all
15// copies or substantial portions of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23// SOFTWARE.
24
25/**
26 * \file timemory/operations/types/finalize/mpi_get.hpp
27 * \brief Definition for various functions for finalizing MPI data
28 */
29
30#pragma once
31
37
38namespace tim
39{
40namespace operation
41{
42namespace finalize
43{
44//
45//--------------------------------------------------------------------------------------//
46//
47template <typename Type>
48struct mpi_get<Type, true>
49{
50 static constexpr bool value = true;
52 using storage_type = impl::storage<Type, value>;
53 using result_type = typename storage_type::result_array_t;
54 using distrib_type = typename storage_type::dmp_result_t;
55 using result_node = typename storage_type::result_node;
56 using graph_type = typename storage_type::graph_t;
57 using graph_node = typename storage_type::graph_node;
58 using hierarchy_type = typename storage_type::uintvector_t;
60 using metadata_t = typename get_type::metadata;
61 using basic_tree_type = typename get_type::basic_tree_vector_type;
62 using basic_tree_vector_type = std::vector<basic_tree_type>;
63
64 static auto& plus(Type& lhs, const Type& rhs) { return (lhs += rhs); }
65
66 explicit TIMEMORY_COLD mpi_get(storage_type& _storage)
67 : m_storage(&_storage)
68 {}
69
70 TIMEMORY_COLD distrib_type& operator()(distrib_type&);
71 TIMEMORY_COLD basic_tree_vector_type& operator()(basic_tree_vector_type&);
72
73 template <typename Archive>
75 operator()(Archive&);
76
77 // this serializes a type (src) and adds it to dst, if !collapse_processes
78 // then it uses the adder to combine the data
79 TIMEMORY_COLD
80 mpi_get(std::vector<Type>& dst, const Type& src,
81 std::function<Type&(Type& lhs, const Type& rhs)>&& adder = this_type::plus);
82
83private:
84 storage_type* m_storage = nullptr;
85};
86//
87//--------------------------------------------------------------------------------------//
88//
89template <typename Type>
90struct mpi_get<Type, false>
91{
92 static constexpr bool value = false;
93 using storage_type = impl::storage<Type, value>;
94
96
97 template <typename Tp>
98 Tp& operator()(Tp&)
99 {}
100};
101//
102//--------------------------------------------------------------------------------------//
103//
104template <typename Type>
107{
108 if(!m_storage)
109 return results;
110
111 auto& data = *m_storage;
112#if !defined(TIMEMORY_USE_MPI)
113 if(settings::debug())
114 PRINT_HERE("%s", "timemory not using MPI");
115
116 results = distrib_type{};
117 results.emplace_back(std::move(data.get()));
118#else
119 if(settings::debug())
120 PRINT_HERE("%s", "timemory using MPI");
121
122 // not yet implemented
123 // auto comm =
124 // (settings::mpi_output_per_node()) ? mpi::get_node_comm() : mpi::comm_world_v;
125 auto comm = mpi::comm_world_v;
126 mpi::barrier(comm);
127
128 int comm_rank = mpi::rank(comm);
129 int comm_size = mpi::size(comm);
130
131 //------------------------------------------------------------------------------//
132 // Used to convert a result to a serialization
133 //
134 auto send_serialize = [&](const result_type& src) {
135 std::stringstream ss;
136 {
137 auto oa = policy::output_archive<cereal::MinimalJSONOutputArchive,
139 (*oa)(cereal::make_nvp("data", src));
140 }
141 return ss.str();
142 };
143
144 //------------------------------------------------------------------------------//
145 // Used to convert the serialization to a result
146 //
147 auto recv_serialize = [&](const std::string& src) {
148 result_type ret;
149 std::stringstream ss;
150 ss << src;
151 {
152 auto ia =
154 (*ia)(cereal::make_nvp("data", ret));
155 if(settings::debug())
156 {
157 printf("[RECV: %i]> data size: %lli\n", comm_rank,
158 (long long int) ret.size());
159 }
160 }
161 return ret;
162 };
163
164 //------------------------------------------------------------------------------//
165 // Calculate the total number of measurement records
166 //
167 auto get_num_records = [&](const auto& _inp) {
168 int _sz = 0;
169 for(const auto& itr : _inp)
170 _sz += itr.size();
171 return _sz;
172 };
173
174 results = distrib_type(comm_size);
175
176 auto ret = data.get();
177 auto str_ret = send_serialize(ret);
178
179 if(comm_rank == 0)
180 {
181 //
182 // The root rank receives data from all non-root ranks and reports all data
183 //
184 for(int i = 1; i < comm_size; ++i)
185 {
186 std::string str;
187 if(settings::debug())
188 printf("[RECV: %i]> starting %i\n", comm_rank, i);
189 mpi::recv(str, i, 0, comm);
190 if(settings::debug())
191 printf("[RECV: %i]> completed %i\n", comm_rank, i);
192 results[i] = recv_serialize(str);
193 }
194 results[comm_rank] = std::move(ret);
195 }
196 else
197 {
198 //
199 // The non-root rank sends its data to the root rank and only reports own data
200 //
201 if(settings::debug())
202 printf("[SEND: %i]> starting\n", comm_rank);
203 mpi::send(str_ret, 0, 0, comm);
204 if(settings::debug())
205 printf("[SEND: %i]> completed\n", comm_rank);
206 results = distrib_type{};
207 results.emplace_back(std::move(ret));
208 }
209
210 // collapse into a single result
211 if(comm_rank == 0 && settings::collapse_processes() && settings::node_count() <= 1)
212 {
213 auto init_size = get_num_records(results);
215 {
216 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks",
217 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
218 comm_rank, init_size, comm_size);
219 }
220
221 auto _collapsed = distrib_type{};
222 // so we can pop off back
223 std::reverse(results.begin(), results.end());
224 while(!results.empty())
225 {
226 if(_collapsed.empty())
227 {
228 _collapsed.emplace_back(std::move(results.back()));
229 }
230 else
231 {
233 results.back());
234 }
235 results.pop_back();
236 }
237
238 // assign results to collapsed entry
239 results = std::move(_collapsed);
240
242 {
243 auto fini_size = get_num_records(results);
244 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
245 "from %i ranks",
246 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
247 comm_rank, init_size, fini_size, comm_size);
248 }
249 }
250 else if(comm_rank == 0 && settings::collapse_processes() &&
252 {
253 // calculate some size parameters
254 int32_t nmod = comm_size % settings::node_count();
255 int32_t bsize = comm_size / settings::node_count() + ((nmod == 0) ? 0 : 1);
256 int32_t bins = comm_size / bsize;
257
259 {
260 PRINT_HERE("[%s][pid=%i][rank=%i]> node_count = %i, comm_size = %i, bins = "
261 "%i, bin size = %i",
262 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
263 comm_rank, settings::node_count(), comm_size, bins, bsize);
264 }
265
266 // generate a map of the ranks to the node ids
267 int32_t ncnt = 0; // current count
268 int32_t midx = 0; // current bin map index
269 std::map<int32_t, std::set<int32_t>> binmap;
270 for(int32_t i = 0; i < comm_size; ++i)
271 {
272 if(settings::debug())
273 {
274 PRINT_HERE("[%s][pid=%i][rank=%i]> adding rank %i to bin %i",
275 demangle<mpi_get<Type, true>>().c_str(),
276 (int) process::get_id(), comm_rank, i, midx);
277 }
278
279 binmap[midx].insert(i);
280 // check to see if we reached the bin size
281 if(++ncnt == bsize)
282 {
283 // set counter to zero and advance the node
284 ncnt = 0;
285 ++midx;
286 }
287 }
288
289 auto init_size = get_num_records(results);
291 {
292 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks into "
293 "%i bins",
294 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
295 comm_rank, init_size, comm_size, (int) binmap.size());
296 }
297
298 assert((int32_t) binmap.size() <= (int32_t) settings::node_count());
299
300 // the collapsed data
301 auto _collapsed = distrib_type(binmap.size());
302 // loop over the node indexes
303 for(const auto& itr : binmap)
304 {
305 // target the node index
306 auto& _dst = _collapsed.at(itr.first);
307 for(const auto& bitr : itr.second)
308 {
309 // combine the node index entry with all of the ranks in that node
310 auto& _src = results.at(bitr);
312 }
313 }
314
315 // assign results to collapsed entry
316 results = std::move(_collapsed);
317
319 {
320 auto fini_size = get_num_records(results);
321 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
322 "and %i bins",
323 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
324 comm_rank, init_size, fini_size, (int) results.size());
325 }
326 }
327
329 {
330 auto ret_size = get_num_records(results);
331 PRINT_HERE("[%s][pid=%i]> %i total records on rank %i of %i",
332 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
333 ret_size, comm_rank, comm_size);
334 }
335
336#endif
337
338 return results;
339}
340//
341//--------------------------------------------------------------------------------------//
342//
343template <typename Type>
346{
347 if(!m_storage)
348 return bt;
349
350 auto& data = *m_storage;
351
352 using serialization_t = serialization<Type>;
353 using mpi_data_t = typename serialization_t::mpi_data;
354
355 basic_tree_type _entry{};
356 bt = serialization_t{}(mpi_data_t{}, mpi::comm_world_v, data.get(_entry));
357 return bt;
358}
359//
360//--------------------------------------------------------------------------------------//
361//
362template <typename Type>
363template <typename Archive>
366{
367 if(!m_storage)
368 return ar;
369
370 if(!mpi::is_initialized())
371 {
372 get_type{ m_storage }(ar);
373 }
374 else
375 {
376 auto bt = basic_tree_vector_type{};
377 (*this)(bt);
378 serialization<Type>{}(ar, bt);
379 }
380 return ar;
381}
382//
383//--------------------------------------------------------------------------------------//
384//
385template <typename Type>
386mpi_get<Type, true>::mpi_get(std::vector<Type>& dst, const Type& inp,
387 std::function<Type&(Type& lhs, const Type& rhs)>&& functor)
388{
389#if !defined(TIMEMORY_USE_MPI)
390 if(settings::debug())
391 PRINT_HERE("%s", "timemory not using MPI");
392 consume_parameters(dst, inp, functor);
393#else
394 CONDITIONAL_PRINT_HERE(settings::debug(), "%s", "timemory using MPI");
395
396 auto comm = mpi::comm_world_v;
397 mpi::barrier(comm);
398
399 int comm_rank = mpi::rank(comm);
400 int comm_size = mpi::size(comm);
401
402 CONDITIONAL_PRINT_HERE(settings::debug(), "timemory using MPI [rank: %i, size: %i]",
403 comm_rank, comm_size);
404
405 //------------------------------------------------------------------------------//
406 // Used to convert a result to a serialization
407 //
408 auto send_serialize = [&](const Type& src) {
409 std::stringstream ss;
410 {
411 auto oa = policy::output_archive<cereal::MinimalJSONOutputArchive,
413 (*oa)(cereal::make_nvp("data", src));
414 }
415 CONDITIONAL_PRINT_HERE(settings::debug(), "sent data [rank: %i] :: %lu",
416 comm_rank, ss.str().length());
417 return ss.str();
418 };
419
420 //------------------------------------------------------------------------------//
421 // Used to convert the serialization to a result
422 //
423 auto recv_serialize = [&](const std::string& src) {
424 CONDITIONAL_PRINT_HERE(settings::debug(), "recv data [rank: %i] :: %lu",
425 comm_rank, src.length());
426 Type ret;
427 std::stringstream ss;
428 ss << src;
429 {
430 auto ia =
432 (*ia)(cereal::make_nvp("data", ret));
433 }
434 return ret;
435 };
436
437 //------------------------------------------------------------------------------//
438 // Calculate the total number of measurement records
439 //
440 auto get_num_records = [&](const auto& _inp) { return _inp.size(); };
441
442 dst.resize(comm_size);
443 auto str_ret = send_serialize(inp);
444
445 mpi::barrier(comm);
446
447 if(comm_rank == 0)
448 {
449 //
450 // The root rank receives data from all non-root ranks and reports all data
451 //
452 for(int i = 1; i < comm_size; ++i)
453 {
454 std::string str;
455 CONDITIONAL_PRINT_HERE(settings::debug(), "[RECV: %i]> starting %i",
456 comm_rank, i);
457 mpi::recv(str, i, 0, comm);
458 CONDITIONAL_PRINT_HERE(settings::debug(), "[RECV: %i]> completed %i",
459 comm_rank, i);
460 dst.at(i) = recv_serialize(str);
461 }
462 dst.at(0) = inp;
463 }
464 else
465 {
466 //
467 // The non-root rank sends its data to the root rank
468 //
469 CONDITIONAL_PRINT_HERE(settings::debug(), "[SEND: %i]> starting", comm_rank);
470 mpi::send(str_ret, 0, 0, comm);
471 CONDITIONAL_PRINT_HERE(settings::debug(), "[SEND: %i]> completed", comm_rank);
472 dst.clear();
473 }
474
475 mpi::barrier(comm);
476
477 // collapse into a single result
478 if(settings::collapse_processes() && comm_rank == 0)
479 {
480 auto init_size = get_num_records(dst);
483 "[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks",
484 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
485 (int) comm_rank, (int) init_size, (int) comm_size);
486
487 auto _dst = std::vector<Type>{};
488 for(auto& itr : dst)
489 {
490 if(_dst.empty())
491 {
492 _dst.emplace_back(std::move(itr));
493 }
494 else
495 {
496 _dst.front() = functor(_dst.front(), itr);
497 }
498 }
499
500 // assign dst to collapsed entry
501 dst = _dst;
502
505 "[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
506 "from %i ranks",
507 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
508 (int) comm_rank, (int) init_size, (int) get_num_records(dst),
509 (int) comm_size);
510 }
511 else if(settings::node_count() > 0 && comm_rank == 0)
512 {
513 // calculate some size parameters
514 int32_t nmod = comm_size % settings::node_count();
515 int32_t bsize = comm_size / settings::node_count() + ((nmod == 0) ? 0 : 1);
516 int32_t bins = comm_size / bsize;
517
520 "[%s][pid=%i][rank=%i]> node_count = %i, comm_size = %i, bins = "
521 "%i, bin size = %i",
522 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(), comm_rank,
523 settings::node_count(), comm_size, bins, bsize);
524
525 // generate a map of the ranks to the node ids
526 int32_t ncnt = 0; // current count
527 int32_t midx = 0; // current bin map index
528 std::map<int32_t, std::set<int32_t>> binmap;
529 for(int32_t i = 0; i < comm_size; ++i)
530 {
532 "[%s][pid=%i][rank=%i]> adding rank %i to bin %i",
533 demangle<mpi_get<Type, true>>().c_str(),
534 (int) process::get_id(), comm_rank, i, midx);
535
536 binmap[midx].insert(i);
537 // check to see if we reached the bin size
538 if(++ncnt == bsize)
539 {
540 // set counter to zero and advance the node
541 ncnt = 0;
542 ++midx;
543 }
544 }
545
546 auto init_size = get_num_records(dst);
549 "[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks into %i bins",
550 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
551 (int) comm_rank, (int) init_size, (int) comm_size, (int) binmap.size());
552
553 assert((int32_t) binmap.size() <= (int32_t) settings::node_count());
554
555 // the collapsed data
556 auto _dst = std::vector<Type>(binmap.size());
557 // loop over the node indexes
558 for(const auto& itr : binmap)
559 {
560 // target the node index
561 auto& _targ = _dst.at(itr.first);
562 for(const auto& bitr : itr.second)
563 {
564 // combine the node index entry with all of the ranks in that node
565 auto& _src = dst.at(bitr);
566 _targ = functor(_targ, _src);
567 }
568 }
569
570 // assign dst to collapsed entry
571 dst = _dst;
572
575 "[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
576 "and %i bins",
577 demangle<mpi_get<Type, true>>().c_str(), (int) process::get_id(),
578 (int) comm_rank, (int) init_size, (int) get_num_records(dst),
579 (int) dst.size());
580 }
581
583 "[%s][pid=%i]> %i total records on rank %i of %i",
584 demangle<mpi_get<Type, true>>().c_str(),
585 (int) process::get_id(), (int) get_num_records(dst),
586 (int) comm_rank, (int) comm_size);
587
588#endif
589}
590//
591//--------------------------------------------------------------------------------------//
592//
593} // namespace finalize
594} // namespace operation
595} // namespace tim
return false
Definition: definition.hpp:326
Tp & plus(Tp &, const Up &)
Definition: plus.hpp:106
Definition: kokkosp.cpp:39
node_count
Definition: settings.cpp:1780
typename std::enable_if< B, T >::type enable_if_t
Alias template for enable_if.
Definition: types.hpp:190
std::string demangle(const char *_mangled_name, int *_status=nullptr)
Definition: demangle.hpp:47
tim::mpl::apply< std::string > string
Definition: macros.hpp:53
collapse_processes
Definition: settings.cpp:1639
void finalize()
Definition: types.hpp:119
auto get(const auto_bundle< Tag, Types... > &_obj)
void consume_parameters(ArgsT &&...)
Definition: types.hpp:285
The declaration for the types for operations without definitions.
Include the macros for operations.
Declare the operations types.
impl::storage< Type, value > storage_type
Definition: mpi_get.hpp:93
typename get_type::metadata metadata_t
Definition: mpi_get.hpp:60
typename storage_type::uintvector_t hierarchy_type
Definition: mpi_get.hpp:58
typename storage_type::graph_node graph_node
Definition: mpi_get.hpp:57
static auto & plus(Type &lhs, const Type &rhs)
Definition: mpi_get.hpp:64
impl::storage< Type, value > storage_type
Definition: mpi_get.hpp:52
typename storage_type::graph_t graph_type
Definition: mpi_get.hpp:56
typename storage_type::result_array_t result_type
Definition: mpi_get.hpp:53
typename get_type::basic_tree_vector_type basic_tree_type
Definition: mpi_get.hpp:61
typename storage_type::result_node result_node
Definition: mpi_get.hpp:55
std::vector< basic_tree_type > basic_tree_vector_type
Definition: mpi_get.hpp:62
typename storage_type::dmp_result_t distrib_type
Definition: mpi_get.hpp:54
static pointer get(std::istream &is)
Definition: policy.hpp:96
Provides a static get() function which return a shared pointer to an instance of the given archive fo...
Definition: policy.hpp:136
#define CONDITIONAL_PRINT_HERE(CONDITION,...)
Definition: macros.hpp:183
#define PRINT_HERE(...)
Definition: macros.hpp:152