LCOV - code coverage report
Current view: top level - co - connection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 148 185 80.0 %
Date: 2018-01-09 16:37:03 Functions: 23 25 92.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "connection.h"
      22             : 
      23             : #include "buffer.h"
      24             : #include "connectionDescription.h"
      25             : #include "connectionListener.h"
      26             : #include "log.h"
      27             : #include "pipeConnection.h"
      28             : #include "rspConnection.h"
      29             : #include "socketConnection.h"
      30             : 
      31             : #ifdef _WIN32
      32             : #include "namedPipeConnection.h"
      33             : #endif
      34             : 
      35             : #include <co/exception.h>
      36             : 
      37             : #ifdef COLLAGE_USE_OFED
      38             : #include "rdmaConnection.h"
      39             : #endif
      40             : #ifdef COLLAGE_USE_UDT
      41             : #include "udtConnection.h"
      42             : #endif
      43             : 
      44             : #include <lunchbox/scopedMutex.h>
      45             : 
      46             : #define STATISTICS
      47             : namespace co
      48             : {
      49             : namespace detail
      50             : {
      51             : class Connection
      52             : {
      53             : public:
      54             :     co::Connection::State state;          //!< The connection state
      55             :     ConnectionDescriptionPtr description; //!< The connection parameters
      56             : 
      57             :     /** The lock used to protect concurrent write calls. */
      58             :     mutable std::mutex sendLock;
      59             : 
      60             :     BufferPtr buffer; //!< Current async read buffer
      61             :     uint64_t bytes;   //!< Current read request size
      62             : 
      63             :     /** The listeners on state changes */
      64             :     ConnectionListeners listeners;
      65             : 
      66             :     uint64_t outBytes; //!< Statistic: written bytes
      67             :     uint64_t inBytes;  //!< Statistic: read bytes
      68             : 
      69         409 :     Connection()
      70         409 :         : state(co::Connection::STATE_CLOSED)
      71         409 :         , description(new ConnectionDescription)
      72             :         , bytes(0)
      73             :         , outBytes(0)
      74         818 :         , inBytes(0)
      75             :     {
      76         409 :         description->type = CONNECTIONTYPE_NONE;
      77         409 :     }
      78             : 
      79         401 :     ~Connection()
      80         401 :     {
      81         401 :         LBASSERT(state == co::Connection::STATE_CLOSED);
      82         401 :         state = co::Connection::STATE_CLOSED;
      83         401 :         description = 0;
      84             : 
      85         401 :         LBASSERTINFO(!buffer,
      86             :                      "Pending read operation during connection destruction");
      87         401 :     }
      88             : 
      89        1065 :     void fireStateChanged(co::Connection* connection)
      90             :     {
      91        3322 :         for (ConnectionListeners::const_iterator i = listeners.begin();
      92        2214 :              i != listeners.end(); ++i)
      93             :         {
      94          43 :             (*i)->notifyStateChanged(connection);
      95             :         }
      96        1064 :     }
      97             : };
      98             : }
      99             : 
     100         409 : Connection::Connection()
     101         409 :     : _impl(new detail::Connection)
     102             : {
     103         409 :     LBVERB << "New Connection @" << (void*)this << std::endl;
     104         409 : }
     105             : 
     106         802 : Connection::~Connection()
     107             : {
     108         401 :     LBVERB << "Delete Connection @" << (void*)this << std::endl;
     109             : #ifdef STATISTICS
     110         401 :     if (_impl->outBytes > LB_1MB || _impl->inBytes > LB_1MB)
     111         102 :         LBINFO << *this << ": " << (_impl->outBytes >> 20) << " MB out, "
     112         102 :                << (_impl->inBytes >> 20) << " MB in" << std::endl;
     113             : #endif
     114         401 :     delete _impl;
     115         401 : }
     116             : 
     117           0 : bool Connection::operator==(const Connection& rhs) const
     118             : {
     119           0 :     if (this == &rhs)
     120           0 :         return true;
     121           0 :     if (_impl->description->type != CONNECTIONTYPE_PIPE)
     122           0 :         return false;
     123           0 :     Connection* pipe = const_cast<Connection*>(this);
     124           0 :     return pipe->acceptSync().get() == &rhs;
     125             : }
     126             : 
     127          99 : ConnectionPtr Connection::create(ConnectionDescriptionPtr description)
     128             : {
     129         198 :     ConnectionPtr connection;
     130          99 :     switch (description->type)
     131             :     {
     132             :     case CONNECTIONTYPE_TCPIP:
     133          95 :         connection = new SocketConnection;
     134          95 :         break;
     135             : 
     136             :     case CONNECTIONTYPE_PIPE:
     137           3 :         connection = new PipeConnection;
     138           3 :         break;
     139             : 
     140             : #ifdef _WIN32
     141             :     case CONNECTIONTYPE_NAMEDPIPE:
     142             :         connection = new NamedPipeConnection;
     143             :         break;
     144             : #endif
     145             : 
     146             :     case CONNECTIONTYPE_RSP:
     147           0 :         connection = new RSPConnection;
     148           0 :         break;
     149             : 
     150             : #ifdef COLLAGE_USE_OFED
     151             :     case CONNECTIONTYPE_RDMA:
     152           0 :         connection = new RDMAConnection;
     153           0 :         break;
     154             : #endif
     155             : #ifdef COLLAGE_USE_UDT
     156             :     case CONNECTIONTYPE_UDT:
     157             :         connection = new UDTConnection;
     158             :         break;
     159             : #endif
     160             : 
     161             :     default:
     162           3 :         LBWARN << "Connection type " << description->type << " not supported"
     163           3 :                << std::endl;
     164           1 :         return 0;
     165             :     }
     166             : 
     167          98 :     if (description->bandwidth == 0)
     168          75 :         description->bandwidth = connection->getDescription()->bandwidth;
     169             : 
     170          98 :     connection->_setDescription(description);
     171          98 :     return connection;
     172             : }
     173             : 
     174     3271994 : Connection::State Connection::getState() const
     175             : {
     176     3271994 :     return _impl->state;
     177             : }
     178             : 
     179          98 : void Connection::_setDescription(ConnectionDescriptionPtr description)
     180             : {
     181          98 :     LBASSERT(description.isValid());
     182          98 :     LBASSERTINFO(_impl->description->type == description->type,
     183             :                  "Wrong connection type in description");
     184          98 :     _impl->description = description;
     185          98 :     LBASSERT(description->bandwidth > 0);
     186          98 : }
     187             : 
     188        1117 : void Connection::_setState(const State state)
     189             : {
     190        1117 :     if (_impl->state == state)
     191          52 :         return;
     192        1065 :     _impl->state = state;
     193        1065 :     _impl->fireStateChanged(this);
     194             : }
     195             : 
     196         177 : void Connection::lockSend() const
     197             : {
     198         177 :     _impl->sendLock.lock();
     199         177 : }
     200             : 
     201         177 : void Connection::unlockSend() const
     202             : {
     203         177 :     _impl->sendLock.unlock();
     204         177 : }
     205             : 
     206         168 : void Connection::addListener(ConnectionListener* listener)
     207             : {
     208         168 :     _impl->listeners.push_back(listener);
     209         169 : }
     210             : 
     211         168 : void Connection::removeListener(ConnectionListener* listener)
     212             : {
     213             :     ConnectionListeners::iterator i =
     214         168 :         find(_impl->listeners.begin(), _impl->listeners.end(), listener);
     215         168 :     if (i != _impl->listeners.end())
     216         168 :         _impl->listeners.erase(i);
     217         168 : }
     218             : 
     219             : //----------------------------------------------------------------------
     220             : // read
     221             : //----------------------------------------------------------------------
     222     2719507 : void Connection::recvNB(BufferPtr buffer, const uint64_t bytes)
     223             : {
     224     2719507 :     LBASSERT(!_impl->buffer);
     225     2719507 :     LBASSERT(_impl->bytes == 0);
     226     2719507 :     LBASSERT(buffer);
     227     2719507 :     LBASSERT(bytes > 0);
     228     2719507 :     LBASSERTINFO(bytes < LB_BIT48, "Out-of-sync network stream: read size "
     229             :                                        << bytes << "?");
     230             : 
     231     2719507 :     _impl->buffer = buffer;
     232     2719507 :     _impl->bytes = bytes;
     233     2719507 :     buffer->reserve(buffer->getSize() + bytes);
     234     2719507 :     readNB(buffer->getData() + buffer->getSize(), bytes);
     235     2719507 : }
     236             : 
     237     2719393 : bool Connection::recvSync(BufferPtr& outBuffer, const bool block)
     238             : {
     239     2719393 :     LBASSERTINFO(_impl->buffer, "No pending receive on "
     240             :                                     << getDescription()->toString());
     241             : 
     242             :     // reset async IO data
     243     2719393 :     outBuffer = _impl->buffer;
     244     2719393 :     const uint64_t bytes = _impl->bytes;
     245     2719393 :     _impl->buffer = 0;
     246     2719393 :     _impl->bytes = 0;
     247             : 
     248     2719393 :     if (_impl->state != STATE_CONNECTED || !outBuffer || bytes == 0)
     249          33 :         return false;
     250     2719360 :     LBASSERTINFO(bytes < LB_BIT48, "Out-of-sync network stream: read size "
     251             :                                        << bytes << "?");
     252             : #ifdef STATISTICS
     253     2719360 :     _impl->inBytes += bytes;
     254             : #endif
     255             : 
     256             :     // 'Iterators' for receive loop
     257     2719360 :     uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
     258     2719360 :     uint64_t bytesLeft = bytes;
     259     2719360 :     int64_t got = readSync(ptr, bytesLeft, block);
     260             : 
     261             :     // WAR: fluke notification: On Win32, we get occasionally a data
     262             :     // notification and then deadlock when reading from the connection. The
     263             :     // callee (Node::handleData) will flag the first read, the underlying
     264             :     // SocketConnection will not block and we will restore the AIO operation if
     265             :     // no data was present.
     266     2719360 :     if (got == READ_TIMEOUT)
     267             :     {
     268           0 :         _impl->buffer = outBuffer;
     269           0 :         _impl->bytes = bytes;
     270           0 :         outBuffer = 0;
     271           0 :         return true;
     272             :     }
     273             : 
     274             :     // From here on, receive loop until all data read or error
     275             :     while (true)
     276             :     {
     277     3016576 :         if (got < 0) // error
     278             :         {
     279          37 :             const uint64_t read = bytes - bytesLeft;
     280          37 :             outBuffer->resize(outBuffer->getSize() + read);
     281          37 :             if (bytes == bytesLeft)
     282          37 :                 LBDEBUG << "Read on dead connection" << std::endl;
     283             :             else
     284           0 :                 LBERROR << "Error during read after " << read << " bytes on "
     285           0 :                         << _impl->description << std::endl;
     286          37 :             return false;
     287             :         }
     288     3016539 :         else if (got == 0)
     289             :         {
     290             :             // ConnectionSet::select may report data on an 'empty' connection.
     291             :             // If we have nothing read so far, we have hit this case.
     292           0 :             if (bytes == bytesLeft)
     293           0 :                 return false;
     294           0 :             LBVERB << "Zero bytes read" << std::endl;
     295             :         }
     296     3016539 :         if (bytesLeft > static_cast<uint64_t>(got)) // partial read
     297             :         {
     298      297216 :             ptr += got;
     299      297216 :             bytesLeft -= got;
     300             : 
     301      297216 :             readNB(ptr, bytesLeft);
     302      297216 :             got = readSync(ptr, bytesLeft, true);
     303      297216 :             continue;
     304             :         }
     305             : 
     306             :         // read done
     307     2719323 :         LBASSERTINFO(static_cast<uint64_t>(got) == bytesLeft, got << " != "
     308             :                                                                   << bytesLeft);
     309             : 
     310     2719323 :         outBuffer->resize(outBuffer->getSize() + bytes);
     311             : #ifndef NDEBUG
     312     2719323 :         if (bytes <= 1024 && (lunchbox::Log::topics & LOG_PACKETS))
     313             :         {
     314           0 :             ptr -= (bytes - bytesLeft); // rewind
     315           0 :             LBINFO << "recv:" << lunchbox::format(ptr, bytes) << std::endl;
     316             :         }
     317             : #endif
     318     2719323 :         return true;
     319      297216 :     }
     320             : 
     321             :     LBUNREACHABLE;
     322             :     return true;
     323             : }
     324             : 
     325         217 : BufferPtr Connection::resetRecvData()
     326             : {
     327         217 :     BufferPtr buffer = _impl->buffer;
     328         217 :     _impl->buffer = 0;
     329         217 :     _impl->bytes = 0;
     330         217 :     return buffer;
     331             : }
     332             : 
     333             : //----------------------------------------------------------------------
     334             : // write
     335             : //----------------------------------------------------------------------
     336     2968123 : bool Connection::send(const void* buffer, const uint64_t bytes,
     337             :                       const bool isLocked)
     338             : {
     339             : #ifdef STATISTICS
     340     2968123 :     _impl->outBytes += bytes;
     341             : #endif
     342     2968123 :     LBASSERT(bytes > 0);
     343     2964517 :     if (bytes == 0)
     344           0 :         return true;
     345             : 
     346     2964517 :     const uint8_t* ptr = static_cast<const uint8_t*>(buffer);
     347             : 
     348             :     // possible OPT: We need to lock here to guarantee an atomic transmission of
     349             :     // the buffer. Possible improvements are:
     350             :     // 1) Disassemble buffer into 'small enough' pieces and use a header to
     351             :     //    reassemble correctly on the other side (aka reliable UDP)
     352             :     // 2) Introduce a send thread with a thread-safe task queue
     353     5930227 :     lunchbox::ScopedWrite mutex(isLocked ? 0 : &_impl->sendLock);
     354             : 
     355             : #ifndef NDEBUG
     356     2970818 :     if (bytes <= 1024 && (lunchbox::Log::topics & LOG_PACKETS))
     357           0 :         LBINFO << "send:" << lunchbox::format(ptr, bytes) << std::endl;
     358             : #endif
     359             : 
     360     2970818 :     uint64_t bytesLeft = bytes;
     361     8902188 :     while (bytesLeft)
     362             :     {
     363             :         try
     364             :         {
     365     2970793 :             const int64_t wrote = this->write(ptr, bytesLeft);
     366     2965685 :             if (wrote == -1) // error
     367             :             {
     368           0 :                 LBERROR << "Error during write after " << bytes - bytesLeft
     369           0 :                         << " bytes, closing " << *this << std::endl;
     370           0 :                 close();
     371           0 :                 return false;
     372             :             }
     373     2965685 :             else if (wrote == 0)
     374           0 :                 LBINFO << "Zero bytes write" << std::endl;
     375             : 
     376     2965685 :             bytesLeft -= wrote;
     377     2965685 :             ptr += wrote;
     378             :         }
     379           0 :         catch (const co::Exception& e)
     380             :         {
     381           0 :             LBERROR << e.what() << " after " << bytes - bytesLeft
     382           0 :                     << " bytes, closing connection" << std::endl;
     383           0 :             close();
     384           0 :             return false;
     385             :         }
     386             :     }
     387     2965710 :     return true;
     388             : }
     389             : 
     390         164 : bool Connection::isMulticast() const
     391             : {
     392         164 :     return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
     393             : }
     394             : 
     395         684 : ConstConnectionDescriptionPtr Connection::getDescription() const
     396             : {
     397         684 :     return _impl->description;
     398             : }
     399             : 
     400         496 : ConnectionDescriptionPtr Connection::_getDescription()
     401             : {
     402         496 :     return _impl->description;
     403             : }
     404             : 
     405         112 : std::ostream& operator<<(std::ostream& os, const Connection& connection)
     406             : {
     407         112 :     const Connection::State state = connection.getState();
     408         224 :     ConstConnectionDescriptionPtr desc = connection.getDescription();
     409             : 
     410         224 :     os << lunchbox::className(connection) << " " << (void*)&connection
     411             :        << " state "
     412             :        << (state == Connection::STATE_CLOSED
     413             :                ? "closed"
     414             :                : state == Connection::STATE_CONNECTING
     415           0 :                      ? "connecting"
     416             :                      : state == Connection::STATE_CONNECTED
     417           0 :                            ? "connected"
     418             :                            : state == Connection::STATE_LISTENING
     419           0 :                                  ? "listening"
     420             :                                  : state == Connection::STATE_CLOSING
     421             :                                        ? "closing"
     422         224 :                                        : "UNKNOWN");
     423         112 :     if (desc.isValid())
     424         112 :         os << " description " << desc->toString();
     425             : 
     426         224 :     return os;
     427             : }
     428          63 : }

Generated by: LCOV version 1.11