111 auto& data = *m_storage;
112#if !defined(TIMEMORY_USE_MPI)
117 results.emplace_back(std::move(data.get()));
125 auto comm = mpi::comm_world_v;
128 int comm_rank = mpi::rank(comm);
129 int comm_size = mpi::size(comm);
134 auto send_serialize = [&](
const result_type& src) {
135 std::stringstream ss;
137 auto oa = policy::output_archive<cereal::MinimalJSONOutputArchive,
139 (*oa)(cereal::make_nvp(
"data", src));
147 auto recv_serialize = [&](
const std::string& src) {
149 std::stringstream ss;
154 (*ia)(cereal::make_nvp(
"data", ret));
157 printf(
"[RECV: %i]> data size: %lli\n", comm_rank,
158 (
long long int) ret.size());
167 auto get_num_records = [&](
const auto& _inp) {
169 for(
const auto& itr : _inp)
176 auto ret = data.get();
177 auto str_ret = send_serialize(ret);
184 for(
int i = 1; i < comm_size; ++i)
188 printf(
"[RECV: %i]> starting %i\n", comm_rank, i);
189 mpi::recv(str, i, 0, comm);
191 printf(
"[RECV: %i]> completed %i\n", comm_rank, i);
192 results[i] = recv_serialize(str);
194 results[comm_rank] = std::move(ret);
202 printf(
"[SEND: %i]> starting\n", comm_rank);
203 mpi::send(str_ret, 0, 0, comm);
205 printf(
"[SEND: %i]> completed\n", comm_rank);
207 results.emplace_back(std::move(ret));
213 auto init_size = get_num_records(results);
216 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks",
217 demangle<mpi_get<Type, true>>().c_str(), (
int) process::get_id(),
218 comm_rank, init_size, comm_size);
223 std::reverse(results.begin(), results.end());
224 while(!results.empty())
226 if(_collapsed.empty())
228 _collapsed.emplace_back(std::move(results.back()));
232 operation::finalize::merge<Type, true>(_collapsed.front(),
239 results = std::move(_collapsed);
243 auto fini_size = get_num_records(results);
244 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
246 demangle<mpi_get<Type, true>>().c_str(), (
int) process::get_id(),
247 comm_rank, init_size, fini_size, comm_size);
256 int32_t bins = comm_size / bsize;
260 PRINT_HERE(
"[%s][pid=%i][rank=%i]> node_count = %i, comm_size = %i, bins = "
262 demangle<mpi_get<Type, true>>().c_str(), (
int) process::get_id(),
269 std::map<int32_t, std::set<int32_t>> binmap;
270 for(int32_t i = 0; i < comm_size; ++i)
274 PRINT_HERE(
"[%s][pid=%i][rank=%i]> adding rank %i to bin %i",
275 demangle<mpi_get<Type, true>>().c_str(),
276 (
int) process::get_id(), comm_rank, i, midx);
279 binmap[midx].insert(i);
289 auto init_size = get_num_records(results);
292 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks into "
294 demangle<mpi_get<Type, true>>().c_str(), (
int) process::get_id(),
295 comm_rank, init_size, comm_size, (
int) binmap.size());
303 for(
const auto& itr : binmap)
306 auto& _dst = _collapsed.at(itr.first);
307 for(
const auto& bitr : itr.second)
310 auto& _src = results.at(bitr);
311 operation::finalize::merge<Type, true>(_dst, _src);
316 results = std::move(_collapsed);
320 auto fini_size = get_num_records(results);
321 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
323 demangle<mpi_get<Type, true>>().c_str(), (
int) process::get_id(),
324 comm_rank, init_size, fini_size, (
int) results.size());
330 auto ret_size = get_num_records(results);
331 PRINT_HERE(
"[%s][pid=%i]> %i total records on rank %i of %i",
332 demangle<mpi_get<Type, true>>().c_str(), (
int) process::get_id(),
333 ret_size, comm_rank, comm_size);
typename storage_type::result_array_t result_type
typename storage_type::dmp_result_t distrib_type