timemory 3.3.0
Modular C++ Toolkit for Performance Analysis and Logging. Profiling API and Tools for C, C++, CUDA, Fortran, and Python. The C++ template API is essentially a framework to creating tools: it is designed to provide a unifying interface for recording various performance measurements alongside data logging and interfaces to other tools.
upc_get.hpp
Go to the documentation of this file.
1// MIT License
2//
3// Copyright (c) 2020, The Regents of the University of California,
4// through Lawrence Berkeley National Laboratory (subject to receipt of any
5// required approvals from the U.S. Dept. of Energy). All rights reserved.
6//
7// Permission is hereby granted, free of charge, to any person obtaining a copy
8// of this software and associated documentation files (the "Software"), to deal
9// in the Software without restriction, including without limitation the rights
10// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11// copies of the Software, and to permit persons to whom the Software is
12// furnished to do so, subject to the following conditions:
13//
14// The above copyright notice and this permission notice shall be included in all
15// copies or substantial portions of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23// SOFTWARE.
24
25/**
26 * \file timemory/operations/types/finalize_get.hpp
27 * \brief Definition for various functions for finalize_get in operations
28 */
29
30#pragma once
31
37
38namespace tim
39{
40namespace operation
41{
42namespace finalize
43{
44//
45//--------------------------------------------------------------------------------------//
46//
47template <typename Type>
48struct upc_get<Type, true>
49{
50 static constexpr bool value = true;
51 using storage_type = impl::storage<Type, value>;
52 using result_type = typename storage_type::result_array_t;
53 using distrib_type = typename storage_type::dmp_result_t;
54 using result_node = typename storage_type::result_node;
55 using graph_type = typename storage_type::graph_t;
56 using graph_node = typename storage_type::graph_node;
57 using hierarchy_type = typename storage_type::uintvector_t;
59 using metadata_t = typename get_type::metadata;
60 using basic_tree_type = typename get_type::basic_tree_vector_type;
61 using basic_tree_vector_type = std::vector<basic_tree_type>;
62
63 explicit TIMEMORY_COLD upc_get(storage_type& _storage)
64 : m_storage(&_storage)
65 {}
66
67 TIMEMORY_COLD distrib_type& operator()(distrib_type&);
68 TIMEMORY_COLD basic_tree_vector_type& operator()(basic_tree_vector_type&);
69
70 template <typename Archive>
72 operator()(Archive&);
73
74private:
75 storage_type* m_storage = nullptr;
76};
77//
78//--------------------------------------------------------------------------------------//
79//
80template <typename Type>
81struct upc_get<Type, false>
82{
83 static constexpr bool value = false;
84 using storage_type = impl::storage<Type, value>;
85
87
88 template <typename Tp>
89 Tp& operator()(Tp&)
90 {}
91};
92//
93//--------------------------------------------------------------------------------------//
94//
95template <typename Type>
98{
99 if(!m_storage)
100 return results;
101
102 auto& data = *m_storage;
103#if !defined(TIMEMORY_USE_UPCXX)
104 if(settings::debug())
105 PRINT_HERE("%s", "timemory not using UPC++");
106
107 results = distrib_type{};
108 results.emplace_back(std::move(data.get()));
109#else
110 if(settings::debug())
111 PRINT_HERE("%s", "timemory using UPC++");
112
113 upc::barrier();
114
115 int comm_rank = upc::rank();
116 int comm_size = upc::size();
117
118 //------------------------------------------------------------------------------//
119 // Used to convert a result to a serialization
120 //
121 auto send_serialize = [=](const result_type& src) {
122 std::stringstream ss;
123 {
124 auto oa = policy::output_archive<cereal::MinimalJSONOutputArchive,
126 (*oa)(cereal::make_nvp("data", src));
127 }
128 return ss.str();
129 };
130
131 //------------------------------------------------------------------------------//
132 // Used to convert the serialization to a result
133 //
134 auto recv_serialize = [=](const std::string& src) {
135 result_type ret;
136 std::stringstream ss;
137 ss << src;
138 {
139 auto ia =
141 (*ia)(cereal::make_nvp("data", ret));
142 if(settings::debug())
143 printf("[RECV: %i]> data size: %lli\n", comm_rank,
144 (long long int) ret.size());
145 }
146 return ret;
147 };
148
149 //------------------------------------------------------------------------------//
150 // Function executed on remote node
151 //
152 auto remote_serialize = [=]() {
153 return send_serialize(storage_type::master_instance()->get());
154 };
155
156 results.resize(comm_size);
157
158 //------------------------------------------------------------------------------//
159 // Combine on master rank
160 //
161 if(comm_rank == 0)
162 {
163 for(int i = 1; i < comm_size; ++i)
164 {
165 upcxx::future<std::string> fut = upcxx::rpc(i, remote_serialize);
166 while(!fut.ready())
167 upcxx::progress();
168 fut.wait();
169 results[i] = recv_serialize(fut.result());
170 }
171 results[comm_rank] = data.get();
172 }
173
174 //------------------------------------------------------------------------------//
175 // Calculate the total number of measurement records
176 //
177 auto get_num_records = [&](const auto& _inp) {
178 int _sz = 0;
179 for(const auto& itr : _inp)
180 _sz += itr.size();
181 return _sz;
182 };
183
184 upcxx::barrier(upcxx::world());
185
186 if(comm_rank != 0)
187 {
188 results = distrib_type{};
189 results.emplace_back(std::move(data.get()));
190 }
191
192 // collapse into a single result
193 if(comm_rank == 0 && settings::collapse_processes() && settings::node_count() <= 1)
194 {
195 auto init_size = get_num_records(results);
197 {
198 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks",
199 demangle<upc_get<Type, true>>().c_str(), (int) process::get_id(),
200 comm_rank, init_size, comm_size);
201 }
202
203 auto _collapsed = distrib_type{};
204 // so we can pop off back
205 std::reverse(results.begin(), results.end());
206 while(!results.empty())
207 {
208 if(_collapsed.empty())
209 _collapsed.emplace_back(std::move(results.back()));
210 else
212 results.back());
213 results.pop_back();
214 }
215
216 // assign results to collapsed entry
217 results = std::move(_collapsed);
218
220 {
221 auto fini_size = get_num_records(results);
222 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
223 "from %i ranks",
224 demangle<upc_get<Type, true>>().c_str(), (int) process::get_id(),
225 comm_rank, init_size, fini_size, comm_size);
226 }
227 }
228 else if(comm_rank == 0 && settings::collapse_processes() &&
230 {
231 // calculate some size parameters
232 int32_t nmod = comm_size % settings::node_count();
233 int32_t bsize = comm_size / settings::node_count() + ((nmod == 0) ? 0 : 1);
234 int32_t bins = comm_size / bsize;
235
237 PRINT_HERE("[%s][pid=%i][rank=%i]> node_count = %i, comm_size = %i, bins = "
238 "%i, bin size = %i",
239 demangle<upc_get<Type, true>>().c_str(), (int) process::get_id(),
240 comm_rank, settings::node_count(), comm_size, bins, bsize);
241
242 // generate a map of the ranks to the node ids
243 int32_t ncnt = 0; // current count
244 int32_t midx = 0; // current bin map index
245 std::map<int32_t, std::set<int32_t>> binmap;
246 for(int32_t i = 0; i < comm_size; ++i)
247 {
248 if(settings::debug())
249 PRINT_HERE("[%s][pid=%i][rank=%i]> adding rank %i to bin %i",
250 demangle<upc_get<Type, true>>().c_str(),
251 (int) process::get_id(), comm_rank, i, midx);
252
253 binmap[midx].insert(i);
254 // check to see if we reached the bin size
255 if(++ncnt == bsize)
256 {
257 // set counter to zero and advance the node
258 ncnt = 0;
259 ++midx;
260 }
261 }
262
263 auto init_size = get_num_records(results);
265 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks into "
266 "%i bins",
267 demangle<upc_get<Type, true>>().c_str(), (int) process::get_id(),
268 comm_rank, init_size, comm_size, (int) binmap.size());
269
270 assert((int32_t) binmap.size() <= (int32_t) settings::node_count());
271
272 // the collapsed data
273 auto _collapsed = distrib_type(binmap.size());
274 // loop over the node indexes
275 for(const auto& itr : binmap)
276 {
277 // target the node index
278 auto& _dst = _collapsed.at(itr.first);
279 for(const auto& bitr : itr.second)
280 {
281 // combine the node index entry with all of the ranks in that node
282 auto& _src = results.at(bitr);
284 }
285 }
286
287 // assign results to collapsed entry
288 results = std::move(_collapsed);
289
291 {
292 auto fini_size = get_num_records(results);
293 PRINT_HERE("[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
294 "and %i bins",
295 demangle<upc_get<Type, true>>().c_str(), (int) process::get_id(),
296 comm_rank, init_size, fini_size, (int) results.size());
297 }
298 }
299
301 {
302 auto ret_size = get_num_records(results);
303 PRINT_HERE("[%s][pid=%i]> %i total records on rank %i of %i",
304 demangle<upc_get<Type, true>>().c_str(), (int) process::get_id(),
305 ret_size, comm_rank, comm_size);
306 }
307
308#endif
309
310 return results;
311}
312//
313//--------------------------------------------------------------------------------------//
314//
315template <typename Type>
318{
319 if(!m_storage)
320 return bt;
321
322 auto& data = *m_storage;
323#if !defined(TIMEMORY_USE_UPCXX)
324 if(settings::debug())
325 PRINT_HERE("%s", "timemory not using UPC++");
326
327 auto entry = basic_tree_type{};
328 bt = basic_tree_vector_type(1, data.get(entry));
329#else
330 if(settings::debug())
331 PRINT_HERE("%s", "timemory using UPC++");
332
333 upc::barrier(upc::world());
334
335 int comm_rank = upc::rank(upc::world());
336 int comm_size = upc::size(upc::world());
337
338 //------------------------------------------------------------------------------//
339 // Used to convert a result to a serialization
340 //
341 auto send_serialize = [&](const basic_tree_type& src) {
342 std::stringstream ss;
343 {
344 auto oa = policy::output_archive<cereal::MinimalJSONOutputArchive,
346 (*oa)(cereal::make_nvp("data", src));
347 }
348 return ss.str();
349 };
350
351 //------------------------------------------------------------------------------//
352 // Used to convert the serialization to a result
353 //
354 auto recv_serialize = [&](const std::string& src) {
355 basic_tree_type ret;
356 std::stringstream ss;
357 ss << src;
358 {
359 auto ia =
361 (*ia)(cereal::make_nvp("data", ret));
362 if(settings::debug())
363 printf("[RECV: %i]> data size: %lli\n", comm_rank,
364 (long long int) ret.size());
365 }
366 return ret;
367 };
368
369 //------------------------------------------------------------------------------//
370 // Function executed on remote node
371 //
372 auto remote_serialize = [=]() {
373 basic_tree_type ret;
374 return send_serialize(storage_type::master_instance()->get(ret));
375 };
376
377 bt = basic_tree_vector_type(comm_size);
378 auto ret = basic_tree_type{};
379
380 if(comm_rank == 0)
381 {
382 //
383 // The root rank receives data from all non-root ranks and reports all data
384 //
385 for(int i = 1; i < comm_size; ++i)
386 {
387 upc::future_t<std::string> fut = upc::rpc(i, remote_serialize);
388 while(!fut.ready())
389 upc::progress();
390 fut.wait();
391 bt[i] = recv_serialize(fut.result());
392 }
393 bt[comm_rank] = data.get(ret);
394 }
395
396 upc::barrier(upc::world());
397
398 if(comm_rank != 0)
399 bt = basic_tree_vector_type(1, data.get(ret));
400
401#endif
402 return bt;
403}
404//
405//--------------------------------------------------------------------------------------//
406//
407template <typename Type>
408template <typename Archive>
411{
412 if(!m_storage)
413 return ar;
414
415 if(!upc::is_initialized())
416 {
417 get_type{ m_storage }(ar);
418 }
419 else
420 {
421 auto idstr = get_type::get_identifier();
422 ar.setNextName(idstr.c_str());
423 ar.startNode();
424 get_type{}(ar, metadata_t{});
425 auto bt = basic_tree_vector_type{};
426 (*this)(bt);
427 ar(cereal::make_nvp("upcxx", bt));
428 ar.finishNode();
429 }
430 return ar;
431}
432//
433//--------------------------------------------------------------------------------------//
434//
435} // namespace finalize
436} // namespace operation
437} // namespace tim
return false
Definition: definition.hpp:326
data::entry entry
Definition: stream.hpp:980
Definition: kokkosp.cpp:39
node_count
Definition: settings.cpp:1780
typename std::enable_if< B, T >::type enable_if_t
Alias template for enable_if.
Definition: types.hpp:190
std::string demangle(const char *_mangled_name, int *_status=nullptr)
Definition: demangle.hpp:47
tim::mpl::apply< std::string > string
Definition: macros.hpp:53
collapse_processes
Definition: settings.cpp:1639
void finalize()
Definition: types.hpp:119
auto get(const auto_bundle< Tag, Types... > &_obj)
The declaration for the types for operations without definitions.
Include the macros for operations.
Declare the operations types.
impl::storage< Type, value > storage_type
Definition: upc_get.hpp:84
std::vector< basic_tree_type > basic_tree_vector_type
Definition: upc_get.hpp:61
typename get_type::basic_tree_vector_type basic_tree_type
Definition: upc_get.hpp:60
typename storage_type::graph_node graph_node
Definition: upc_get.hpp:56
typename storage_type::result_array_t result_type
Definition: upc_get.hpp:52
typename storage_type::result_node result_node
Definition: upc_get.hpp:54
impl::storage< Type, value > storage_type
Definition: upc_get.hpp:51
typename storage_type::uintvector_t hierarchy_type
Definition: upc_get.hpp:57
typename storage_type::dmp_result_t distrib_type
Definition: upc_get.hpp:53
typename storage_type::graph_t graph_type
Definition: upc_get.hpp:55
typename get_type::metadata metadata_t
Definition: upc_get.hpp:59
static pointer get(std::istream &is)
Definition: policy.hpp:96
Provides a static get() function which return a shared pointer to an instance of the given archive fo...
Definition: policy.hpp:136
#define PRINT_HERE(...)
Definition: macros.hpp:152