This source file includes following definitions.
- set_options
- close
- write
- write
- read_msg
- read
- read
- read
- is_open
- to_string
- connect
- fn
- stopped
- stop
- run
- set_options
- async_accept
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 namespace jmmcg { namespace socket { namespace glibc {
22
23 namespace basic {
24
25 inline void
26 wrapper::set_options(std::size_t min_message_size, std::size_t max_message_size) noexcept(true) {
27 int val=1;
28 ::setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
29 ::setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
30 ::setsockopt(socket_, SOL_SOCKET, SO_SNDBUF, &max_message_size, sizeof(max_message_size));
31 ::setsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &max_message_size, sizeof(max_message_size));
32 ::setsockopt(socket_, SOL_SOCKET, SO_RCVLOWAT, &min_message_size, sizeof(min_message_size));
33 int priority=socket_priority;
34 ::setsockopt(socket_, SOL_SOCKET, SO_PRIORITY, &priority, sizeof(priority));
35 }
36
37 inline void
38 wrapper::close() noexcept(true) {
39 ::shutdown(socket_, SHUT_RDWR);
40 ::close(socket_);
41 }
42
43 inline
44 wrapper::wrapper(type_t type, domain_t domain) noexcept(false)
45 : socket_(::socket(domain, type|SOCK_CLOEXEC, 0)) {
46 if (socket_==-1) {
47 const int err=errno;
48 info::function fun(
49 __LINE__,
50 __FUNCTION__,
51 typeid(*this),
52 info::function::argument(_T("domain_t"), domain)
53 );
54 fun.add_arg(info::function::argument(_T("type_t"), type));
55 throw exception_t(_T("Unable to create "), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
56 }
57
58 const auto err=::signal(SIGPIPE, SIG_IGN);
59 if (err==SIG_ERR) {
60 const int err=errno;
61 close();
62 info::function fun(
63 __LINE__,
64 __FUNCTION__,
65 typeid(*this),
66 info::function::argument(_T("domain_t"), domain)
67 );
68 fun.add_arg(info::function::argument(_T("type_t"), type));
69 fun.add_arg(info::function::argument(_T("socket_type"), socket_));
70 throw exception_t(_T("Could not ignore SIGPIPE "), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
71 }
72 }
73
74 inline
75 wrapper::~wrapper() noexcept(true) {
76 close();
77 }
78
79 template<class MsgT> inline void
80 wrapper::write(MsgT const &message) noexcept(true) {
81 const typename thread_traits::cancellability set;
82 if (MsgT::has_static_size) {
83 assert(message.length()==sizeof(MsgT));
84 const ssize_t poss_err=::write(socket_, &message, 0);
85 if (poss_err==0) {
86 [[maybe_unused]] const ssize_t bytes_written=::write(socket_, &message, sizeof(MsgT));
87 assert(bytes_written>=0);
88 assert(static_cast<std::size_t>(bytes_written)>=sizeof(MsgT));
89 } else {
90 assert(poss_err==-1);
91 assert(!"TODO Error in writing to the socket!");
92 }
93 } else {
94 const ssize_t poss_err=::write(socket_, &message, 0);
95 if (poss_err==0) {
96 [[maybe_unused]] const ssize_t bytes_written=::write(socket_, &message, message.length());
97 assert(bytes_written>=0);
98 assert(static_cast<std::size_t>(bytes_written)<=sizeof(MsgT));
99 assert(static_cast<std::size_t>(bytes_written)>=message.length());
100 } else {
101 assert(poss_err==-1);
102 assert(!"TODO Error in writing to the socket!");
103 }
104 }
105 }
106
107 template<class V, std::size_t N> inline void
108 wrapper::write(std::array<V, N> const &message) noexcept(true) {
109 const typename thread_traits::cancellability set;
110 const ssize_t poss_err=::write(socket_, &message, 0);
111 if (poss_err==0) {
112 [[maybe_unused]] const ssize_t bytes_written=::write(socket_, message.data(), sizeof(V)*N);
113 assert(bytes_written>=0);
114 assert(static_cast<std::size_t>(bytes_written)>=sizeof(V)*N);
115 } else {
116 assert(poss_err==-1);
117 assert(!"TODO Error in writing to the socket!");
118 }
119 }
120
121 template<class MsgT> inline bool
122 wrapper::read_msg(MsgT &dest, std::size_t msg_size) noexcept(true) {
123 const typename thread_traits::cancellability set;
124 std::size_t bytes_read=0;
125 while (bytes_read<msg_size) {
126 const ssize_t ret_code=::read(socket_, &dest+bytes_read, msg_size-bytes_read);
127 if (LIKELY(ret_code>0)) {
128 assert((bytes_read+ret_code)<=msg_size);
129 bytes_read+=ret_code;
130 } else if (ret_code<0) {
131 const int err=errno;
132 switch (err) {
133 case EAGAIN:
134 case EINTR:
135 continue;
136 default:
137 return true;
138 }
139 } else {
140 return true;
141 }
142 }
143 assert(bytes_read==msg_size);
144 return false;
145 }
146
147 template<class MsgT> inline bool
148 wrapper::read(MsgT &dest) noexcept(true) {
149 if (MsgT::has_static_size) {
150 assert(dest.length()==sizeof(MsgT));
151 BOOST_MPL_ASSERT_RELATION(sizeof(MsgT), <=, SSIZE_MAX);
152 return read_msg(dest, sizeof(MsgT));
153 } else {
154 const bool eof=read_msg(dest, MsgT::header_t_size);
155 if (!eof) {
156 typename MsgT::Header_t const *hdr=reinterpret_cast<typename MsgT::Header_t const *>(&dest);
157 const std::size_t length=hdr->length();
158 const std::size_t header_t_sz=MsgT::header_t_size;
159 assert(length>=header_t_sz);
160 const std::size_t body_size=length-header_t_sz;
161 assert(body_size<=SSIZE_MAX);
162 return read_msg(dest, body_size);
163 } else {
164 return true;
165 }
166 }
167 }
168
169 template<class V, std::size_t SrcSz> inline bool
170 wrapper::read(V (& dest)[SrcSz]) noexcept(true) {
171 BOOST_MPL_ASSERT_RELATION(sizeof(V)*SrcSz, <=, SSIZE_MAX);
172 return read_msg(dest, sizeof(V)*SrcSz);
173 }
174
175 template<class MsgDetails, class V, std::size_t N> inline bool
176 wrapper::read(std::array<V, N> &buff) noexcept(false) {
177 using msg_details_t=MsgDetails;
178
179 BOOST_MPL_ASSERT_RELATION(msg_details_t::max_msg_size, >=, msg_details_t::header_t_size);
180
181 if (!read_msg(*buff.data(), msg_details_t::header_t_size)) {
182 typename msg_details_t::Header_t const *hdr=reinterpret_cast<typename msg_details_t::Header_t const *>(buff.data());
183 const std::size_t length=hdr->length();
184 assert(length>=msg_details_t::header_t_size);
185 const std::size_t body_size=length-msg_details_t::header_t_size;
186 assert(body_size<=SSIZE_MAX);
187 return read_msg(*std::next(buff.begin(), msg_details_t::header_t_size), body_size);
188 } else {
189 return true;
190 }
191 }
192
193 inline bool
194 wrapper::is_open() const noexcept(true) {
195 int error_code;
196 socklen_t error_code_size = sizeof(error_code);
197 return ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error_code, &error_code_size)==0;
198 }
199
200 inline std::string
201 wrapper::to_string() const noexcept(false) {
202 std::ostringstream ss;
203 ss
204 <<"socket_="<<socket_;
205 return ss.str();
206 }
207
208 inline std::ostream &
209 operator<<(std::ostream &os, wrapper const &ec) noexcept(false) {
210 os<<ec.to_string();
211 return os;
212 }
213
214 }
215
216 namespace client {
217
218 inline
219 wrapper::wrapper(type_t type, domain_t domain) noexcept(false)
220 : basic::wrapper(type, domain) {
221 }
222
223 inline void
224 wrapper::connect(char const *addr, uint16_t port) noexcept(false) {
225 ::sockaddr_in addr_in;
226 addr_in.sin_family=AF_INET;
227 ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
228 addr_in.sin_port=htons(port);
229 std::fill_n(addr_in.sin_zero, sizeof(addr_in.sin_zero), 0);
230 const int err=::connect(socket_, reinterpret_cast<::sockaddr const *>(&addr_in), sizeof(::sockaddr_in));
231 if (err==-1) {
232 const int err=errno;
233 close();
234 info::function fun(
235 __LINE__,
236 __FUNCTION__,
237 typeid(*this),
238 info::function::argument(_T("IPv4 address"), addr)
239 );
240 fun.add_arg(info::function::argument(_T("port"), port));
241 throw exception_t(_T("Unable to connect socket."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
242 }
243 }
244
245 }
246
247 namespace server {
248
249 class wrapper::recv_msg_ops_t {
250 public:
251 socket_t &skt_;
252 std::function<void()> fn;
253
254 template<class RecvProcMsgs> constexpr
255 recv_msg_ops_t(socket_t &skt, RecvProcMsgs const &f) noexcept(true)
256 : skt_(skt), fn(f) {
257 }
258 };
259
260 inline
261 wrapper::wrapper(char const *addr, uint16_t port, std::size_t min_message_size, std::size_t max_message_size, type_t type, domain_t domain) noexcept(false)
262 : basic::wrapper(type, domain) {
263 ::sockaddr_in addr_in;
264 addr_in.sin_family=AF_INET;
265 ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
266 addr_in.sin_port=htons(port);
267 std::fill_n(addr_in.sin_zero, sizeof(addr_in.sin_zero), 0);
268 const int bind_err=::bind(socket_, reinterpret_cast<::sockaddr const *>(&addr_in), sizeof(::sockaddr_in));
269 if (bind_err==-1) {
270 const int err=errno;
271 close();
272 info::function fun(
273 __LINE__,
274 __FUNCTION__,
275 typeid(*this),
276 info::function::argument(_T("IPv4 address"), addr)
277 );
278 fun.add_arg(info::function::argument(_T("port"), port));
279 fun.add_arg(info::function::argument(_T("domain_t"), domain));
280 fun.add_arg(info::function::argument(_T("type_t"), type));
281 throw exception_t(_T("Unable to bind socket."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
282 }
283 const int listen_err=::listen(socket_, max_backlog_connections);
284 if (listen_err==-1) {
285 const int err=errno;
286 close();
287 info::function fun(
288 __LINE__,
289 __FUNCTION__,
290 typeid(*this),
291 info::function::argument(_T("IPv4 address"), addr)
292 );
293 fun.add_arg(info::function::argument(_T("port"), port));
294 fun.add_arg(info::function::argument(_T("domain_t"), domain));
295 fun.add_arg(info::function::argument(_T("type_t"), type));
296 throw exception_t(_T("Unable to bind socket."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
297 }
298 base_t::set_options(min_message_size, max_message_size);
299 }
300
301 inline bool
302 wrapper::stopped() const noexcept(true) {
303 return static_cast<bool>(exited_);;
304 }
305
306 inline void
307 wrapper::stop() noexcept(true) {
308 exit_=true;
309 while(!stopped()) {
310 thread::thread_traits::sleep(0);
311 }
312 }
313
314 inline
315 wrapper::~wrapper() noexcept(true) {
316 stop();
317 }
318
319 inline void
320 wrapper::run() noexcept(true) {
321 class set_exited {
322 public:
323 explicit constexpr set_exited(std::atomic<bool> &exited) noexcept(true)
324 : exited_(exited) {
325 }
326 ~set_exited() noexcept(true) {
327 exited_=true;
328 }
329
330 private:
331 std::atomic<bool> &exited_;
332 };
333
334 const set_exited exited(exited_);
335 try {
336 while (!static_cast<bool>(exit_)) {
337 try {
338 ::pollfd fds={
339 socket_,
340 POLLIN|POLLPRI,
341 0
342 };
343 const int poll_ret=::poll(&fds, 1, 0);
344 if (poll_ret==-1) {
345 const int err=errno;
346 info::function fun(
347 __LINE__,
348 __FUNCTION__,
349 typeid(*this),
350 info::function::argument(_T("socket"), socket_)
351 );
352 throw exception_t(_T("Unable to accept a client connection, re-trying."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
353 } else if (poll_ret==0) {
354 thread::thread_traits::sleep(0);
355 continue;
356 } else {
357 assert(poll_ret==1);
358 ::sockaddr_in client_addr;
359 ::socklen_t len=sizeof(::sockaddr_in);
360 const int accept_skt=::accept(socket_, reinterpret_cast<::sockaddr *>(&client_addr), &len);
361 if (accept_skt==-1) {
362 const int err=errno;
363 info::function fun(
364 __LINE__,
365 __FUNCTION__,
366 typeid(*this),
367 info::function::argument(_T("socket"), socket_)
368 );
369 throw exception_t(_T("Unable to accept a client connection, re-trying."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
370 }
371 assert(len==sizeof(sockaddr_in));
372 assert(client_addr.sin_port!=0);
373 const std::shared_ptr<recv_msg_ops_t> recv_msg_ops_cpy(recv_msg_ops);
374 if (recv_msg_ops_cpy.get()) {
375 recv_msg_ops_cpy->skt_.socket()=accept_skt;
376 set_options(recv_msg_ops_cpy->skt_);
377 recv_msg_ops_cpy->fn();
378 }
379 }
380 } catch (std::exception const &e) {
381
382 ex=std::make_exception_ptr(e);
383 }
384 }
385 } catch (std::exception const &e) {
386
387 ex=std::make_exception_ptr(e);
388 }
389 }
390
391 inline void
392 wrapper::set_options(basic::wrapper &skt) const noexcept(true) {
393 std::size_t snd_msg_sz=0;
394 socklen_t buff_sz=sizeof(snd_msg_sz);
395 ::getsockopt(socket_, SOL_SOCKET, SO_SNDBUF, &snd_msg_sz, &buff_sz);
396 std::size_t rcv_msg_sz=0;
397 buff_sz=sizeof(rcv_msg_sz);
398 ::getsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &rcv_msg_sz, &buff_sz);
399 std::size_t rcv_msg_lowat=0;
400 buff_sz=sizeof(rcv_msg_lowat);
401 ::getsockopt(socket_, SOL_SOCKET, SO_RCVLOWAT, &rcv_msg_lowat, &buff_sz);
402 skt.set_options(rcv_msg_lowat, std::max(snd_msg_sz, rcv_msg_sz));
403 }
404
405 template<class RecvProcMsgs> inline void
406 wrapper::async_accept(socket_t &skt, RecvProcMsgs const &f) noexcept(false) {
407 recv_msg_ops.reset(new recv_msg_ops_t(skt, f));
408 }
409
410 }
411
412 } } }