LCOV - code coverage report
Current view: top level - co - connection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 147 177 83.1 %
Date: 2015-11-03 13:48:53 Functions: 23 25 92.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "connection.h"
      22             : 
      23             : #include "buffer.h"
      24             : #include "connectionDescription.h"
      25             : #include "connectionListener.h"
      26             : #include "log.h"
      27             : #include "pipeConnection.h"
      28             : #include "socketConnection.h"
      29             : #include "rspConnection.h"
      30             : 
      31             : #ifdef _WIN32
      32             : #  include "namedPipeConnection.h"
      33             : #endif
      34             : 
      35             : #include <co/exception.h>
      36             : 
      37             : #ifdef COLLAGE_USE_OFED
      38             : #  include "rdmaConnection.h"
      39             : #endif
      40             : #ifdef COLLAGE_USE_UDT
      41             : #  include "udtConnection.h"
      42             : #endif
      43             : 
      44             : #include <lunchbox/scopedMutex.h>
      45             : #include <lunchbox/stdExt.h>
      46             : 
      47             : //#define STATISTICS
      48             : #ifdef STATISTICS
      49             : typedef std::map< uint64_t, size_t > Histogram;
      50             : typedef Histogram::const_iterator HistogramCIter;
      51             : lunchbox::Lockable< Histogram, lunchbox::SpinLock > _histogram;
      52             : 
      53             : #  define ADD_STATISTIC( x )                            \
      54             :     {                                                   \
      55             :         lunchbox::ScopedFastWrite mutex( _histogram );  \
      56             :         ++(*_histogram)[ x ];                           \
      57             :     }
      58             : #  define DUMP_STATISTIC                                                \
      59             :     {                                                                   \
      60             :         lunchbox::ScopedFastRead mutex( _histogram );                   \
      61             :         LBINFO << lunchbox::disableFlush << lunchbox::disableHeader;    \
      62             :         for( HistogramCIter i=_histogram->begin(); i!=_histogram->end(); ++i) \
      63             :             LBINFO << i->first << ", " << i->second << std::endl;       \
      64             :         LBINFO << lunchbox::enableHeader << lunchbox::enableFlush <<std::endl; \
      65             :     }
      66             : #else
      67             : #  define ADD_STATISTIC( x )
      68             : #  define DUMP_STATISTIC
      69             : #endif
      70             : 
      71             : namespace co
      72             : {
      73             : namespace detail
      74             : {
      75             : class Connection
      76             : {
      77             : public:
      78             :     co::Connection::State state; //!< The connection state
      79             :     ConnectionDescriptionPtr description; //!< The connection parameters
      80             : 
      81             :     /** The lock used to protect concurrent write calls. */
      82             :     mutable lunchbox::Lock sendLock;
      83             : 
      84             :     BufferPtr buffer; //!< Current async read buffer
      85             :     uint64_t bytes; //!< Current read request size
      86             : 
      87             :     /** The listeners on state changes */
      88             :     ConnectionListeners listeners;
      89             : 
      90         420 :     Connection()
      91             :             : state( co::Connection::STATE_CLOSED )
      92         420 :             , description( new ConnectionDescription )
      93         840 :             , bytes( 0 )
      94             :     {
      95         420 :         description->type = CONNECTIONTYPE_NONE;
      96         420 :     }
      97             : 
      98         412 :     ~Connection()
      99         412 :     {
     100         412 :         LBASSERT( state == co::Connection::STATE_CLOSED );
     101         412 :         state = co::Connection::STATE_CLOSED;
     102         412 :         description = 0;
     103             : 
     104         412 :         LBASSERTINFO( !buffer,
     105             :                       "Pending read operation during connection destruction" );
     106         412 :     }
     107             : 
     108        1110 :     void fireStateChanged( co::Connection* connection )
     109             :     {
     110        3459 :         for( ConnectionListeners::const_iterator i= listeners.begin();
     111        2306 :              i != listeners.end(); ++i )
     112             :         {
     113          43 :             (*i)->notifyStateChanged( connection );
     114             :         }
     115        1110 :     }
     116             : };
     117             : }
     118             : 
     119         420 : Connection::Connection()
     120         420 :         : _impl( new detail::Connection )
     121             : {
     122         420 :     LBVERB << "New Connection @" << (void*)this << std::endl;
     123         420 : }
     124             : 
     125         824 : Connection::~Connection()
     126             : {
     127         412 :     delete _impl;
     128         412 :     LBVERB << "Delete Connection @" << (void*)this << std::endl;
     129             :     DUMP_STATISTIC;
     130         412 : }
     131             : 
     132           0 : bool Connection::operator == ( const Connection& rhs ) const
     133             : {
     134           0 :     if( this == &rhs )
     135           0 :         return true;
     136           0 :     if( _impl->description->type != CONNECTIONTYPE_PIPE )
     137           0 :         return false;
     138           0 :     Connection* pipe = const_cast< Connection* >( this );
     139           0 :     return pipe->acceptSync().get() == &rhs;
     140             : }
     141             : 
     142         104 : ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
     143             : {
     144         104 :     ConnectionPtr connection;
     145         104 :     switch( description->type )
     146             :     {
     147             :         case CONNECTIONTYPE_TCPIP:
     148             :         case CONNECTIONTYPE_SDP:
     149          94 :             connection = new SocketConnection( description->type );
     150          94 :             break;
     151             : 
     152             :         case CONNECTIONTYPE_PIPE:
     153           3 :             connection = new PipeConnection;
     154           3 :             break;
     155             : 
     156             : #ifdef _WIN32
     157             :         case CONNECTIONTYPE_NAMEDPIPE:
     158             :             connection = new NamedPipeConnection;
     159             :             break;
     160             : #endif
     161             : 
     162             :         case CONNECTIONTYPE_RSP:
     163           2 :             connection = new RSPConnection;
     164           2 :             break;
     165             : 
     166             : #ifdef COLLAGE_USE_OFED
     167             :         case CONNECTIONTYPE_RDMA:
     168           4 :             connection = new RDMAConnection;
     169           4 :             break;
     170             : #endif
     171             : #ifdef COLLAGE_USE_UDT
     172             :         case CONNECTIONTYPE_UDT:
     173             :             connection = new UDTConnection;
     174             :             break;
     175             : #endif
     176             : 
     177             :         default:
     178           3 :             LBWARN << "Connection type " << description->type
     179           3 :                    << " not supported" << std::endl;
     180           1 :             return 0;
     181             :     }
     182             : 
     183         103 :     if( description->bandwidth == 0 )
     184          76 :         description->bandwidth = connection->getDescription()->bandwidth;
     185             : 
     186         103 :     connection->_setDescription( description );
     187         103 :     return connection;
     188             : }
     189             : 
     190     4110906 : Connection::State Connection::getState() const
     191             : {
     192     4110906 :     return _impl->state;
     193             : }
     194             : 
     195         105 : void Connection::_setDescription( ConnectionDescriptionPtr description )
     196             : {
     197         105 :     LBASSERT( description.isValid( ));
     198         105 :     LBASSERTINFO( _impl->description->type == description->type,
     199             :                   "Wrong connection type in description" );
     200         105 :     _impl->description = description;
     201         105 :     LBASSERT( description->bandwidth > 0 );
     202         105 : }
     203             : 
     204        1167 : void Connection::_setState( const State state )
     205             : {
     206        1167 :     if( _impl->state == state )
     207        1222 :         return;
     208        1111 :     _impl->state = state;
     209        1111 :     _impl->fireStateChanged( this );
     210             : }
     211             : 
     212         183 : void Connection::lockSend() const
     213             : {
     214         183 :     _impl->sendLock.set();
     215         183 : }
     216             : 
     217         183 : void Connection::unlockSend() const
     218             : {
     219         183 :     _impl->sendLock.unset();
     220         183 : }
     221             : 
     222         166 : void Connection::addListener( ConnectionListener* listener )
     223             : {
     224         166 :     _impl->listeners.push_back( listener );
     225         166 : }
     226             : 
     227         165 : void Connection::removeListener( ConnectionListener* listener )
     228             : {
     229             :     ConnectionListeners::iterator i = find( _impl->listeners.begin(),
     230         165 :                                             _impl->listeners.end(), listener );
     231         165 :     if( i != _impl->listeners.end( ))
     232         165 :         _impl->listeners.erase( i );
     233         165 : }
     234             : 
     235             : //----------------------------------------------------------------------
     236             : // read
     237             : //----------------------------------------------------------------------
     238     3044737 : void Connection::recvNB( BufferPtr buffer, const uint64_t bytes )
     239             : {
     240     3044737 :     LBASSERT( !_impl->buffer );
     241     3044737 :     LBASSERT( _impl->bytes == 0 );
     242     3044737 :     LBASSERT( buffer );
     243     3044737 :     LBASSERT( bytes > 0 );
     244     3044737 :     LBASSERTINFO( bytes < LB_BIT48,
     245             :                   "Out-of-sync network stream: read size " << bytes << "?" );
     246             : 
     247     3044737 :     _impl->buffer = buffer;
     248     3044737 :     _impl->bytes = bytes;
     249     3044737 :     buffer->reserve( buffer->getSize() + bytes );
     250     3044737 :     readNB( buffer->getData() + buffer->getSize(), bytes );
     251     3044737 : }
     252             : 
     253     3044625 : bool Connection::recvSync( BufferPtr& outBuffer, const bool block )
     254             : {
     255     3044625 :     LBASSERTINFO( _impl->buffer,
     256             :                   "No pending receive on " << getDescription()->toString( ));
     257             : 
     258             :     // reset async IO data
     259     3044625 :     outBuffer = _impl->buffer;
     260     3044625 :     const uint64_t bytes = _impl->bytes;
     261     3044625 :     _impl->buffer = 0;
     262     3044625 :     _impl->bytes = 0;
     263             : 
     264     3044625 :     if( _impl->state != STATE_CONNECTED || !outBuffer || bytes == 0 )
     265          33 :         return false;
     266     3044592 :     LBASSERTINFO( bytes < LB_BIT48,
     267             :                   "Out-of-sync network stream: read size " << bytes << "?" );
     268             : 
     269             :     // 'Iterators' for receive loop
     270     3044592 :     uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
     271     3044592 :     uint64_t bytesLeft = bytes;
     272     3044592 :     int64_t got = readSync( ptr, bytesLeft, block );
     273             : 
     274             :     // WAR: fluke notification: On Win32, we get occasionally a data
     275             :     // notification and then deadlock when reading from the connection. The
     276             :     // callee (Node::handleData) will flag the first read, the underlying
     277             :     // SocketConnection will not block and we will restore the AIO operation if
     278             :     // no data was present.
     279     3044592 :     if( got == READ_TIMEOUT )
     280             :     {
     281           0 :         _impl->buffer = outBuffer;
     282           0 :         _impl->bytes = bytes;
     283           0 :         outBuffer = 0;
     284           0 :         return true;
     285             :     }
     286             : 
     287             :     // From here on, receive loop until all data read or error
     288             :     while( true )
     289             :     {
     290     3272991 :         if( got < 0 ) // error
     291             :         {
     292          41 :             const uint64_t read = bytes - bytesLeft;
     293          41 :             outBuffer->resize( outBuffer->getSize() + read );
     294          41 :             if( bytes == bytesLeft )
     295          41 :                 LBDEBUG << "Read on dead connection" << std::endl;
     296             :             else
     297           0 :                 LBERROR << "Error during read after " << read << " bytes on "
     298           0 :                         << _impl->description << std::endl;
     299          41 :             return false;
     300             :         }
     301     3272950 :         else if( got == 0 )
     302             :         {
     303             :             // ConnectionSet::select may report data on an 'empty' connection.
     304             :             // If we have nothing read so far, we have hit this case.
     305           0 :             if( bytes == bytesLeft )
     306           0 :                 return false;
     307           0 :             LBVERB << "Zero bytes read" << std::endl;
     308             :         }
     309     3272950 :         if( bytesLeft > static_cast< uint64_t >( got )) // partial read
     310             :         {
     311      228399 :             ptr += got;
     312      228399 :             bytesLeft -= got;
     313             : 
     314      228399 :             readNB( ptr, bytesLeft );
     315      228399 :             got = readSync( ptr, bytesLeft, true );
     316      228399 :             continue;
     317             :         }
     318             : 
     319             :         // read done
     320     3044551 :         LBASSERTINFO( static_cast< uint64_t >( got ) == bytesLeft,
     321             :                       got << " != " << bytesLeft );
     322             : 
     323     3044551 :         outBuffer->resize( outBuffer->getSize() + bytes );
     324             : #ifndef NDEBUG
     325     3044551 :         if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
     326             :         {
     327           0 :             ptr -= (bytes - bytesLeft); // rewind
     328           0 :             LBINFO << "recv:" << lunchbox::format( ptr, bytes ) << std::endl;
     329             :         }
     330             : #endif
     331     3044551 :         return true;
     332             :     }
     333             : 
     334             :     LBUNREACHABLE;
     335      228399 :     return true;
     336             : }
     337             : 
     338         214 : BufferPtr Connection::resetRecvData()
     339             : {
     340         214 :     BufferPtr buffer = _impl->buffer;
     341         214 :     _impl->buffer = 0;
     342         214 :     _impl->bytes = 0;
     343         214 :     return buffer;
     344             : }
     345             : 
     346             : //----------------------------------------------------------------------
     347             : // write
     348             : //----------------------------------------------------------------------
     349     3295715 : bool Connection::send( const void* buffer, const uint64_t bytes,
     350             :                        const bool isLocked )
     351             : {
     352             :     ADD_STATISTIC( bytes );
     353     3295715 :     LBASSERT( bytes > 0 );
     354     3295114 :     if( bytes == 0 )
     355           0 :         return true;
     356             : 
     357     3295114 :     const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
     358             : 
     359             :     // possible OPT: We need to lock here to guarantee an atomic transmission of
     360             :     // the buffer. Possible improvements are:
     361             :     // 1) Disassemble buffer into 'small enough' pieces and use a header to
     362             :     //    reassemble correctly on the other side (aka reliable UDP)
     363             :     // 2) Introduce a send thread with a thread-safe task queue
     364     3295114 :     lunchbox::ScopedMutex<> mutex( isLocked ? 0 : &_impl->sendLock );
     365             : 
     366             : #ifndef NDEBUG
     367     3296579 :     if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
     368           0 :         LBINFO << "send:" << lunchbox::format( ptr, bytes ) << std::endl;
     369             : #endif
     370             : 
     371     3296579 :     uint64_t bytesLeft = bytes;
     372     9890094 :     while( bytesLeft )
     373             :     {
     374             :         try
     375             :         {
     376     3299163 :             const int64_t wrote = this->write( ptr, bytesLeft );
     377     3296936 :             if( wrote == -1 ) // error
     378             :             {
     379           0 :                 LBERROR << "Error during write after " << bytes - bytesLeft
     380           0 :                         << " bytes, closing connection" << std::endl;
     381           0 :                 close();
     382           0 :                 return false;
     383             :             }
     384     3296936 :             else if( wrote == 0 )
     385           0 :                 LBINFO << "Zero bytes write" << std::endl;
     386             : 
     387     3296936 :             bytesLeft -= wrote;
     388     3296936 :             ptr += wrote;
     389             :         }
     390           0 :         catch( const co::Exception& e )
     391             :         {
     392           0 :             LBERROR << e.what() << " after " << bytes - bytesLeft
     393           0 :                     << " bytes, closing connection" << std::endl;
     394           0 :             close();
     395           0 :             return false;
     396             :         }
     397             : 
     398             :     }
     399     3294352 :     return true;
     400             : }
     401             : 
     402         164 : bool Connection::isMulticast() const
     403             : {
     404         164 :     return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
     405             : }
     406             : 
     407       17338 : ConstConnectionDescriptionPtr Connection::getDescription() const
     408             : {
     409       17338 :     return _impl->description;
     410             : }
     411             : 
     412         518 : ConnectionDescriptionPtr Connection::_getDescription()
     413             : {
     414         518 :     return _impl->description;
     415             : }
     416             : 
     417          77 : std::ostream& operator << ( std::ostream& os, const Connection& connection )
     418             : {
     419          77 :     const Connection::State state = connection.getState();
     420          77 :     ConstConnectionDescriptionPtr desc = connection.getDescription();
     421             : 
     422         154 :     os << lunchbox::className( connection ) << " " << (void*)&connection
     423          77 :        << " state " << ( state == Connection::STATE_CLOSED     ? "closed" :
     424             :                          state == Connection::STATE_CONNECTING ? "connecting" :
     425             :                          state == Connection::STATE_CONNECTED  ? "connected" :
     426             :                          state == Connection::STATE_LISTENING  ? "listening" :
     427             :                          state == Connection::STATE_CLOSING    ? "closing" :
     428         154 :                          "UNKNOWN" );
     429          77 :     if( desc.isValid( ))
     430          77 :         os << " description " << desc->toString();
     431             : 
     432          77 :     return os;
     433             : }
     434          63 : }

Generated by: LCOV version 1.11