timemory 3.3.0
Modular C++ Toolkit for Performance Analysis and Logging. Profiling API and Tools for C, C++, CUDA, Fortran, and Python. The C++ template API is essentially a framework to creating tools: it is designed to provide a unifying interface for recording various performance measurements alongside data logging and interfaces to other tools.
socket.hpp
Go to the documentation of this file.
1// MIT License
2//
3// Copyright (c) 2020, The Regents of the University of California,
4// through Lawrence Berkeley National Laboratory (subject to receipt of any
5// required approvals from the U.S. Dept. of Energy). All rights reserved.
6//
7// Permission is hereby granted, free of charge, to any person obtaining a copy
8// of this software and associated documentation files (the "Software"), to deal
9// in the Software without restriction, including without limitation the rights
10// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11// copies of the Software, and to permit persons to whom the Software is
12// furnished to do so, subject to the following conditions:
13//
14// The above copyright notice and this permission notice shall be included in all
15// copies or substantial portions of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23// SOFTWARE.
24
25#pragma once
26
28
29#if defined(TIMEMORY_WINDOWS)
30# include <WinSock2.h>
31# include <Ws2tcpip.h>
32#else
33# include <arpa/inet.h>
34# include <netdb.h>
35# include <sys/socket.h>
36# include <unistd.h>
37#endif
38
39#include <atomic>
40#include <cstring>
41#include <functional>
42#include <iostream>
43#include <string>
44#include <unordered_map>
45
46namespace tim
47{
48namespace socket
49{
50//
51#if defined(TIMEMORY_WINDOWS)
52using socket_t = SOCKET;
53static constexpr int invalid_socket = INVALID_SOCKET;
54static constexpr int socket_error = SOCKET_ERROR;
55#else
56using socket_t = int;
57static constexpr int invalid_socket = 0;
58static constexpr int socket_error = -1;
59#endif
60//
62{
63public:
64 static constexpr int buffer_size = 4096;
65 using socket_map_t = std::unordered_map<std::string, socket_t>;
66 using listen_info_t = std::pair<int64_t, int64_t>;
67
68public:
69 manager() = default;
70 ~manager() = default;
71 manager(const manager&) = default;
72 manager(manager&&) = default;
73
74 manager& operator=(const manager&) = default;
75 manager& operator=(manager&&) = default;
76
77public:
78 /// listen for data on a socket. Returns a pair of the number of packets received and
79 /// the number of bytes received. If both are -1, connecting to socket failed.
80 template <typename CallbackT>
81 auto listen(const std::string& _channel_name, int _port, CallbackT&& callback,
82 int64_t _max_packets = 0)
83 -> decltype(callback(_channel_name), listen_info_t{})
84 {
85 if(tim::socket::manager::init() != 0)
86 {
87 std::cerr << "Can't start socket!" << std::endl;
88 return { -1, -1 };
89 }
90
91 socket_t _listening = ::socket(AF_INET, SOCK_STREAM, 0);
92 if(_listening == invalid_socket)
93 {
94 std::cerr << "Can't create a socket!" << std::endl;
95 return { -1, -1 };
96 }
97
98 sockaddr_in _hint;
99 _hint.sin_family = AF_INET;
100 _hint.sin_port = htons(_port);
101#if defined(TIMEMORY_WINDOWS)
102 _hint.sin_addr.S_un.S_addr = INADDR_ANY;
103#else
104 _hint.sin_addr.s_addr = INADDR_ANY;
105#endif
106
107 ::bind(_listening, (sockaddr*) &_hint, sizeof(_hint));
108 ::listen(_listening, SOMAXCONN);
109
110 sockaddr_in _client;
111#if defined(TIMEMORY_WINDOWS)
112 int _client_size = sizeof(_client);
113#else
114 unsigned int _client_size = sizeof(_client);
115#endif
116 socket_t _client_socket = accept(_listening, (sockaddr*) &_client, &_client_size);
117 m_server_sockets[_channel_name] = _client_socket;
118 char _host[NI_MAXHOST];
119 char _service[NI_MAXSERV];
120
121 memset(_host, 0, NI_MAXHOST);
122 memset(_service, 0, NI_MAXSERV);
123 if(getnameinfo((sockaddr*) &_client, sizeof(_client), _host, NI_MAXHOST, _service,
124 NI_MAXSERV, 0) == 0)
125 {
126 std::cout << _host << " connected on port " << _service << std::endl;
127 }
128 else
129 {
130 inet_ntop(AF_INET, &_client.sin_addr, _host, NI_MAXHOST);
131 std::cout << _host << " connected on port " << ntohs(_client.sin_port)
132 << std::endl;
133 }
134 tim::socket::manager::close(_listening);
135 char _buff[buffer_size];
136 listen_info_t _nrecv = { 0, 0 };
137 while(true)
138 {
139 memset(_buff, 0, buffer_size);
140
141 int _bytes_recv = ::recv(_client_socket, _buff, buffer_size, 0);
142
143 // exit out of receiving
144 if(_bytes_recv == socket_error)
145 {
146 std::cerr << "Error in recv(). Quitting" << std::endl;
147 break;
148 }
149
150 if(_bytes_recv > 0)
151 {
152 _nrecv.first += 1;
153 _nrecv.second += _bytes_recv;
154 callback(std::string(_buff, 0, _bytes_recv));
155 if(_max_packets > 0 && _nrecv.first >= _max_packets)
156 {
157 std::cerr << "Maximum number of packages received: " << _max_packets
158 << ". Quitting" << std::endl;
159 break;
160 }
161 }
162 else
163 {
164 break;
165 }
166 }
167
168 tim::socket::manager::close(_client_socket);
169 tim::socket::manager::quit();
170 return _nrecv;
171 }
172
173 bool send(const std::string& _channel_name, const std::string& _data)
174 {
175 if(m_client_sockets.find(_channel_name) != m_client_sockets.end())
176 {
177 socket_t _sock = m_client_sockets.at(_channel_name);
178 int _send_result = ::send(_sock, _data.c_str(), _data.size() + 1, 0);
179 if(_send_result == socket_error)
180 {
181 std::cerr << "Can't create socket!" << std::endl;
182 return false;
183 }
184 return true;
185 }
186 return false;
187 }
188
189 bool connect(const std::string& _channel_name, const std::string& _ip, int _port)
190 {
191 if(tim::socket::manager::init() != 0)
192 {
193 std::cerr << "Can't start socket!" << std::endl;
194 return false;
195 }
196
197 socket_t _sock = ::socket(AF_INET, SOCK_STREAM, 0);
198 if(_sock == invalid_socket)
199 {
200 std::cerr << "Can't create socket!" << std::endl;
201 tim::socket::manager::quit();
202 return false;
203 }
204
205 sockaddr_in _hint;
206 _hint.sin_family = AF_INET;
207 _hint.sin_port = htons(_port);
208 inet_pton(AF_INET, _ip.c_str(), &_hint.sin_addr);
209
210 int _conn_result = ::connect(_sock, (sockaddr*) &_hint, sizeof(_hint));
211 if(_conn_result == socket_error)
212 {
213 std::cerr << "Can't connect to server!" << std::endl;
215 tim::socket::manager::quit();
216 return false;
217 }
218 m_client_sockets[_channel_name] = _sock;
219 return true;
220 }
221
222 bool close(const std::string& _channel_name)
223 {
224 if(m_client_sockets.find(_channel_name) != m_client_sockets.end())
225 {
226 socket_t s = m_client_sockets.at(_channel_name);
228 tim::socket::manager::quit();
229 return true;
230 }
231 return false;
232 }
233
234private:
235 static int init()
236 {
237#if defined(TIMEMORY_WINDOWS)
238 WSADATA _wsa_data;
239 return WSAStartup(MAKEWORD(1, 1), &_wsa_data);
240#else
241 return 0;
242#endif
243 }
244
245 static int quit()
246 {
247#if defined(TIMEMORY_WINDOWS)
248 return WSACleanup();
249#else
250 return 0;
251#endif
252 }
253
254 static int close(socket_t _sock)
255 {
256 int status = 0;
257#if defined(TIMEMORY_WINDOWS)
258 if((status = ::shutdown(_sock, SD_BOTH)) == 0)
259 return ::closesocket(_sock);
260#else
261 if((status = ::shutdown(_sock, SHUT_RDWR)) == 0)
262 return ::close(_sock);
263#endif
264 return status;
265 }
266
267private:
268 socket_map_t m_client_sockets = {};
269 socket_map_t m_server_sockets = {};
270};
271//
272} // namespace socket
273} // namespace tim
auto listen(const std::string &_channel_name, int _port, CallbackT &&callback, int64_t _max_packets=0) -> decltype(callback(_channel_name), listen_info_t{})
listen for data on a socket. Returns a pair of the number of packets received and the number of bytes...
Definition: socket.hpp:81
std::pair< int64_t, int64_t > listen_info_t
Definition: socket.hpp:66
bool close(const std::string &_channel_name)
Definition: socket.hpp:222
static constexpr int buffer_size
Definition: socket.hpp:64
bool connect(const std::string &_channel_name, const std::string &_ip, int _port)
Definition: socket.hpp:189
manager & operator=(const manager &)=default
std::unordered_map< std::string, socket_t > socket_map_t
Definition: socket.hpp:65
manager & operator=(manager &&)=default
bool send(const std::string &_channel_name, const std::string &_data)
Definition: socket.hpp:173
manager(manager &&)=default
manager(const manager &)=default
int socket_t
Definition: socket.hpp:56
Definition: kokkosp.cpp:39
tim::mpl::apply< std::string > string
Definition: macros.hpp:53