47template <
typename Type>
50 static constexpr bool value =
true;
64 static auto&
plus(Type& lhs,
const Type& rhs) {
return (lhs += rhs); }
67 : m_storage(&_storage)
70 TIMEMORY_COLD distrib_type& operator()(distrib_type&);
71 TIMEMORY_COLD basic_tree_vector_type& operator()(basic_tree_vector_type&);
73 template <
typename Archive>
80 mpi_get(std::vector<Type>& dst,
const Type& src,
81 std::function<Type&(Type& lhs,
const Type& rhs)>&& adder =
this_type::plus);
84 storage_type* m_storage =
nullptr;
89template <
typename Type>
92 static constexpr bool value =
false;
97 template <
typename Tp>
104template <
typename Type>
111 auto& data = *m_storage;
112#if !defined(TIMEMORY_USE_MPI)
117 results.emplace_back(std::move(data.get()));
125 auto comm = mpi::comm_world_v;
128 int comm_rank = mpi::rank(comm);
129 int comm_size = mpi::size(comm);
134 auto send_serialize = [&](
const result_type& src) {
135 std::stringstream ss;
139 (*oa)(cereal::make_nvp(
"data", src));
147 auto recv_serialize = [&](
const std::string& src) {
149 std::stringstream ss;
154 (*ia)(cereal::make_nvp(
"data", ret));
157 printf(
"[RECV: %i]> data size: %lli\n", comm_rank,
158 (
long long int) ret.size());
167 auto get_num_records = [&](
const auto& _inp) {
169 for(
const auto& itr : _inp)
176 auto ret = data.get();
177 auto str_ret = send_serialize(ret);
184 for(
int i = 1; i < comm_size; ++i)
188 printf(
"[RECV: %i]> starting %i\n", comm_rank, i);
189 mpi::recv(str, i, 0, comm);
191 printf(
"[RECV: %i]> completed %i\n", comm_rank, i);
192 results[i] = recv_serialize(str);
194 results[comm_rank] = std::move(ret);
202 printf(
"[SEND: %i]> starting\n", comm_rank);
203 mpi::send(str_ret, 0, 0, comm);
205 printf(
"[SEND: %i]> completed\n", comm_rank);
207 results.emplace_back(std::move(ret));
213 auto init_size = get_num_records(results);
216 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks",
218 comm_rank, init_size, comm_size);
223 std::reverse(results.begin(), results.end());
224 while(!results.empty())
226 if(_collapsed.empty())
228 _collapsed.emplace_back(std::move(results.back()));
239 results = std::move(_collapsed);
243 auto fini_size = get_num_records(results);
244 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
247 comm_rank, init_size, fini_size, comm_size);
256 int32_t bins = comm_size / bsize;
260 PRINT_HERE(
"[%s][pid=%i][rank=%i]> node_count = %i, comm_size = %i, bins = "
269 std::map<int32_t, std::set<int32_t>> binmap;
270 for(int32_t i = 0; i < comm_size; ++i)
274 PRINT_HERE(
"[%s][pid=%i][rank=%i]> adding rank %i to bin %i",
276 (
int) process::get_id(), comm_rank, i, midx);
279 binmap[midx].insert(i);
289 auto init_size = get_num_records(results);
292 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks into "
295 comm_rank, init_size, comm_size, (
int) binmap.size());
303 for(
const auto& itr : binmap)
306 auto& _dst = _collapsed.at(itr.first);
307 for(
const auto& bitr : itr.second)
310 auto& _src = results.at(bitr);
316 results = std::move(_collapsed);
320 auto fini_size = get_num_records(results);
321 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
324 comm_rank, init_size, fini_size, (
int) results.size());
330 auto ret_size = get_num_records(results);
331 PRINT_HERE(
"[%s][pid=%i]> %i total records on rank %i of %i",
333 ret_size, comm_rank, comm_size);
343template <
typename Type>
350 auto& data = *m_storage;
353 using mpi_data_t =
typename serialization_t::mpi_data;
356 bt = serialization_t{}(mpi_data_t{}, mpi::comm_world_v, data.get(_entry));
362template <
typename Type>
363template <
typename Archive>
370 if(!mpi::is_initialized())
385template <
typename Type>
387 std::function<Type&(Type& lhs,
const Type& rhs)>&& functor)
389#if !defined(TIMEMORY_USE_MPI)
396 auto comm = mpi::comm_world_v;
399 int comm_rank = mpi::rank(comm);
400 int comm_size = mpi::size(comm);
403 comm_rank, comm_size);
408 auto send_serialize = [&](
const Type& src) {
409 std::stringstream ss;
413 (*oa)(cereal::make_nvp(
"data", src));
416 comm_rank, ss.str().length());
423 auto recv_serialize = [&](
const std::string& src) {
425 comm_rank, src.length());
427 std::stringstream ss;
432 (*ia)(cereal::make_nvp(
"data", ret));
440 auto get_num_records = [&](
const auto& _inp) {
return _inp.size(); };
442 dst.resize(comm_size);
443 auto str_ret = send_serialize(inp);
452 for(
int i = 1; i < comm_size; ++i)
457 mpi::recv(str, i, 0, comm);
460 dst.at(i) = recv_serialize(str);
470 mpi::send(str_ret, 0, 0, comm);
480 auto init_size = get_num_records(dst);
483 "[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks",
485 (
int) comm_rank, (
int) init_size, (
int) comm_size);
487 auto _dst = std::vector<Type>{};
492 _dst.emplace_back(std::move(itr));
496 _dst.front() = functor(_dst.front(), itr);
505 "[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
508 (
int) comm_rank, (
int) init_size, (
int) get_num_records(dst),
516 int32_t bins = comm_size / bsize;
520 "[%s][pid=%i][rank=%i]> node_count = %i, comm_size = %i, bins = "
528 std::map<int32_t, std::set<int32_t>> binmap;
529 for(int32_t i = 0; i < comm_size; ++i)
532 "[%s][pid=%i][rank=%i]> adding rank %i to bin %i",
534 (
int) process::get_id(), comm_rank, i, midx);
536 binmap[midx].insert(i);
546 auto init_size = get_num_records(dst);
549 "[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks into %i bins",
551 (
int) comm_rank, (
int) init_size, (
int) comm_size, (
int) binmap.size());
556 auto _dst = std::vector<Type>(binmap.size());
558 for(
const auto& itr : binmap)
561 auto& _targ = _dst.at(itr.first);
562 for(
const auto& bitr : itr.second)
565 auto& _src = dst.at(bitr);
566 _targ = functor(_targ, _src);
575 "[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
578 (
int) comm_rank, (
int) init_size, (
int) get_num_records(dst),
583 "[%s][pid=%i]> %i total records on rank %i of %i",
585 (
int) process::get_id(), (
int) get_num_records(dst),
586 (
int) comm_rank, (
int) comm_size);
Tp & plus(Tp &, const Up &)
typename std::enable_if< B, T >::type enable_if_t
Alias template for enable_if.
std::string demangle(const char *_mangled_name, int *_status=nullptr)
tim::mpl::apply< std::string > string
auto get(const auto_bundle< Tag, Types... > &_obj)
void consume_parameters(ArgsT &&...)
The declaration for the types for operations without definitions.
Include the macros for operations.
Declare the operations types.
impl::storage< Type, value > storage_type
typename get_type::metadata metadata_t
typename storage_type::uintvector_t hierarchy_type
typename storage_type::graph_node graph_node
static auto & plus(Type &lhs, const Type &rhs)
impl::storage< Type, value > storage_type
typename storage_type::graph_t graph_type
typename storage_type::result_array_t result_type
typename get_type::basic_tree_vector_type basic_tree_type
mpi_get(storage_type &_storage)
typename storage_type::result_node result_node
std::vector< basic_tree_type > basic_tree_vector_type
typename storage_type::dmp_result_t distrib_type
Provides a static get() function which return a shared pointer to an instance of the given archive fo...
#define CONDITIONAL_PRINT_HERE(CONDITION,...)