LCOV - code coverage report
Current view: top level - co - rspConnection.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 399 892 44.7 %
Date: 2014-10-06 Functions: 30 50 60.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c)      2009, Cedric Stalder <cedric.stalder@gmail.com>
       3             :  *               2009-2014, Stefan Eilemann <eile@equalizergraphics.com>
       4             :  *                    2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "rspConnection.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "log.h"
      28             : 
      29             : #include <lunchbox/rng.h>
      30             : #include <lunchbox/scopedMutex.h>
      31             : #include <lunchbox/sleep.h>
      32             : 
      33             : #include <boost/bind.hpp>
      34             : 
      35             : //#define CO_INSTRUMENT_RSP
      36             : #define CO_RSP_MERGE_WRITES
      37             : #define CO_RSP_MAX_TIMEOUTS 1000
      38             : #ifdef _WIN32
      39             : #  define CO_RSP_DEFAULT_PORT (4242)
      40             : #else
      41             : #  define CO_RSP_DEFAULT_PORT ( (getuid() % 64511) + 1024 )
      42             : #endif
      43             : 
      44             : 
      45             : // Note: Do not use version > 255, endianness detection magic relies on this.
      46             : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
      47             : 
      48             : namespace bp = boost::posix_time;
      49             : namespace ip = boost::asio::ip;
      50             : 
      51             : namespace co
      52             : {
      53             : 
      54             : namespace
      55             : {
      56             : #ifdef CO_INSTRUMENT_RSP
      57             : lunchbox::a_int32_t nReadData;
      58             : lunchbox::a_int32_t nBytesRead;
      59             : lunchbox::a_int32_t nBytesWritten;
      60             : lunchbox::a_int32_t nDatagrams;
      61             : lunchbox::a_int32_t nRepeated;
      62             : lunchbox::a_int32_t nMergedDatagrams;
      63             : lunchbox::a_int32_t nAckRequests;
      64             : lunchbox::a_int32_t nAcksSend;
      65             : lunchbox::a_int32_t nAcksSendTotal;
      66             : lunchbox::a_int32_t nAcksRead;
      67             : lunchbox::a_int32_t nAcksAccepted;
      68             : lunchbox::a_int32_t nNAcksSend;
      69             : lunchbox::a_int32_t nNAcksRead;
      70             : lunchbox::a_int32_t nNAcksResend;
      71             : 
      72             : float writeWaitTime = 0.f;
      73             : lunchbox::Clock instrumentClock;
      74             : #endif
      75             : 
      76             : static uint16_t _numBuffers = 0;
      77             : }
      78             : 
      79           4 : RSPConnection::RSPConnection()
      80             :     : _id( 0 )
      81             :     , _idAccepted( false )
      82           4 :     , _mtu( Global::getIAttribute( Global::IATTR_UDP_MTU ))
      83           4 :     , _ackFreq( Global::getIAttribute( Global::IATTR_RSP_ACK_FREQUENCY ))
      84             :     , _payloadSize( _mtu - sizeof( DatagramData ))
      85             :     , _timeouts( 0 )
      86           4 :     , _event( new EventConnection )
      87             :     , _read( 0 )
      88             :     , _write( 0 )
      89             :     , _timeout( _ioService )
      90             :     , _wakeup( _ioService )
      91           4 :     , _maxBucketSize( ( _mtu * _ackFreq) >> 1 )
      92             :     , _bucketSize( 0 )
      93             :     , _sendRate( 0 )
      94             :     , _thread( 0 )
      95           4 :     , _acked( std::numeric_limits< uint16_t >::max( ))
      96             :     , _threadBuffers( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS))
      97             :     , _recvBuffer( _mtu )
      98             :     , _readBuffer( 0 )
      99             :     , _readBufferPos( 0 )
     100             :     , _sequence( 0 )
     101             :     // ensure we have a handleConnectedTimeout before the write pop
     102          24 :     , _writeTimeOut( Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2 )
     103             : {
     104           4 :     _buildNewID();
     105           4 :     ConnectionDescriptionPtr description = _getDescription();
     106           4 :     description->type = CONNECTIONTYPE_RSP;
     107           4 :     description->bandwidth = 102400;
     108             : 
     109           4 :     LBCHECK( _event->connect( ));
     110             : 
     111           4 :     _buffers.reserve( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ));
     112         524 :     while( static_cast< int32_t >( _buffers.size( )) <
     113         260 :            Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ))
     114             :     {
     115         256 :         _buffers.push_back( new Buffer( _mtu ));
     116             :     }
     117             : 
     118           4 :     LBASSERT( sizeof( DatagramNack ) <= size_t( _mtu ));
     119           4 :     LBLOG( LOG_RSP ) << "New RSP connection, " << _buffers.size()
     120           4 :                      << " buffers of " << _mtu << " bytes" << std::endl;
     121             : 
     122           4 : }
     123             : 
     124          12 : RSPConnection::~RSPConnection()
     125             : {
     126           4 :     _close();
     127         264 :     while( !_buffers.empty( ))
     128             :     {
     129         256 :         delete _buffers.back();
     130         256 :         _buffers.pop_back();
     131             :     }
     132           8 : }
     133             : 
     134          12 : void RSPConnection::_close()
     135             : {
     136          12 :     if( _parent.isValid() && _parent->_id == _id )
     137           2 :         _parent->close();
     138             : 
     139          24 :     while( !_parent && _isWriting( ))
     140           0 :         lunchbox::sleep( 10 /*ms*/ );
     141             : 
     142          12 :     if( isClosed( ))
     143          20 :         return;
     144             : 
     145           4 :     lunchbox::ScopedWrite mutex( _mutexEvent );
     146           4 :     if( _thread )
     147             :     {
     148           2 :         LBASSERT( !_thread->isCurrent( ));
     149           2 :         _sendSimpleDatagram( ID_EXIT, _id );
     150           2 :         _ioService.stop();
     151           2 :         _thread->join();
     152           2 :         delete _thread;
     153             :     }
     154             : 
     155           4 :     _setState( STATE_CLOSING );
     156           4 :     if( _thread )
     157             :     {
     158           2 :          _thread = 0;
     159             : 
     160             :         // notify children to close
     161           4 :         for( RSPConnectionsCIter i=_children.begin(); i !=_children.end(); ++i )
     162             :         {
     163           2 :             RSPConnectionPtr child = *i;
     164           4 :             lunchbox::ScopedWrite mutexChild( child->_mutexEvent );
     165           2 :             child->_appBuffers.push( 0 );
     166           2 :             child->_event->set();
     167           2 :         }
     168             : 
     169           2 :         _children.clear();
     170           2 :         _newChildren.clear();
     171             :     }
     172             : 
     173           4 :     _parent = 0;
     174             : 
     175           4 :     if( _read )
     176           2 :         _read->close();
     177           4 :     delete _read;
     178           4 :     _read = 0;
     179             : 
     180           4 :     if( _write )
     181           2 :         _write->close();
     182           4 :     delete _write;
     183           4 :     _write = 0;
     184             : 
     185           4 :     _threadBuffers.clear();
     186           4 :     _appBuffers.push( 0 ); // unlock any other read/write threads
     187             : 
     188           4 :     _setState( STATE_CLOSED );
     189             : 
     190           4 :     mutex.leave();
     191           4 :     _event->close();
     192             : }
     193             : 
     194             : //----------------------------------------------------------------------
     195             : // Async IO handles
     196             : //----------------------------------------------------------------------
     197           4 : uint16_t RSPConnection::_buildNewID()
     198             : {
     199           4 :     lunchbox::RNG rng;
     200           4 :     _id = rng.get< uint16_t >();
     201           4 :     return _id;
     202             : }
     203             : 
     204           2 : bool RSPConnection::listen()
     205             : {
     206           2 :     ConnectionDescriptionPtr description = _getDescription();
     207           2 :     LBASSERT( description->type == CONNECTIONTYPE_RSP );
     208             : 
     209           2 :     if( !isClosed( ))
     210           0 :         return false;
     211             : 
     212           2 :     _setState( STATE_CONNECTING );
     213           2 :     _numBuffers =  Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS );
     214             : 
     215             :     // init udp connection
     216           2 :     if( description->port == 0 )
     217           1 :         description->port = CO_RSP_DEFAULT_PORT;
     218           2 :     if( description->getHostname().empty( ))
     219           0 :         description->setHostname( "239.255.42.43" );
     220           2 :     if( description->getInterface().empty( ))
     221           1 :         description->setInterface( "0.0.0.0" );
     222             : 
     223             :     try
     224             :     {
     225           2 :         const ip::address readAddress( ip::address::from_string( "0.0.0.0" ));
     226             :         const ip::udp::endpoint readEndpoint( readAddress,
     227           2 :                                               description->port );
     228             : 
     229           2 :         std::stringstream portStr;
     230           2 :         portStr << description->port;
     231           4 :         const std::string& port = portStr.str();
     232           4 :         ip::udp::resolver resolver( _ioService );
     233             :         const ip::udp::resolver::query queryHN( ip::udp::v4(),
     234           2 :                                                 description->getHostname(),
     235           4 :                                                 port );
     236           4 :         const ip::udp::resolver::iterator end;
     237             :         const ip::udp::resolver::iterator hostnameIP =
     238           4 :             resolver.resolve( queryHN );
     239             : 
     240           2 :         if( hostnameIP == end )
     241           0 :             return false;
     242             : 
     243           2 :         const ip::udp::endpoint writeEndpoint = *hostnameIP;
     244           2 :         const ip::address mcAddr( writeEndpoint.address() );
     245             : 
     246           2 :         _read = new ip::udp::socket( _ioService );
     247           2 :         _write = new ip::udp::socket( _ioService );
     248           2 :         _read->open( readEndpoint.protocol( ));
     249           2 :         _write->open( writeEndpoint.protocol( ));
     250             : 
     251           2 :         _read->set_option( ip::udp::socket::reuse_address( true ));
     252           2 :         _write->set_option( ip::udp::socket::reuse_address( true ));
     253             :         _read->set_option( ip::udp::socket::receive_buffer_size(
     254           2 :                        Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
     255             :         _write->set_option( ip::udp::socket::send_buffer_size(
     256           2 :                        Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
     257             : 
     258           2 :         _read->bind( readEndpoint );
     259             : 
     260             :         const ip::udp::resolver::query queryIF( ip::udp::v4(),
     261           2 :                                                 description->getInterface(),
     262           6 :                                                 "0" );
     263             :         const ip::udp::resolver::iterator interfaceIP =
     264           4 :             resolver.resolve( queryIF );
     265             : 
     266           2 :         if( interfaceIP == end )
     267           0 :             return false;
     268             : 
     269           2 :         const ip::address ifAddr( ip::udp::endpoint( *interfaceIP ).address( ));
     270           2 :         LBINFO << "Joining " << mcAddr << " on " << ifAddr << std::endl;
     271             : 
     272             :         _read->set_option( ip::multicast::join_group( mcAddr.to_v4(),
     273           2 :                                                       ifAddr.to_v4( )));
     274           2 :         _write->set_option( ip::multicast::outbound_interface( ifAddr.to_v4()));
     275             : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
     276           4 :         const std::string& ifIP = ifAddr.to_string();
     277             :         ::setsockopt( _write->native(), SOL_SOCKET, SO_BINDTODEVICE,
     278           2 :                       ifIP.c_str(), ifIP.size() + 1 );
     279             :         ::setsockopt( _read->native(), SOL_SOCKET, SO_BINDTODEVICE,
     280           2 :                       ifIP.c_str(), ifIP.size() + 1 );
     281             : #endif
     282             : 
     283           2 :         _write->connect( writeEndpoint );
     284             : 
     285           2 :         _read->set_option( ip::multicast::enable_loopback( false ));
     286           4 :         _write->set_option( ip::multicast::enable_loopback( false ));
     287             :     }
     288             :     catch( const boost::system::system_error& e )
     289             :     {
     290             :         LBWARN << "can't setup underlying UDP connection: " << e.what()
     291             :                << std::endl;
     292             :         delete _read;
     293             :         delete _write;
     294             :         _read = 0;
     295             :         _write = 0;
     296             :         return false;
     297             :     }
     298             : 
     299             :     // init communication protocol thread
     300           2 :     _thread = new Thread( this );
     301           2 :     _bucketSize = 0;
     302           2 :     _sendRate = description->bandwidth;
     303             : 
     304             :     // waits until RSP protocol establishes connection to the multicast network
     305           2 :     if( !_thread->start( ) )
     306             :     {
     307           0 :         close();
     308           0 :         return false;
     309             :     }
     310             : 
     311             :     // Make all buffers available for writing
     312           2 :     LBASSERT( _appBuffers.isEmpty( ));
     313           2 :     _appBuffers.push( _buffers );
     314             : 
     315           8 :     LBINFO << "Listening on " << description->getHostname() << ":"
     316          10 :            << description->port << " (" << description->toString() << " @"
     317           8 :            << (void*)this << ")" << std::endl;
     318           2 :     return true;
     319             : }
     320             : 
     321           2 : ConnectionPtr RSPConnection::acceptSync()
     322             : {
     323           2 :     if( !isListening( ))
     324           0 :         return 0;
     325             : 
     326           2 :     lunchbox::ScopedWrite mutex( _mutexConnection );
     327           2 :     LBASSERT( !_newChildren.empty( ));
     328           2 :     if( _newChildren.empty( ))
     329           0 :         return 0;
     330             : 
     331           4 :     RSPConnectionPtr newConnection = _newChildren.back();
     332           2 :     _newChildren.pop_back();
     333             : 
     334           6 :     LBINFO << _id << " accepted RSP connection " << newConnection->_id
     335           6 :            << std::endl;
     336             : 
     337           4 :     lunchbox::ScopedWrite mutex2( _mutexEvent );
     338           2 :     if( _newChildren.empty() )
     339           2 :         _event->reset();
     340             :     else
     341           0 :         _event->set();
     342             : 
     343           4 :     return newConnection;
     344             : }
     345             : 
     346       67598 : int64_t RSPConnection::readSync( void* buffer, const uint64_t bytes, const bool)
     347             : {
     348       67598 :     LBASSERT( bytes > 0 );
     349       67598 :     if( !isConnected ( ))
     350           0 :         return -1;
     351             : 
     352       67598 :     uint64_t bytesLeft = bytes;
     353       67598 :     uint8_t* ptr = reinterpret_cast< uint8_t* >( buffer );
     354             : 
     355             :     // redundant (done by the caller already), but saves some lock ops
     356      203643 :     while( bytesLeft )
     357             :     {
     358       68449 :         if( !_readBuffer )
     359             :         {
     360       14516 :             LBASSERT( _readBufferPos == 0 );
     361       14516 :             _readBuffer = _appBuffers.pop();
     362       14516 :             if( !_readBuffer )
     363             :             {
     364           2 :                 close();
     365             :                 return (bytes == bytesLeft) ?
     366           2 :                     -1 : static_cast< int64_t >( bytes - bytesLeft );
     367             :             }
     368             :         }
     369             : 
     370             :         const DatagramData* header = reinterpret_cast< const DatagramData* >(
     371       68447 :             _readBuffer->getData( ));
     372       68447 :         const uint8_t* payload = reinterpret_cast< const uint8_t* >( header+1 );
     373       68447 :         const size_t dataLeft = header->size - _readBufferPos;
     374       68447 :         const size_t size = LB_MIN( static_cast< size_t >( bytesLeft ),
     375             :                                     dataLeft );
     376             : 
     377       68447 :         memcpy( ptr, payload + _readBufferPos, size );
     378       68447 :         _readBufferPos += size;
     379       68447 :         ptr += size;
     380       68447 :         bytesLeft -= size;
     381             : 
     382             :         // if all data in the buffer has been taken
     383       68447 :         if( _readBufferPos >= header->size )
     384             :         {
     385       14514 :             LBASSERT( _readBufferPos == header->size );
     386             :             //LBLOG( LOG_RSP ) << "reset read buffer  " << header->sequence
     387             :             //                 << std::endl;
     388             : 
     389       14514 :             LBCHECK( _threadBuffers.push( _readBuffer ));
     390       14514 :             _readBuffer = 0;
     391       14514 :             _readBufferPos = 0;
     392             :         }
     393             :         else
     394             :         {
     395       53933 :             LBASSERT( _readBufferPos < header->size );
     396             :         }
     397             :     }
     398             : 
     399       67596 :     if( _readBuffer || !_appBuffers.isEmpty( ))
     400       54236 :         _event->set();
     401             :     else
     402             :     {
     403       13360 :         lunchbox::ScopedWrite mutex( _mutexEvent );
     404       13360 :         if( _appBuffers.isEmpty( ))
     405       13325 :             _event->reset();
     406             :     }
     407             : 
     408             : #ifdef CO_INSTRUMENT_RSP
     409             :     nBytesRead += bytes;
     410             : #endif
     411       67596 :     return bytes;
     412             : }
     413             : 
     414           2 : void RSPConnection::Thread::run()
     415             : {
     416           2 :     _connection->_runThread();
     417           2 :     _connection = 0;
     418           2 :     LBINFO << "Left RSP protocol thread" << std::endl;
     419           2 : }
     420             : 
     421       83806 : void RSPConnection::_handleTimeout( const boost::system::error_code& error )
     422             : {
     423       83806 :     if( error == boost::asio::error::operation_aborted )
     424      153015 :         return;
     425             : 
     426       14597 :     if( isListening( ))
     427       14517 :         _handleConnectedTimeout();
     428          80 :     else if( _idAccepted )
     429          40 :         _handleInitTimeout();
     430             :     else
     431          40 :         _handleAcceptIDTimeout();
     432             : }
     433             : 
     434          40 : void RSPConnection::_handleAcceptIDTimeout()
     435             : {
     436          40 :     ++_timeouts;
     437          40 :     if( _timeouts < 20 )
     438             :     {
     439          38 :         LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     440          38 :         _sendSimpleDatagram( ID_HELLO, _id );
     441             :     }
     442             :     else
     443             :     {
     444           2 :         LBLOG( LOG_RSP ) << "Confirm " << _id << std::endl;
     445           2 :         _sendSimpleDatagram( ID_CONFIRM, _id );
     446           2 :         _addConnection( _id, _sequence );
     447           2 :         _idAccepted = true;
     448           2 :         _timeouts = 0;
     449             :         // send a first datagram to announce me and discover all other
     450             :         // connections
     451           2 :         _sendCountNode();
     452             :     }
     453          40 :     _setTimeout( 10 );
     454          40 : }
     455             : 
     456          40 : void RSPConnection::_handleInitTimeout( )
     457             : {
     458          40 :     LBASSERT( !isListening( ))
     459          40 :     ++_timeouts;
     460          40 :     if( _timeouts < 20 )
     461          38 :         _sendCountNode();
     462             :     else
     463             :     {
     464           2 :         _setState( STATE_LISTENING );
     465           2 :         LBINFO << "RSP connection " << _id << " listening" << std::endl;
     466           2 :         _timeouts = 0;
     467           2 :         _ioService.stop(); // thread initialized, run restarts
     468             :     }
     469          40 :     _setTimeout( 10 );
     470          40 : }
     471             : 
     472           0 : void RSPConnection::_clearWriteQueues()
     473             : {
     474           0 :     while( !_threadBuffers.isEmpty() )
     475             :     {
     476           0 :         Buffer* buffer = 0;
     477           0 :         _threadBuffers.pop( buffer );
     478           0 :         _writeBuffers.push_back( buffer );
     479             :     }
     480             : 
     481           0 :     _finishWriteQueue( _sequence - 1 );
     482           0 :     LBASSERT( _threadBuffers.isEmpty() && _writeBuffers.empty() );
     483           0 : }
     484             : 
     485       14517 : void RSPConnection::_handleConnectedTimeout()
     486             : {
     487       14517 :     if( !isListening( ))
     488             :     {
     489           0 :         _clearWriteQueues();
     490           0 :         _ioService.stop();
     491           0 :         return;
     492             :     }
     493             : 
     494       14517 :     _processOutgoing();
     495             : 
     496       14517 :     if( _timeouts >= CO_RSP_MAX_TIMEOUTS )
     497             :     {
     498           0 :         LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
     499           0 :         bool all = true;
     500           0 :         for( RSPConnectionsCIter i =_children.begin(); i !=_children.end(); ++i)
     501             :         {
     502           0 :             RSPConnectionPtr child = *i;
     503           0 :             if ( child->_acked >= _sequence - _numBuffers && child->_id != _id )
     504             :             {
     505           0 :                 all = false;
     506           0 :                 break;
     507             :             }
     508           0 :         }
     509             : 
     510             :         // if all connections failed we probably got disconnected -> close and
     511             :         // exit else close all failed child connections
     512           0 :         if ( all )
     513             :         {
     514           0 :             _sendSimpleDatagram( ID_EXIT, _id );
     515           0 :             _appBuffers.pushFront( 0 ); // unlock write function
     516             : 
     517           0 :             for( RSPConnectionsCIter i =_children.begin();
     518           0 :                  i !=_children.end(); ++i)
     519             :             {
     520           0 :                 RSPConnectionPtr child = *i;
     521           0 :                 child->_setState( STATE_CLOSING );
     522           0 :                 child->_appBuffers.push( 0 ); // unlock read func
     523           0 :             }
     524             : 
     525           0 :             _clearWriteQueues();
     526           0 :             _ioService.stop();
     527           0 :             return;
     528             :         }
     529             : 
     530           0 :         RSPConnectionsCIter i =_children.begin();
     531           0 :         while ( i !=_children.end() )
     532             :         {
     533           0 :             RSPConnectionPtr child = *i;
     534           0 :             if ( child->_acked < _sequence - 1 && _id != child->_id )
     535             :             {
     536           0 :                 _sendSimpleDatagram( ID_EXIT, child->_id );
     537           0 :                 _removeConnection( child->_id );
     538             :             }
     539             :             else
     540             :             {
     541           0 :                 uint16_t wb = static_cast<uint16_t>( _writeBuffers.size( ));
     542           0 :                 child->_acked = _sequence - wb;
     543           0 :                 ++i;
     544             :             }
     545           0 :         }
     546             : 
     547           0 :         _timeouts = 0;
     548             :     }
     549             : }
     550             : 
     551             : RSPConnection::DatagramNode*
     552           0 : RSPConnection::_getDatagramNode( const size_t bytes )
     553             : {
     554           0 :     if( bytes < sizeof( DatagramNode ))
     555             :     {
     556           0 :         LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
     557           0 :                 << sizeof( DatagramNode ) << " bytes" << std::endl;
     558             :         //close();
     559           0 :         return 0;
     560             :     }
     561             :     DatagramNode& node =
     562           0 :                     *reinterpret_cast< DatagramNode* >( _recvBuffer.getData( ));
     563           0 :     node.byteswap();
     564           0 :     if( node.protocolVersion != CO_RSP_PROTOCOL_VERSION )
     565             :     {
     566           0 :         LBERROR << "Protocol version mismatch, got " << node.protocolVersion
     567           0 :                 << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
     568             :         //close();
     569           0 :         return 0;
     570             :     }
     571           0 :     return &node;
     572             : }
     573             : 
     574           2 : bool RSPConnection::_initThread()
     575             : {
     576           2 :     LBLOG( LOG_RSP ) << "Started RSP protocol thread" << std::endl;
     577           2 :     _timeouts = 0;
     578             : 
     579             :    // send a first datagram to announce me and discover other connections
     580           2 :     LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     581           2 :     _sendSimpleDatagram( ID_HELLO, _id );
     582           2 :     _setTimeout( 10 );
     583           2 :     _asyncReceiveFrom();
     584           2 :     _ioService.run();
     585           2 :     return isListening();
     586             : }
     587             : 
     588           2 : void RSPConnection::_runThread()
     589             : {
     590             :     //__debugbreak();
     591           2 :     _ioService.reset();
     592           2 :     _ioService.run();
     593           2 : }
     594             : 
     595       14579 : void RSPConnection::_setTimeout( const int32_t timeOut )
     596             : {
     597       14579 :     LBASSERT( timeOut >= 0 );
     598       14579 :     _timeout.expires_from_now( boost::posix_time::milliseconds( timeOut ));
     599             :     _timeout.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
     600       14579 :                                       boost::asio::placeholders::error ));
     601       14579 : }
     602             : 
     603       69227 : void RSPConnection::_postWakeup()
     604             : {
     605       69227 :     _wakeup.expires_from_now( boost::posix_time::milliseconds( 0 ));
     606             :     _wakeup.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
     607       69227 :                                      boost::asio::placeholders::error ));
     608       69227 : }
     609             : 
     610       14517 : void RSPConnection::_processOutgoing()
     611             : {
     612             : #ifdef CO_INSTRUMENT_RSP
     613             :     if( instrumentClock.getTime64() > 1000 )
     614             :     {
     615             :         LBWARN << *this << std::endl;
     616             :         instrumentClock.reset();
     617             :     }
     618             : #endif
     619             : 
     620       14517 :     if( !_repeatQueue.empty( ))
     621           0 :         _repeatData();
     622             :     else
     623       14517 :         _writeData();
     624             : 
     625       14517 :     if( !_threadBuffers.isEmpty() || !_repeatQueue.empty( ))
     626             :     {
     627       14497 :         _setTimeout( 0 ); // call again to send remaining
     628       14497 :         return;
     629             :     }
     630             :     // no more data to write, check/send ack request, reset timeout
     631             : 
     632          20 :     if( _writeBuffers.empty( )) // got all acks
     633             :     {
     634          20 :         _timeouts = 0;
     635          20 :         _timeout.cancel();
     636          20 :         return;
     637             :     }
     638             : 
     639             :     const int64_t timeout =
     640           0 :         Global::getIAttribute( Global::IATTR_RSP_ACK_TIMEOUT );
     641           0 :     const int64_t left = timeout - _clock.getTime64();
     642             : 
     643           0 :     if( left > 0 )
     644             :     {
     645           0 :         _setTimeout( left );
     646           0 :         return;
     647             :     }
     648             : 
     649             :     // (repeat) ack request
     650           0 :     _clock.reset();
     651           0 :     ++_timeouts;
     652           0 :     if ( _timeouts < CO_RSP_MAX_TIMEOUTS )
     653           0 :         _sendAckRequest();
     654           0 :     _setTimeout( timeout );
     655             : }
     656             : 
     657       14517 : void RSPConnection::_writeData()
     658             : {
     659       14517 :     Buffer* buffer = 0;
     660       14517 :     if( !_threadBuffers.pop( buffer )) // nothing to write
     661       14520 :         return;
     662             : 
     663       14514 :     _timeouts = 0;
     664       14514 :     LBASSERT( buffer );
     665             : 
     666             :     // write buffer
     667       14514 :     DatagramData* header = reinterpret_cast<DatagramData*>( buffer->getData( ));
     668       14514 :     header->sequence = _sequence++;
     669             : 
     670             : #ifdef CO_RSP_MERGE_WRITES
     671       14514 :     if( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
     672             :     {
     673       13645 :         std::vector< Buffer* > appBuffers;
     674       81223 :         while( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
     675             :         {
     676       54783 :             Buffer* buffer2 = 0;
     677       54783 :             LBCHECK( _threadBuffers.getFront( buffer2 ));
     678       54783 :             LBASSERT( buffer2 );
     679             :             DatagramData* header2 =
     680       54783 :                 reinterpret_cast<DatagramData*>( buffer2->getData( ));
     681             : 
     682       54783 :             if( uint32_t( header->size + header2->size ) > _payloadSize )
     683         850 :                 break;
     684             : 
     685       53933 :             memcpy( reinterpret_cast<uint8_t*>( header + 1 ) + header->size,
     686      107866 :                     header2 + 1, header2->size );
     687       53933 :             header->size += header2->size;
     688       53933 :             LBCHECK( _threadBuffers.pop( buffer2 ));
     689       53933 :             appBuffers.push_back( buffer2 );
     690             : #ifdef CO_INSTRUMENT_RSP
     691             :             ++nMergedDatagrams;
     692             : #endif
     693             :         }
     694             : 
     695       13645 :         if( !appBuffers.empty( ))
     696       12795 :             _appBuffers.push( appBuffers );
     697             :     }
     698             : #endif
     699             : 
     700             :     // send data
     701             :     //  Note 1: We could optimize the send away if we're all alone, but this is
     702             :     //          not a use case for RSP, so we don't care.
     703             :     //  Note 2: Data to myself will be 'written' in _finishWriteQueue once we
     704             :     //          got all acks for the packet
     705       14514 :     const uint32_t size = header->size + sizeof( DatagramData );
     706             : 
     707       14514 :     _waitWritable( size ); // OPT: process incoming in between
     708       14514 :     header->byteswap();
     709       14514 :     _write->send( boost::asio::buffer( header, size ));
     710             : 
     711             : #ifdef CO_INSTRUMENT_RSP
     712             :     ++nDatagrams;
     713             :     nBytesWritten += header->size;
     714             : #endif
     715             : 
     716             :     // save datagram for repeats (and self)
     717       14514 :     _writeBuffers.push_back( buffer );
     718             : 
     719       14514 :     if( _children.size() == 1 ) // We're all alone
     720             :     {
     721       14514 :         LBASSERT( _children.front()->_id == _id );
     722       14514 :         _finishWriteQueue( _sequence - 1 );
     723             :     }
     724             : }
     725             : 
     726       14514 : void RSPConnection::_waitWritable( const uint64_t bytes )
     727             : {
     728             : #ifdef CO_INSTRUMENT_RSP
     729             :     lunchbox::Clock clock;
     730             : #endif
     731             : 
     732       14514 :     _bucketSize += static_cast< uint64_t >( _clock.resetTimef() * _sendRate );
     733             :                                                      // opt omit: * 1024 / 1000;
     734       14514 :     _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
     735             : 
     736       14514 :     const uint64_t size = LB_MIN( bytes, static_cast< uint64_t >( _mtu ));
     737     2381648 :     while( _bucketSize < size )
     738             :     {
     739     2352620 :         lunchbox::Thread::yield();
     740     2352620 :         float time = _clock.resetTimef();
     741             : 
     742     4705240 :         while( time == 0.f )
     743             :         {
     744           0 :             lunchbox::Thread::yield();
     745           0 :             time = _clock.resetTimef();
     746             :         }
     747             : 
     748     2352620 :         _bucketSize += static_cast< int64_t >( time * _sendRate );
     749     2352620 :         _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
     750             :     }
     751       14514 :     _bucketSize -= size;
     752             : 
     753             : #ifdef CO_INSTRUMENT_RSP
     754             :     writeWaitTime += clock.getTimef();
     755             : #endif
     756             : 
     757       14514 :     ConstConnectionDescriptionPtr description = getDescription();
     758       14514 :     if( _sendRate < description->bandwidth )
     759             :     {
     760             :         _sendRate += int64_t(
     761           0 :             float( Global::getIAttribute( Global::IATTR_RSP_ERROR_UPSCALE )) *
     762           0 :             float( description->bandwidth ) * .001f );
     763           0 :         LBLOG( LOG_RSP ) << "speeding up to " << _sendRate << " KB/s"
     764           0 :                          << std::endl;
     765       14514 :     }
     766       14514 : }
     767             : 
     768           0 : void RSPConnection::_repeatData()
     769             : {
     770           0 :     _timeouts = 0;
     771             : 
     772           0 :     while( !_repeatQueue.empty( ))
     773             :     {
     774           0 :         Nack& request = _repeatQueue.front();
     775           0 :         const uint16_t distance = _sequence - request.start;
     776             : 
     777           0 :         if ( distance == 0 )
     778             :         {
     779           0 :             LBWARN << "ignoring invalid nack (" << request.start
     780           0 :                    << ".." << request.end << ")" << std::endl;
     781           0 :             _repeatQueue.pop_front();
     782           0 :             continue;
     783             :         }
     784             : 
     785           0 :         if( distance <= _writeBuffers.size( )) // not already acked
     786             :         {
     787             : //          LBLOG( LOG_RSP ) << "Repeat " << request.start << ", " << _sendRate
     788             : //                           << "KB/s"<< std::endl;
     789             : 
     790           0 :             const size_t i = _writeBuffers.size() - distance;
     791           0 :             Buffer* buffer = _writeBuffers[i];
     792           0 :             LBASSERT( buffer );
     793             : 
     794             :             DatagramData* header =
     795           0 :                 reinterpret_cast<DatagramData*>( buffer->getData( ));
     796           0 :             const uint32_t size = header->size + sizeof( DatagramData );
     797           0 :             LBASSERT( header->sequence == request.start );
     798             : 
     799             :             // send data
     800           0 :             _waitWritable( size ); // OPT: process incoming in between
     801             :             // already done by _writeData: header->byteswap();
     802           0 :             _write->send( boost::asio::buffer( header, size ) );
     803             : #ifdef CO_INSTRUMENT_RSP
     804             :             ++nRepeated;
     805             : #endif
     806             :         }
     807             : 
     808           0 :         if( request.start == request.end )
     809           0 :             _repeatQueue.pop_front();    // done with request
     810             :         else
     811           0 :             ++request.start;
     812             : 
     813           0 :         if( distance <= _writeBuffers.size( )) // send something
     814           0 :             return;
     815             :     }
     816             : }
     817             : 
     818       14514 : void RSPConnection::_finishWriteQueue( const uint16_t sequence )
     819             : {
     820       14514 :     LBASSERT( !_writeBuffers.empty( ));
     821             : 
     822       14514 :     RSPConnectionPtr connection = _findConnection( _id );
     823       14514 :     LBASSERT( connection.isValid( ));
     824       14514 :     LBASSERT( connection->_recvBuffers.empty( ));
     825             : 
     826             :     // Bundle pushing the buffers to the app to avoid excessive lock ops
     827       29028 :     Buffers readBuffers;
     828       29028 :     Buffers freeBuffers;
     829             : 
     830       14514 :     const uint16_t size = _sequence - sequence - 1;
     831       14514 :     LBASSERTINFO( size <= uint16_t( _writeBuffers.size( )),
     832             :                   size << " > " << _writeBuffers.size( ));
     833       14514 :     LBLOG( LOG_RSP ) << "Got all remote acks for " << sequence << " current "
     834           0 :                      << _sequence << " advance " << _writeBuffers.size() - size
     835       14514 :                      << " buffers" << std::endl;
     836             : 
     837       43542 :     while( _writeBuffers.size() > size_t( size ))
     838             :     {
     839       14514 :         Buffer* buffer = _writeBuffers.front();
     840       14514 :         _writeBuffers.pop_front();
     841             : 
     842             : #ifndef NDEBUG
     843             :         DatagramData* datagram =
     844       14514 :                          reinterpret_cast< DatagramData* >( buffer->getData( ));
     845       14514 :         datagram->byteswap();
     846       14514 :         LBASSERT( datagram->writerID == _id );
     847       14514 :         LBASSERTINFO( datagram->sequence ==
     848             :                       uint16_t( connection->_sequence + readBuffers.size( )),
     849             :                       datagram->sequence << ", " << connection->_sequence <<
     850             :                       ", " << readBuffers.size( ));
     851             :       //LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
     852             : #endif
     853             : 
     854       14514 :         Buffer* newBuffer = connection->_newDataBuffer( *buffer );
     855       14514 :         if( !newBuffer && !readBuffers.empty( )) // push prepared app buffers
     856             :         {
     857           0 :             lunchbox::ScopedWrite mutex( connection->_mutexEvent );
     858           0 :             LBLOG( LOG_RSP ) << "post " << readBuffers.size()
     859           0 :                              << " buffers starting with sequence "
     860           0 :                              << connection->_sequence << std::endl;
     861             : 
     862           0 :             connection->_appBuffers.push( readBuffers );
     863           0 :             connection->_sequence += uint16_t( readBuffers.size( ));
     864           0 :             readBuffers.clear();
     865           0 :             connection->_event->set();
     866             :         }
     867             : 
     868       29028 :         while( !newBuffer ) // no more data buffers, wait for app to drain
     869             :         {
     870           0 :             newBuffer = connection->_newDataBuffer( *buffer );
     871           0 :             lunchbox::Thread::yield();
     872             :         }
     873             : 
     874       14514 :         freeBuffers.push_back( buffer );
     875       14514 :         readBuffers.push_back( newBuffer );
     876             :     }
     877             : 
     878       14514 :     _appBuffers.push( freeBuffers );
     879       14514 :     if( !readBuffers.empty( ))
     880             :     {
     881       14514 :         lunchbox::ScopedWrite mutex( connection->_mutexEvent );
     882             : #if 0
     883             :         LBLOG( LOG_RSP )
     884             :             << "post " << readBuffers.size() << " buffers starting at "
     885             :             << connection->_sequence << std::endl;
     886             : #endif
     887             : 
     888       14514 :         connection->_appBuffers.push( readBuffers );
     889       14514 :         connection->_sequence += uint16_t( readBuffers.size( ));
     890       14514 :         connection->_event->set();
     891             :     }
     892             : 
     893       14514 :     connection->_acked = uint16_t( connection->_sequence - 1 );
     894       14514 :     LBASSERT( connection->_acked == sequence );
     895             : 
     896       29028 :     _timeouts = 0;
     897       14514 : }
     898             : 
     899           0 : void RSPConnection::_handlePacket( const boost::system::error_code& /* error */,
     900             :                                    const size_t bytes )
     901             : {
     902           0 :     if( isListening( ))
     903             :     {
     904           0 :         _handleConnectedData( bytes );
     905             : 
     906           0 :         if( isListening( ))
     907           0 :             _processOutgoing();
     908             :         else
     909             :         {
     910           0 :             _ioService.stop();
     911           0 :             return;
     912             :         }
     913             :     }
     914           0 :     else if( bytes >= sizeof( DatagramNode ))
     915             :     {
     916           0 :         if( _idAccepted )
     917           0 :             _handleInitData( bytes, false );
     918             :         else
     919           0 :             _handleAcceptIDData( bytes );
     920             :     }
     921             : 
     922             :     //LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
     923           0 :     _asyncReceiveFrom();
     924             : }
     925             : 
     926           0 : void RSPConnection::_handleAcceptIDData( const size_t bytes )
     927             : {
     928           0 :     DatagramNode* pNode = _getDatagramNode( bytes );
     929           0 :     if( !pNode )
     930           0 :         return;
     931             : 
     932           0 :     DatagramNode& node = *pNode;
     933             : 
     934           0 :     switch( node.type )
     935             :     {
     936             :         case ID_HELLO:
     937           0 :             _checkNewID( node.connectionID );
     938           0 :             break;
     939             : 
     940             :         case ID_HELLO_REPLY:
     941           0 :             _addConnection( node.connectionID, node.data );
     942           0 :             break;
     943             : 
     944             :         case ID_DENY:
     945             :             // a connection refused my ID, try another ID
     946           0 :             if( node.connectionID == _id )
     947             :             {
     948           0 :                 _timeouts = 0;
     949           0 :                 _sendSimpleDatagram( ID_HELLO, _buildNewID( ));
     950           0 :                 LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     951             :             }
     952           0 :             break;
     953             : 
     954             :         case ID_EXIT:
     955           0 :             _removeConnection( node.connectionID );
     956           0 :             break;
     957             : 
     958             :         default:
     959           0 :             LBUNIMPLEMENTED;
     960           0 :             break;
     961             :     }
     962             : }
     963             : 
     964           0 : void RSPConnection::_handleInitData( const size_t bytes, const bool connected )
     965             : {
     966           0 :     DatagramNode* pNode = _getDatagramNode( bytes );
     967           0 :     if( !pNode )
     968           0 :         return;
     969             : 
     970           0 :     DatagramNode& node = *pNode;
     971             : 
     972           0 :     switch( node.type )
     973             :     {
     974             :         case ID_HELLO:
     975           0 :             if( !connected )
     976           0 :                 _timeouts = 0;
     977           0 :             _checkNewID( node.connectionID ) ;
     978           0 :             return;
     979             : 
     980             :         case ID_CONFIRM:
     981           0 :             if( !connected )
     982           0 :                 _timeouts = 0;
     983           0 :             _addConnection( node.connectionID, node.data );
     984           0 :             return;
     985             : 
     986             :         case COUNTNODE:
     987           0 :             LBLOG( LOG_RSP ) << "Got " << node.data << " nodes from "
     988           0 :                              << node.connectionID << std::endl;
     989           0 :             return;
     990             : 
     991             :         case ID_HELLO_REPLY:
     992           0 :             _addConnection( node.connectionID, node.data );
     993           0 :             return;
     994             : 
     995             :         case ID_EXIT:
     996           0 :             _removeConnection( node.connectionID );
     997           0 :             return;
     998             : 
     999             :         default:
    1000           0 :             LBUNIMPLEMENTED;
    1001           0 :             break;
    1002             :     }
    1003             : }
    1004             : 
    1005           0 : void RSPConnection::_handleConnectedData( const size_t bytes )
    1006             : {
    1007           0 :     if( bytes < sizeof( uint16_t ))
    1008           0 :         return;
    1009             : 
    1010           0 :     void* data = _recvBuffer.getData();
    1011           0 :     uint16_t type = *reinterpret_cast< uint16_t* >( data );
    1012             : #ifdef COLLAGE_BIGENDIAN
    1013             :     lunchbox::byteswap( type );
    1014             : #endif
    1015           0 :     switch( type )
    1016             :     {
    1017             :         case DATA:
    1018           0 :             LBCHECK( _handleData( bytes ));
    1019           0 :             break;
    1020             : 
    1021             :         case ACK:
    1022           0 :             LBCHECK( _handleAck( bytes ));
    1023           0 :             break;
    1024             : 
    1025             :         case NACK:
    1026           0 :             LBCHECK( _handleNack( ));
    1027           0 :             break;
    1028             : 
    1029             :         case ACKREQ: // The writer asks for an ack/nack
    1030           0 :             LBCHECK( _handleAckRequest( bytes ));
    1031           0 :             break;
    1032             : 
    1033             :         case ID_HELLO:
    1034             :         case ID_HELLO_REPLY:
    1035             :         case ID_CONFIRM:
    1036             :         case ID_EXIT:
    1037             :         case ID_DENY:
    1038             :         case COUNTNODE:
    1039           0 :             _handleInitData( bytes, true );
    1040           0 :             break;
    1041             : 
    1042             :         default:
    1043           0 :             LBASSERTINFO( false,
    1044             :                           "Don't know how to handle packet of type " << type );
    1045             :     }
    1046             : 
    1047             : }
    1048             : 
    1049           2 : void RSPConnection::_asyncReceiveFrom()
    1050             : {
    1051             :     _read->async_receive_from(
    1052           2 :         boost::asio::buffer( _recvBuffer.getData(), _mtu ), _readAddr,
    1053             :         boost::bind( &RSPConnection::_handlePacket, this,
    1054             :                      boost::asio::placeholders::error,
    1055           4 :                      boost::asio::placeholders::bytes_transferred ));
    1056           2 : }
    1057             : 
    1058           0 : bool RSPConnection::_handleData( const size_t bytes )
    1059             : {
    1060           0 :     if( bytes < sizeof( DatagramData ))
    1061           0 :         return false;
    1062             :     DatagramData& datagram =
    1063           0 :                     *reinterpret_cast< DatagramData* >( _recvBuffer.getData( ));
    1064           0 :     datagram.byteswap();
    1065             : 
    1066             : #ifdef CO_INSTRUMENT_RSP
    1067             :     ++nReadData;
    1068             : #endif
    1069           0 :     const uint16_t writerID = datagram.writerID;
    1070             : #ifdef Darwin
    1071             :     // There is occasionally a packet from ourselves, even though multicast loop
    1072             :     // is not set?!
    1073             :     if( writerID == _id )
    1074             :         return true;
    1075             : #else
    1076           0 :     LBASSERT( writerID != _id );
    1077             : #endif
    1078             : 
    1079           0 :     RSPConnectionPtr connection = _findConnection( writerID );
    1080             : 
    1081           0 :     if( !connection )  // unknown connection ?
    1082             :     {
    1083           0 :         LBASSERTINFO( false, "Can't find connection with id " << writerID );
    1084           0 :         return false;
    1085             :     }
    1086           0 :     LBASSERT( connection->_id == writerID );
    1087             : 
    1088           0 :     const uint16_t sequence = datagram.sequence;
    1089             : //  LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID <<std::endl;
    1090             : 
    1091           0 :     if( connection->_sequence == sequence ) // in-order packet
    1092             :     {
    1093           0 :         Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
    1094           0 :         if( !newBuffer ) // no more data buffers, drop packet
    1095           0 :             return true;
    1096             : 
    1097           0 :         lunchbox::ScopedWrite mutex( connection->_mutexEvent );
    1098           0 :         connection->_pushDataBuffer( newBuffer );
    1099             : 
    1100           0 :         while( !connection->_recvBuffers.empty( )) // enqueue ready pending data
    1101             :         {
    1102           0 :             newBuffer = connection->_recvBuffers.front();
    1103           0 :             if( !newBuffer )
    1104           0 :                 break;
    1105             : 
    1106           0 :             connection->_recvBuffers.pop_front();
    1107           0 :             connection->_pushDataBuffer( newBuffer );
    1108             :         }
    1109             : 
    1110           0 :         if( !connection->_recvBuffers.empty() &&
    1111           0 :             !connection->_recvBuffers.front( )) // update for new _sequence
    1112             :         {
    1113           0 :             connection->_recvBuffers.pop_front();
    1114             :         }
    1115             : 
    1116           0 :         connection->_event->set();
    1117           0 :         return true;
    1118             :     }
    1119             : 
    1120           0 :     const uint16_t max = std::numeric_limits< uint16_t >::max();
    1121           0 :     if(( connection->_sequence > sequence &&
    1122           0 :          max - connection->_sequence + sequence > _numBuffers ) ||
    1123           0 :        ( connection->_sequence < sequence &&
    1124           0 :          sequence - connection->_sequence > _numBuffers ))
    1125             :     {
    1126             :         // ignore it if it's a repetition for another reader
    1127           0 :         return true;
    1128             :     }
    1129             : 
    1130             :     // else out of order
    1131             : 
    1132           0 :     const uint16_t size = sequence - connection->_sequence;
    1133           0 :     LBASSERT( size != 0 );
    1134           0 :     LBASSERTINFO( size <= _numBuffers, size << " > " << _numBuffers );
    1135             : 
    1136           0 :     ssize_t i = ssize_t( size ) - 1;
    1137           0 :     const bool gotPacket = ( connection->_recvBuffers.size() >= size &&
    1138           0 :                              connection->_recvBuffers[ i ] );
    1139           0 :     if( gotPacket )
    1140           0 :         return true;
    1141             : 
    1142           0 :     Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
    1143           0 :     if( !newBuffer ) // no more data buffers, drop packet
    1144           0 :         return true;
    1145             : 
    1146           0 :     if( connection->_recvBuffers.size() < size )
    1147           0 :         connection->_recvBuffers.resize( size, 0 );
    1148             : 
    1149           0 :     LBASSERT( !connection->_recvBuffers[ i ] );
    1150           0 :     connection->_recvBuffers[ i ] = newBuffer;
    1151             : 
    1152             :     // early nack: request missing packets before current
    1153           0 :     --i;
    1154           0 :     Nack nack = { connection->_sequence, uint16_t( sequence - 1 ) };
    1155           0 :     if( i > 0 )
    1156             :     {
    1157           0 :         if( connection->_recvBuffers[i] ) // got previous packet
    1158           0 :             return true;
    1159             : 
    1160           0 :         while( i >= 0 && !connection->_recvBuffers[i] )
    1161           0 :             --i;
    1162             : 
    1163           0 :         const Buffer* lastBuffer = i>=0 ? connection->_recvBuffers[i] : 0;
    1164           0 :         if( lastBuffer )
    1165             :         {
    1166           0 :             nack.start = connection->_sequence + i;
    1167             :         }
    1168             :     }
    1169             : 
    1170           0 :     LBLOG( LOG_RSP ) << "send early nack " << nack.start << ".." << nack.end
    1171           0 :                      << " current " << connection->_sequence << " ooo "
    1172           0 :                      << connection->_recvBuffers.size() << std::endl;
    1173             : 
    1174           0 :     if( nack.end < nack.start )
    1175             :         // OPT: don't drop nack 0..nack.end, but it doesn't happen often
    1176           0 :         nack.end = std::numeric_limits< uint16_t >::max();
    1177             : 
    1178           0 :     _sendNack( writerID, &nack, 1 );
    1179           0 :     return true;
    1180             : }
    1181             : 
    1182       14514 : RSPConnection::Buffer* RSPConnection::_newDataBuffer( Buffer& inBuffer )
    1183             : {
    1184       14514 :     LBASSERT( static_cast< int32_t >( inBuffer.getMaxSize( )) == _mtu );
    1185             : 
    1186       14514 :     Buffer* buffer = 0;
    1187       14514 :     if( _threadBuffers.pop( buffer ))
    1188             :     {
    1189       14514 :         buffer->swap( inBuffer );
    1190       14514 :         return buffer;
    1191             :     }
    1192             : 
    1193             :     // we do not have a free buffer, which means that the receiver is slower
    1194             :     // then our read thread. This is bad, because now we'll drop the data and
    1195             :     // will send a NAck packet upon the ack request, causing retransmission even
    1196             :     // though we'll probably drop it again
    1197           0 :     LBLOG( LOG_RSP ) << "Reader too slow, dropping data" << std::endl;
    1198             : 
    1199             :     // Set the event if there is data to read. This shouldn't be needed since
    1200             :     // the event should be set in this case, but it'll increase the robustness
    1201           0 :     lunchbox::ScopedWrite mutex( _mutexEvent );
    1202           0 :     if( !_appBuffers.isEmpty( ))
    1203           0 :         _event->set();
    1204           0 :     return 0;
    1205             : }
    1206             : 
    1207           0 : void RSPConnection::_pushDataBuffer( Buffer* buffer )
    1208             : {
    1209           0 :     LBASSERT( _parent );
    1210             : #ifndef NDEBUG
    1211           0 :     DatagramData* dgram = reinterpret_cast< DatagramData* >(buffer->getData( ));
    1212           0 :     LBASSERTINFO( dgram->sequence == _sequence,
    1213             :                   dgram->sequence << " != " << _sequence );
    1214             : #endif
    1215             : 
    1216           0 :     if( (( _sequence + _parent->_id ) % _ackFreq ) == 0 )
    1217           0 :         _parent->_sendAck( _id, _sequence );
    1218             : 
    1219           0 :     LBLOG( LOG_RSP ) << "post buffer " << _sequence << std::endl;
    1220           0 :     ++_sequence;
    1221           0 :     _appBuffers.push( buffer );
    1222           0 : }
    1223             : 
    1224           0 : bool RSPConnection::_handleAck( const size_t bytes )
    1225             : {
    1226           0 :     if( bytes < sizeof( DatagramAck ))
    1227           0 :         return false;
    1228             :     DatagramAck& ack =
    1229           0 :                      *reinterpret_cast< DatagramAck* >( _recvBuffer.getData( ));
    1230           0 :     ack.byteswap();
    1231             : 
    1232             : #ifdef CO_INSTRUMENT_RSP
    1233             :     ++nAcksRead;
    1234             : #endif
    1235             : 
    1236           0 :     if( ack.writerID != _id )
    1237           0 :         return true;
    1238             : 
    1239           0 :     LBLOG( LOG_RSP ) << "got ack from " << ack.readerID << " for "
    1240           0 :                      << ack.writerID << " sequence " << ack.sequence
    1241           0 :                      << " current " << _sequence << std::endl;
    1242             : 
    1243             :     // find destination connection, update ack data if needed
    1244           0 :     RSPConnectionPtr connection = _findConnection( ack.readerID );
    1245           0 :     if( !connection )
    1246             :     {
    1247           0 :         LBUNREACHABLE;
    1248           0 :         return false;
    1249             :     }
    1250             : 
    1251           0 :     if( connection->_acked >= ack.sequence &&
    1252           0 :         connection->_acked - ack.sequence <= _numBuffers )
    1253             :     {
    1254             :         // I have received a later ack previously from the reader
    1255           0 :         LBLOG( LOG_RSP ) << "Late ack" << std::endl;
    1256           0 :         return true;
    1257             :     }
    1258             : 
    1259             : #ifdef CO_INSTRUMENT_RSP
    1260             :     ++nAcksAccepted;
    1261             : #endif
    1262           0 :     connection->_acked = ack.sequence;
    1263           0 :     _timeouts = 0; // reset timeout counter
    1264             : 
    1265             :     // Check if we can advance _acked
    1266           0 :     uint16_t acked = ack.sequence;
    1267             : 
    1268           0 :     for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
    1269             :     {
    1270           0 :         RSPConnectionPtr child = *i;
    1271           0 :         if( child->_id == _id )
    1272           0 :             continue;
    1273             : 
    1274           0 :         const uint16_t distance = child->_acked - acked;
    1275           0 :         if( distance > _numBuffers )
    1276           0 :             acked = child->_acked;
    1277           0 :     }
    1278             : 
    1279           0 :     RSPConnectionPtr selfChild = _findConnection( _id );
    1280           0 :     const uint16_t distance = acked - selfChild->_acked;
    1281           0 :     if( distance <= _numBuffers )
    1282           0 :         _finishWriteQueue( acked );
    1283           0 :     return true;
    1284             : }
    1285             : 
    1286           0 : bool RSPConnection::_handleNack()
    1287             : {
    1288             :     DatagramNack& nack =
    1289           0 :                     *reinterpret_cast< DatagramNack* >( _recvBuffer.getData( ));
    1290           0 :     nack.byteswap();
    1291             : 
    1292             : #ifdef CO_INSTRUMENT_RSP
    1293             :     ++nNAcksRead;
    1294             : #endif
    1295             : 
    1296           0 :     if( _id != nack.writerID )
    1297             :     {
    1298           0 :         LBLOG( LOG_RSP )
    1299           0 :             << "ignore " << nack.count << " nacks from " << nack.readerID
    1300           0 :             << " for " << nack.writerID << " (not me)"<< std::endl;
    1301           0 :         return true;
    1302             :     }
    1303             : 
    1304           0 :     LBLOG( LOG_RSP )
    1305           0 :         << "handle " << nack.count << " nacks from " << nack.readerID
    1306           0 :         << " for " << nack.writerID << std::endl;
    1307             : 
    1308           0 :     RSPConnectionPtr connection = _findConnection( nack.readerID );
    1309           0 :     if( !connection )
    1310             :     {
    1311           0 :         LBUNREACHABLE;
    1312           0 :         return false;
    1313             :         // it's an unknown connection, TODO add this connection?
    1314             :     }
    1315             : 
    1316           0 :     _timeouts = 0;
    1317           0 :     _addRepeat( nack.nacks, nack.count );
    1318           0 :     return true;
    1319             : }
    1320             : 
    1321           0 : void RSPConnection::_addRepeat( const Nack* nacks, uint16_t num )
    1322             : {
    1323           0 :     LBLOG( LOG_RSP ) << lunchbox::disableFlush << "Queue repeat requests ";
    1324           0 :     size_t lost = 0;
    1325             : 
    1326           0 :     for( size_t i = 0; i < num; ++i )
    1327             :     {
    1328           0 :         const Nack& nack = nacks[ i ];
    1329           0 :         LBASSERT( nack.start <= nack.end );
    1330             : 
    1331           0 :         LBLOG( LOG_RSP ) << nack.start << ".." << nack.end << " ";
    1332             : 
    1333           0 :         bool merged = false;
    1334           0 :         for( RepeatQueue::iterator j = _repeatQueue.begin();
    1335           0 :              j != _repeatQueue.end() && !merged; ++j )
    1336             :         {
    1337           0 :             Nack& old = *j;
    1338           0 :             if( old.start <= nack.end && old.end >= nack.start )
    1339             :             {
    1340           0 :                 if( old.start > nack.start )
    1341             :                 {
    1342           0 :                     lost += old.start - nack.start;
    1343           0 :                     old.start = nack.start;
    1344           0 :                     merged = true;
    1345             :                 }
    1346           0 :                 if( old.end < nack.end )
    1347             :                 {
    1348           0 :                     lost += nack.end - old.end;
    1349           0 :                     old.end = nack.end;
    1350           0 :                     merged = true;
    1351             :                 }
    1352           0 :                 LBASSERT( lost < _numBuffers );
    1353             :             }
    1354             :         }
    1355             : 
    1356           0 :         if( !merged )
    1357             :         {
    1358           0 :             lost += uint16_t( nack.end - nack.start ) + 1;
    1359           0 :             LBASSERT( lost <= _numBuffers );
    1360           0 :             _repeatQueue.push_back( nack );
    1361             :         }
    1362             :     }
    1363             : 
    1364           0 :     ConstConnectionDescriptionPtr description = getDescription();
    1365           0 :     if( _sendRate >
    1366           0 :         ( description->bandwidth >>
    1367           0 :           Global::getIAttribute( Global::IATTR_RSP_MIN_SENDRATE_SHIFT )))
    1368             :     {
    1369           0 :         const float delta = float( lost ) * .001f *
    1370           0 :                      Global::getIAttribute( Global::IATTR_RSP_ERROR_DOWNSCALE );
    1371           0 :         const float maxDelta = .01f *
    1372           0 :             float( Global::getIAttribute( Global::IATTR_RSP_ERROR_MAXSCALE ));
    1373           0 :         const float downScale = LB_MIN( delta, maxDelta );
    1374           0 :         _sendRate -= 1 + int64_t( _sendRate * downScale );
    1375           0 :         LBLOG( LOG_RSP )
    1376           0 :             << ", lost " << lost << " slowing down " << downScale * 100.f
    1377           0 :             << "% to " << _sendRate << " KB/s" << std::endl
    1378           0 :             << lunchbox::enableFlush;
    1379             :     }
    1380             :     else
    1381           0 :         LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush;
    1382           0 : }
    1383             : 
    1384           0 : bool RSPConnection::_handleAckRequest( const size_t bytes )
    1385             : {
    1386           0 :     if( bytes < sizeof( DatagramAckRequest ))
    1387           0 :         return false;
    1388             :     DatagramAckRequest& ackRequest =
    1389           0 :               *reinterpret_cast< DatagramAckRequest* >( _recvBuffer.getData( ));
    1390           0 :     ackRequest.byteswap();
    1391             : 
    1392           0 :     const uint16_t writerID = ackRequest.writerID;
    1393             : #ifdef Darwin
    1394             :     // There is occasionally a packet from ourselves, even though multicast loop
    1395             :     // is not set?!
    1396             :     if( writerID == _id )
    1397             :         return true;
    1398             : #else
    1399           0 :     LBASSERT( writerID != _id );
    1400             : #endif
    1401           0 :     RSPConnectionPtr connection = _findConnection( writerID );
    1402           0 :     if( !connection )
    1403             :     {
    1404           0 :         LBUNREACHABLE;
    1405           0 :         return false;
    1406             :     }
    1407             : 
    1408           0 :     const uint16_t reqID = ackRequest.sequence;
    1409           0 :     const uint16_t gotID = connection->_sequence - 1;
    1410           0 :     const uint16_t distance = reqID - gotID;
    1411             : 
    1412           0 :     LBLOG( LOG_RSP ) << "ack request "  << reqID << " from " << writerID
    1413           0 :                      << " got " << gotID << " missing " << distance
    1414           0 :                      << std::endl;
    1415             : 
    1416           0 :     if( (reqID == gotID) ||
    1417           0 :         (gotID > reqID && gotID - reqID <= _numBuffers) ||
    1418           0 :         (gotID < reqID && distance > _numBuffers) )
    1419             :     {
    1420           0 :         _sendAck( connection->_id, gotID );
    1421           0 :         return true;
    1422             :     }
    1423             :     // else find all missing datagrams
    1424             : 
    1425           0 :     const uint16_t max = CO_RSP_MAX_NACKS - 2;
    1426             :     Nack nacks[ CO_RSP_MAX_NACKS ];
    1427           0 :     uint16_t i = 0;
    1428             : 
    1429           0 :     nacks[ i ].start = connection->_sequence;
    1430           0 :     LBLOG( LOG_RSP ) << lunchbox::disableFlush << "nacks: "
    1431           0 :                      << nacks[i].start << "..";
    1432             : 
    1433           0 :     std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
    1434           0 :     std::deque<Buffer*>::const_iterator first = j;
    1435           0 :     for( ; j != connection->_recvBuffers.end() && i < max; ++j )
    1436             :     {
    1437           0 :         if( *j ) // got buffer
    1438             :         {
    1439           0 :             nacks[ i ].end = connection->_sequence + std::distance( first, j);
    1440           0 :             LBLOG( LOG_RSP ) << nacks[i].end << ", ";
    1441           0 :             if( nacks[ i ].end < nacks[ i ].start )
    1442             :             {
    1443           0 :                 LBASSERT( nacks[ i ].end < _numBuffers );
    1444           0 :                 nacks[ i + 1 ].start = 0;
    1445           0 :                 nacks[ i + 1 ].end = nacks[ i ].end;
    1446           0 :                 nacks[ i ].end = std::numeric_limits< uint16_t >::max();
    1447           0 :                 ++i;
    1448             :             }
    1449           0 :             ++i;
    1450             : 
    1451             :             // find next hole
    1452           0 :             for( ++j; j != connection->_recvBuffers.end() && (*j); ++j )
    1453             :                 /* nop */;
    1454             : 
    1455           0 :             if( j == connection->_recvBuffers.end( ))
    1456           0 :                 break;
    1457             : 
    1458           0 :             nacks[i].start = connection->_sequence + std::distance(first, j) +1;
    1459           0 :             LBLOG( LOG_RSP ) << nacks[i].start << "..";
    1460             :         }
    1461             :     }
    1462             : 
    1463           0 :     if( j != connection->_recvBuffers.end() || i == 0 )
    1464             :     {
    1465           0 :         nacks[ i ].end = reqID;
    1466           0 :         LBLOG( LOG_RSP ) << nacks[i].end;
    1467           0 :         ++i;
    1468             :     }
    1469           0 :     else if( uint16_t( reqID - nacks[i-1].end ) < _numBuffers )
    1470             :     {
    1471           0 :         nacks[i].start = nacks[i-1].end + 1;
    1472           0 :         nacks[i].end = reqID;
    1473           0 :         LBLOG( LOG_RSP ) << nacks[i].start << ".." << nacks[i].end;
    1474           0 :         ++i;
    1475             :     }
    1476           0 :     if( nacks[ i -1 ].end < nacks[ i - 1 ].start )
    1477             :     {
    1478           0 :         LBASSERT( nacks[ i - 1 ].end < _numBuffers );
    1479           0 :         nacks[ i ].start = 0;
    1480           0 :         nacks[ i ].end = nacks[ i - 1 ].end;
    1481           0 :         nacks[ i - 1 ].end = std::numeric_limits< uint16_t >::max();
    1482           0 :         ++i;
    1483             :     }
    1484             : 
    1485           0 :     LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush << "send " << i
    1486           0 :                      << " nacks to " << connection->_id << std::endl;
    1487             : 
    1488           0 :     LBASSERT( i > 0 );
    1489           0 :     _sendNack( connection->_id, nacks, i );
    1490           0 :     return true;
    1491             : }
    1492             : 
    1493           0 : void RSPConnection::_checkNewID( uint16_t id )
    1494             : {
    1495             :     // look if the new ID exist in another connection
    1496           0 :     if( id == _id || _findConnection( id ))
    1497             :     {
    1498           0 :         LBLOG( LOG_RSP ) << "Deny " << id << std::endl;
    1499           0 :         _sendSimpleDatagram( ID_DENY, _id );
    1500             :     }
    1501             :     else
    1502           0 :         _sendSimpleDatagram( ID_HELLO_REPLY, _id );
    1503           0 : }
    1504             : 
    1505       14558 : RSPConnectionPtr RSPConnection::_findConnection( const uint16_t id )
    1506             : {
    1507       14558 :     for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
    1508             :     {
    1509       14556 :         if( (*i)->_id == id )
    1510       14556 :             return *i;
    1511             :     }
    1512           2 :     return 0;
    1513             : }
    1514             : 
    1515           2 : bool RSPConnection::_addConnection( const uint16_t id, const uint16_t sequence )
    1516             : {
    1517           2 :     if( _findConnection( id ))
    1518           0 :         return false;
    1519             : 
    1520           2 :     LBINFO << "add connection " << id << std::endl;
    1521           2 :     RSPConnectionPtr connection = new RSPConnection();
    1522           2 :     connection->_id = id;
    1523           2 :     connection->_parent = this;
    1524           2 :     connection->_setState( STATE_CONNECTED );
    1525           2 :     connection->_setDescription( _getDescription( ));
    1526           2 :     connection->_sequence = sequence;
    1527           2 :     LBASSERT( connection->_appBuffers.isEmpty( ));
    1528             : 
    1529             :     // Make all buffers available for reading
    1530         390 :     for( BuffersCIter i = connection->_buffers.begin();
    1531         260 :          i != connection->_buffers.end(); ++i )
    1532             :     {
    1533         128 :         Buffer* buffer = *i;
    1534         128 :         LBCHECK( connection->_threadBuffers.push( buffer ));
    1535             :     }
    1536             : 
    1537           2 :     _children.push_back( connection );
    1538           2 :     _sendCountNode();
    1539             : 
    1540           4 :     lunchbox::ScopedWrite mutex( _mutexConnection );
    1541           2 :     _newChildren.push_back( connection );
    1542             : 
    1543           4 :     lunchbox::ScopedWrite mutex2( _mutexEvent );
    1544           2 :     _event->set();
    1545           4 :     return true;
    1546             : }
    1547             : 
    1548           0 : void RSPConnection::_removeConnection( const uint16_t id )
    1549             : {
    1550           0 :     LBINFO << "remove connection " << id << std::endl;
    1551           0 :     if( id == _id )
    1552           0 :         return;
    1553             : 
    1554           0 :     for( RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i )
    1555             :     {
    1556           0 :         RSPConnectionPtr child = *i;
    1557           0 :         if( child->_id == id )
    1558             :         {
    1559           0 :             _children.erase( i );
    1560             : 
    1561           0 :             lunchbox::ScopedWrite mutex( child->_mutexEvent );
    1562           0 :             child->_appBuffers.push( 0 );
    1563           0 :             child->_event->set();
    1564           0 :             break;
    1565             :         }
    1566           0 :     }
    1567             : 
    1568           0 :     _sendCountNode();
    1569             : }
    1570             : 
    1571       67596 : int64_t RSPConnection::write( const void* inData, const uint64_t bytes )
    1572             : {
    1573       67596 :     if( _parent )
    1574           0 :         return _parent->write( inData, bytes );
    1575             : 
    1576       67596 :     LBASSERT( isListening( ));
    1577       67596 :     if( !_write )
    1578           0 :         return -1;
    1579             : 
    1580             :     // compute number of datagrams
    1581       67596 :     uint64_t nDatagrams = bytes  / _payloadSize;
    1582       67596 :     if( nDatagrams * _payloadSize != bytes )
    1583       67596 :         ++nDatagrams;
    1584             : 
    1585             :     // queue each datagram (might block if buffers are exhausted)
    1586       67596 :     const uint8_t* data = reinterpret_cast< const uint8_t* >( inData );
    1587       67596 :     const uint8_t* end = data + bytes;
    1588      136043 :     for( uint64_t i = 0; i < nDatagrams; ++i )
    1589             :     {
    1590       68447 :         size_t packetSize = end - data;
    1591       68447 :         packetSize = LB_MIN( packetSize, _payloadSize );
    1592             : 
    1593       68447 :         if( _appBuffers.isEmpty( ))
    1594             :             // trigger processing
    1595        1631 :             _postWakeup();
    1596             : 
    1597             :         Buffer* buffer;
    1598       68447 :         if ( !_appBuffers.timedPop( _writeTimeOut, buffer ) )
    1599             :         {
    1600           0 :             LBERROR << "Timeout while writing" << std::endl;
    1601           0 :             buffer = 0;
    1602             :         }
    1603             : 
    1604       68447 :         if( !buffer )
    1605             :         {
    1606           0 :             close();
    1607           0 :             return -1;
    1608             :         }
    1609             : 
    1610             :         // prepare packet header (sequence is done by thread)
    1611             :         DatagramData* header =
    1612       68447 :             reinterpret_cast< DatagramData* >( buffer->getData( ));
    1613       68447 :         header->type = DATA;
    1614       68447 :         header->size = uint16_t( packetSize );
    1615       68447 :         header->writerID = _id;
    1616             : 
    1617       68447 :         memcpy( header + 1, data, packetSize );
    1618       68447 :         data += packetSize;
    1619             : 
    1620       68447 :         LBCHECK( _threadBuffers.push( buffer ));
    1621             :     }
    1622       67596 :     _postWakeup();
    1623       67596 :     LBLOG( LOG_RSP ) << "queued " << nDatagrams << " datagrams, "
    1624       67596 :                      << bytes << " bytes" << std::endl;
    1625       67596 :     return bytes;
    1626             : }
    1627             : 
    1628           0 : void RSPConnection::finish()
    1629             : {
    1630           0 :     if( _parent.isValid( ))
    1631             :     {
    1632           0 :         LBASSERTINFO( !_parent, "Writes are only allowed on RSP listeners" );
    1633           0 :         return;
    1634             :     }
    1635           0 :     LBASSERT( isListening( ));
    1636           0 :     _appBuffers.waitSize( _buffers.size( ));
    1637             : }
    1638             : 
    1639          42 : void RSPConnection::_sendCountNode()
    1640             : {
    1641          42 :     if( !_findConnection( _id ))
    1642          42 :         return;
    1643             : 
    1644          42 :     LBLOG( LOG_RSP ) << _children.size() << " nodes" << std::endl;
    1645             :     DatagramNode count = { COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
    1646          42 :                            uint16_t( _children.size( )) };
    1647          42 :     count.byteswap();
    1648          42 :     _write->send( boost::asio::buffer( &count, sizeof( count )) );
    1649             : }
    1650             : 
    1651          44 : void RSPConnection::_sendSimpleDatagram( const DatagramType type,
    1652             :                                          const uint16_t id )
    1653             : {
    1654             :     DatagramNode simple = { uint16_t( type ), CO_RSP_PROTOCOL_VERSION, id,
    1655          44 :                             _sequence };
    1656          44 :     simple.byteswap();
    1657          44 :     _write->send( boost::asio::buffer( &simple, sizeof( simple )) );
    1658          44 : }
    1659             : 
    1660           0 : void RSPConnection::_sendAck( const uint16_t writerID,
    1661             :                               const uint16_t sequence )
    1662             : {
    1663           0 :     LBASSERT( _id != writerID );
    1664             : #ifdef CO_INSTRUMENT_RSP
    1665             :     ++nAcksSend;
    1666             : #endif
    1667             : 
    1668           0 :     LBLOG( LOG_RSP ) << "send ack " << sequence << std::endl;
    1669           0 :     DatagramAck ack = { ACK, _id, writerID, sequence };
    1670           0 :     ack.byteswap();
    1671           0 :     _write->send( boost::asio::buffer( &ack, sizeof( ack )) );
    1672           0 : }
    1673             : 
    1674           0 : void RSPConnection::_sendNack( const uint16_t writerID, const Nack* nacks,
    1675             :                                const uint16_t count )
    1676             : {
    1677           0 :     LBASSERT( count > 0 );
    1678           0 :     LBASSERT( count <= CO_RSP_MAX_NACKS );
    1679             : #ifdef CO_INSTRUMENT_RSP
    1680             :     ++nNAcksSend;
    1681             : #endif
    1682             :     /* optimization: use the direct access to the reader. */
    1683           0 :     if( writerID == _id )
    1684             :     {
    1685           0 :          _addRepeat( nacks, count );
    1686           0 :          return;
    1687             :     }
    1688             : 
    1689             :     const size_t size = sizeof( DatagramNack ) -
    1690           0 :                         (CO_RSP_MAX_NACKS - count) * sizeof( Nack );
    1691             : 
    1692             :     // set the header
    1693             :     DatagramNack packet;
    1694           0 :     packet.set( _id, writerID, count );
    1695           0 :     memcpy( packet.nacks, nacks, count * sizeof( Nack ));
    1696           0 :     packet.byteswap();
    1697           0 :     _write->send( boost::asio::buffer( &packet, size ));
    1698             : }
    1699             : 
    1700           0 : void RSPConnection::_sendAckRequest()
    1701             : {
    1702             : #ifdef CO_INSTRUMENT_RSP
    1703             :     ++nAckRequests;
    1704             : #endif
    1705           0 :     LBLOG( LOG_RSP ) << "send ack request for " << uint16_t( _sequence -1 )
    1706           0 :                      << std::endl;
    1707           0 :     DatagramAckRequest ackRequest = { ACKREQ, _id, uint16_t( _sequence - 1 ) };
    1708           0 :     ackRequest.byteswap();
    1709           0 :     _write->send( boost::asio::buffer( &ackRequest, sizeof( DatagramAckRequest )) );
    1710           0 : }
    1711             : 
    1712           0 : std::ostream& operator << ( std::ostream& os,
    1713             :                             const RSPConnection& connection )
    1714             : {
    1715           0 :     os << lunchbox::disableFlush << lunchbox::disableHeader
    1716           0 :        << "RSPConnection id " << connection.getID() << " send rate "
    1717           0 :        << connection.getSendRate();
    1718             : 
    1719             : #ifdef CO_INSTRUMENT_RSP
    1720             :     const int prec = os.precision();
    1721             :     os.precision( 3 );
    1722             : 
    1723             :     const float time = instrumentClock.getTimef();
    1724             :     const float mbps = 1048.576f * time;
    1725             :     os << ": " << lunchbox::indent << std::endl
    1726             :        << float( nBytesRead ) / mbps << " / " << float( nBytesWritten ) / mbps
    1727             :        <<  " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
    1728             :        << " repeats " << nMergedDatagrams
    1729             :        << " merged"
    1730             :        << std::endl;
    1731             : 
    1732             :     os.precision( prec );
    1733             :     os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
    1734             :        << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
    1735             :        << writeWaitTime << " ms"
    1736             :        << std::endl
    1737             :        << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
    1738             :        << lunchbox::exdent;
    1739             : 
    1740             :     nReadData = 0;
    1741             :     nBytesRead = 0;
    1742             :     nBytesWritten = 0;
    1743             :     nDatagrams = 0;
    1744             :     nRepeated = 0;
    1745             :     nMergedDatagrams = 0;
    1746             :     nAckRequests = 0;
    1747             :     nAcksSend = 0;
    1748             :     nAcksRead = 0;
    1749             :     nAcksAccepted = 0;
    1750             :     nNAcksSend = 0;
    1751             :     nNAcksRead = 0;
    1752             :     writeWaitTime = 0.f;
    1753             : #endif
    1754           0 :     os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
    1755             : 
    1756           0 :     return os;
    1757             : }
    1758             : 
    1759          60 : }

Generated by: LCOV version 1.10