LCOV - code coverage report
Current view: top level - co - rspConnection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 397 900 44.1 %
Date: 2015-11-03 13:48:53 Functions: 30 50 60.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c)      2009, Cedric Stalder <cedric.stalder@gmail.com>
       3             :  *               2009-2014, Stefan Eilemann <eile@equalizergraphics.com>
       4             :  *                    2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "rspConnection.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "log.h"
      28             : 
      29             : #include <lunchbox/rng.h>
      30             : #include <lunchbox/scopedMutex.h>
      31             : #include <lunchbox/sleep.h>
      32             : 
      33             : #include <boost/bind.hpp>
      34             : 
      35             : //#define CO_INSTRUMENT_RSP
      36             : #define CO_RSP_MERGE_WRITES
      37             : #define CO_RSP_MAX_TIMEOUTS 1000
      38             : #ifdef _WIN32
      39             : #  define CO_RSP_DEFAULT_PORT (4242)
      40             : #else
      41             : #  define CO_RSP_DEFAULT_PORT ( (getuid() % 64511) + 1024 )
      42             : #endif
      43             : 
      44             : 
      45             : // Note: Do not use version > 255, endianness detection magic relies on this.
      46             : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
      47             : 
      48             : namespace bp = boost::posix_time;
      49             : namespace ip = boost::asio::ip;
      50             : 
      51             : namespace co
      52             : {
      53             : 
      54             : namespace
      55             : {
      56             : #ifdef CO_INSTRUMENT_RSP
      57             : lunchbox::a_int32_t nReadData;
      58             : lunchbox::a_int32_t nBytesRead;
      59             : lunchbox::a_int32_t nBytesWritten;
      60             : lunchbox::a_int32_t nDatagrams;
      61             : lunchbox::a_int32_t nRepeated;
      62             : lunchbox::a_int32_t nMergedDatagrams;
      63             : lunchbox::a_int32_t nAckRequests;
      64             : lunchbox::a_int32_t nAcksSend;
      65             : lunchbox::a_int32_t nAcksSendTotal;
      66             : lunchbox::a_int32_t nAcksRead;
      67             : lunchbox::a_int32_t nAcksAccepted;
      68             : lunchbox::a_int32_t nNAcksSend;
      69             : lunchbox::a_int32_t nNAcksRead;
      70             : lunchbox::a_int32_t nNAcksResend;
      71             : 
      72             : float writeWaitTime = 0.f;
      73             : lunchbox::Clock instrumentClock;
      74             : #endif
      75             : 
      76             : static uint16_t _numBuffers = 0;
      77             : }
      78             : 
      79           4 : RSPConnection::RSPConnection()
      80             :     : _id( 0 )
      81             :     , _idAccepted( false )
      82           4 :     , _mtu( Global::getIAttribute( Global::IATTR_UDP_MTU ))
      83           4 :     , _ackFreq( Global::getIAttribute( Global::IATTR_RSP_ACK_FREQUENCY ))
      84             :     , _payloadSize( _mtu - sizeof( DatagramData ))
      85             :     , _timeouts( 0 )
      86           4 :     , _event( new EventConnection )
      87             :     , _read( 0 )
      88             :     , _write( 0 )
      89             :     , _timeout( _ioService )
      90             :     , _wakeup( _ioService )
      91           4 :     , _maxBucketSize( ( _mtu * _ackFreq) >> 1 )
      92             :     , _bucketSize( 0 )
      93             :     , _sendRate( 0 )
      94             :     , _thread( 0 )
      95           4 :     , _acked( std::numeric_limits< uint16_t >::max( ))
      96             :     , _threadBuffers( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS))
      97             :     , _recvBuffer( _mtu )
      98             :     , _readBuffer( 0 )
      99             :     , _readBufferPos( 0 )
     100             :     , _sequence( 0 )
     101             :     // ensure we have a handleConnectedTimeout before the write pop
     102          24 :     , _writeTimeOut( Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2 )
     103             : {
     104           4 :     _buildNewID();
     105           4 :     ConnectionDescriptionPtr description = _getDescription();
     106           4 :     description->type = CONNECTIONTYPE_RSP;
     107           4 :     description->bandwidth = 102400;
     108             : 
     109           4 :     LBCHECK( _event->connect( ));
     110             : 
     111           4 :     _buffers.reserve( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ));
     112         524 :     while( static_cast< int32_t >( _buffers.size( )) <
     113         260 :            Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ))
     114             :     {
     115         256 :         _buffers.push_back( new Buffer( _mtu ));
     116             :     }
     117             : 
     118           4 :     LBASSERT( sizeof( DatagramNack ) <= size_t( _mtu ));
     119           4 :     LBLOG( LOG_RSP ) << "New RSP connection, " << _buffers.size()
     120           4 :                      << " buffers of " << _mtu << " bytes" << std::endl;
     121             : 
     122           4 : }
     123             : 
     124          12 : RSPConnection::~RSPConnection()
     125             : {
     126           4 :     _close();
     127         264 :     while( !_buffers.empty( ))
     128             :     {
     129         256 :         delete _buffers.back();
     130         256 :         _buffers.pop_back();
     131             :     }
     132           8 : }
     133             : 
     134          12 : void RSPConnection::_close()
     135             : {
     136          12 :     if( _parent.isValid() && _parent->_id == _id )
     137           2 :         _parent->close();
     138             : 
     139          24 :     while( !_parent && _isWriting( ))
     140           0 :         lunchbox::sleep( 10 /*ms*/ );
     141             : 
     142          12 :     if( isClosed( ))
     143          18 :         return;
     144             : 
     145           6 :     lunchbox::ScopedWrite mutex( _mutexEvent );
     146           6 :     if( _thread )
     147             :     {
     148           2 :         LBASSERT( !_thread->isCurrent( ));
     149           2 :         _sendSimpleDatagram( ID_EXIT, _id );
     150           2 :         _ioService.stop();
     151           2 :         _thread->join();
     152           2 :         delete _thread;
     153             :     }
     154             : 
     155           6 :     _setState( STATE_CLOSING );
     156           6 :     if( _thread )
     157             :     {
     158           2 :          _thread = 0;
     159             : 
     160             :         // notify children to close
     161           4 :         for( RSPConnectionsCIter i=_children.begin(); i !=_children.end(); ++i )
     162             :         {
     163           2 :             RSPConnectionPtr child = *i;
     164           4 :             lunchbox::ScopedWrite mutexChild( child->_mutexEvent );
     165           2 :             child->_appBuffers.push( 0 );
     166           2 :             child->_event->set();
     167           2 :         }
     168             : 
     169           2 :         _children.clear();
     170           2 :         _newChildren.clear();
     171             :     }
     172             : 
     173           6 :     _parent = 0;
     174             : 
     175           6 :     if( _read )
     176           2 :         _read->close();
     177           6 :     delete _read;
     178           6 :     _read = 0;
     179             : 
     180           6 :     if( _write )
     181           2 :         _write->close();
     182           6 :     delete _write;
     183           6 :     _write = 0;
     184             : 
     185           6 :     _threadBuffers.clear();
     186           6 :     _appBuffers.push( 0 ); // unlock any other read/write threads
     187             : 
     188           6 :     _setState( STATE_CLOSED );
     189             : 
     190           6 :     mutex.leave();
     191           6 :     _event->close();
     192             : }
     193             : 
     194             : //----------------------------------------------------------------------
     195             : // Async IO handles
     196             : //----------------------------------------------------------------------
     197           4 : uint16_t RSPConnection::_buildNewID()
     198             : {
     199           4 :     lunchbox::RNG rng;
     200           4 :     _id = rng.get< uint16_t >();
     201           4 :     return _id;
     202             : }
     203             : 
     204           2 : bool RSPConnection::listen()
     205             : {
     206           2 :     ConnectionDescriptionPtr description = _getDescription();
     207           2 :     LBASSERT( description->type == CONNECTIONTYPE_RSP );
     208             : 
     209           2 :     if( !isClosed( ))
     210           0 :         return false;
     211             : 
     212           2 :     _setState( STATE_CONNECTING );
     213           2 :     _numBuffers =  Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS );
     214             : 
     215             :     // init udp connection
     216           2 :     if( description->port == ConnectionDescription::RANDOM_MULTICAST_PORT )
     217           2 :         description->port = 0; // Let OS choose
     218           0 :     else if( description->port == 0 )
     219           0 :         description->port = CO_RSP_DEFAULT_PORT;
     220           2 :     if( description->getHostname().empty( ))
     221           0 :         description->setHostname( "239.255.42.43" );
     222           2 :     if( description->getInterface().empty( ))
     223           1 :         description->setInterface( "0.0.0.0" );
     224             : 
     225             :     try
     226             :     {
     227           2 :         const ip::address readAddress( ip::address::from_string( "0.0.0.0" ));
     228             :         const ip::udp::endpoint readEndpoint( readAddress,
     229           2 :                                               description->port );
     230           2 :         const std::string& port = std::to_string( unsigned( description->port));
     231           4 :         ip::udp::resolver resolver( _ioService );
     232             :         const ip::udp::resolver::query queryHN( ip::udp::v4(),
     233           2 :                                                 description->getHostname(),
     234           4 :                                                 port );
     235           4 :         const ip::udp::resolver::iterator hostname = resolver.resolve( queryHN);
     236             : 
     237           2 :         if( hostname == ip::udp::resolver::iterator( ))
     238           0 :             return false;
     239             : 
     240           2 :         const ip::udp::endpoint writeEndpoint = *hostname;
     241           2 :         const ip::address mcAddr( writeEndpoint.address( ));
     242             : 
     243           2 :         _read = new ip::udp::socket( _ioService );
     244           2 :         _write = new ip::udp::socket( _ioService );
     245           2 :         _read->open( readEndpoint.protocol( ));
     246           2 :         _write->open( writeEndpoint.protocol( ));
     247             : 
     248           2 :         _read->set_option( ip::udp::socket::reuse_address( true ));
     249           2 :         _write->set_option( ip::udp::socket::reuse_address( true ));
     250             :         _read->set_option( ip::udp::socket::receive_buffer_size(
     251           2 :                        Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
     252             :         _write->set_option( ip::udp::socket::send_buffer_size(
     253           2 :                        Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
     254             : 
     255           2 :         _read->bind( readEndpoint );
     256           2 :         description->port = _read->local_endpoint().port();
     257             : 
     258             :         const ip::udp::resolver::query queryIF( ip::udp::v4(),
     259           2 :                                                 description->getInterface(),
     260           6 :                                                 "0" );
     261             :         const ip::udp::resolver::iterator interfaceIP =
     262           4 :             resolver.resolve( queryIF );
     263             : 
     264           2 :         if( interfaceIP == ip::udp::resolver::iterator( ))
     265           0 :             return false;
     266             : 
     267           2 :         const ip::address ifAddr( ip::udp::endpoint( *interfaceIP ).address( ));
     268           2 :         LBINFO << "Joining " << mcAddr << " on " << ifAddr << std::endl;
     269             : 
     270             :         _read->set_option( ip::multicast::join_group( mcAddr.to_v4(),
     271           2 :                                                       ifAddr.to_v4( )));
     272           2 :         _write->set_option( ip::multicast::outbound_interface( ifAddr.to_v4()));
     273             : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
     274           4 :         const std::string& ifIP = ifAddr.to_string();
     275             :         ::setsockopt( _write->native(), SOL_SOCKET, SO_BINDTODEVICE,
     276           2 :                       ifIP.c_str(), ifIP.size() + 1 );
     277             :         ::setsockopt( _read->native(), SOL_SOCKET, SO_BINDTODEVICE,
     278           2 :                       ifIP.c_str(), ifIP.size() + 1 );
     279             : #endif
     280             : 
     281           2 :         _write->connect( writeEndpoint );
     282             : 
     283           2 :         _read->set_option( ip::multicast::enable_loopback( false ));
     284           4 :         _write->set_option( ip::multicast::enable_loopback( false ));
     285             :     }
     286           0 :     catch( const boost::system::system_error& e )
     287             :     {
     288           0 :         LBWARN << "can't setup underlying UDP connection: " << e.what()
     289           0 :                << std::endl;
     290           0 :         delete _read;
     291           0 :         delete _write;
     292           0 :         _read = 0;
     293           0 :         _write = 0;
     294           0 :         return false;
     295             :     }
     296             : 
     297             :     // init communication protocol thread
     298           2 :     _thread = new Thread( this );
     299           2 :     _bucketSize = 0;
     300           2 :     _sendRate = description->bandwidth;
     301             : 
     302             :     // waits until RSP protocol establishes connection to the multicast network
     303           2 :     if( !_thread->start( ) )
     304             :     {
     305           0 :         close();
     306           0 :         return false;
     307             :     }
     308             : 
     309             :     // Make all buffers available for writing
     310           2 :     LBASSERT( _appBuffers.isEmpty( ));
     311           2 :     _appBuffers.push( _buffers );
     312             : 
     313           8 :     LBDEBUG << "Listening on " << description->getHostname() << ":"
     314          10 :             << description->port << " (" << description->toString() << " @"
     315           8 :             << (void*)this << ")" << std::endl;
     316           2 :     return true;
     317             : }
     318             : 
     319           2 : ConnectionPtr RSPConnection::acceptSync()
     320             : {
     321           2 :     if( !isListening( ))
     322           0 :         return 0;
     323             : 
     324           2 :     lunchbox::ScopedWrite mutex( _mutexConnection );
     325           2 :     LBASSERT( !_newChildren.empty( ));
     326           2 :     if( _newChildren.empty( ))
     327           0 :         return 0;
     328             : 
     329           4 :     RSPConnectionPtr newConnection = _newChildren.back();
     330           2 :     _newChildren.pop_back();
     331             : 
     332           6 :     LBINFO << _id << " accepted RSP connection " << newConnection->_id
     333           6 :            << std::endl;
     334             : 
     335           4 :     lunchbox::ScopedWrite mutex2( _mutexEvent );
     336           2 :     if( _newChildren.empty() )
     337           2 :         _event->reset();
     338             :     else
     339           0 :         _event->set();
     340             : 
     341           4 :     return newConnection;
     342             : }
     343             : 
     344       66395 : int64_t RSPConnection::readSync( void* buffer, const uint64_t bytes, const bool)
     345             : {
     346       66395 :     LBASSERT( bytes > 0 );
     347       66395 :     if( !isConnected ( ))
     348           0 :         return -1;
     349             : 
     350       66395 :     uint64_t bytesLeft = bytes;
     351       66395 :     uint8_t* ptr = reinterpret_cast< uint8_t* >( buffer );
     352             : 
     353             :     // redundant (done by the caller already), but saves some lock ops
     354      200035 :     while( bytesLeft )
     355             :     {
     356       67247 :         if( !_readBuffer )
     357             :         {
     358       16681 :             LBASSERT( _readBufferPos == 0 );
     359       16681 :             _readBuffer = _appBuffers.pop();
     360       16681 :             if( !_readBuffer )
     361             :             {
     362           2 :                 close();
     363             :                 return (bytes == bytesLeft) ?
     364           2 :                     -1 : static_cast< int64_t >( bytes - bytesLeft );
     365             :             }
     366             :         }
     367             : 
     368             :         const DatagramData* header = reinterpret_cast< const DatagramData* >(
     369       67245 :             _readBuffer->getData( ));
     370       67245 :         const uint8_t* payload = reinterpret_cast< const uint8_t* >( header+1 );
     371       67245 :         const size_t dataLeft = header->size - _readBufferPos;
     372       67245 :         const size_t size = LB_MIN( static_cast< size_t >( bytesLeft ),
     373             :                                     dataLeft );
     374             : 
     375       67245 :         memcpy( ptr, payload + _readBufferPos, size );
     376       67245 :         _readBufferPos += size;
     377       67245 :         ptr += size;
     378       67245 :         bytesLeft -= size;
     379             : 
     380             :         // if all data in the buffer has been taken
     381       67245 :         if( _readBufferPos >= header->size )
     382             :         {
     383       16679 :             LBASSERT( _readBufferPos == header->size );
     384             :             //LBLOG( LOG_RSP ) << "reset read buffer  " << header->sequence
     385             :             //                 << std::endl;
     386             : 
     387       16679 :             LBCHECK( _threadBuffers.push( _readBuffer ));
     388       16679 :             _readBuffer = 0;
     389       16679 :             _readBufferPos = 0;
     390             :         }
     391             :         else
     392             :         {
     393       50566 :             LBASSERT( _readBufferPos < header->size );
     394             :         }
     395             :     }
     396             : 
     397       66393 :     if( _readBuffer || !_appBuffers.isEmpty( ))
     398       50661 :         _event->set();
     399             :     else
     400             :     {
     401       15732 :         lunchbox::ScopedWrite mutex( _mutexEvent );
     402       15732 :         if( _appBuffers.isEmpty( ))
     403       15731 :             _event->reset();
     404             :     }
     405             : 
     406             : #ifdef CO_INSTRUMENT_RSP
     407             :     nBytesRead += bytes;
     408             : #endif
     409       66393 :     return bytes;
     410             : }
     411             : 
     412           2 : void RSPConnection::Thread::run()
     413             : {
     414           2 :     _connection->_runThread();
     415           2 :     _connection = 0;
     416           2 :     LBINFO << "Left RSP protocol thread" << std::endl;
     417           2 : }
     418             : 
     419       84758 : void RSPConnection::_handleTimeout( const boost::system::error_code& error )
     420             : {
     421       84758 :     if( error == boost::asio::error::operation_aborted )
     422      152753 :         return;
     423             : 
     424       16763 :     if( isListening( ))
     425       16683 :         _handleConnectedTimeout();
     426          80 :     else if( _idAccepted )
     427          40 :         _handleInitTimeout();
     428             :     else
     429          40 :         _handleAcceptIDTimeout();
     430             : }
     431             : 
     432          40 : void RSPConnection::_handleAcceptIDTimeout()
     433             : {
     434          40 :     ++_timeouts;
     435          40 :     if( _timeouts < 20 )
     436             :     {
     437          38 :         LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     438          38 :         _sendSimpleDatagram( ID_HELLO, _id );
     439             :     }
     440             :     else
     441             :     {
     442           2 :         LBLOG( LOG_RSP ) << "Confirm " << _id << std::endl;
     443           2 :         _sendSimpleDatagram( ID_CONFIRM, _id );
     444           2 :         _addConnection( _id, _sequence );
     445           2 :         _idAccepted = true;
     446           2 :         _timeouts = 0;
     447             :         // send a first datagram to announce me and discover all other
     448             :         // connections
     449           2 :         _sendCountNode();
     450             :     }
     451          40 :     _setTimeout( 10 );
     452          40 : }
     453             : 
     454          40 : void RSPConnection::_handleInitTimeout( )
     455             : {
     456          40 :     LBASSERT( !isListening( ))
     457          40 :     ++_timeouts;
     458          40 :     if( _timeouts < 20 )
     459          38 :         _sendCountNode();
     460             :     else
     461             :     {
     462           2 :         _setState( STATE_LISTENING );
     463           2 :         LBINFO << "RSP connection " << _id << " listening" << std::endl;
     464           2 :         _timeouts = 0;
     465           2 :         _ioService.stop(); // thread initialized, run restarts
     466             :     }
     467          40 :     _setTimeout( 10 );
     468          40 : }
     469             : 
     470           0 : void RSPConnection::_clearWriteQueues()
     471             : {
     472           0 :     while( !_threadBuffers.isEmpty() )
     473             :     {
     474           0 :         Buffer* buffer = 0;
     475           0 :         _threadBuffers.pop( buffer );
     476           0 :         _writeBuffers.push_back( buffer );
     477             :     }
     478             : 
     479           0 :     _finishWriteQueue( _sequence - 1 );
     480           0 :     LBASSERT( _threadBuffers.isEmpty() && _writeBuffers.empty() );
     481           0 : }
     482             : 
     483       16683 : void RSPConnection::_handleConnectedTimeout()
     484             : {
     485       16683 :     if( !isListening( ))
     486             :     {
     487           0 :         _clearWriteQueues();
     488           0 :         _ioService.stop();
     489           0 :         return;
     490             :     }
     491             : 
     492       16683 :     _processOutgoing();
     493             : 
     494       16683 :     if( _timeouts >= CO_RSP_MAX_TIMEOUTS )
     495             :     {
     496           0 :         LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
     497           0 :         bool all = true;
     498           0 :         for( RSPConnectionsCIter i =_children.begin(); i !=_children.end(); ++i)
     499             :         {
     500           0 :             RSPConnectionPtr child = *i;
     501           0 :             if ( child->_acked >= _sequence - _numBuffers && child->_id != _id )
     502             :             {
     503           0 :                 all = false;
     504           0 :                 break;
     505             :             }
     506           0 :         }
     507             : 
     508             :         // if all connections failed we probably got disconnected -> close and
     509             :         // exit else close all failed child connections
     510           0 :         if ( all )
     511             :         {
     512           0 :             _sendSimpleDatagram( ID_EXIT, _id );
     513           0 :             _appBuffers.pushFront( 0 ); // unlock write function
     514             : 
     515           0 :             for( RSPConnectionsCIter i =_children.begin();
     516           0 :                  i !=_children.end(); ++i)
     517             :             {
     518           0 :                 RSPConnectionPtr child = *i;
     519           0 :                 child->_setState( STATE_CLOSING );
     520           0 :                 child->_appBuffers.push( 0 ); // unlock read func
     521           0 :             }
     522             : 
     523           0 :             _clearWriteQueues();
     524           0 :             _ioService.stop();
     525           0 :             return;
     526             :         }
     527             : 
     528           0 :         RSPConnectionsCIter i =_children.begin();
     529           0 :         while ( i !=_children.end() )
     530             :         {
     531           0 :             RSPConnectionPtr child = *i;
     532           0 :             if ( child->_acked < _sequence - 1 && _id != child->_id )
     533             :             {
     534           0 :                 _sendSimpleDatagram( ID_EXIT, child->_id );
     535           0 :                 _removeConnection( child->_id );
     536             :             }
     537             :             else
     538             :             {
     539           0 :                 uint16_t wb = static_cast<uint16_t>( _writeBuffers.size( ));
     540           0 :                 child->_acked = _sequence - wb;
     541           0 :                 ++i;
     542             :             }
     543           0 :         }
     544             : 
     545           0 :         _timeouts = 0;
     546             :     }
     547             : }
     548             : 
     549             : RSPConnection::DatagramNode*
     550           0 : RSPConnection::_getDatagramNode( const size_t bytes )
     551             : {
     552           0 :     if( bytes < sizeof( DatagramNode ))
     553             :     {
     554           0 :         LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
     555           0 :                 << sizeof( DatagramNode ) << " bytes" << std::endl;
     556             :         //close();
     557           0 :         return 0;
     558             :     }
     559             :     DatagramNode& node =
     560           0 :                     *reinterpret_cast< DatagramNode* >( _recvBuffer.getData( ));
     561           0 :     node.byteswap();
     562           0 :     if( node.protocolVersion != CO_RSP_PROTOCOL_VERSION )
     563             :     {
     564           0 :         LBERROR << "Protocol version mismatch, got " << node.protocolVersion
     565           0 :                 << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
     566             :         //close();
     567           0 :         return 0;
     568             :     }
     569           0 :     return &node;
     570             : }
     571             : 
     572           2 : bool RSPConnection::_initThread()
     573             : {
     574           2 :     LBLOG( LOG_RSP ) << "Started RSP protocol thread" << std::endl;
     575           2 :     _timeouts = 0;
     576             : 
     577             :    // send a first datagram to announce me and discover other connections
     578           2 :     LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     579           2 :     _sendSimpleDatagram( ID_HELLO, _id );
     580           2 :     _setTimeout( 10 );
     581           2 :     _asyncReceiveFrom();
     582           2 :     _ioService.run();
     583           2 :     return isListening();
     584             : }
     585             : 
     586           2 : void RSPConnection::_runThread()
     587             : {
     588             :     //__debugbreak();
     589           2 :     _ioService.reset();
     590           2 :     _ioService.run();
     591           2 : }
     592             : 
     593       16733 : void RSPConnection::_setTimeout( const int32_t timeOut )
     594             : {
     595       16733 :     LBASSERT( timeOut >= 0 );
     596       16733 :     _timeout.expires_from_now( boost::posix_time::milliseconds( timeOut ));
     597             :     _timeout.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
     598       16733 :                                       boost::asio::placeholders::error ));
     599       16733 : }
     600             : 
     601       68025 : void RSPConnection::_postWakeup()
     602             : {
     603       68025 :     _wakeup.expires_from_now( boost::posix_time::milliseconds( 0 ));
     604             :     _wakeup.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
     605       68025 :                                      boost::asio::placeholders::error ));
     606       68025 : }
     607             : 
     608       16683 : void RSPConnection::_processOutgoing()
     609             : {
     610             : #ifdef CO_INSTRUMENT_RSP
     611             :     if( instrumentClock.getTime64() > 1000 )
     612             :     {
     613             :         LBWARN << *this << std::endl;
     614             :         instrumentClock.reset();
     615             :     }
     616             : #endif
     617             : 
     618       16683 :     if( !_repeatQueue.empty( ))
     619           0 :         _repeatData();
     620             :     else
     621       16683 :         _writeData();
     622             : 
     623       16683 :     if( !_threadBuffers.isEmpty() || !_repeatQueue.empty( ))
     624             :     {
     625       16651 :         _setTimeout( 0 ); // call again to send remaining
     626       16651 :         return;
     627             :     }
     628             :     // no more data to write, check/send ack request, reset timeout
     629             : 
     630          32 :     if( _writeBuffers.empty( )) // got all acks
     631             :     {
     632          32 :         _timeouts = 0;
     633          32 :         _timeout.cancel();
     634          32 :         return;
     635             :     }
     636             : 
     637             :     const int64_t timeout =
     638           0 :         Global::getIAttribute( Global::IATTR_RSP_ACK_TIMEOUT );
     639           0 :     const int64_t left = timeout - _clock.getTime64();
     640             : 
     641           0 :     if( left > 0 )
     642             :     {
     643           0 :         _setTimeout( left );
     644           0 :         return;
     645             :     }
     646             : 
     647             :     // (repeat) ack request
     648           0 :     _clock.reset();
     649           0 :     ++_timeouts;
     650           0 :     if ( _timeouts < CO_RSP_MAX_TIMEOUTS )
     651           0 :         _sendAckRequest();
     652           0 :     _setTimeout( timeout );
     653             : }
     654             : 
     655       16683 : void RSPConnection::_writeData()
     656             : {
     657       16683 :     Buffer* buffer = 0;
     658       16683 :     if( !_threadBuffers.pop( buffer )) // nothing to write
     659       16687 :         return;
     660             : 
     661       16679 :     _timeouts = 0;
     662       16679 :     LBASSERT( buffer );
     663             : 
     664             :     // write buffer
     665       16679 :     DatagramData* header = reinterpret_cast<DatagramData*>( buffer->getData( ));
     666       16679 :     header->sequence = _sequence++;
     667             : 
     668             : #ifdef CO_RSP_MERGE_WRITES
     669       16679 :     if( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
     670             :     {
     671       15798 :         std::vector< Buffer* > appBuffers;
     672       82162 :         while( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
     673             :         {
     674       51417 :             Buffer* buffer2 = 0;
     675       51417 :             LBCHECK( _threadBuffers.getFront( buffer2 ));
     676       51417 :             LBASSERT( buffer2 );
     677             :             DatagramData* header2 =
     678       51417 :                 reinterpret_cast<DatagramData*>( buffer2->getData( ));
     679             : 
     680       51417 :             if( uint32_t( header->size + header2->size ) > _payloadSize )
     681         851 :                 break;
     682             : 
     683       50566 :             memcpy( reinterpret_cast<uint8_t*>( header + 1 ) + header->size,
     684      101132 :                     header2 + 1, header2->size );
     685       50566 :             header->size += header2->size;
     686       50566 :             LBCHECK( _threadBuffers.pop( buffer2 ));
     687       50566 :             appBuffers.push_back( buffer2 );
     688             : #ifdef CO_INSTRUMENT_RSP
     689             :             ++nMergedDatagrams;
     690             : #endif
     691             :         }
     692             : 
     693       15798 :         if( !appBuffers.empty( ))
     694       14947 :             _appBuffers.push( appBuffers );
     695             :     }
     696             : #endif
     697             : 
     698             :     // send data
     699             :     //  Note 1: We could optimize the send away if we're all alone, but this is
     700             :     //          not a use case for RSP, so we don't care.
     701             :     //  Note 2: Data to myself will be 'written' in _finishWriteQueue once we
     702             :     //          got all acks for the packet
     703       16679 :     const uint32_t size = header->size + sizeof( DatagramData );
     704             : 
     705       16679 :     _waitWritable( size ); // OPT: process incoming in between
     706       16679 :     header->byteswap();
     707       16679 :     _write->send( boost::asio::buffer( header, size ));
     708             : 
     709             : #ifdef CO_INSTRUMENT_RSP
     710             :     ++nDatagrams;
     711             :     nBytesWritten += header->size;
     712             : #endif
     713             : 
     714             :     // save datagram for repeats (and self)
     715       16679 :     _writeBuffers.push_back( buffer );
     716             : 
     717       16679 :     if( _children.size() == 1 ) // We're all alone
     718             :     {
     719       16679 :         LBASSERT( _children.front()->_id == _id );
     720       16679 :         _finishWriteQueue( _sequence - 1 );
     721             :     }
     722             : }
     723             : 
     724       16679 : void RSPConnection::_waitWritable( const uint64_t bytes )
     725             : {
     726             : #ifdef CO_INSTRUMENT_RSP
     727             :     lunchbox::Clock clock;
     728             : #endif
     729             : 
     730       16679 :     _bucketSize += static_cast< uint64_t >( _clock.resetTimef() * _sendRate );
     731             :                                                      // opt omit: * 1024 / 1000;
     732       16679 :     _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
     733             : 
     734       16679 :     const uint64_t size = LB_MIN( bytes, static_cast< uint64_t >( _mtu ));
     735     2756595 :     while( _bucketSize < size )
     736             :     {
     737     2723237 :         lunchbox::Thread::yield();
     738     2723237 :         float time = _clock.resetTimef();
     739             : 
     740     5446474 :         while( time == 0.f )
     741             :         {
     742           0 :             lunchbox::Thread::yield();
     743           0 :             time = _clock.resetTimef();
     744             :         }
     745             : 
     746     2723237 :         _bucketSize += static_cast< int64_t >( time * _sendRate );
     747     2723237 :         _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
     748             :     }
     749       16679 :     _bucketSize -= size;
     750             : 
     751             : #ifdef CO_INSTRUMENT_RSP
     752             :     writeWaitTime += clock.getTimef();
     753             : #endif
     754             : 
     755       16679 :     ConstConnectionDescriptionPtr description = getDescription();
     756       16679 :     if( _sendRate < description->bandwidth )
     757             :     {
     758             :         _sendRate += int64_t(
     759           0 :             float( Global::getIAttribute( Global::IATTR_RSP_ERROR_UPSCALE )) *
     760           0 :             float( description->bandwidth ) * .001f );
     761           0 :         LBLOG( LOG_RSP ) << "speeding up to " << _sendRate << " KB/s"
     762           0 :                          << std::endl;
     763       16679 :     }
     764       16679 : }
     765             : 
     766           0 : void RSPConnection::_repeatData()
     767             : {
     768           0 :     _timeouts = 0;
     769             : 
     770           0 :     while( !_repeatQueue.empty( ))
     771             :     {
     772           0 :         Nack& request = _repeatQueue.front();
     773           0 :         const uint16_t distance = _sequence - request.start;
     774             : 
     775           0 :         if ( distance == 0 )
     776             :         {
     777           0 :             LBWARN << "ignoring invalid nack (" << request.start
     778           0 :                    << ".." << request.end << ")" << std::endl;
     779           0 :             _repeatQueue.pop_front();
     780           0 :             continue;
     781             :         }
     782             : 
     783           0 :         if( distance <= _writeBuffers.size( )) // not already acked
     784             :         {
     785             : //          LBLOG( LOG_RSP ) << "Repeat " << request.start << ", " << _sendRate
     786             : //                           << "KB/s"<< std::endl;
     787             : 
     788           0 :             const size_t i = _writeBuffers.size() - distance;
     789           0 :             Buffer* buffer = _writeBuffers[i];
     790           0 :             LBASSERT( buffer );
     791             : 
     792             :             DatagramData* header =
     793           0 :                 reinterpret_cast<DatagramData*>( buffer->getData( ));
     794           0 :             const uint32_t size = header->size + sizeof( DatagramData );
     795           0 :             LBASSERT( header->sequence == request.start );
     796             : 
     797             :             // send data
     798           0 :             _waitWritable( size ); // OPT: process incoming in between
     799             :             // already done by _writeData: header->byteswap();
     800           0 :             _write->send( boost::asio::buffer( header, size ) );
     801             : #ifdef CO_INSTRUMENT_RSP
     802             :             ++nRepeated;
     803             : #endif
     804             :         }
     805             : 
     806           0 :         if( request.start == request.end )
     807           0 :             _repeatQueue.pop_front();    // done with request
     808             :         else
     809           0 :             ++request.start;
     810             : 
     811           0 :         if( distance <= _writeBuffers.size( )) // send something
     812           0 :             return;
     813             :     }
     814             : }
     815             : 
     816       16679 : void RSPConnection::_finishWriteQueue( const uint16_t sequence )
     817             : {
     818       16679 :     LBASSERT( !_writeBuffers.empty( ));
     819             : 
     820       16679 :     RSPConnectionPtr connection = _findConnection( _id );
     821       16679 :     LBASSERT( connection.isValid( ));
     822       16679 :     LBASSERT( connection->_recvBuffers.empty( ));
     823             : 
     824             :     // Bundle pushing the buffers to the app to avoid excessive lock ops
     825       33358 :     Buffers readBuffers;
     826       33358 :     Buffers freeBuffers;
     827             : 
     828       16679 :     const uint16_t size = _sequence - sequence - 1;
     829       16679 :     LBASSERTINFO( size <= uint16_t( _writeBuffers.size( )),
     830             :                   size << " > " << _writeBuffers.size( ));
     831       16679 :     LBLOG( LOG_RSP ) << "Got all remote acks for " << sequence << " current "
     832           0 :                      << _sequence << " advance " << _writeBuffers.size() - size
     833       16679 :                      << " buffers" << std::endl;
     834             : 
     835       50037 :     while( _writeBuffers.size() > size_t( size ))
     836             :     {
     837       16679 :         Buffer* buffer = _writeBuffers.front();
     838       16679 :         _writeBuffers.pop_front();
     839             : 
     840             : #ifndef NDEBUG
     841             :         DatagramData* datagram =
     842       16679 :                          reinterpret_cast< DatagramData* >( buffer->getData( ));
     843       16679 :         datagram->byteswap();
     844       16679 :         LBASSERT( datagram->writerID == _id );
     845       16679 :         LBASSERTINFO( datagram->sequence ==
     846             :                       uint16_t( connection->_sequence + readBuffers.size( )),
     847             :                       datagram->sequence << ", " << connection->_sequence <<
     848             :                       ", " << readBuffers.size( ));
     849             :       //LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
     850             : #endif
     851             : 
     852       16679 :         Buffer* newBuffer = connection->_newDataBuffer( *buffer );
     853       16679 :         if( !newBuffer && !readBuffers.empty( )) // push prepared app buffers
     854             :         {
     855           0 :             lunchbox::ScopedWrite mutex( connection->_mutexEvent );
     856           0 :             LBLOG( LOG_RSP ) << "post " << readBuffers.size()
     857           0 :                              << " buffers starting with sequence "
     858           0 :                              << connection->_sequence << std::endl;
     859             : 
     860           0 :             connection->_appBuffers.push( readBuffers );
     861           0 :             connection->_sequence += uint16_t( readBuffers.size( ));
     862           0 :             readBuffers.clear();
     863           0 :             connection->_event->set();
     864             :         }
     865             : 
     866       33358 :         while( !newBuffer ) // no more data buffers, wait for app to drain
     867             :         {
     868           0 :             newBuffer = connection->_newDataBuffer( *buffer );
     869           0 :             lunchbox::Thread::yield();
     870             :         }
     871             : 
     872       16679 :         freeBuffers.push_back( buffer );
     873       16679 :         readBuffers.push_back( newBuffer );
     874             :     }
     875             : 
     876       16679 :     _appBuffers.push( freeBuffers );
     877       16679 :     if( !readBuffers.empty( ))
     878             :     {
     879       16679 :         lunchbox::ScopedWrite mutex( connection->_mutexEvent );
     880             : #if 0
     881             :         LBLOG( LOG_RSP )
     882             :             << "post " << readBuffers.size() << " buffers starting at "
     883             :             << connection->_sequence << std::endl;
     884             : #endif
     885             : 
     886       16679 :         connection->_appBuffers.push( readBuffers );
     887       16679 :         connection->_sequence += uint16_t( readBuffers.size( ));
     888       16679 :         connection->_event->set();
     889             :     }
     890             : 
     891       16679 :     connection->_acked = uint16_t( connection->_sequence - 1 );
     892       16679 :     LBASSERT( connection->_acked == sequence );
     893             : 
     894       33358 :     _timeouts = 0;
     895       16679 : }
     896             : 
     897           0 : void RSPConnection::_handlePacket( const boost::system::error_code& /* error */,
     898             :                                    const size_t bytes )
     899             : {
     900           0 :     if( isListening( ))
     901             :     {
     902           0 :         _handleConnectedData( bytes );
     903             : 
     904           0 :         if( isListening( ))
     905           0 :             _processOutgoing();
     906             :         else
     907             :         {
     908           0 :             _ioService.stop();
     909           0 :             return;
     910             :         }
     911             :     }
     912           0 :     else if( bytes >= sizeof( DatagramNode ))
     913             :     {
     914           0 :         if( _idAccepted )
     915           0 :             _handleInitData( bytes, false );
     916             :         else
     917           0 :             _handleAcceptIDData( bytes );
     918             :     }
     919             : 
     920             :     //LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
     921           0 :     _asyncReceiveFrom();
     922             : }
     923             : 
     924           0 : void RSPConnection::_handleAcceptIDData( const size_t bytes )
     925             : {
     926           0 :     DatagramNode* pNode = _getDatagramNode( bytes );
     927           0 :     if( !pNode )
     928           0 :         return;
     929             : 
     930           0 :     DatagramNode& node = *pNode;
     931             : 
     932           0 :     switch( node.type )
     933             :     {
     934             :         case ID_HELLO:
     935           0 :             _checkNewID( node.connectionID );
     936           0 :             break;
     937             : 
     938             :         case ID_HELLO_REPLY:
     939           0 :             _addConnection( node.connectionID, node.data );
     940           0 :             break;
     941             : 
     942             :         case ID_DENY:
     943             :             // a connection refused my ID, try another ID
     944           0 :             if( node.connectionID == _id )
     945             :             {
     946           0 :                 _timeouts = 0;
     947           0 :                 _sendSimpleDatagram( ID_HELLO, _buildNewID( ));
     948           0 :                 LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
     949             :             }
     950           0 :             break;
     951             : 
     952             :         case ID_EXIT:
     953           0 :             _removeConnection( node.connectionID );
     954           0 :             break;
     955             : 
     956             :         default:
     957           0 :             LBUNIMPLEMENTED;
     958           0 :             break;
     959             :     }
     960             : }
     961             : 
     962           0 : void RSPConnection::_handleInitData( const size_t bytes, const bool connected )
     963             : {
     964           0 :     DatagramNode* pNode = _getDatagramNode( bytes );
     965           0 :     if( !pNode )
     966           0 :         return;
     967             : 
     968           0 :     DatagramNode& node = *pNode;
     969             : 
     970           0 :     switch( node.type )
     971             :     {
     972             :         case ID_HELLO:
     973           0 :             if( !connected )
     974           0 :                 _timeouts = 0;
     975           0 :             _checkNewID( node.connectionID ) ;
     976           0 :             return;
     977             : 
     978             :         case ID_CONFIRM:
     979           0 :             if( !connected )
     980           0 :                 _timeouts = 0;
     981           0 :             _addConnection( node.connectionID, node.data );
     982           0 :             return;
     983             : 
     984             :         case COUNTNODE:
     985           0 :             LBLOG( LOG_RSP ) << "Got " << node.data << " nodes from "
     986           0 :                              << node.connectionID << std::endl;
     987           0 :             return;
     988             : 
     989             :         case ID_HELLO_REPLY:
     990           0 :             _addConnection( node.connectionID, node.data );
     991           0 :             return;
     992             : 
     993             :         case ID_EXIT:
     994           0 :             _removeConnection( node.connectionID );
     995           0 :             return;
     996             : 
     997             :         default:
     998           0 :             LBUNIMPLEMENTED;
     999           0 :             break;
    1000             :     }
    1001             : }
    1002             : 
    1003           0 : void RSPConnection::_handleConnectedData( const size_t bytes )
    1004             : {
    1005           0 :     if( bytes < sizeof( uint16_t ))
    1006           0 :         return;
    1007             : 
    1008           0 :     void* data = _recvBuffer.getData();
    1009           0 :     uint16_t type = *reinterpret_cast< uint16_t* >( data );
    1010             : #ifdef COMMON_BIGENDIAN
    1011             :     lunchbox::byteswap( type );
    1012             : #endif
    1013           0 :     switch( type )
    1014             :     {
    1015             :         case DATA:
    1016           0 :             LBCHECK( _handleData( bytes ));
    1017           0 :             break;
    1018             : 
    1019             :         case ACK:
    1020           0 :             LBCHECK( _handleAck( bytes ));
    1021           0 :             break;
    1022             : 
    1023             :         case NACK:
    1024           0 :             LBCHECK( _handleNack( ));
    1025           0 :             break;
    1026             : 
    1027             :         case ACKREQ: // The writer asks for an ack/nack
    1028           0 :             LBCHECK( _handleAckRequest( bytes ));
    1029           0 :             break;
    1030             : 
    1031             :         case ID_HELLO:
    1032             :         case ID_HELLO_REPLY:
    1033             :         case ID_CONFIRM:
    1034             :         case ID_EXIT:
    1035             :         case ID_DENY:
    1036             :         case COUNTNODE:
    1037           0 :             _handleInitData( bytes, true );
    1038           0 :             break;
    1039             : 
    1040             :         default:
    1041           0 :             LBASSERTINFO( false,
    1042             :                           "Don't know how to handle packet of type " << type );
    1043             :     }
    1044             : 
    1045             : }
    1046             : 
    1047           2 : void RSPConnection::_asyncReceiveFrom()
    1048             : {
    1049             :     _read->async_receive_from(
    1050           2 :         boost::asio::buffer( _recvBuffer.getData(), _mtu ), _readAddr,
    1051             :         boost::bind( &RSPConnection::_handlePacket, this,
    1052             :                      boost::asio::placeholders::error,
    1053           4 :                      boost::asio::placeholders::bytes_transferred ));
    1054           2 : }
    1055             : 
    1056           0 : bool RSPConnection::_handleData( const size_t bytes )
    1057             : {
    1058           0 :     if( bytes < sizeof( DatagramData ))
    1059           0 :         return false;
    1060             :     DatagramData& datagram =
    1061           0 :                     *reinterpret_cast< DatagramData* >( _recvBuffer.getData( ));
    1062           0 :     datagram.byteswap();
    1063             : 
    1064             : #ifdef CO_INSTRUMENT_RSP
    1065             :     ++nReadData;
    1066             : #endif
    1067           0 :     const uint16_t writerID = datagram.writerID;
    1068             : #ifdef Darwin
    1069             :     // There is occasionally a packet from ourselves, even though multicast loop
    1070             :     // is not set?!
    1071             :     if( writerID == _id )
    1072             :         return true;
    1073             : #else
    1074           0 :     LBASSERT( writerID != _id );
    1075             : #endif
    1076             : 
    1077           0 :     RSPConnectionPtr connection = _findConnection( writerID );
    1078             : 
    1079           0 :     if( !connection )  // unknown connection ?
    1080             :     {
    1081           0 :         LBASSERTINFO( false, "Can't find connection with id " << writerID );
    1082           0 :         return false;
    1083             :     }
    1084           0 :     LBASSERT( connection->_id == writerID );
    1085             : 
    1086           0 :     const uint16_t sequence = datagram.sequence;
    1087             : //  LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID <<std::endl;
    1088             : 
    1089           0 :     if( connection->_sequence == sequence ) // in-order packet
    1090             :     {
    1091           0 :         Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
    1092           0 :         if( !newBuffer ) // no more data buffers, drop packet
    1093           0 :             return true;
    1094             : 
    1095           0 :         lunchbox::ScopedWrite mutex( connection->_mutexEvent );
    1096           0 :         connection->_pushDataBuffer( newBuffer );
    1097             : 
    1098           0 :         while( !connection->_recvBuffers.empty( )) // enqueue ready pending data
    1099             :         {
    1100           0 :             newBuffer = connection->_recvBuffers.front();
    1101           0 :             if( !newBuffer )
    1102           0 :                 break;
    1103             : 
    1104           0 :             connection->_recvBuffers.pop_front();
    1105           0 :             connection->_pushDataBuffer( newBuffer );
    1106             :         }
    1107             : 
    1108           0 :         if( !connection->_recvBuffers.empty() &&
    1109           0 :             !connection->_recvBuffers.front( )) // update for new _sequence
    1110             :         {
    1111           0 :             connection->_recvBuffers.pop_front();
    1112             :         }
    1113             : 
    1114           0 :         connection->_event->set();
    1115           0 :         return true;
    1116             :     }
    1117             : 
    1118           0 :     const uint16_t max = std::numeric_limits< uint16_t >::max();
    1119           0 :     if(( connection->_sequence > sequence &&
    1120           0 :          max - connection->_sequence + sequence > _numBuffers ) ||
    1121           0 :        ( connection->_sequence < sequence &&
    1122           0 :          sequence - connection->_sequence > _numBuffers ))
    1123             :     {
    1124             :         // ignore it if it's a repetition for another reader
    1125           0 :         return true;
    1126             :     }
    1127             : 
    1128             :     // else out of order
    1129             : 
    1130           0 :     const uint16_t size = sequence - connection->_sequence;
    1131           0 :     LBASSERT( size != 0 );
    1132           0 :     LBASSERTINFO( size <= _numBuffers, size << " > " << _numBuffers );
    1133             : 
    1134           0 :     ssize_t i = ssize_t( size ) - 1;
    1135           0 :     const bool gotPacket = ( connection->_recvBuffers.size() >= size &&
    1136           0 :                              connection->_recvBuffers[ i ] );
    1137           0 :     if( gotPacket )
    1138           0 :         return true;
    1139             : 
    1140           0 :     Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
    1141           0 :     if( !newBuffer ) // no more data buffers, drop packet
    1142           0 :         return true;
    1143             : 
    1144           0 :     if( connection->_recvBuffers.size() < size )
    1145           0 :         connection->_recvBuffers.resize( size, 0 );
    1146             : 
    1147           0 :     LBASSERT( !connection->_recvBuffers[ i ] );
    1148           0 :     connection->_recvBuffers[ i ] = newBuffer;
    1149             : 
    1150             :     // early nack: request missing packets before current
    1151           0 :     --i;
    1152           0 :     Nack nack = { connection->_sequence, uint16_t( sequence - 1 ) };
    1153           0 :     if( i > 0 )
    1154             :     {
    1155           0 :         if( connection->_recvBuffers[i] ) // got previous packet
    1156           0 :             return true;
    1157             : 
    1158           0 :         while( i >= 0 && !connection->_recvBuffers[i] )
    1159           0 :             --i;
    1160             : 
    1161           0 :         const Buffer* lastBuffer = i>=0 ? connection->_recvBuffers[i] : 0;
    1162           0 :         if( lastBuffer )
    1163             :         {
    1164           0 :             nack.start = connection->_sequence + i;
    1165             :         }
    1166             :     }
    1167             : 
    1168           0 :     LBLOG( LOG_RSP ) << "send early nack " << nack.start << ".." << nack.end
    1169           0 :                      << " current " << connection->_sequence << " ooo "
    1170           0 :                      << connection->_recvBuffers.size() << std::endl;
    1171             : 
    1172           0 :     if( nack.end < nack.start )
    1173             :         // OPT: don't drop nack 0..nack.end, but it doesn't happen often
    1174           0 :         nack.end = std::numeric_limits< uint16_t >::max();
    1175             : 
    1176           0 :     _sendNack( writerID, &nack, 1 );
    1177           0 :     return true;
    1178             : }
    1179             : 
    1180       16679 : RSPConnection::Buffer* RSPConnection::_newDataBuffer( Buffer& inBuffer )
    1181             : {
    1182       16679 :     LBASSERT( static_cast< int32_t >( inBuffer.getMaxSize( )) == _mtu );
    1183             : 
    1184       16679 :     Buffer* buffer = 0;
    1185       16679 :     if( _threadBuffers.pop( buffer ))
    1186             :     {
    1187       16679 :         buffer->swap( inBuffer );
    1188       16679 :         return buffer;
    1189             :     }
    1190             : 
    1191             :     // we do not have a free buffer, which means that the receiver is slower
    1192             :     // then our read thread. This is bad, because now we'll drop the data and
    1193             :     // will send a NAck packet upon the ack request, causing retransmission even
    1194             :     // though we'll probably drop it again
    1195           0 :     LBLOG( LOG_RSP ) << "Reader too slow, dropping data" << std::endl;
    1196             : 
    1197             :     // Set the event if there is data to read. This shouldn't be needed since
    1198             :     // the event should be set in this case, but it'll increase the robustness
    1199           0 :     lunchbox::ScopedWrite mutex( _mutexEvent );
    1200           0 :     if( !_appBuffers.isEmpty( ))
    1201           0 :         _event->set();
    1202           0 :     return 0;
    1203             : }
    1204             : 
    1205           0 : void RSPConnection::_pushDataBuffer( Buffer* buffer )
    1206             : {
    1207           0 :     LBASSERT( _parent );
    1208             : #ifndef NDEBUG
    1209           0 :     DatagramData* dgram = reinterpret_cast< DatagramData* >(buffer->getData( ));
    1210           0 :     LBASSERTINFO( dgram->sequence == _sequence,
    1211             :                   dgram->sequence << " != " << _sequence );
    1212             : #endif
    1213             : 
    1214           0 :     if( (( _sequence + _parent->_id ) % _ackFreq ) == 0 )
    1215           0 :         _parent->_sendAck( _id, _sequence );
    1216             : 
    1217           0 :     LBLOG( LOG_RSP ) << "post buffer " << _sequence << std::endl;
    1218           0 :     ++_sequence;
    1219           0 :     _appBuffers.push( buffer );
    1220           0 : }
    1221             : 
    1222           0 : bool RSPConnection::_handleAck( const size_t bytes )
    1223             : {
    1224           0 :     if( bytes < sizeof( DatagramAck ))
    1225           0 :         return false;
    1226             :     DatagramAck& ack =
    1227           0 :                      *reinterpret_cast< DatagramAck* >( _recvBuffer.getData( ));
    1228           0 :     ack.byteswap();
    1229             : 
    1230             : #ifdef CO_INSTRUMENT_RSP
    1231             :     ++nAcksRead;
    1232             : #endif
    1233             : 
    1234           0 :     if( ack.writerID != _id )
    1235           0 :         return true;
    1236             : 
    1237           0 :     LBLOG( LOG_RSP ) << "got ack from " << ack.readerID << " for "
    1238           0 :                      << ack.writerID << " sequence " << ack.sequence
    1239           0 :                      << " current " << _sequence << std::endl;
    1240             : 
    1241             :     // find destination connection, update ack data if needed
    1242           0 :     RSPConnectionPtr connection = _findConnection( ack.readerID );
    1243           0 :     if( !connection )
    1244             :     {
    1245           0 :         LBUNREACHABLE;
    1246           0 :         return false;
    1247             :     }
    1248             : 
    1249           0 :     if( connection->_acked >= ack.sequence &&
    1250           0 :         connection->_acked - ack.sequence <= _numBuffers )
    1251             :     {
    1252             :         // I have received a later ack previously from the reader
    1253           0 :         LBLOG( LOG_RSP ) << "Late ack" << std::endl;
    1254           0 :         return true;
    1255             :     }
    1256             : 
    1257             : #ifdef CO_INSTRUMENT_RSP
    1258             :     ++nAcksAccepted;
    1259             : #endif
    1260           0 :     connection->_acked = ack.sequence;
    1261           0 :     _timeouts = 0; // reset timeout counter
    1262             : 
    1263             :     // Check if we can advance _acked
    1264           0 :     uint16_t acked = ack.sequence;
    1265             : 
    1266           0 :     for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
    1267             :     {
    1268           0 :         RSPConnectionPtr child = *i;
    1269           0 :         if( child->_id == _id )
    1270           0 :             continue;
    1271             : 
    1272           0 :         const uint16_t distance = child->_acked - acked;
    1273           0 :         if( distance > _numBuffers )
    1274           0 :             acked = child->_acked;
    1275           0 :     }
    1276             : 
    1277           0 :     RSPConnectionPtr selfChild = _findConnection( _id );
    1278           0 :     const uint16_t distance = acked - selfChild->_acked;
    1279           0 :     if( distance <= _numBuffers )
    1280           0 :         _finishWriteQueue( acked );
    1281           0 :     return true;
    1282             : }
    1283             : 
    1284           0 : bool RSPConnection::_handleNack()
    1285             : {
    1286             :     DatagramNack& nack =
    1287           0 :                     *reinterpret_cast< DatagramNack* >( _recvBuffer.getData( ));
    1288           0 :     nack.byteswap();
    1289             : 
    1290             : #ifdef CO_INSTRUMENT_RSP
    1291             :     ++nNAcksRead;
    1292             : #endif
    1293             : 
    1294           0 :     if( _id != nack.writerID )
    1295             :     {
    1296           0 :         LBLOG( LOG_RSP )
    1297           0 :             << "ignore " << nack.count << " nacks from " << nack.readerID
    1298           0 :             << " for " << nack.writerID << " (not me)"<< std::endl;
    1299           0 :         return true;
    1300             :     }
    1301             : 
    1302           0 :     LBLOG( LOG_RSP )
    1303           0 :         << "handle " << nack.count << " nacks from " << nack.readerID
    1304           0 :         << " for " << nack.writerID << std::endl;
    1305             : 
    1306           0 :     RSPConnectionPtr connection = _findConnection( nack.readerID );
    1307           0 :     if( !connection )
    1308             :     {
    1309           0 :         LBUNREACHABLE;
    1310           0 :         return false;
    1311             :         // it's an unknown connection, TODO add this connection?
    1312             :     }
    1313             : 
    1314           0 :     _timeouts = 0;
    1315           0 :     _addRepeat( nack.nacks, nack.count );
    1316           0 :     return true;
    1317             : }
    1318             : 
    1319           0 : void RSPConnection::_addRepeat( const Nack* nacks, uint16_t num )
    1320             : {
    1321           0 :     LBLOG( LOG_RSP ) << lunchbox::disableFlush << "Queue repeat requests ";
    1322           0 :     size_t lost = 0;
    1323             : 
    1324           0 :     for( size_t i = 0; i < num; ++i )
    1325             :     {
    1326           0 :         const Nack& nack = nacks[ i ];
    1327           0 :         LBASSERT( nack.start <= nack.end );
    1328             : 
    1329           0 :         LBLOG( LOG_RSP ) << nack.start << ".." << nack.end << " ";
    1330             : 
    1331           0 :         bool merged = false;
    1332           0 :         for( RepeatQueue::iterator j = _repeatQueue.begin();
    1333           0 :              j != _repeatQueue.end() && !merged; ++j )
    1334             :         {
    1335           0 :             Nack& old = *j;
    1336           0 :             if( old.start <= nack.end && old.end >= nack.start )
    1337             :             {
    1338           0 :                 if( old.start > nack.start )
    1339             :                 {
    1340           0 :                     lost += old.start - nack.start;
    1341           0 :                     old.start = nack.start;
    1342           0 :                     merged = true;
    1343             :                 }
    1344           0 :                 if( old.end < nack.end )
    1345             :                 {
    1346           0 :                     lost += nack.end - old.end;
    1347           0 :                     old.end = nack.end;
    1348           0 :                     merged = true;
    1349             :                 }
    1350           0 :                 LBASSERT( lost < _numBuffers );
    1351             :             }
    1352             :         }
    1353             : 
    1354           0 :         if( !merged )
    1355             :         {
    1356           0 :             lost += uint16_t( nack.end - nack.start ) + 1;
    1357           0 :             LBASSERT( lost <= _numBuffers );
    1358           0 :             _repeatQueue.push_back( nack );
    1359             :         }
    1360             :     }
    1361             : 
    1362           0 :     ConstConnectionDescriptionPtr description = getDescription();
    1363           0 :     if( _sendRate >
    1364           0 :         ( description->bandwidth >>
    1365           0 :           Global::getIAttribute( Global::IATTR_RSP_MIN_SENDRATE_SHIFT )))
    1366             :     {
    1367           0 :         const float delta = float( lost ) * .001f *
    1368           0 :                      Global::getIAttribute( Global::IATTR_RSP_ERROR_DOWNSCALE );
    1369           0 :         const float maxDelta = .01f *
    1370           0 :             float( Global::getIAttribute( Global::IATTR_RSP_ERROR_MAXSCALE ));
    1371           0 :         const float downScale = LB_MIN( delta, maxDelta );
    1372           0 :         _sendRate -= 1 + int64_t( _sendRate * downScale );
    1373           0 :         LBLOG( LOG_RSP )
    1374           0 :             << ", lost " << lost << " slowing down " << downScale * 100.f
    1375           0 :             << "% to " << _sendRate << " KB/s" << std::endl
    1376           0 :             << lunchbox::enableFlush;
    1377             :     }
    1378             :     else
    1379           0 :         LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush;
    1380           0 : }
    1381             : 
    1382           0 : bool RSPConnection::_handleAckRequest( const size_t bytes )
    1383             : {
    1384           0 :     if( bytes < sizeof( DatagramAckRequest ))
    1385           0 :         return false;
    1386             :     DatagramAckRequest& ackRequest =
    1387           0 :               *reinterpret_cast< DatagramAckRequest* >( _recvBuffer.getData( ));
    1388           0 :     ackRequest.byteswap();
    1389             : 
    1390           0 :     const uint16_t writerID = ackRequest.writerID;
    1391             : #ifdef Darwin
    1392             :     // There is occasionally a packet from ourselves, even though multicast loop
    1393             :     // is not set?!
    1394             :     if( writerID == _id )
    1395             :         return true;
    1396             : #else
    1397           0 :     LBASSERT( writerID != _id );
    1398             : #endif
    1399           0 :     RSPConnectionPtr connection = _findConnection( writerID );
    1400           0 :     if( !connection )
    1401             :     {
    1402           0 :         LBUNREACHABLE;
    1403           0 :         return false;
    1404             :     }
    1405             : 
    1406           0 :     const uint16_t reqID = ackRequest.sequence;
    1407           0 :     const uint16_t gotID = connection->_sequence - 1;
    1408           0 :     const uint16_t distance = reqID - gotID;
    1409             : 
    1410           0 :     LBLOG( LOG_RSP ) << "ack request "  << reqID << " from " << writerID
    1411           0 :                      << " got " << gotID << " missing " << distance
    1412           0 :                      << std::endl;
    1413             : 
    1414           0 :     if( (reqID == gotID) ||
    1415           0 :         (gotID > reqID && gotID - reqID <= _numBuffers) ||
    1416           0 :         (gotID < reqID && distance > _numBuffers) )
    1417             :     {
    1418           0 :         _sendAck( connection->_id, gotID );
    1419           0 :         return true;
    1420             :     }
    1421             :     // else find all missing datagrams
    1422             : 
    1423           0 :     const uint16_t max = CO_RSP_MAX_NACKS - 2;
    1424             :     Nack nacks[ CO_RSP_MAX_NACKS ];
    1425           0 :     uint16_t i = 0;
    1426             : 
    1427           0 :     nacks[ i ].start = connection->_sequence;
    1428           0 :     LBLOG( LOG_RSP ) << lunchbox::disableFlush << "nacks: "
    1429           0 :                      << nacks[i].start << "..";
    1430             : 
    1431           0 :     std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
    1432           0 :     std::deque<Buffer*>::const_iterator first = j;
    1433           0 :     for( ; j != connection->_recvBuffers.end() && i < max; ++j )
    1434             :     {
    1435           0 :         if( *j ) // got buffer
    1436             :         {
    1437           0 :             nacks[ i ].end = connection->_sequence + std::distance( first, j);
    1438           0 :             LBLOG( LOG_RSP ) << nacks[i].end << ", ";
    1439           0 :             if( nacks[ i ].end < nacks[ i ].start )
    1440             :             {
    1441           0 :                 LBASSERT( nacks[ i ].end < _numBuffers );
    1442           0 :                 nacks[ i + 1 ].start = 0;
    1443           0 :                 nacks[ i + 1 ].end = nacks[ i ].end;
    1444           0 :                 nacks[ i ].end = std::numeric_limits< uint16_t >::max();
    1445           0 :                 ++i;
    1446             :             }
    1447           0 :             ++i;
    1448             : 
    1449             :             // find next hole
    1450           0 :             for( ++j; j != connection->_recvBuffers.end() && (*j); ++j )
    1451             :                 /* nop */;
    1452             : 
    1453           0 :             if( j == connection->_recvBuffers.end( ))
    1454           0 :                 break;
    1455             : 
    1456           0 :             nacks[i].start = connection->_sequence + std::distance(first, j) +1;
    1457           0 :             LBLOG( LOG_RSP ) << nacks[i].start << "..";
    1458             :         }
    1459             :     }
    1460             : 
    1461           0 :     if( j != connection->_recvBuffers.end() || i == 0 )
    1462             :     {
    1463           0 :         nacks[ i ].end = reqID;
    1464           0 :         LBLOG( LOG_RSP ) << nacks[i].end;
    1465           0 :         ++i;
    1466             :     }
    1467           0 :     else if( uint16_t( reqID - nacks[i-1].end ) < _numBuffers )
    1468             :     {
    1469           0 :         nacks[i].start = nacks[i-1].end + 1;
    1470           0 :         nacks[i].end = reqID;
    1471           0 :         LBLOG( LOG_RSP ) << nacks[i].start << ".." << nacks[i].end;
    1472           0 :         ++i;
    1473             :     }
    1474           0 :     if( nacks[ i -1 ].end < nacks[ i - 1 ].start )
    1475             :     {
    1476           0 :         LBASSERT( nacks[ i - 1 ].end < _numBuffers );
    1477           0 :         nacks[ i ].start = 0;
    1478           0 :         nacks[ i ].end = nacks[ i - 1 ].end;
    1479           0 :         nacks[ i - 1 ].end = std::numeric_limits< uint16_t >::max();
    1480           0 :         ++i;
    1481             :     }
    1482             : 
    1483           0 :     LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush << "send " << i
    1484           0 :                      << " nacks to " << connection->_id << std::endl;
    1485             : 
    1486           0 :     LBASSERT( i > 0 );
    1487           0 :     _sendNack( connection->_id, nacks, i );
    1488           0 :     return true;
    1489             : }
    1490             : 
    1491           0 : void RSPConnection::_checkNewID( uint16_t id )
    1492             : {
    1493             :     // look if the new ID exist in another connection
    1494           0 :     if( id == _id || _findConnection( id ))
    1495             :     {
    1496           0 :         LBLOG( LOG_RSP ) << "Deny " << id << std::endl;
    1497           0 :         _sendSimpleDatagram( ID_DENY, _id );
    1498             :     }
    1499             :     else
    1500           0 :         _sendSimpleDatagram( ID_HELLO_REPLY, _id );
    1501           0 : }
    1502             : 
    1503       16723 : RSPConnectionPtr RSPConnection::_findConnection( const uint16_t id )
    1504             : {
    1505       16723 :     for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
    1506             :     {
    1507       16721 :         if( (*i)->_id == id )
    1508       16721 :             return *i;
    1509             :     }
    1510           2 :     return 0;
    1511             : }
    1512             : 
    1513           2 : bool RSPConnection::_addConnection( const uint16_t id, const uint16_t sequence )
    1514             : {
    1515           2 :     if( _findConnection( id ))
    1516           0 :         return false;
    1517             : 
    1518           2 :     LBINFO << "add connection " << id << std::endl;
    1519           2 :     RSPConnectionPtr connection = new RSPConnection();
    1520           2 :     connection->_id = id;
    1521           2 :     connection->_parent = this;
    1522           2 :     connection->_setState( STATE_CONNECTED );
    1523           2 :     connection->_setDescription( _getDescription( ));
    1524           2 :     connection->_sequence = sequence;
    1525           2 :     LBASSERT( connection->_appBuffers.isEmpty( ));
    1526             : 
    1527             :     // Make all buffers available for reading
    1528         390 :     for( BuffersCIter i = connection->_buffers.begin();
    1529         260 :          i != connection->_buffers.end(); ++i )
    1530             :     {
    1531         128 :         Buffer* buffer = *i;
    1532         128 :         LBCHECK( connection->_threadBuffers.push( buffer ));
    1533             :     }
    1534             : 
    1535           2 :     _children.push_back( connection );
    1536           2 :     _sendCountNode();
    1537             : 
    1538           4 :     lunchbox::ScopedWrite mutex( _mutexConnection );
    1539           2 :     _newChildren.push_back( connection );
    1540             : 
    1541           4 :     lunchbox::ScopedWrite mutex2( _mutexEvent );
    1542           2 :     _event->set();
    1543           4 :     return true;
    1544             : }
    1545             : 
    1546           0 : void RSPConnection::_removeConnection( const uint16_t id )
    1547             : {
    1548           0 :     LBINFO << "remove connection " << id << std::endl;
    1549           0 :     if( id == _id )
    1550           0 :         return;
    1551             : 
    1552           0 :     for( RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i )
    1553             :     {
    1554           0 :         RSPConnectionPtr child = *i;
    1555           0 :         if( child->_id == id )
    1556             :         {
    1557           0 :             _children.erase( i );
    1558             : 
    1559           0 :             lunchbox::ScopedWrite mutex( child->_mutexEvent );
    1560           0 :             child->_appBuffers.push( 0 );
    1561           0 :             child->_event->set();
    1562           0 :             break;
    1563             :         }
    1564           0 :     }
    1565             : 
    1566           0 :     _sendCountNode();
    1567             : }
    1568             : 
    1569       66393 : int64_t RSPConnection::write( const void* inData, const uint64_t bytes )
    1570             : {
    1571       66393 :     if( _parent )
    1572           0 :         return _parent->write( inData, bytes );
    1573             : 
    1574       66393 :     LBASSERT( isListening( ));
    1575       66393 :     if( !_write )
    1576           0 :         return -1;
    1577             : 
    1578             :     // compute number of datagrams
    1579       66393 :     uint64_t nDatagrams = bytes  / _payloadSize;
    1580       66393 :     if( nDatagrams * _payloadSize != bytes )
    1581       66393 :         ++nDatagrams;
    1582             : 
    1583             :     // queue each datagram (might block if buffers are exhausted)
    1584       66393 :     const uint8_t* data = reinterpret_cast< const uint8_t* >( inData );
    1585       66393 :     const uint8_t* end = data + bytes;
    1586      133638 :     for( uint64_t i = 0; i < nDatagrams; ++i )
    1587             :     {
    1588       67245 :         size_t packetSize = end - data;
    1589       67245 :         packetSize = LB_MIN( packetSize, _payloadSize );
    1590             : 
    1591       67245 :         if( _appBuffers.isEmpty( ))
    1592             :             // trigger processing
    1593        1632 :             _postWakeup();
    1594             : 
    1595             :         Buffer* buffer;
    1596       67245 :         if ( !_appBuffers.timedPop( _writeTimeOut, buffer ) )
    1597             :         {
    1598           0 :             LBERROR << "Timeout while writing" << std::endl;
    1599           0 :             buffer = 0;
    1600             :         }
    1601             : 
    1602       67245 :         if( !buffer )
    1603             :         {
    1604           0 :             close();
    1605           0 :             return -1;
    1606             :         }
    1607             : 
    1608             :         // prepare packet header (sequence is done by thread)
    1609             :         DatagramData* header =
    1610       67245 :             reinterpret_cast< DatagramData* >( buffer->getData( ));
    1611       67245 :         header->type = DATA;
    1612       67245 :         header->size = uint16_t( packetSize );
    1613       67245 :         header->writerID = _id;
    1614             : 
    1615       67245 :         memcpy( header + 1, data, packetSize );
    1616       67245 :         data += packetSize;
    1617             : 
    1618       67245 :         LBCHECK( _threadBuffers.push( buffer ));
    1619             :     }
    1620       66393 :     _postWakeup();
    1621       66393 :     LBLOG( LOG_RSP ) << "queued " << nDatagrams << " datagrams, "
    1622       66393 :                      << bytes << " bytes" << std::endl;
    1623       66393 :     return bytes;
    1624             : }
    1625             : 
    1626           0 : void RSPConnection::finish()
    1627             : {
    1628           0 :     if( _parent.isValid( ))
    1629             :     {
    1630           0 :         LBASSERTINFO( !_parent, "Writes are only allowed on RSP listeners" );
    1631           0 :         return;
    1632             :     }
    1633           0 :     LBASSERT( isListening( ));
    1634           0 :     _appBuffers.waitSize( _buffers.size( ));
    1635             : }
    1636             : 
    1637          42 : void RSPConnection::_sendCountNode()
    1638             : {
    1639          42 :     if( !_findConnection( _id ))
    1640          42 :         return;
    1641             : 
    1642          42 :     LBLOG( LOG_RSP ) << _children.size() << " nodes" << std::endl;
    1643             :     DatagramNode count = { COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
    1644          42 :                            uint16_t( _children.size( )) };
    1645          42 :     count.byteswap();
    1646          42 :     _write->send( boost::asio::buffer( &count, sizeof( count )) );
    1647             : }
    1648             : 
    1649          44 : void RSPConnection::_sendSimpleDatagram( const DatagramType type,
    1650             :                                          const uint16_t id )
    1651             : {
    1652             :     DatagramNode simple = { uint16_t( type ), CO_RSP_PROTOCOL_VERSION, id,
    1653          44 :                             _sequence };
    1654          44 :     simple.byteswap();
    1655          44 :     _write->send( boost::asio::buffer( &simple, sizeof( simple )) );
    1656          44 : }
    1657             : 
    1658           0 : void RSPConnection::_sendAck( const uint16_t writerID,
    1659             :                               const uint16_t sequence )
    1660             : {
    1661           0 :     LBASSERT( _id != writerID );
    1662             : #ifdef CO_INSTRUMENT_RSP
    1663             :     ++nAcksSend;
    1664             : #endif
    1665             : 
    1666           0 :     LBLOG( LOG_RSP ) << "send ack " << sequence << std::endl;
    1667           0 :     DatagramAck ack = { ACK, _id, writerID, sequence };
    1668           0 :     ack.byteswap();
    1669           0 :     _write->send( boost::asio::buffer( &ack, sizeof( ack )) );
    1670           0 : }
    1671             : 
    1672           0 : void RSPConnection::_sendNack( const uint16_t writerID, const Nack* nacks,
    1673             :                                const uint16_t count )
    1674             : {
    1675           0 :     LBASSERT( count > 0 );
    1676           0 :     LBASSERT( count <= CO_RSP_MAX_NACKS );
    1677             : #ifdef CO_INSTRUMENT_RSP
    1678             :     ++nNAcksSend;
    1679             : #endif
    1680             :     /* optimization: use the direct access to the reader. */
    1681           0 :     if( writerID == _id )
    1682             :     {
    1683           0 :          _addRepeat( nacks, count );
    1684           0 :          return;
    1685             :     }
    1686             : 
    1687             :     const size_t size = sizeof( DatagramNack ) -
    1688           0 :                         (CO_RSP_MAX_NACKS - count) * sizeof( Nack );
    1689             : 
    1690             :     // set the header
    1691             :     DatagramNack packet;
    1692           0 :     packet.set( _id, writerID, count );
    1693           0 :     memcpy( packet.nacks, nacks, count * sizeof( Nack ));
    1694           0 :     packet.byteswap();
    1695           0 :     _write->send( boost::asio::buffer( &packet, size ));
    1696             : }
    1697             : 
    1698           0 : void RSPConnection::_sendAckRequest()
    1699             : {
    1700             : #ifdef CO_INSTRUMENT_RSP
    1701             :     ++nAckRequests;
    1702             : #endif
    1703           0 :     LBLOG( LOG_RSP ) << "send ack request for " << uint16_t( _sequence -1 )
    1704           0 :                      << std::endl;
    1705           0 :     DatagramAckRequest ackRequest = { ACKREQ, _id, uint16_t( _sequence - 1 ) };
    1706           0 :     ackRequest.byteswap();
    1707           0 :     _write->send( boost::asio::buffer( &ackRequest, sizeof( DatagramAckRequest )) );
    1708           0 : }
    1709             : 
    1710           0 : std::ostream& operator << ( std::ostream& os,
    1711             :                             const RSPConnection& connection )
    1712             : {
    1713           0 :     os << lunchbox::disableFlush << lunchbox::disableHeader
    1714           0 :        << "RSPConnection id " << connection.getID() << " send rate "
    1715           0 :        << connection.getSendRate();
    1716             : 
    1717             : #ifdef CO_INSTRUMENT_RSP
    1718             :     const int prec = os.precision();
    1719             :     os.precision( 3 );
    1720             : 
    1721             :     const float time = instrumentClock.getTimef();
    1722             :     const float mbps = 1048.576f * time;
    1723             :     os << ": " << lunchbox::indent << std::endl
    1724             :        << float( nBytesRead ) / mbps << " / " << float( nBytesWritten ) / mbps
    1725             :        <<  " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
    1726             :        << " repeats " << nMergedDatagrams
    1727             :        << " merged"
    1728             :        << std::endl;
    1729             : 
    1730             :     os.precision( prec );
    1731             :     os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
    1732             :        << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
    1733             :        << writeWaitTime << " ms"
    1734             :        << std::endl
    1735             :        << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
    1736             :        << lunchbox::exdent;
    1737             : 
    1738             :     nReadData = 0;
    1739             :     nBytesRead = 0;
    1740             :     nBytesWritten = 0;
    1741             :     nDatagrams = 0;
    1742             :     nRepeated = 0;
    1743             :     nMergedDatagrams = 0;
    1744             :     nAckRequests = 0;
    1745             :     nAcksSend = 0;
    1746             :     nAcksRead = 0;
    1747             :     nAcksAccepted = 0;
    1748             :     nNAcksSend = 0;
    1749             :     nNAcksRead = 0;
    1750             :     writeWaitTime = 0.f;
    1751             : #endif
    1752           0 :     os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
    1753             : 
    1754           0 :     return os;
    1755             : }
    1756             : 
    1757          63 : }

Generated by: LCOV version 1.11