root/core/socket_wrapper_glibc_impl.hpp

/* [<][>][^][v][top][bottom][index][help] */

INCLUDED FROM


DEFINITIONS

This source file includes following definitions.
  1. set_options
  2. close
  3. write
  4. write
  5. read_msg
  6. read
  7. read
  8. read
  9. is_open
  10. to_string
  11. connect
  12. fn
  13. stopped
  14. stop
  15. run
  16. set_options
  17. async_accept

   1 /******************************************************************************
   2 ** $Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/wrapper_impl.hpp 1893 2016-02-28 00:16:27Z jmmcg $
   3 **
   4 ** Copyright (C) 2015 by J.M.McGuiness, coder@hussar.me.uk
   5 **
   6 ** This library is free software; you can redistribute it and/or
   7 ** modify it under the terms of the GNU Lesser General Public
   8 ** License as published by the Free Software Foundation; either
   9 ** version 2.1 of the License, or (at your option) any later version.
  10 **
  11 ** This library is distributed in the hope that it will be useful,
  12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14 ** Lesser General Public License for more details.
  15 **
  16 ** You should have received a copy of the GNU Lesser General Public
  17 ** License along with this library; if not, write to the Free Software
  18 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  19 */
  20 
  21 namespace jmmcg { namespace socket { namespace glibc {
  22 
  23 namespace basic {
  24 
  25 inline void
  26 wrapper::set_options(std::size_t min_message_size, std::size_t max_message_size) noexcept(true) {
  27         int val=1;
  28         ::setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
  29         ::setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
  30         ::setsockopt(socket_, SOL_SOCKET, SO_SNDBUF, &max_message_size, sizeof(max_message_size));
  31         ::setsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &max_message_size, sizeof(max_message_size));
  32         ::setsockopt(socket_, SOL_SOCKET, SO_RCVLOWAT, &min_message_size, sizeof(min_message_size));
  33         int priority=socket_priority;
  34         ::setsockopt(socket_, SOL_SOCKET, SO_PRIORITY, &priority, sizeof(priority));
  35 }
  36 
  37 inline void
  38 wrapper::close() noexcept(true) {
  39         ::shutdown(socket_, SHUT_RDWR);
  40         ::close(socket_);
  41 }
  42 
  43 inline
  44 wrapper::wrapper(type_t type, domain_t domain) noexcept(false)
  45 : socket_(::socket(domain, type|SOCK_CLOEXEC, 0)) {
  46         if (socket_==-1) {
  47                 const int err=errno;
  48                 info::function fun(
  49                         __LINE__,
  50                         __FUNCTION__,
  51                         typeid(*this),
  52                         info::function::argument(_T("domain_t"), domain)
  53                 );
  54                 fun.add_arg(info::function::argument(_T("type_t"), type));
  55                 throw exception_t(_T("Unable to create "), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
  56         }
  57         // https://stackoverflow.com/questions/8829238/how-can-i-trap-a-signal-sigpipe-for-a-socket-that-closes#8829311
  58         const auto err=::signal(SIGPIPE, SIG_IGN);
  59         if (err==SIG_ERR) {
  60                 const int err=errno;
  61                 close();
  62                 info::function fun(
  63                         __LINE__,
  64                         __FUNCTION__,
  65                         typeid(*this),
  66                         info::function::argument(_T("domain_t"), domain)
  67                 );
  68                 fun.add_arg(info::function::argument(_T("type_t"), type));
  69                 fun.add_arg(info::function::argument(_T("socket_type"), socket_));
  70                 throw exception_t(_T("Could not ignore SIGPIPE "), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
  71         }
  72 }
  73 
  74 inline
  75 wrapper::~wrapper() noexcept(true) {
  76         close();
  77 }
  78 
  79 template<class MsgT> inline void
  80 wrapper::write(MsgT const &message) noexcept(true) {
  81         const typename thread_traits::cancellability set;
  82         if (MsgT::has_static_size) {
  83                 assert(message.length()==sizeof(MsgT));
  84                 const ssize_t poss_err=::write(socket_, &message, 0);
  85                 if (poss_err==0) {
  86                         [[maybe_unused]] const ssize_t bytes_written=::write(socket_, &message, sizeof(MsgT));
  87                         assert(bytes_written>=0);
  88                         assert(static_cast<std::size_t>(bytes_written)>=sizeof(MsgT));
  89                 } else {
  90                         assert(poss_err==-1);
  91                         assert(!"TODO Error in writing to the socket!");
  92                 }
  93         } else {
  94                 const ssize_t poss_err=::write(socket_, &message, 0);
  95                 if (poss_err==0) {
  96                         [[maybe_unused]] const ssize_t bytes_written=::write(socket_, &message, message.length());
  97                         assert(bytes_written>=0);
  98                         assert(static_cast<std::size_t>(bytes_written)<=sizeof(MsgT));
  99                         assert(static_cast<std::size_t>(bytes_written)>=message.length());
 100                 } else {
 101                         assert(poss_err==-1);
 102                         assert(!"TODO Error in writing to the socket!");
 103                 }
 104         }
 105 }
 106 
 107 template<class V, std::size_t N> inline void
 108 wrapper::write(std::array<V, N> const &message) noexcept(true) {
 109         const typename thread_traits::cancellability set;
 110         const ssize_t poss_err=::write(socket_, &message, 0);
 111         if (poss_err==0) {
 112                 [[maybe_unused]] const ssize_t bytes_written=::write(socket_, message.data(), sizeof(V)*N);
 113                 assert(bytes_written>=0);
 114                 assert(static_cast<std::size_t>(bytes_written)>=sizeof(V)*N);
 115         } else {
 116                 assert(poss_err==-1);
 117                 assert(!"TODO Error in writing to the socket!");
 118         }
 119 }
 120 
 121 template<class MsgT> inline bool
 122 wrapper::read_msg(MsgT &dest, std::size_t msg_size) noexcept(true) {
 123         const typename thread_traits::cancellability set;
 124         std::size_t bytes_read=0;
 125         while (bytes_read<msg_size) {
 126                 const ssize_t ret_code=::read(socket_, &dest+bytes_read, msg_size-bytes_read);
 127                 if (LIKELY(ret_code>0)) {
 128                         assert((bytes_read+ret_code)<=msg_size);
 129                         bytes_read+=ret_code;
 130                 } else if (ret_code<0) {
 131                         const int err=errno;
 132                         switch (err) {
 133                                 case EAGAIN:
 134                                 case EINTR:
 135                                         continue;
 136                                 default:
 137                                         return true;
 138                         }
 139                 } else {
 140                         return true;
 141                 }
 142         }
 143         assert(bytes_read==msg_size);
 144         return false;
 145 }
 146 
 147 template<class MsgT> inline bool
 148 wrapper::read(MsgT &dest) noexcept(true) {
 149         if (MsgT::has_static_size) {
 150                 assert(dest.length()==sizeof(MsgT));
 151                 BOOST_MPL_ASSERT_RELATION(sizeof(MsgT), <=, SSIZE_MAX);
 152                 return read_msg(dest, sizeof(MsgT));
 153         } else {
 154                 const bool eof=read_msg(dest, MsgT::header_t_size);
 155                 if (!eof) {
 156                         typename MsgT::Header_t const *hdr=reinterpret_cast<typename MsgT::Header_t const *>(&dest);
 157                         const std::size_t length=hdr->length();
 158                         const std::size_t header_t_sz=MsgT::header_t_size;
 159                         assert(length>=header_t_sz);
 160                         const std::size_t body_size=length-header_t_sz;
 161                         assert(body_size<=SSIZE_MAX);
 162                         return read_msg(dest, body_size);
 163                 } else {
 164                         return true;
 165                 }
 166         }
 167 }
 168 
 169 template<class V, std::size_t SrcSz> inline bool
 170 wrapper::read(V (& dest)[SrcSz]) noexcept(true) {
 171         BOOST_MPL_ASSERT_RELATION(sizeof(V)*SrcSz, <=, SSIZE_MAX);
 172         return read_msg(dest, sizeof(V)*SrcSz);
 173 }
 174 
 175 template<class MsgDetails, class V, std::size_t N> inline bool
 176 wrapper::read(std::array<V, N> &buff) noexcept(false) {
 177         using msg_details_t=MsgDetails;
 178 
 179         BOOST_MPL_ASSERT_RELATION(msg_details_t::max_msg_size, >=, msg_details_t::header_t_size);
 180 
 181         if (!read_msg(*buff.data(), msg_details_t::header_t_size)) {
 182                 typename msg_details_t::Header_t const *hdr=reinterpret_cast<typename msg_details_t::Header_t const *>(buff.data());
 183                 const std::size_t length=hdr->length();
 184                 assert(length>=msg_details_t::header_t_size);
 185                 const std::size_t body_size=length-msg_details_t::header_t_size;
 186                 assert(body_size<=SSIZE_MAX);
 187                 return read_msg(*std::next(buff.begin(), msg_details_t::header_t_size), body_size);
 188         } else {
 189                 return true;
 190         }
 191 }
 192 
 193 inline bool
 194 wrapper::is_open() const noexcept(true) {
 195         int error_code;
 196         socklen_t error_code_size = sizeof(error_code);
 197         return ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error_code, &error_code_size)==0;
 198 }
 199 
 200 inline std::string
 201 wrapper::to_string() const noexcept(false) {
 202         std::ostringstream ss;
 203         ss
 204                 <<"socket_="<<socket_;
 205         return ss.str();
 206 }
 207 
 208 inline std::ostream &
 209 operator<<(std::ostream &os, wrapper const &ec) noexcept(false) {
 210         os<<ec.to_string();
 211         return os;
 212 }
 213 
 214 }
 215 
 216 namespace client {
 217 
 218 inline
 219 wrapper::wrapper(type_t type, domain_t domain) noexcept(false)
 220 : basic::wrapper(type, domain) {
 221 }
 222 
 223 inline void
 224 wrapper::connect(char const *addr, uint16_t port) noexcept(false) {
 225         ::sockaddr_in addr_in;
 226         addr_in.sin_family=AF_INET;
 227         ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
 228         addr_in.sin_port=htons(port);
 229         std::fill_n(addr_in.sin_zero, sizeof(addr_in.sin_zero), 0);
 230         const int err=::connect(socket_, reinterpret_cast<::sockaddr const *>(&addr_in), sizeof(::sockaddr_in));
 231         if (err==-1) {
 232                 const int err=errno;
 233                 close();
 234                 info::function fun(
 235                         __LINE__,
 236                         __FUNCTION__,
 237                         typeid(*this),
 238                         info::function::argument(_T("IPv4 address"), addr)
 239                 );
 240                 fun.add_arg(info::function::argument(_T("port"), port));
 241                 throw exception_t(_T("Unable to connect socket."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
 242         }
 243 }
 244 
 245 }
 246 
 247 namespace server {
 248 
 249 class wrapper::recv_msg_ops_t {
 250 public:
 251         socket_t &skt_;
 252         std::function<void()> fn;
 253 
 254         template<class RecvProcMsgs> constexpr
 255         recv_msg_ops_t(socket_t &skt, RecvProcMsgs const &f) noexcept(true)
 256         : skt_(skt), fn(f) {
 257         }
 258 };
 259 
 260 inline
 261 wrapper::wrapper(char const *addr, uint16_t port, std::size_t min_message_size, std::size_t max_message_size, type_t type, domain_t domain) noexcept(false)
 262 : basic::wrapper(type, domain) {
 263         ::sockaddr_in addr_in;
 264         addr_in.sin_family=AF_INET;
 265         ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
 266         addr_in.sin_port=htons(port);
 267         std::fill_n(addr_in.sin_zero, sizeof(addr_in.sin_zero), 0);
 268         const int bind_err=::bind(socket_, reinterpret_cast<::sockaddr const *>(&addr_in), sizeof(::sockaddr_in));
 269         if (bind_err==-1) {
 270                 const int err=errno;
 271                 close();
 272                 info::function fun(
 273                         __LINE__,
 274                         __FUNCTION__,
 275                         typeid(*this),
 276                         info::function::argument(_T("IPv4 address"), addr)
 277                 );
 278                 fun.add_arg(info::function::argument(_T("port"), port));
 279                 fun.add_arg(info::function::argument(_T("domain_t"), domain));
 280                 fun.add_arg(info::function::argument(_T("type_t"), type));
 281                 throw exception_t(_T("Unable to bind socket."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
 282         }
 283         const int listen_err=::listen(socket_, max_backlog_connections);
 284         if (listen_err==-1) {
 285                 const int err=errno;
 286                 close();
 287                 info::function fun(
 288                         __LINE__,
 289                         __FUNCTION__,
 290                         typeid(*this),
 291                         info::function::argument(_T("IPv4 address"), addr)
 292                 );
 293                 fun.add_arg(info::function::argument(_T("port"), port));
 294                 fun.add_arg(info::function::argument(_T("domain_t"), domain));
 295                 fun.add_arg(info::function::argument(_T("type_t"), type));
 296                 throw exception_t(_T("Unable to bind socket."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
 297         }
 298         base_t::set_options(min_message_size, max_message_size);
 299 }
 300 
 301 inline bool
 302 wrapper::stopped() const noexcept(true) {
 303         return static_cast<bool>(exited_);;
 304 }
 305 
 306 inline void
 307 wrapper::stop() noexcept(true) {
 308         exit_=true;
 309         while(!stopped()) {
 310                 thread::thread_traits::sleep(0);
 311         }
 312 }
 313 
 314 inline
 315 wrapper::~wrapper() noexcept(true) {
 316         stop();
 317 }
 318 
 319 inline void
 320 wrapper::run() noexcept(true) {
 321         class set_exited {
 322         public:
 323                 explicit constexpr set_exited(std::atomic<bool> &exited) noexcept(true)
 324                 : exited_(exited) {
 325                 }
 326                 ~set_exited() noexcept(true) {
 327                         exited_=true;
 328                 }
 329 
 330         private:
 331                 std::atomic<bool> &exited_;
 332         };
 333 
 334         const set_exited exited(exited_);
 335         try {
 336                 while (!static_cast<bool>(exit_)) {
 337                         try {
 338                                 ::pollfd fds={
 339                                         socket_,
 340                                         POLLIN|POLLPRI,
 341                                         0
 342                                 };
 343                                 const int poll_ret=::poll(&fds, 1, 0);
 344                                 if (poll_ret==-1) {
 345                                         const int err=errno;
 346                                         info::function fun(
 347                                                 __LINE__,
 348                                                 __FUNCTION__,
 349                                                 typeid(*this),
 350                                                 info::function::argument(_T("socket"), socket_)
 351                                         );
 352                                         throw exception_t(_T("Unable to accept a client connection, re-trying."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
 353                                 } else if (poll_ret==0) {
 354                                         thread::thread_traits::sleep(0);
 355                                         continue;
 356                                 } else {
 357                                         assert(poll_ret==1);
 358                                         ::sockaddr_in client_addr;
 359                                         ::socklen_t len=sizeof(::sockaddr_in);
 360                                         const int accept_skt=::accept(socket_, reinterpret_cast<::sockaddr *>(&client_addr), &len);
 361                                         if (accept_skt==-1) {
 362                                                 const int err=errno;
 363                                                 info::function fun(
 364                                                         __LINE__,
 365                                                         __FUNCTION__,
 366                                                         typeid(*this),
 367                                                         info::function::argument(_T("socket"), socket_)
 368                                                 );
 369                                                 throw exception_t(_T("Unable to accept a client connection, re-trying."), fun, info::revision(_T("$Header: svn+ssh://jmmcg@svn.code.sf.net/p/libjmmcg/code/trunk/libjmmcg/core/dynamic_cast.hpp 1626 2015-06-06 13:46:22Z jmmcg $"), __FILE__), err);
 370                                         }
 371                                         assert(len==sizeof(sockaddr_in));
 372                                         assert(client_addr.sin_port!=0);
 373                                         const std::shared_ptr<recv_msg_ops_t> recv_msg_ops_cpy(recv_msg_ops);
 374                                         if (recv_msg_ops_cpy.get()) {
 375                                                 recv_msg_ops_cpy->skt_.socket()=accept_skt;
 376                                                 set_options(recv_msg_ops_cpy->skt_);
 377                                                 recv_msg_ops_cpy->fn();
 378                                         }
 379                                 }
 380                         } catch (std::exception const &e) {
 381                                 // TODO oops: could overwrite an error... Do we care?
 382                                 ex=std::make_exception_ptr(e);
 383                         }
 384                 }
 385         } catch (std::exception const &e) {
 386                 // TODO oops: could overwrite an error... Do we care?
 387                 ex=std::make_exception_ptr(e);
 388         }
 389 }
 390 
 391 inline void
 392 wrapper::set_options(basic::wrapper &skt) const noexcept(true) {
 393         std::size_t snd_msg_sz=0;
 394         socklen_t buff_sz=sizeof(snd_msg_sz);
 395         ::getsockopt(socket_, SOL_SOCKET, SO_SNDBUF, &snd_msg_sz, &buff_sz);
 396         std::size_t rcv_msg_sz=0;
 397         buff_sz=sizeof(rcv_msg_sz);
 398         ::getsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &rcv_msg_sz, &buff_sz);
 399         std::size_t rcv_msg_lowat=0;
 400         buff_sz=sizeof(rcv_msg_lowat);
 401         ::getsockopt(socket_, SOL_SOCKET, SO_RCVLOWAT, &rcv_msg_lowat, &buff_sz);
 402         skt.set_options(rcv_msg_lowat, std::max(snd_msg_sz, rcv_msg_sz));
 403 }
 404 
 405 template<class RecvProcMsgs> inline void
 406 wrapper::async_accept(socket_t &skt, RecvProcMsgs const &f) noexcept(false) {
 407         recv_msg_ops.reset(new recv_msg_ops_t(skt, f));
 408 }
 409 
 410 }
 411 
 412 } } }

/* [<][>][^][v][top][bottom][index][help] */