LCOV - code coverage report
Current view: top level - co - rspConnection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 1 908 0.1 %
Date: 2016-12-14 01:26:48 Functions: 2 50 4.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2009-2016, Cedric Stalder <cedric.stalder@gmail.com>
       3             :  *                          Stefan Eilemann <eile@equalizergraphics.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "rspConnection.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "log.h"
      28             : 
      29             : #include <lunchbox/rng.h>
      30             : #include <lunchbox/scopedMutex.h>
      31             : #include <lunchbox/sleep.h>
      32             : 
      33             : #include <boost/bind.hpp>
      34             : 
      35             : //#define CO_INSTRUMENT_RSP
      36             : #define CO_RSP_MERGE_WRITES
      37             : #define CO_RSP_MAX_TIMEOUTS 1000
      38             : #ifdef _WIN32
      39             : #  define CO_RSP_DEFAULT_PORT (4242)
      40             : #else
      41             : #  define CO_RSP_DEFAULT_PORT ( (getuid() % 64511) + 1024 )
      42             : #endif
      43             : 
      44             : 
      45             : // Note: Do not use version > 255, endianness detection magic relies on this.
      46             : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
      47             : 
      48             : namespace bp = boost::posix_time;
      49             : namespace ip = boost::asio::ip;
      50             : 
      51             : namespace co
      52             : {
      53             : 
      54             : namespace
      55             : {
      56             : #ifdef CO_INSTRUMENT_RSP
      57             : lunchbox::a_int32_t nReadData;
      58             : lunchbox::a_int32_t nBytesRead;
      59             : lunchbox::a_int32_t nBytesWritten;
      60             : lunchbox::a_int32_t nDatagrams;
      61             : lunchbox::a_int32_t nRepeated;
      62             : lunchbox::a_int32_t nMergedDatagrams;
      63             : lunchbox::a_int32_t nAckRequests;
      64             : lunchbox::a_int32_t nAcksSend;
      65             : lunchbox::a_int32_t nAcksSendTotal;
      66             : lunchbox::a_int32_t nAcksRead;
      67             : lunchbox::a_int32_t nAcksAccepted;
      68             : lunchbox::a_int32_t nNAcksSend;
      69             : lunchbox::a_int32_t nNAcksRead;
      70             : lunchbox::a_int32_t nNAcksResend;
      71             : 
      72             : float writeWaitTime = 0.f;
      73             : lunchbox::Clock instrumentClock;
      74             : #endif
      75             : 
      76             : static uint16_t _numBuffers = 0;
      77             : }
      78             : 
      79           0 : RSPConnection::RSPConnection()
      80             :     : _id( 0 )
      81             :     , _idAccepted( false )
      82           0 :     , _mtu( Global::getIAttribute( Global::IATTR_UDP_MTU ))
      83           0 :     , _ackFreq( Global::getIAttribute( Global::IATTR_RSP_ACK_FREQUENCY ))
      84           0 :     , _payloadSize( _mtu - sizeof( DatagramData ))
      85             :     , _timeouts( 0 )
      86           0 :     , _event( new EventConnection )
      87             :     , _read( 0 )
      88             :     , _write( 0 )
      89             :     , _timeout( _ioService )
      90             :     , _wakeup( _ioService )
      91           0 :     , _maxBucketSize( ( _mtu * _ackFreq) >> 1 )
      92             :     , _bucketSize( 0 )
      93             :     , _sendRate( 0 )
      94             :     , _thread( 0 )
      95           0 :     , _acked( std::numeric_limits< uint16_t >::max( ))
      96             :     , _threadBuffers( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS))
      97           0 :     , _recvBuffer( _mtu )
      98             :     , _readBuffer( 0 )
      99             :     , _readBufferPos( 0 )
     100             :     , _sequence( 0 )
     101             :     // ensure we have a handleConnectedTimeout before the write pop
     102           0 :     , _writeTimeOut( Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2 )
     103             : {
     104           0 :     _buildNewID();
     105           0 :     ConnectionDescriptionPtr description = _getDescription();
     106           0 :     description->type = CONNECTIONTYPE_RSP;
     107           0 :     description->bandwidth = 102400;
     108             : 
     109           0 :     LBCHECK( _event->connect( ));
     110             : 
     111           0 :     _buffers.reserve( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ));
     112           0 :     while( static_cast< int32_t >( _buffers.size( )) <
     113           0 :            Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ))
     114             :     {
     115           0 :         _buffers.push_back( new Buffer( _mtu ));
     116             :     }
     117             : 
     118           0 :     LBASSERT( sizeof( DatagramNack ) <= size_t( _mtu ));
     119           0 :     LBLOG( LOG_RSP ) << "New RSP connection, " << _buffers.size()
     120           0 :                      << " buffers of " << _mtu << " bytes" << std::endl;
     121             : 
     122           0 : }
     123             : 
     124           0 : RSPConnection::~RSPConnection()
     125             : {
     126           0 :     _close();
     127           0 :     while( !_buffers.empty( ))
     128             :     {
     129           0 :         delete _buffers.back();
     130           0 :         _buffers.pop_back();
     131             :     }
     132             : #ifdef CO_INSTRUMENT_RSP
     133             :     LBWARN << *this << std::endl;
     134             :     instrumentClock.reset();
     135             : #endif
     136           0 : }
     137             : 
     138           0 : void RSPConnection::_close()
     139             : {
     140           0 :     if( _parent.isValid() && _parent->_id == _id )
     141           0 :         _parent->close();
     142             : 
     143           0 :     while( !_parent && _isWriting( ))
     144           0 :         lunchbox::sleep( 10 /*ms*/ );
     145             : 
     146           0 :     if( isClosed( ))
     147           0 :         return;
     148             : 
     149           0 :     lunchbox::ScopedWrite mutex( _mutexEvent );
     150           0 :     if( _thread )
     151             :     {
     152           0 :         LBASSERT( !_thread->isCurrent( ));
     153           0 :         _sendSimpleDatagram( ID_EXIT, _id );
     154           0 :         _ioService.stop();
     155           0 :         _thread->join();
     156           0 :         delete _thread;
     157             :     }
     158             : 
     159           0 :     _setState( STATE_CLOSING );
     160           0 :     if( _thread )
     161             :     {
     162           0 :          _thread = 0;
     163             : 
     164             :         // notify children to close
     165           0 :         for( RSPConnectionsCIter i=_children.begin(); i !=_children.end(); ++i )
     166             :         {
     167           0 :             RSPConnectionPtr child = *i;
     168           0 :             lunchbox::ScopedWrite mutexChild( child->_mutexEvent );
     169           0 :             child->_appBuffers.push( 0 );
     170           0 :             child->_event->set();
     171             :         }
     172             : 
     173           0 :         _children.clear();
     174           0 :         _newChildren.clear();
     175             :     }
     176             : 
     177           0 :     _parent = 0;
     178             : 
     179           0 :     if( _read )
     180           0 :         _read->close();
     181           0 :     delete _read;
     182           0 :     _read = 0;
     183             : 
     184           0 :     if( _write )
     185           0 :         _write->close();
     186           0 :     delete _write;
     187           0 :     _write = 0;
     188             : 
     189           0 :     _threadBuffers.clear();
     190           0 :     _appBuffers.push( 0 ); // unlock any other read/write threads
     191             : 
     192           0 :     _setState( STATE_CLOSED );
     193             : 
     194           0 :     mutex.leave();
     195           0 :     _event->close();
     196             : }
     197             : 
     198             : //----------------------------------------------------------------------
     199             : // Async IO handles
     200             : //----------------------------------------------------------------------
     201           0 : uint16_t RSPConnection::_buildNewID()
     202             : {
     203           0 :     lunchbox::RNG rng;
     204           0 :     _id = rng.get< uint16_t >();
     205           0 :     return _id;
     206             : }
     207             : 
     208           0 : bool RSPConnection::listen()
     209             : {
     210           0 :     ConnectionDescriptionPtr description = _getDescription();
     211           0 :     LBASSERT( description->type == CONNECTIONTYPE_RSP );
     212             : 
     213           0 :     if( !isClosed( ))
     214           0 :         return false;
     215             : 
     216           0 :     _setState( STATE_CONNECTING );
     217           0 :     _numBuffers =  Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS );
     218             : 
     219             :     // init udp connection
     220           0 :     if( description->port == ConnectionDescription::RANDOM_MULTICAST_PORT )
     221           0 :         description->port = 0; // Let OS choose
     222           0 :     else if( description->port == 0 )
     223           0 :         description->port = CO_RSP_DEFAULT_PORT;
     224           0 :     if( description->getHostname().empty( ))
     225           0 :         description->setHostname( "239.255.42.43" );
     226           0 :     if( description->getInterface().empty( ))
     227           0 :         description->setInterface( "0.0.0.0" );
     228             : 
     229             :     try
     230             :     {
     231           0 :         const ip::address readAddress( ip::address::from_string( "0.0.0.0" ));
     232             :         const ip::udp::endpoint readEndpoint( readAddress,
     233           0 :                                               description->port );
     234           0 :         const std::string& port = std::to_string( unsigned( description->port));
     235           0 :         ip::udp::resolver resolver( _ioService );
     236           0 :         const ip::udp::resolver::query queryHN( ip::udp::v4(),
     237             :                                                 description->getHostname(),
     238           0 :                                                 port );
     239           0 :         const ip::udp::resolver::iterator hostname = resolver.resolve( queryHN);
     240             : 
     241           0 :         if( hostname == ip::udp::resolver::iterator( ))
     242           0 :             return false;
     243             : 
     244           0 :         const ip::udp::endpoint writeEndpoint = *hostname;
     245           0 :         const ip::address mcAddr( writeEndpoint.address( ));
     246             : 
     247           0 :         _read = new ip::udp::socket( _ioService );
     248           0 :         _write = new ip::udp::socket( _ioService );
     249           0 :         _read->open( readEndpoint.protocol( ));
     250           0 :         _write->open( writeEndpoint.protocol( ));
     251             : 
     252           0 :         _read->set_option( ip::udp::socket::reuse_address( true ));
     253           0 :         _write->set_option( ip::udp::socket::reuse_address( true ));
     254           0 :         _read->set_option( ip::udp::socket::receive_buffer_size(
     255           0 :                        Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
     256           0 :         _write->set_option( ip::udp::socket::send_buffer_size(
     257           0 :                        Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
     258             : 
     259           0 :         _read->bind( readEndpoint );
     260           0 :         description->port = _read->local_endpoint().port();
     261             : 
     262           0 :         const ip::udp::resolver::query queryIF( ip::udp::v4(),
     263             :                                                 description->getInterface(),
     264           0 :                                                 "0" );
     265             :         const ip::udp::resolver::iterator interfaceIP =
     266           0 :             resolver.resolve( queryIF );
     267             : 
     268           0 :         if( interfaceIP == ip::udp::resolver::iterator( ))
     269           0 :             return false;
     270             : 
     271           0 :         const ip::address ifAddr( ip::udp::endpoint( *interfaceIP ).address( ));
     272           0 :         LBDEBUG << "Joining " << mcAddr << " on " << ifAddr << std::endl;
     273             : 
     274           0 :         _read->set_option( ip::multicast::join_group( mcAddr.to_v4(),
     275           0 :                                                       ifAddr.to_v4( )));
     276           0 :         _write->set_option( ip::multicast::outbound_interface( ifAddr.to_v4()));
     277             : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
     278           0 :         const std::string& ifIP = ifAddr.to_string();
     279           0 :         ::setsockopt( _write->native(), SOL_SOCKET, SO_BINDTODEVICE,
     280           0 :                       ifIP.c_str(), ifIP.size() + 1 );
     281           0 :         ::setsockopt( _read->native(), SOL_SOCKET, SO_BINDTODEVICE,
     282           0 :                       ifIP.c_str(), ifIP.size() + 1 );
     283             : #endif
     284             : 
     285           0 :         _write->connect( writeEndpoint );
     286             : 
     287           0 :         _read->set_option( ip::multicast::enable_loopback( false ));
     288           0 :         _write->set_option( ip::multicast::enable_loopback( false ));
     289             :     }
     290           0 :     catch( const boost::system::system_error& e )
     291             :     {
     292           0 :         LBWARN << "can't setup underlying UDP connection: " << e.what()
     293           0 :                << std::endl;
     294           0 :         delete _read;
     295           0 :         delete _write;
     296           0 :         _read = 0;
     297           0 :         _write = 0;
     298           0 :         return false;
     299             :     }
     300             : 
     301             :     // init communication protocol thread
     302           0 :     _thread = new Thread( this );
     303           0 :     _bucketSize = 0;
     304           0 :     _sendRate = description->bandwidth;
     305             : 
     306             :     // waits until RSP protocol establishes connection to the multicast network
     307           0 :     if( !_thread->start( ) )
     308             :     {
     309           0 :         close();
     310           0 :         return false;
     311             :     }
     312             : 
     313             :     // Make all buffers available for writing
     314           0 :     LBASSERT( _appBuffers.isEmpty( ));
     315           0 :     _appBuffers.push( _buffers );
     316             : 
     317           0 :     LBDEBUG << "Listening on " << description->getHostname() << ":"
     318           0 :             << description->port << " (" << description->toString() << " @"
     319           0 :             << (void*)this << ")" << std::endl;
     320           0 :     return true;
     321             : }
     322             : 
     323           0 : ConnectionPtr RSPConnection::acceptSync()
     324             : {
     325           0 :     if( !isListening( ))
     326           0 :         return 0;
     327             : 
     328           0 :     lunchbox::ScopedWrite mutex( _mutexConnection );
     329           0 :     LBASSERT( !_newChildren.empty( ));
     330           0 :     if( _newChildren.empty( ))
     331           0 :         return 0;
     332             : 
     333           0 :     RSPConnectionPtr newConnection = _newChildren.back();
     334           0 :     _newChildren.pop_back();
     335             : 
     336           0 :     LBDEBUG << _id << " accepted RSP connection " << newConnection->_id
     337           0 :            << std::endl;
     338             : 
     339           0 :     lunchbox::ScopedWrite mutex2( _mutexEvent );
     340           0 :     if( _newChildren.empty() )
     341           0 :         _event->reset();
     342             :     else
     343           0 :         _event->set();
     344             : 
     345           0 :     return newConnection;
     346             : }
     347             : 
     348           0 : int64_t RSPConnection::readSync( void* buffer, const uint64_t bytes, const bool)
     349             : {
     350           0 :     LBASSERT( bytes > 0 );
     351           0 :     if( !isConnected ( ))
     352           0 :         return -1;
     353             : 
     354           0 :     uint64_t bytesLeft = bytes;
     355           0 :     uint8_t* ptr = reinterpret_cast< uint8_t* >( buffer );
     356             : 
     357             :     // redundant (done by the caller already), but saves some lock ops
     358           0 :     while( bytesLeft )
     359             :     {
     360           0 :         if( !_readBuffer )
     361             :         {
     362           0 :             LBASSERT( _readBufferPos == 0 );
     363           0 :             _readBuffer = _appBuffers.pop();
     364           0 :             if( !_readBuffer )
     365             :             {
     366           0 :                 close();
     367           0 :                 return (bytes == bytesLeft) ?
     368           0 :                     -1 : static_cast< int64_t >( bytes - bytesLeft );
     369             :             }
     370             :         }
     371             : 
     372             :         const DatagramData* header = reinterpret_cast< const DatagramData* >(
     373           0 :             _readBuffer->getData( ));
     374           0 :         const uint8_t* payload = reinterpret_cast< const uint8_t* >( header+1 );
     375           0 :         const size_t dataLeft = header->size - _readBufferPos;
     376           0 :         const size_t size = LB_MIN( static_cast< size_t >( bytesLeft ),
     377             :                                     dataLeft );
     378             : 
     379           0 :         memcpy( ptr, payload + _readBufferPos, size );
     380           0 :         _readBufferPos += size;
     381           0 :         ptr += size;
     382           0 :         bytesLeft -= size;
     383             : 
     384             :         // if all data in the buffer has been taken
     385           0 :         if( _readBufferPos >= header->size )
     386             :         {
     387           0 :             LBASSERT( _readBufferPos == header->size );
     388             :             //LBLOG( LOG_RSP ) << "reset read buffer  " << header->sequence
     389             :             //                 << std::endl;
     390             : 
     391           0 :             LBCHECK( _threadBuffers.push( _readBuffer ));
     392           0 :             _readBuffer = 0;
     393           0 :             _readBufferPos = 0;
     394             :         }
     395             :         else
     396             :         {
     397           0 :             LBASSERT( _readBufferPos < header->size );
     398             :         }
     399             :     }
     400             : 
     401           0 :     if( _readBuffer || !_appBuffers.isEmpty( ))
     402           0 :         _event->set();
     403             :     else
     404             :     {
     405           0 :         lunchbox::ScopedWrite mutex( _mutexEvent );
     406           0 :         if( _appBuffers.isEmpty( ))
     407           0 :             _event->reset();
     408             :     }
     409             : 
     410             : #ifdef CO_INSTRUMENT_RSP
     411             :     nBytesRead += bytes;
     412             : #endif
     413           0 :     return bytes;
     414             : }
     415             : 
     416           0 : void RSPConnection::Thread::run()
     417             : {
     418           0 :     _connection->_runThread();
     419           0 :     _connection = 0;
     420           0 :     LBDEBUG << "Left RSP protocol thread" << std::endl;
     421           0 : }
     422             : 
     423           0 : void RSPConnection::_handleTimeout( const boost::system::error_code& error )
     424             : {
     425           0 :     if( error == boost::asio::error::operation_aborted )
     426           0 :         return;
     427             : 
     428           0 :     if( isListening( ))
     429           0 :         _handleConnectedTimeout();
     430           0 :     else if( _idAccepted )
     431           0 :         _handleInitTimeout();
     432             :     else
     433           0 :         _handleAcceptIDTimeout();
     434             : }
     435             : 
     436           0 : void RSPConnection::_handleAcceptIDTimeout()
     437             : {
     438           0 :     ++_timeouts;
     439           0 :     if( _timeouts < 20 )
     440             :     {
     441           0 :         LBLOG( LOG_RSP ) << "Announce " << _id << " " << _timeouts << std::endl;
     442           0 :         _sendSimpleDatagram( ID_HELLO, _id );
     443             :     }
     444             :     else
     445             :     {
     446           0 :         LBLOG( LOG_RSP ) << "Confirm " << _id << std::endl;
     447           0 :         _sendSimpleDatagram( ID_CONFIRM, _id );
     448           0 :         _addConnection( _id, _sequence );
     449           0 :         _idAccepted = true;
     450           0 :         _timeouts = 0;
     451             :         // send a first datagram to announce me and discover all other
     452             :         // connections
     453           0 :         _sendCountNode();
     454             :     }
     455           0 :     _setTimeout( 10 );
     456           0 : }
     457             : 
     458           0 : void RSPConnection::_handleInitTimeout( )
     459             : {
     460           0 :     LBASSERT( !isListening( ))
     461           0 :     ++_timeouts;
     462           0 :     if( _timeouts < 20 )
     463           0 :         _sendCountNode();
     464             :     else
     465             :     {
     466           0 :         _setState( STATE_LISTENING );
     467           0 :         LBDEBUG << "RSP connection " << _id << " listening" << std::endl;
     468           0 :         _timeouts = 0;
     469           0 :         _ioService.stop(); // thread initialized, run restarts
     470             :     }
     471           0 :     _setTimeout( 10 );
     472           0 : }
     473             : 
     474           0 : void RSPConnection::_clearWriteQueues()
     475             : {
     476           0 :     while( !_threadBuffers.isEmpty( ))
     477             :     {
     478           0 :         Buffer* buffer = 0;
     479           0 :         _threadBuffers.pop( buffer );
     480           0 :         _writeBuffers.push_back( buffer );
     481             :     }
     482             : 
     483           0 :     _finishWriteQueue( _sequence - 1 );
     484           0 :     LBASSERT( _threadBuffers.isEmpty() && _writeBuffers.empty() );
     485           0 : }
     486             : 
     487           0 : void RSPConnection::_handleConnectedTimeout()
     488             : {
     489           0 :     if( !isListening( ))
     490             :     {
     491           0 :         _clearWriteQueues();
     492           0 :         _ioService.stop();
     493           0 :         return;
     494             :     }
     495             : 
     496           0 :     _processOutgoing();
     497             : 
     498           0 :     if( _timeouts >= CO_RSP_MAX_TIMEOUTS )
     499             :     {
     500           0 :         LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
     501           0 :         bool all = true;
     502           0 :         for( RSPConnectionsCIter i =_children.begin(); i !=_children.end(); ++i)
     503             :         {
     504           0 :             RSPConnectionPtr child = *i;
     505           0 :             if ( child->_acked >= _sequence - _numBuffers && child->_id != _id )
     506             :             {
     507           0 :                 all = false;
     508           0 :                 break;
     509             :             }
     510             :         }
     511             : 
     512             :         // if all connections failed we probably got disconnected -> close and
     513             :         // exit else close all failed child connections
     514           0 :         if ( all )
     515             :         {
     516           0 :             _sendSimpleDatagram( ID_EXIT, _id );
     517           0 :             _appBuffers.pushFront( 0 ); // unlock write function
     518             : 
     519           0 :             for( RSPConnectionsCIter i =_children.begin();
     520           0 :                  i !=_children.end(); ++i)
     521             :             {
     522           0 :                 RSPConnectionPtr child = *i;
     523           0 :                 child->_setState( STATE_CLOSING );
     524           0 :                 child->_appBuffers.push( 0 ); // unlock read func
     525             :             }
     526             : 
     527           0 :             _clearWriteQueues();
     528           0 :             _ioService.stop();
     529           0 :             return;
     530             :         }
     531             : 
     532           0 :         RSPConnectionsCIter i =_children.begin();
     533           0 :         while ( i !=_children.end() )
     534             :         {
     535           0 :             RSPConnectionPtr child = *i;
     536           0 :             if ( child->_acked < _sequence - 1 && _id != child->_id )
     537             :             {
     538           0 :                 _sendSimpleDatagram( ID_EXIT, child->_id );
     539           0 :                 _removeConnection( child->_id );
     540             :             }
     541             :             else
     542             :             {
     543           0 :                 uint16_t wb = static_cast<uint16_t>( _writeBuffers.size( ));
     544           0 :                 child->_acked = _sequence - wb;
     545           0 :                 ++i;
     546             :             }
     547             :         }
     548             : 
     549           0 :         _timeouts = 0;
     550             :     }
     551             : }
     552             : 
     553             : RSPConnection::DatagramNode*
     554           0 : RSPConnection::_getDatagramNode( const size_t bytes )
     555             : {
     556           0 :     if( bytes < sizeof( DatagramNode ))
     557             :     {
     558           0 :         LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
     559           0 :                 << sizeof( DatagramNode ) << " bytes" << std::endl;
     560             :         //close();
     561           0 :         return 0;
     562             :     }
     563             :     DatagramNode& node =
     564           0 :                     *reinterpret_cast< DatagramNode* >( _recvBuffer.getData( ));
     565           0 :     node.byteswap();
     566           0 :     if( node.protocolVersion != CO_RSP_PROTOCOL_VERSION )
     567             :     {
     568           0 :         LBERROR << "Protocol version mismatch, got " << node.protocolVersion
     569           0 :                 << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
     570             :         //close();
     571           0 :         return 0;
     572             :     }
     573           0 :     return &node;
     574             : }
     575             : 
     576           0 : bool RSPConnection::_initThread()
     577             : {
     578           0 :     LBLOG( LOG_RSP ) << "Started RSP protocol thread" << std::endl;
     579           0 :     _timeouts = 0;
     580             : 
     581             :    // send a first datagram to announce me and discover other connections
     582           0 :     LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     583           0 :     _sendSimpleDatagram( ID_HELLO, _id );
     584           0 :     _setTimeout( 10 );
     585           0 :     _asyncReceiveFrom();
     586           0 :     _ioService.run();
     587           0 :     return isListening();
     588             : }
     589             : 
     590           0 : void RSPConnection::_runThread()
     591             : {
     592             :     //__debugbreak();
     593           0 :     _ioService.reset();
     594           0 :     _ioService.run();
     595           0 : }
     596             : 
     597           0 : void RSPConnection::_setTimeout( const int32_t timeOut )
     598             : {
     599           0 :     LBASSERT( timeOut >= 0 );
     600           0 :     _timeout.expires_from_now( boost::posix_time::milliseconds( timeOut ));
     601           0 :     _timeout.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
     602           0 :                                       boost::asio::placeholders::error ));
     603           0 : }
     604             : 
     605           0 : void RSPConnection::_postWakeup()
     606             : {
     607           0 :     _wakeup.expires_from_now( boost::posix_time::milliseconds( 0 ));
     608           0 :     _wakeup.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
     609           0 :                                      boost::asio::placeholders::error ));
     610           0 : }
     611             : 
     612           0 : void RSPConnection::_processOutgoing()
     613             : {
     614             : #ifdef CO_INSTRUMENT_RSP
     615             :     if( instrumentClock.getTime64() > 1000 )
     616             :     {
     617             :         LBWARN << *this << std::endl;
     618             :         instrumentClock.reset();
     619             :     }
     620             : #endif
     621             : 
     622           0 :     if( !_repeatQueue.empty( ))
     623           0 :         _repeatData();
     624             :     else
     625           0 :         _writeData();
     626             : 
     627           0 :     if( !_threadBuffers.isEmpty() || !_repeatQueue.empty( ))
     628             :     {
     629           0 :         _setTimeout( 0 ); // call again to send remaining
     630           0 :         return;
     631             :     }
     632             :     // no more data to write, check/send ack request, reset timeout
     633             : 
     634           0 :     if( _writeBuffers.empty( )) // got all acks
     635             :     {
     636           0 :         _timeouts = 0;
     637           0 :         _timeout.cancel();
     638           0 :         return;
     639             :     }
     640             : 
     641             :     const int64_t timeout =
     642           0 :         Global::getIAttribute( Global::IATTR_RSP_ACK_TIMEOUT );
     643           0 :     const int64_t left = timeout - _clock.getTime64();
     644             : 
     645           0 :     if( left > 0 )
     646             :     {
     647           0 :         _setTimeout( left );
     648           0 :         return;
     649             :     }
     650             : 
     651             :     // (repeat) ack request
     652           0 :     _clock.reset();
     653           0 :     ++_timeouts;
     654           0 :     if ( _timeouts < CO_RSP_MAX_TIMEOUTS )
     655           0 :         _sendAckRequest();
     656           0 :     _setTimeout( timeout );
     657             : }
     658             : 
     659           0 : void RSPConnection::_writeData()
     660             : {
     661           0 :     Buffer* buffer = 0;
     662           0 :     if( !_threadBuffers.pop( buffer )) // nothing to write
     663           0 :         return;
     664             : 
     665           0 :     _timeouts = 0;
     666           0 :     LBASSERT( buffer );
     667             : 
     668             :     // write buffer
     669           0 :     DatagramData* header = reinterpret_cast<DatagramData*>( buffer->getData( ));
     670           0 :     header->sequence = _sequence++;
     671             : 
     672             : #ifdef CO_RSP_MERGE_WRITES
     673           0 :     if( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
     674             :     {
     675           0 :         std::vector< Buffer* > appBuffers;
     676           0 :         while( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
     677             :         {
     678           0 :             Buffer* buffer2 = 0;
     679           0 :             LBCHECK( _threadBuffers.getFront( buffer2 ));
     680           0 :             LBASSERT( buffer2 );
     681             :             DatagramData* header2 =
     682           0 :                 reinterpret_cast<DatagramData*>( buffer2->getData( ));
     683             : 
     684           0 :             if( uint32_t( header->size + header2->size ) > _payloadSize )
     685           0 :                 break;
     686             : 
     687           0 :             memcpy( reinterpret_cast<uint8_t*>( header + 1 ) + header->size,
     688           0 :                     header2 + 1, header2->size );
     689           0 :             header->size += header2->size;
     690           0 :             LBCHECK( _threadBuffers.pop( buffer2 ));
     691           0 :             appBuffers.push_back( buffer2 );
     692             : #ifdef CO_INSTRUMENT_RSP
     693             :             ++nMergedDatagrams;
     694             : #endif
     695             :         }
     696             : 
     697           0 :         if( !appBuffers.empty( ))
     698           0 :             _appBuffers.push( appBuffers );
     699             :     }
     700             : #endif
     701             : 
     702             :     // send data
     703             :     //  Note 1: We could optimize the send away if we're all alone, but this is
     704             :     //          not a use case for RSP, so we don't care.
     705             :     //  Note 2: Data to myself will be 'written' in _finishWriteQueue once we
     706             :     //          got all acks for the packet
     707           0 :     const uint32_t size = header->size + sizeof( DatagramData );
     708             : 
     709           0 :     _waitWritable( size ); // OPT: process incoming in between
     710           0 :     header->byteswap();
     711           0 :     _write->send( boost::asio::buffer( header, size ));
     712             : 
     713             : #ifdef CO_INSTRUMENT_RSP
     714             :     ++nDatagrams;
     715             :     nBytesWritten += header->size;
     716             : #endif
     717             : 
     718             :     // save datagram for repeats (and self)
     719           0 :     _writeBuffers.push_back( buffer );
     720             : 
     721           0 :     if( _children.size() == 1 ) // We're all alone
     722             :     {
     723           0 :         LBASSERT( _children.front()->_id == _id );
     724           0 :         _finishWriteQueue( _sequence - 1 );
     725             :     }
     726             : }
     727             : 
     728           0 : void RSPConnection::_waitWritable( const uint64_t bytes )
     729             : {
     730             : #ifdef CO_INSTRUMENT_RSP
     731             :     lunchbox::Clock clock;
     732             : #endif
     733             : 
     734           0 :     _bucketSize += static_cast< uint64_t >( _clock.resetTimef() * _sendRate );
     735             :                                                      // opt omit: * 1024 / 1000;
     736           0 :     _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
     737             : 
     738           0 :     const uint64_t size = LB_MIN( bytes, static_cast< uint64_t >( _mtu ));
     739           0 :     while( _bucketSize < size )
     740             :     {
     741           0 :         lunchbox::Thread::yield();
     742           0 :         float time = _clock.resetTimef();
     743             : 
     744           0 :         while( time == 0.f )
     745             :         {
     746           0 :             lunchbox::Thread::yield();
     747           0 :             time = _clock.resetTimef();
     748             :         }
     749             : 
     750           0 :         _bucketSize += static_cast< int64_t >( time * _sendRate );
     751           0 :         _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
     752             :     }
     753           0 :     _bucketSize -= size;
     754             : 
     755             : #ifdef CO_INSTRUMENT_RSP
     756             :     writeWaitTime += clock.getTimef();
     757             : #endif
     758             : 
     759           0 :     ConstConnectionDescriptionPtr description = getDescription();
     760           0 :     if( _sendRate < description->bandwidth )
     761             :     {
     762           0 :         _sendRate += int64_t(
     763           0 :             float( Global::getIAttribute( Global::IATTR_RSP_ERROR_UPSCALE )) *
     764           0 :             float( description->bandwidth ) * .001f );
     765           0 :         LBLOG( LOG_RSP ) << "speeding up to " << _sendRate << " KB/s"
     766           0 :                          << std::endl;
     767             :     }
     768           0 : }
     769             : 
     770           0 : void RSPConnection::_repeatData()
     771             : {
     772           0 :     _timeouts = 0;
     773             : 
     774           0 :     while( !_repeatQueue.empty( ))
     775             :     {
     776           0 :         Nack& request = _repeatQueue.front();
     777           0 :         const uint16_t distance = _sequence - request.start;
     778             : 
     779           0 :         if ( distance == 0 )
     780             :         {
     781           0 :             LBWARN << "ignoring invalid nack (" << request.start
     782           0 :                    << ".." << request.end << ")" << std::endl;
     783           0 :             _repeatQueue.pop_front();
     784           0 :             continue;
     785             :         }
     786             : 
     787           0 :         if( distance <= _writeBuffers.size( )) // not already acked
     788             :         {
     789             : //          LBLOG( LOG_RSP ) << "Repeat " << request.start << ", " << _sendRate
     790             : //                           << "KB/s"<< std::endl;
     791             : 
     792           0 :             const size_t i = _writeBuffers.size() - distance;
     793           0 :             Buffer* buffer = _writeBuffers[i];
     794           0 :             LBASSERT( buffer );
     795             : 
     796             :             DatagramData* header =
     797           0 :                 reinterpret_cast<DatagramData*>( buffer->getData( ));
     798           0 :             const uint32_t size = header->size + sizeof( DatagramData );
     799           0 :             LBASSERT( header->sequence == request.start );
     800             : 
     801             :             // send data
     802           0 :             _waitWritable( size ); // OPT: process incoming in between
     803             :             // already done by _writeData: header->byteswap();
     804           0 :             _write->send( boost::asio::buffer( header, size ) );
     805             : #ifdef CO_INSTRUMENT_RSP
     806             :             ++nRepeated;
     807             : #endif
     808             :         }
     809             : 
     810           0 :         if( request.start == request.end )
     811           0 :             _repeatQueue.pop_front();    // done with request
     812             :         else
     813           0 :             ++request.start;
     814             : 
     815           0 :         if( distance <= _writeBuffers.size( )) // send something
     816           0 :             return;
     817             :     }
     818             : }
     819             : 
     820           0 : void RSPConnection::_finishWriteQueue( const uint16_t sequence )
     821             : {
     822           0 :     LBASSERT( !_writeBuffers.empty( ));
     823             : 
     824           0 :     RSPConnectionPtr connection = _findConnection( _id );
     825           0 :     LBASSERT( connection.isValid( ));
     826           0 :     LBASSERT( connection->_recvBuffers.empty( ));
     827             : 
     828             :     // Bundle pushing the buffers to the app to avoid excessive lock ops
     829           0 :     Buffers readBuffers;
     830           0 :     Buffers freeBuffers;
     831             : 
     832           0 :     const uint16_t size = _sequence - sequence - 1;
     833           0 :     LBASSERTINFO( size <= uint16_t( _writeBuffers.size( )),
     834             :                   size << " > " << _writeBuffers.size( ));
     835           0 :     LBLOG( LOG_RSP ) << "Got all remote acks for " << sequence << " current "
     836           0 :                      << _sequence << " advance " << _writeBuffers.size() - size
     837           0 :                      << " buffers" << std::endl;
     838             : 
     839           0 :     while( _writeBuffers.size() > size_t( size ))
     840             :     {
     841           0 :         Buffer* buffer = _writeBuffers.front();
     842           0 :         _writeBuffers.pop_front();
     843             : 
     844             : #ifndef NDEBUG
     845             :         DatagramData* datagram =
     846           0 :                          reinterpret_cast< DatagramData* >( buffer->getData( ));
     847           0 :         datagram->byteswap();
     848           0 :         LBASSERT( datagram->writerID == _id );
     849           0 :         LBASSERTINFO( datagram->sequence ==
     850             :                       uint16_t( connection->_sequence + readBuffers.size( )),
     851             :                       datagram->sequence << ", " << connection->_sequence <<
     852             :                       ", " << readBuffers.size( ));
     853             :       //LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
     854             : #endif
     855             : 
     856           0 :         Buffer* newBuffer = connection->_newDataBuffer( *buffer );
     857           0 :         if( !newBuffer && !readBuffers.empty( )) // push prepared app buffers
     858             :         {
     859           0 :             lunchbox::ScopedWrite mutex( connection->_mutexEvent );
     860           0 :             LBLOG( LOG_RSP ) << "post " << readBuffers.size()
     861           0 :                              << " buffers starting with sequence "
     862           0 :                              << connection->_sequence << std::endl;
     863             : 
     864           0 :             connection->_appBuffers.push( readBuffers );
     865           0 :             connection->_sequence += uint16_t( readBuffers.size( ));
     866           0 :             readBuffers.clear();
     867           0 :             connection->_event->set();
     868             :         }
     869             : 
     870           0 :         while( !newBuffer ) // no more data buffers, wait for app to drain
     871             :         {
     872           0 :             newBuffer = connection->_newDataBuffer( *buffer );
     873           0 :             lunchbox::Thread::yield();
     874             :         }
     875             : 
     876           0 :         freeBuffers.push_back( buffer );
     877           0 :         readBuffers.push_back( newBuffer );
     878             :     }
     879             : 
     880           0 :     _appBuffers.push( freeBuffers );
     881           0 :     if( !readBuffers.empty( ))
     882             :     {
     883           0 :         lunchbox::ScopedWrite mutex( connection->_mutexEvent );
     884             : #if 0
     885             :         LBLOG( LOG_RSP )
     886             :             << "post " << readBuffers.size() << " buffers starting at "
     887             :             << connection->_sequence << std::endl;
     888             : #endif
     889             : 
     890           0 :         connection->_appBuffers.push( readBuffers );
     891           0 :         connection->_sequence += uint16_t( readBuffers.size( ));
     892           0 :         connection->_event->set();
     893             :     }
     894             : 
     895           0 :     connection->_acked = uint16_t( connection->_sequence - 1 );
     896           0 :     LBASSERT( connection->_acked == sequence );
     897             : 
     898           0 :     _timeouts = 0;
     899           0 : }
     900             : 
     901           0 : void RSPConnection::_handlePacket( const boost::system::error_code& /* error */,
     902             :                                    const size_t bytes )
     903             : {
     904           0 :     if( isListening( ))
     905             :     {
     906           0 :         _handleConnectedData( bytes );
     907             : 
     908           0 :         if( isListening( ))
     909           0 :             _processOutgoing();
     910             :         else
     911             :         {
     912           0 :             _ioService.stop();
     913           0 :             return;
     914             :         }
     915             :     }
     916           0 :     else if( bytes >= sizeof( DatagramNode ))
     917             :     {
     918           0 :         if( _idAccepted )
     919           0 :             _handleInitData( bytes, false );
     920             :         else
     921           0 :             _handleAcceptIDData( bytes );
     922             :     }
     923             : 
     924             :     //LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
     925           0 :     _asyncReceiveFrom();
     926             : }
     927             : 
     928           0 : void RSPConnection::_handleAcceptIDData( const size_t bytes )
     929             : {
     930           0 :     DatagramNode* pNode = _getDatagramNode( bytes );
     931           0 :     if( !pNode )
     932           0 :         return;
     933             : 
     934           0 :     DatagramNode& node = *pNode;
     935             : 
     936           0 :     switch( node.type )
     937             :     {
     938             :         case ID_HELLO:
     939           0 :             _checkNewID( node.connectionID );
     940           0 :             break;
     941             : 
     942             :         case ID_HELLO_REPLY:
     943           0 :             _addConnection( node.connectionID, node.data );
     944           0 :             break;
     945             : 
     946             :         case ID_DENY:
     947             :             // a connection refused my ID, try another ID
     948           0 :             if( node.connectionID == _id )
     949             :             {
     950           0 :                 _timeouts = 0;
     951           0 :                 _sendSimpleDatagram( ID_HELLO, _buildNewID( ));
     952           0 :                 LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     953             :             }
     954           0 :             break;
     955             : 
     956             :         case ID_EXIT:
     957           0 :             _removeConnection( node.connectionID );
     958           0 :             break;
     959             : 
     960             :         default:
     961           0 :             LBERROR << "Got unexpected datagram type " << node.type <<std::endl;
     962           0 :             LBUNIMPLEMENTED;
     963           0 :             break;
     964             :     }
     965             : }
     966             : 
     967           0 : void RSPConnection::_handleInitData( const size_t bytes, const bool connected )
     968             : {
     969           0 :     DatagramNode* pNode = _getDatagramNode( bytes );
     970           0 :     if( !pNode )
     971           0 :         return;
     972             : 
     973           0 :     DatagramNode& node = *pNode;
     974             : 
     975           0 :     switch( node.type )
     976             :     {
     977             :         case ID_HELLO:
     978           0 :             if( !connected )
     979           0 :                 _timeouts = 0;
     980           0 :             _checkNewID( node.connectionID ) ;
     981           0 :             return;
     982             : 
     983             :         case ID_CONFIRM:
     984           0 :             if( !connected )
     985           0 :                 _timeouts = 0;
     986           0 :             _addConnection( node.connectionID, node.data );
     987           0 :             return;
     988             : 
     989             :         case COUNTNODE:
     990           0 :             LBLOG( LOG_RSP ) << "Got " << node.data << " nodes from "
     991           0 :                              << node.connectionID << std::endl;
     992           0 :             return;
     993             : 
     994             :         case ID_HELLO_REPLY:
     995           0 :             _addConnection( node.connectionID, node.data );
     996           0 :             return;
     997             : 
     998             :         case ID_EXIT:
     999           0 :             _removeConnection( node.connectionID );
    1000           0 :             return;
    1001             : 
    1002             :         default:
    1003           0 :             LBERROR << "Got unexpected datagram type " << node.type <<std::endl;
    1004           0 :             LBUNIMPLEMENTED;
    1005           0 :             break;
    1006             :     }
    1007             : }
    1008             : 
    1009           0 : void RSPConnection::_handleConnectedData( const size_t bytes )
    1010             : {
    1011           0 :     if( bytes < sizeof( uint16_t ))
    1012           0 :         return;
    1013             : 
    1014           0 :     void* data = _recvBuffer.getData();
    1015           0 :     uint16_t type = *reinterpret_cast< uint16_t* >( data );
    1016             : #ifdef COLLAGE_BIGENDIAN
    1017             :     lunchbox::byteswap( type );
    1018             : #endif
    1019           0 :     switch( type )
    1020             :     {
    1021             :         case DATA:
    1022           0 :             LBCHECK( _handleData( bytes ));
    1023           0 :             break;
    1024             : 
    1025             :         case ACK:
    1026           0 :             LBCHECK( _handleAck( bytes ));
    1027           0 :             break;
    1028             : 
    1029             :         case NACK:
    1030           0 :             LBCHECK( _handleNack( ));
    1031           0 :             break;
    1032             : 
    1033             :         case ACKREQ: // The writer asks for an ack/nack
    1034           0 :             LBCHECK( _handleAckRequest( bytes ));
    1035           0 :             break;
    1036             : 
    1037             :         case ID_HELLO:
    1038             :         case ID_HELLO_REPLY:
    1039             :         case ID_CONFIRM:
    1040             :         case ID_EXIT:
    1041             :         case ID_DENY:
    1042             :         case COUNTNODE:
    1043           0 :             _handleInitData( bytes, true );
    1044           0 :             break;
    1045             : 
    1046             :         default:
    1047           0 :             LBASSERTINFO( false,
    1048             :                           "Don't know how to handle packet of type " << type );
    1049             :     }
    1050             : 
    1051             : }
    1052             : 
    1053           0 : void RSPConnection::_asyncReceiveFrom()
    1054             : {
    1055           0 :     _read->async_receive_from(
    1056           0 :         boost::asio::buffer( _recvBuffer.getData(), _mtu ), _readAddr,
    1057           0 :         boost::bind( &RSPConnection::_handlePacket, this,
    1058             :                      boost::asio::placeholders::error,
    1059           0 :                      boost::asio::placeholders::bytes_transferred ));
    1060           0 : }
    1061             : 
    1062           0 : bool RSPConnection::_handleData( const size_t bytes )
    1063             : {
    1064           0 :     if( bytes < sizeof( DatagramData ))
    1065           0 :         return false;
    1066             :     DatagramData& datagram =
    1067           0 :                     *reinterpret_cast< DatagramData* >( _recvBuffer.getData( ));
    1068           0 :     datagram.byteswap();
    1069             : 
    1070             : #ifdef CO_INSTRUMENT_RSP
    1071             :     ++nReadData;
    1072             : #endif
    1073           0 :     const uint16_t writerID = datagram.writerID;
    1074             : #ifdef Darwin
    1075             :     // There is occasionally a packet from ourselves, even though multicast loop
    1076             :     // is not set?!
    1077             :     if( writerID == _id )
    1078             :         return true;
    1079             : #else
    1080           0 :     LBASSERT( writerID != _id );
    1081             : #endif
    1082             : 
    1083           0 :     RSPConnectionPtr connection = _findConnection( writerID );
    1084             : 
    1085           0 :     if( !connection )  // unknown connection ?
    1086             :     {
    1087           0 :         LBASSERTINFO( false, "Can't find connection with id " << writerID );
    1088           0 :         return false;
    1089             :     }
    1090           0 :     LBASSERT( connection->_id == writerID );
    1091             : 
    1092           0 :     const uint16_t sequence = datagram.sequence;
    1093             : //  LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID <<std::endl;
    1094             : 
    1095           0 :     if( connection->_sequence == sequence ) // in-order packet
    1096             :     {
    1097           0 :         Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
    1098           0 :         if( !newBuffer ) // no more data buffers, drop packet
    1099           0 :             return true;
    1100             : 
    1101           0 :         lunchbox::ScopedWrite mutex( connection->_mutexEvent );
    1102           0 :         connection->_pushDataBuffer( newBuffer );
    1103             : 
    1104           0 :         while( !connection->_recvBuffers.empty( )) // enqueue ready pending data
    1105             :         {
    1106           0 :             newBuffer = connection->_recvBuffers.front();
    1107           0 :             if( !newBuffer )
    1108           0 :                 break;
    1109             : 
    1110           0 :             connection->_recvBuffers.pop_front();
    1111           0 :             connection->_pushDataBuffer( newBuffer );
    1112             :         }
    1113             : 
    1114           0 :         if( !connection->_recvBuffers.empty() &&
    1115           0 :             !connection->_recvBuffers.front( )) // update for new _sequence
    1116             :         {
    1117           0 :             connection->_recvBuffers.pop_front();
    1118             :         }
    1119             : 
    1120           0 :         connection->_event->set();
    1121           0 :         return true;
    1122             :     }
    1123             : 
    1124           0 :     const uint16_t max = std::numeric_limits< uint16_t >::max();
    1125           0 :     if(( connection->_sequence > sequence &&
    1126           0 :          max - connection->_sequence + sequence > _numBuffers ) ||
    1127           0 :        ( connection->_sequence < sequence &&
    1128           0 :          sequence - connection->_sequence > _numBuffers ))
    1129             :     {
    1130             :         // ignore it if it's a repetition for another reader
    1131           0 :         return true;
    1132             :     }
    1133             : 
    1134             :     // else out of order
    1135             : 
    1136           0 :     const uint16_t size = sequence - connection->_sequence;
    1137           0 :     LBASSERT( size != 0 );
    1138           0 :     LBASSERTINFO( size <= _numBuffers, size << " > " << _numBuffers );
    1139             : 
    1140           0 :     ssize_t i = ssize_t( size ) - 1;
    1141           0 :     const bool gotPacket = ( connection->_recvBuffers.size() >= size &&
    1142           0 :                              connection->_recvBuffers[ i ] );
    1143           0 :     if( gotPacket )
    1144           0 :         return true;
    1145             : 
    1146           0 :     Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
    1147           0 :     if( !newBuffer ) // no more data buffers, drop packet
    1148           0 :         return true;
    1149             : 
    1150           0 :     if( connection->_recvBuffers.size() < size )
    1151           0 :         connection->_recvBuffers.resize( size, 0 );
    1152             : 
    1153           0 :     LBASSERT( !connection->_recvBuffers[ i ] );
    1154           0 :     connection->_recvBuffers[ i ] = newBuffer;
    1155             : 
    1156             :     // early nack: request missing packets before current
    1157           0 :     --i;
    1158           0 :     Nack nack = { connection->_sequence, uint16_t( sequence - 1 ) };
    1159           0 :     if( i > 0 )
    1160             :     {
    1161           0 :         if( connection->_recvBuffers[i] ) // got previous packet
    1162           0 :             return true;
    1163             : 
    1164           0 :         while( i >= 0 && !connection->_recvBuffers[i] )
    1165           0 :             --i;
    1166             : 
    1167           0 :         const Buffer* lastBuffer = i>=0 ? connection->_recvBuffers[i] : 0;
    1168           0 :         if( lastBuffer )
    1169             :         {
    1170           0 :             nack.start = connection->_sequence + i;
    1171             :         }
    1172             :     }
    1173             : 
    1174           0 :     LBLOG( LOG_RSP ) << "send early nack " << nack.start << ".." << nack.end
    1175           0 :                      << " current " << connection->_sequence << " ooo "
    1176           0 :                      << connection->_recvBuffers.size() << std::endl;
    1177             : 
    1178           0 :     if( nack.end < nack.start )
    1179             :         // OPT: don't drop nack 0..nack.end, but it doesn't happen often
    1180           0 :         nack.end = std::numeric_limits< uint16_t >::max();
    1181             : 
    1182           0 :     _sendNack( writerID, &nack, 1 );
    1183           0 :     return true;
    1184             : }
    1185             : 
    1186           0 : RSPConnection::Buffer* RSPConnection::_newDataBuffer( Buffer& inBuffer )
    1187             : {
    1188           0 :     LBASSERT( static_cast< int32_t >( inBuffer.getMaxSize( )) == _mtu );
    1189             : 
    1190           0 :     Buffer* buffer = 0;
    1191           0 :     if( _threadBuffers.pop( buffer ))
    1192             :     {
    1193           0 :         buffer->swap( inBuffer );
    1194           0 :         return buffer;
    1195             :     }
    1196             : 
    1197             :     // we do not have a free buffer, which means that the receiver is slower
    1198             :     // then our read thread. This is bad, because now we'll drop the data and
    1199             :     // will send a NAck packet upon the ack request, causing retransmission even
    1200             :     // though we'll probably drop it again
    1201           0 :     LBLOG( LOG_RSP ) << "Reader too slow, dropping data" << std::endl;
    1202             : 
    1203             :     // Set the event if there is data to read. This shouldn't be needed since
    1204             :     // the event should be set in this case, but it'll increase the robustness
    1205           0 :     lunchbox::ScopedWrite mutex( _mutexEvent );
    1206           0 :     if( !_appBuffers.isEmpty( ))
    1207           0 :         _event->set();
    1208           0 :     return 0;
    1209             : }
    1210             : 
    1211           0 : void RSPConnection::_pushDataBuffer( Buffer* buffer )
    1212             : {
    1213           0 :     LBASSERT( _parent );
    1214             : #ifndef NDEBUG
    1215           0 :     DatagramData* dgram = reinterpret_cast< DatagramData* >(buffer->getData( ));
    1216           0 :     LBASSERTINFO( dgram->sequence == _sequence,
    1217             :                   dgram->sequence << " != " << _sequence );
    1218             : #endif
    1219             : 
    1220           0 :     if( (( _sequence + _parent->_id ) % _ackFreq ) == 0 )
    1221           0 :         _parent->_sendAck( _id, _sequence );
    1222             : 
    1223           0 :     LBLOG( LOG_RSP ) << "post buffer " << _sequence << std::endl;
    1224           0 :     ++_sequence;
    1225           0 :     _appBuffers.push( buffer );
    1226           0 : }
    1227             : 
    1228           0 : bool RSPConnection::_handleAck( const size_t bytes )
    1229             : {
    1230           0 :     if( bytes < sizeof( DatagramAck ))
    1231           0 :         return false;
    1232             :     DatagramAck& ack =
    1233           0 :                      *reinterpret_cast< DatagramAck* >( _recvBuffer.getData( ));
    1234           0 :     ack.byteswap();
    1235             : 
    1236             : #ifdef CO_INSTRUMENT_RSP
    1237             :     ++nAcksRead;
    1238             : #endif
    1239             : 
    1240           0 :     if( ack.writerID != _id )
    1241           0 :         return true;
    1242             : 
    1243           0 :     LBLOG( LOG_RSP ) << "got ack from " << ack.readerID << " for "
    1244           0 :                      << ack.writerID << " sequence " << ack.sequence
    1245           0 :                      << " current " << _sequence << std::endl;
    1246             : 
    1247             :     // find destination connection, update ack data if needed
    1248           0 :     RSPConnectionPtr connection = _findConnection( ack.readerID );
    1249           0 :     if( !connection )
    1250             :     {
    1251           0 :         LBUNREACHABLE;
    1252           0 :         return false;
    1253             :     }
    1254             : 
    1255           0 :     if( connection->_acked >= ack.sequence &&
    1256           0 :         connection->_acked - ack.sequence <= _numBuffers )
    1257             :     {
    1258             :         // I have received a later ack previously from the reader
    1259           0 :         LBLOG( LOG_RSP ) << "Late ack" << std::endl;
    1260           0 :         return true;
    1261             :     }
    1262             : 
    1263             : #ifdef CO_INSTRUMENT_RSP
    1264             :     ++nAcksAccepted;
    1265             : #endif
    1266           0 :     connection->_acked = ack.sequence;
    1267           0 :     _timeouts = 0; // reset timeout counter
    1268             : 
    1269             :     // Check if we can advance _acked
    1270           0 :     uint16_t acked = ack.sequence;
    1271             : 
    1272           0 :     for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
    1273             :     {
    1274           0 :         RSPConnectionPtr child = *i;
    1275           0 :         if( child->_id == _id )
    1276           0 :             continue;
    1277             : 
    1278           0 :         const uint16_t distance = child->_acked - acked;
    1279           0 :         if( distance > _numBuffers )
    1280           0 :             acked = child->_acked;
    1281             :     }
    1282             : 
    1283           0 :     RSPConnectionPtr selfChild = _findConnection( _id );
    1284           0 :     const uint16_t distance = acked - selfChild->_acked;
    1285           0 :     if( distance <= _numBuffers )
    1286           0 :         _finishWriteQueue( acked );
    1287           0 :     return true;
    1288             : }
    1289             : 
    1290           0 : bool RSPConnection::_handleNack()
    1291             : {
    1292             :     DatagramNack& nack =
    1293           0 :                     *reinterpret_cast< DatagramNack* >( _recvBuffer.getData( ));
    1294           0 :     nack.byteswap();
    1295             : 
    1296             : #ifdef CO_INSTRUMENT_RSP
    1297             :     ++nNAcksRead;
    1298             : #endif
    1299             : 
    1300           0 :     if( _id != nack.writerID )
    1301             :     {
    1302           0 :         LBLOG( LOG_RSP )
    1303           0 :             << "ignore " << nack.count << " nacks from " << nack.readerID
    1304           0 :             << " for " << nack.writerID << " (not me)"<< std::endl;
    1305           0 :         return true;
    1306             :     }
    1307             : 
    1308           0 :     LBLOG( LOG_RSP )
    1309           0 :         << "handle " << nack.count << " nacks from " << nack.readerID
    1310           0 :         << " for " << nack.writerID << std::endl;
    1311             : 
    1312           0 :     RSPConnectionPtr connection = _findConnection( nack.readerID );
    1313           0 :     if( !connection )
    1314             :     {
    1315           0 :         LBUNREACHABLE;
    1316           0 :         return false;
    1317             :         // it's an unknown connection, TODO add this connection?
    1318             :     }
    1319             : 
    1320           0 :     _timeouts = 0;
    1321           0 :     _addRepeat( nack.nacks, nack.count );
    1322           0 :     return true;
    1323             : }
    1324             : 
    1325           0 : void RSPConnection::_addRepeat( const Nack* nacks, uint16_t num )
    1326             : {
    1327           0 :     LBLOG( LOG_RSP ) << lunchbox::disableFlush << "Queue repeat requests ";
    1328           0 :     size_t lost = 0;
    1329             : 
    1330           0 :     for( size_t i = 0; i < num; ++i )
    1331             :     {
    1332           0 :         const Nack& nack = nacks[ i ];
    1333           0 :         LBASSERT( nack.start <= nack.end );
    1334             : 
    1335           0 :         LBLOG( LOG_RSP ) << nack.start << ".." << nack.end << " ";
    1336             : 
    1337           0 :         bool merged = false;
    1338           0 :         for( RepeatQueue::iterator j = _repeatQueue.begin();
    1339           0 :              j != _repeatQueue.end() && !merged; ++j )
    1340             :         {
    1341           0 :             Nack& old = *j;
    1342           0 :             if( old.start <= nack.end && old.end >= nack.start )
    1343             :             {
    1344           0 :                 if( old.start > nack.start )
    1345             :                 {
    1346           0 :                     lost += old.start - nack.start;
    1347           0 :                     old.start = nack.start;
    1348           0 :                     merged = true;
    1349             :                 }
    1350           0 :                 if( old.end < nack.end )
    1351             :                 {
    1352           0 :                     lost += nack.end - old.end;
    1353           0 :                     old.end = nack.end;
    1354           0 :                     merged = true;
    1355             :                 }
    1356           0 :                 LBASSERT( lost < _numBuffers );
    1357             :             }
    1358             :         }
    1359             : 
    1360           0 :         if( !merged )
    1361             :         {
    1362           0 :             lost += uint16_t( nack.end - nack.start ) + 1;
    1363           0 :             LBASSERT( lost <= _numBuffers );
    1364           0 :             _repeatQueue.push_back( nack );
    1365             :         }
    1366             :     }
    1367             : 
    1368           0 :     ConstConnectionDescriptionPtr description = getDescription();
    1369           0 :     if( _sendRate >
    1370           0 :         ( description->bandwidth >>
    1371           0 :           Global::getIAttribute( Global::IATTR_RSP_MIN_SENDRATE_SHIFT )))
    1372             :     {
    1373           0 :         const float delta = float( lost ) * .001f *
    1374           0 :                      Global::getIAttribute( Global::IATTR_RSP_ERROR_DOWNSCALE );
    1375             :         const float maxDelta = .01f *
    1376           0 :             float( Global::getIAttribute( Global::IATTR_RSP_ERROR_MAXSCALE ));
    1377           0 :         const float downScale = LB_MIN( delta, maxDelta );
    1378           0 :         _sendRate -= 1 + int64_t( _sendRate * downScale );
    1379           0 :         LBLOG( LOG_RSP )
    1380           0 :             << ", lost " << lost << " slowing down " << downScale * 100.f
    1381           0 :             << "% to " << _sendRate << " KB/s" << std::endl
    1382           0 :             << lunchbox::enableFlush;
    1383             :     }
    1384             :     else
    1385           0 :         LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush;
    1386           0 : }
    1387             : 
    1388           0 : bool RSPConnection::_handleAckRequest( const size_t bytes )
    1389             : {
    1390           0 :     if( bytes < sizeof( DatagramAckRequest ))
    1391           0 :         return false;
    1392             :     DatagramAckRequest& ackRequest =
    1393           0 :               *reinterpret_cast< DatagramAckRequest* >( _recvBuffer.getData( ));
    1394           0 :     ackRequest.byteswap();
    1395             : 
    1396           0 :     const uint16_t writerID = ackRequest.writerID;
    1397             : #ifdef Darwin
    1398             :     // There is occasionally a packet from ourselves, even though multicast loop
    1399             :     // is not set?!
    1400             :     if( writerID == _id )
    1401             :         return true;
    1402             : #else
    1403           0 :     LBASSERT( writerID != _id );
    1404             : #endif
    1405           0 :     RSPConnectionPtr connection = _findConnection( writerID );
    1406           0 :     if( !connection )
    1407             :     {
    1408           0 :         LBUNREACHABLE;
    1409           0 :         return false;
    1410             :     }
    1411             : 
    1412           0 :     const uint16_t reqID = ackRequest.sequence;
    1413           0 :     const uint16_t gotID = connection->_sequence - 1;
    1414           0 :     const uint16_t distance = reqID - gotID;
    1415             : 
    1416           0 :     LBLOG( LOG_RSP ) << "ack request "  << reqID << " from " << writerID
    1417           0 :                      << " got " << gotID << " missing " << distance
    1418           0 :                      << std::endl;
    1419             : 
    1420           0 :     if( (reqID == gotID) ||
    1421           0 :         (gotID > reqID && gotID - reqID <= _numBuffers) ||
    1422           0 :         (gotID < reqID && distance > _numBuffers) )
    1423             :     {
    1424           0 :         _sendAck( connection->_id, gotID );
    1425           0 :         return true;
    1426             :     }
    1427             :     // else find all missing datagrams
    1428             : 
    1429           0 :     const uint16_t max = CO_RSP_MAX_NACKS - 2;
    1430             :     Nack nacks[ CO_RSP_MAX_NACKS ];
    1431           0 :     uint16_t i = 0;
    1432             : 
    1433           0 :     nacks[ i ].start = connection->_sequence;
    1434           0 :     LBLOG( LOG_RSP ) << lunchbox::disableFlush << "nacks: "
    1435           0 :                      << nacks[i].start << "..";
    1436             : 
    1437           0 :     std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
    1438           0 :     std::deque<Buffer*>::const_iterator first = j;
    1439           0 :     for( ; j != connection->_recvBuffers.end() && i < max; ++j )
    1440             :     {
    1441           0 :         if( *j ) // got buffer
    1442             :         {
    1443           0 :             nacks[ i ].end = connection->_sequence + std::distance( first, j);
    1444           0 :             LBLOG( LOG_RSP ) << nacks[i].end << ", ";
    1445           0 :             if( nacks[ i ].end < nacks[ i ].start )
    1446             :             {
    1447           0 :                 LBASSERT( nacks[ i ].end < _numBuffers );
    1448           0 :                 nacks[ i + 1 ].start = 0;
    1449           0 :                 nacks[ i + 1 ].end = nacks[ i ].end;
    1450           0 :                 nacks[ i ].end = std::numeric_limits< uint16_t >::max();
    1451           0 :                 ++i;
    1452             :             }
    1453           0 :             ++i;
    1454             : 
    1455             :             // find next hole
    1456           0 :             for( ++j; j != connection->_recvBuffers.end() && (*j); ++j )
    1457             :                 /* nop */;
    1458             : 
    1459           0 :             if( j == connection->_recvBuffers.end( ))
    1460           0 :                 break;
    1461             : 
    1462           0 :             nacks[i].start = connection->_sequence + std::distance(first, j) +1;
    1463           0 :             LBLOG( LOG_RSP ) << nacks[i].start << "..";
    1464             :         }
    1465             :     }
    1466             : 
    1467           0 :     if( j != connection->_recvBuffers.end() || i == 0 )
    1468             :     {
    1469           0 :         nacks[ i ].end = reqID;
    1470           0 :         LBLOG( LOG_RSP ) << nacks[i].end;
    1471           0 :         ++i;
    1472             :     }
    1473           0 :     else if( uint16_t( reqID - nacks[i-1].end ) < _numBuffers )
    1474             :     {
    1475           0 :         nacks[i].start = nacks[i-1].end + 1;
    1476           0 :         nacks[i].end = reqID;
    1477           0 :         LBLOG( LOG_RSP ) << nacks[i].start << ".." << nacks[i].end;
    1478           0 :         ++i;
    1479             :     }
    1480           0 :     if( nacks[ i -1 ].end < nacks[ i - 1 ].start )
    1481             :     {
    1482           0 :         LBASSERT( nacks[ i - 1 ].end < _numBuffers );
    1483           0 :         nacks[ i ].start = 0;
    1484           0 :         nacks[ i ].end = nacks[ i - 1 ].end;
    1485           0 :         nacks[ i - 1 ].end = std::numeric_limits< uint16_t >::max();
    1486           0 :         ++i;
    1487             :     }
    1488             : 
    1489           0 :     LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush << "send " << i
    1490           0 :                      << " nacks to " << connection->_id << std::endl;
    1491             : 
    1492           0 :     LBASSERT( i > 0 );
    1493           0 :     _sendNack( connection->_id, nacks, i );
    1494           0 :     return true;
    1495             : }
    1496             : 
    1497           0 : void RSPConnection::_checkNewID( uint16_t id )
    1498             : {
    1499             :     // look if the new ID exist in another connection
    1500           0 :     if( id == _id || _findConnection( id ))
    1501             :     {
    1502           0 :         LBLOG( LOG_RSP ) << "Deny " << id << std::endl;
    1503           0 :         _sendSimpleDatagram( ID_DENY, _id );
    1504             :     }
    1505             :     else
    1506           0 :         _sendSimpleDatagram( ID_HELLO_REPLY, _id );
    1507           0 : }
    1508             : 
    1509           0 : RSPConnectionPtr RSPConnection::_findConnection( const uint16_t id )
    1510             : {
    1511           0 :     for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
    1512             :     {
    1513           0 :         if( (*i)->_id == id )
    1514           0 :             return *i;
    1515             :     }
    1516           0 :     return 0;
    1517             : }
    1518             : 
    1519           0 : bool RSPConnection::_addConnection( const uint16_t id, const uint16_t sequence )
    1520             : {
    1521           0 :     if( _findConnection( id ))
    1522           0 :         return false;
    1523             : 
    1524           0 :     LBDEBUG << "add connection " << id << std::endl;
    1525           0 :     RSPConnectionPtr connection = new RSPConnection();
    1526           0 :     connection->_id = id;
    1527           0 :     connection->_parent = this;
    1528           0 :     connection->_setState( STATE_CONNECTED );
    1529           0 :     connection->_setDescription( _getDescription( ));
    1530           0 :     connection->_sequence = sequence;
    1531           0 :     LBASSERT( connection->_appBuffers.isEmpty( ));
    1532             : 
    1533             :     // Make all buffers available for reading
    1534           0 :     for( BuffersCIter i = connection->_buffers.begin();
    1535           0 :          i != connection->_buffers.end(); ++i )
    1536             :     {
    1537           0 :         Buffer* buffer = *i;
    1538           0 :         LBCHECK( connection->_threadBuffers.push( buffer ));
    1539             :     }
    1540             : 
    1541           0 :     _children.push_back( connection );
    1542           0 :     _sendCountNode();
    1543             : 
    1544           0 :     lunchbox::ScopedWrite mutex( _mutexConnection );
    1545           0 :     _newChildren.push_back( connection );
    1546             : 
    1547           0 :     lunchbox::ScopedWrite mutex2( _mutexEvent );
    1548           0 :     _event->set();
    1549           0 :     return true;
    1550             : }
    1551             : 
    1552           0 : void RSPConnection::_removeConnection( const uint16_t id )
    1553             : {
    1554           0 :     LBDEBUG << "remove connection " << id << std::endl;
    1555           0 :     if( id == _id )
    1556           0 :         return;
    1557             : 
    1558           0 :     for( RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i )
    1559             :     {
    1560           0 :         RSPConnectionPtr child = *i;
    1561           0 :         if( child->_id == id )
    1562             :         {
    1563           0 :             _children.erase( i );
    1564             : 
    1565           0 :             lunchbox::ScopedWrite mutex( child->_mutexEvent );
    1566           0 :             child->_appBuffers.push( 0 );
    1567           0 :             child->_event->set();
    1568           0 :             break;
    1569             :         }
    1570             :     }
    1571             : 
    1572           0 :     _sendCountNode();
    1573             : }
    1574             : 
    1575           0 : int64_t RSPConnection::write( const void* inData, const uint64_t bytes )
    1576             : {
    1577           0 :     if( _parent )
    1578           0 :         return _parent->write( inData, bytes );
    1579             : 
    1580           0 :     LBASSERT( isListening( ));
    1581           0 :     if( !_write )
    1582           0 :         return -1;
    1583             : 
    1584             :     // compute number of datagrams
    1585           0 :     uint64_t nDatagrams = bytes  / _payloadSize;
    1586           0 :     if( nDatagrams * _payloadSize != bytes )
    1587           0 :         ++nDatagrams;
    1588             : 
    1589             :     // queue each datagram (might block if buffers are exhausted)
    1590           0 :     const uint8_t* data = reinterpret_cast< const uint8_t* >( inData );
    1591           0 :     const uint8_t* end = data + bytes;
    1592           0 :     for( uint64_t i = 0; i < nDatagrams; ++i )
    1593             :     {
    1594           0 :         size_t packetSize = end - data;
    1595           0 :         packetSize = LB_MIN( packetSize, _payloadSize );
    1596             : 
    1597           0 :         if( _appBuffers.isEmpty( ))
    1598             :             // trigger processing
    1599           0 :             _postWakeup();
    1600             : 
    1601             :         Buffer* buffer;
    1602           0 :         if ( !_appBuffers.timedPop( _writeTimeOut, buffer ) )
    1603             :         {
    1604           0 :             LBERROR << "Timeout while writing" << std::endl;
    1605           0 :             buffer = 0;
    1606             :         }
    1607             : 
    1608           0 :         if( !buffer )
    1609             :         {
    1610           0 :             close();
    1611           0 :             return -1;
    1612             :         }
    1613             : 
    1614             :         // prepare packet header (sequence is done by thread)
    1615             :         DatagramData* header =
    1616           0 :             reinterpret_cast< DatagramData* >( buffer->getData( ));
    1617           0 :         header->type = DATA;
    1618           0 :         header->size = uint16_t( packetSize );
    1619           0 :         header->writerID = _id;
    1620             : 
    1621           0 :         memcpy( header + 1, data, packetSize );
    1622           0 :         data += packetSize;
    1623             : 
    1624           0 :         LBCHECK( _threadBuffers.push( buffer ));
    1625             :     }
    1626           0 :     _postWakeup();
    1627           0 :     LBLOG( LOG_RSP ) << "queued " << nDatagrams << " datagrams, "
    1628           0 :                      << bytes << " bytes" << std::endl;
    1629           0 :     return bytes;
    1630             : }
    1631             : 
    1632           0 : void RSPConnection::finish()
    1633             : {
    1634           0 :     if( _parent.isValid( ))
    1635             :     {
    1636           0 :         LBASSERTINFO( !_parent, "Writes are only allowed on RSP listeners" );
    1637           0 :         return;
    1638             :     }
    1639           0 :     LBASSERT( isListening( ));
    1640           0 :     _appBuffers.waitSize( _buffers.size( ));
    1641             : }
    1642             : 
    1643           0 : void RSPConnection::_sendCountNode()
    1644             : {
    1645           0 :     if( !_findConnection( _id ))
    1646           0 :         return;
    1647             : 
    1648           0 :     LBLOG( LOG_RSP ) << _children.size() << " nodes" << std::endl;
    1649           0 :     DatagramNode count = { COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
    1650           0 :                            uint16_t( _children.size( )) };
    1651           0 :     count.byteswap();
    1652           0 :     _write->send( boost::asio::buffer( &count, sizeof( count )) );
    1653             : }
    1654             : 
    1655           0 : void RSPConnection::_sendSimpleDatagram( const DatagramType type,
    1656             :                                          const uint16_t id )
    1657             : {
    1658             :     DatagramNode simple = { uint16_t( type ), CO_RSP_PROTOCOL_VERSION, id,
    1659           0 :                             _sequence };
    1660           0 :     simple.byteswap();
    1661           0 :     _write->send( boost::asio::buffer( &simple, sizeof( simple )) );
    1662           0 : }
    1663             : 
    1664           0 : void RSPConnection::_sendAck( const uint16_t writerID,
    1665             :                               const uint16_t sequence )
    1666             : {
    1667           0 :     LBASSERT( _id != writerID );
    1668             : #ifdef CO_INSTRUMENT_RSP
    1669             :     ++nAcksSend;
    1670             : #endif
    1671             : 
    1672           0 :     LBLOG( LOG_RSP ) << "send ack " << sequence << std::endl;
    1673           0 :     DatagramAck ack = { ACK, _id, writerID, sequence };
    1674           0 :     ack.byteswap();
    1675           0 :     _write->send( boost::asio::buffer( &ack, sizeof( ack )) );
    1676           0 : }
    1677             : 
    1678           0 : void RSPConnection::_sendNack( const uint16_t writerID, const Nack* nacks,
    1679             :                                const uint16_t count )
    1680             : {
    1681           0 :     LBASSERT( count > 0 );
    1682           0 :     LBASSERT( count <= CO_RSP_MAX_NACKS );
    1683             : #ifdef CO_INSTRUMENT_RSP
    1684             :     ++nNAcksSend;
    1685             : #endif
    1686             :     /* optimization: use the direct access to the reader. */
    1687           0 :     if( writerID == _id )
    1688             :     {
    1689           0 :          _addRepeat( nacks, count );
    1690           0 :          return;
    1691             :     }
    1692             : 
    1693             :     const size_t size = sizeof( DatagramNack ) -
    1694           0 :                         (CO_RSP_MAX_NACKS - count) * sizeof( Nack );
    1695             : 
    1696             :     // set the header
    1697             :     DatagramNack packet;
    1698           0 :     packet.set( _id, writerID, count );
    1699           0 :     memcpy( packet.nacks, nacks, count * sizeof( Nack ));
    1700           0 :     packet.byteswap();
    1701           0 :     _write->send( boost::asio::buffer( &packet, size ));
    1702             : }
    1703             : 
    1704           0 : void RSPConnection::_sendAckRequest()
    1705             : {
    1706             : #ifdef CO_INSTRUMENT_RSP
    1707             :     ++nAckRequests;
    1708             : #endif
    1709           0 :     LBLOG( LOG_RSP ) << "send ack request for " << uint16_t( _sequence -1 )
    1710           0 :                      << std::endl;
    1711           0 :     DatagramAckRequest ackRequest = { ACKREQ, _id, uint16_t( _sequence - 1 ) };
    1712           0 :     ackRequest.byteswap();
    1713           0 :     _write->send( boost::asio::buffer( &ackRequest, sizeof( DatagramAckRequest )) );
    1714           0 : }
    1715             : 
    1716           0 : std::ostream& operator << ( std::ostream& os,
    1717             :                             const RSPConnection& connection )
    1718             : {
    1719           0 :     os << lunchbox::disableFlush << lunchbox::disableHeader
    1720           0 :        << "RSPConnection id " << connection.getID() << " send rate "
    1721           0 :        << connection.getSendRate();
    1722             : 
    1723             : #ifdef CO_INSTRUMENT_RSP
    1724             :     const int prec = os.precision();
    1725             :     os.precision( 3 );
    1726             : 
    1727             :     const float time = instrumentClock.getTimef();
    1728             :     const float mbps = 1048.576f * time;
    1729             :     os << ": " << lunchbox::indent << std::endl
    1730             :        << float( nBytesRead ) / mbps << " / " << float( nBytesWritten ) / mbps
    1731             :        <<  " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
    1732             :        << " repeats " << nMergedDatagrams
    1733             :        << " merged"
    1734             :        << std::endl;
    1735             : 
    1736             :     os.precision( prec );
    1737             :     os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
    1738             :        << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
    1739             :        << writeWaitTime << " ms"
    1740             :        << std::endl
    1741             :        << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
    1742             :        << lunchbox::exdent;
    1743             : 
    1744             :     nReadData = 0;
    1745             :     nBytesRead = 0;
    1746             :     nBytesWritten = 0;
    1747             :     nDatagrams = 0;
    1748             :     nRepeated = 0;
    1749             :     nMergedDatagrams = 0;
    1750             :     nAckRequests = 0;
    1751             :     nAcksSend = 0;
    1752             :     nAcksRead = 0;
    1753             :     nAcksAccepted = 0;
    1754             :     nNAcksSend = 0;
    1755             :     nNAcksRead = 0;
    1756             :     writeWaitTime = 0.f;
    1757             : #endif
    1758           0 :     os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
    1759             : 
    1760           0 :     return os;
    1761             : }
    1762             : 
    1763          66 : }

Generated by: LCOV version 1.11