47template <
typename Type>
50 static constexpr bool value =
true;
64 : m_storage(&_storage)
67 TIMEMORY_COLD distrib_type& operator()(distrib_type&);
68 TIMEMORY_COLD basic_tree_vector_type& operator()(basic_tree_vector_type&);
70 template <
typename Archive>
75 storage_type* m_storage =
nullptr;
80template <
typename Type>
83 static constexpr bool value =
false;
88 template <
typename Tp>
95template <
typename Type>
102 auto& data = *m_storage;
103#if !defined(TIMEMORY_USE_UPCXX)
108 results.emplace_back(std::move(data.get()));
115 int comm_rank = upc::rank();
116 int comm_size = upc::size();
121 auto send_serialize = [=](
const result_type& src) {
122 std::stringstream ss;
126 (*oa)(cereal::make_nvp(
"data", src));
134 auto recv_serialize = [=](
const std::string& src) {
136 std::stringstream ss;
141 (*ia)(cereal::make_nvp(
"data", ret));
143 printf(
"[RECV: %i]> data size: %lli\n", comm_rank,
144 (
long long int) ret.size());
152 auto remote_serialize = [=]() {
153 return send_serialize(storage_type::master_instance()->
get());
156 results.resize(comm_size);
163 for(
int i = 1; i < comm_size; ++i)
165 upcxx::future<std::string> fut = upcxx::rpc(i, remote_serialize);
169 results[i] = recv_serialize(fut.result());
171 results[comm_rank] = data.get();
177 auto get_num_records = [&](
const auto& _inp) {
179 for(
const auto& itr : _inp)
184 upcxx::barrier(upcxx::world());
189 results.emplace_back(std::move(data.get()));
195 auto init_size = get_num_records(results);
198 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks",
200 comm_rank, init_size, comm_size);
205 std::reverse(results.begin(), results.end());
206 while(!results.empty())
208 if(_collapsed.empty())
209 _collapsed.emplace_back(std::move(results.back()));
217 results = std::move(_collapsed);
221 auto fini_size = get_num_records(results);
222 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
225 comm_rank, init_size, fini_size, comm_size);
234 int32_t bins = comm_size / bsize;
237 PRINT_HERE(
"[%s][pid=%i][rank=%i]> node_count = %i, comm_size = %i, bins = "
245 std::map<int32_t, std::set<int32_t>> binmap;
246 for(int32_t i = 0; i < comm_size; ++i)
249 PRINT_HERE(
"[%s][pid=%i][rank=%i]> adding rank %i to bin %i",
251 (int) process::get_id(), comm_rank, i, midx);
253 binmap[midx].insert(i);
263 auto init_size = get_num_records(results);
265 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsing %i records from %i ranks into "
268 comm_rank, init_size, comm_size, (
int) binmap.size());
275 for(
const auto& itr : binmap)
278 auto& _dst = _collapsed.at(itr.first);
279 for(
const auto& bitr : itr.second)
282 auto& _src = results.at(bitr);
288 results = std::move(_collapsed);
292 auto fini_size = get_num_records(results);
293 PRINT_HERE(
"[%s][pid=%i][rank=%i]> collapsed %i records into %i records "
296 comm_rank, init_size, fini_size, (
int) results.size());
302 auto ret_size = get_num_records(results);
303 PRINT_HERE(
"[%s][pid=%i]> %i total records on rank %i of %i",
305 ret_size, comm_rank, comm_size);
315template <
typename Type>
322 auto& data = *m_storage;
323#if !defined(TIMEMORY_USE_UPCXX)
333 upc::barrier(upc::world());
335 int comm_rank = upc::rank(upc::world());
336 int comm_size = upc::size(upc::world());
342 std::stringstream ss;
346 (*oa)(cereal::make_nvp(
"data", src));
354 auto recv_serialize = [&](
const std::string& src) {
356 std::stringstream ss;
361 (*ia)(cereal::make_nvp(
"data", ret));
363 printf(
"[RECV: %i]> data size: %lli\n", comm_rank,
364 (
long long int) ret.size());
372 auto remote_serialize = [=]() {
374 return send_serialize(storage_type::master_instance()->
get(ret));
385 for(
int i = 1; i < comm_size; ++i)
387 upc::future_t<std::string> fut = upc::rpc(i, remote_serialize);
391 bt[i] = recv_serialize(fut.result());
393 bt[comm_rank] = data.get(ret);
396 upc::barrier(upc::world());
407template <
typename Type>
408template <
typename Archive>
415 if(!upc::is_initialized())
421 auto idstr = get_type::get_identifier();
422 ar.setNextName(idstr.c_str());
427 ar(cereal::make_nvp(
"upcxx", bt));
typename std::enable_if< B, T >::type enable_if_t
Alias template for enable_if.
std::string demangle(const char *_mangled_name, int *_status=nullptr)
tim::mpl::apply< std::string > string
auto get(const auto_bundle< Tag, Types... > &_obj)
The declaration for the types for operations without definitions.
Include the macros for operations.
Declare the operations types.
impl::storage< Type, value > storage_type
std::vector< basic_tree_type > basic_tree_vector_type
typename get_type::basic_tree_vector_type basic_tree_type
typename storage_type::graph_node graph_node
typename storage_type::result_array_t result_type
typename storage_type::result_node result_node
impl::storage< Type, value > storage_type
typename storage_type::uintvector_t hierarchy_type
typename storage_type::dmp_result_t distrib_type
typename storage_type::graph_t graph_type
upc_get(storage_type &_storage)
typename get_type::metadata metadata_t
Provides a static get() function which return a shared pointer to an instance of the given archive fo...