LCOV - code coverage report
Current view: top level - co - rspConnection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 1 894 0.1 %
Date: 2018-01-09 16:37:03 Functions: 2 50 4.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2009-2017, Cedric Stalder <cedric.stalder@gmail.com>
       3             :  *                          Stefan Eilemann <eile@equalizergraphics.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "rspConnection.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "log.h"
      28             : 
      29             : #include <lunchbox/rng.h>
      30             : #include <lunchbox/scopedMutex.h>
      31             : #include <lunchbox/sleep.h>
      32             : 
      33             : #include <boost/bind.hpp>
      34             : 
      35             : //#define CO_INSTRUMENT_RSP
      36             : #define CO_RSP_MERGE_WRITES
      37             : #define CO_RSP_MAX_TIMEOUTS 1000
      38             : #ifdef _WIN32
      39             : #define CO_RSP_DEFAULT_PORT (4242)
      40             : #else
      41             : #define CO_RSP_DEFAULT_PORT ((getuid() % 64511) + 1024)
      42             : #endif
      43             : 
      44             : // Note: Do not use version > 255, endianness detection magic relies on this.
      45             : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
      46             : 
      47             : namespace ip = boost::asio::ip;
      48             : 
      49             : namespace co
      50             : {
      51             : namespace
      52             : {
      53             : #ifdef CO_INSTRUMENT_RSP
      54             : lunchbox::a_int32_t nReadData;
      55             : lunchbox::a_int32_t nBytesRead;
      56             : lunchbox::a_int32_t nBytesWritten;
      57             : lunchbox::a_int32_t nDatagrams;
      58             : lunchbox::a_int32_t nRepeated;
      59             : lunchbox::a_int32_t nMergedDatagrams;
      60             : lunchbox::a_int32_t nAckRequests;
      61             : lunchbox::a_int32_t nAcksSend;
      62             : lunchbox::a_int32_t nAcksSendTotal;
      63             : lunchbox::a_int32_t nAcksRead;
      64             : lunchbox::a_int32_t nAcksAccepted;
      65             : lunchbox::a_int32_t nNAcksSend;
      66             : lunchbox::a_int32_t nNAcksRead;
      67             : lunchbox::a_int32_t nNAcksResend;
      68             : 
      69             : float writeWaitTime = 0.f;
      70             : lunchbox::Clock instrumentClock;
      71             : #endif
      72             : 
      73             : static uint16_t _numBuffers = 0;
      74             : }
      75             : 
      76           0 : RSPConnection::RSPConnection()
      77             :     : _id(0)
      78             :     , _idAccepted(false)
      79           0 :     , _mtu(Global::getIAttribute(Global::IATTR_UDP_MTU))
      80           0 :     , _ackFreq(Global::getIAttribute(Global::IATTR_RSP_ACK_FREQUENCY))
      81           0 :     , _payloadSize(_mtu - sizeof(DatagramData))
      82             :     , _timeouts(0)
      83           0 :     , _event(new EventConnection)
      84             :     , _read(0)
      85             :     , _write(0)
      86             :     , _timeout(_ioService)
      87             :     , _wakeup(_ioService)
      88           0 :     , _maxBucketSize((_mtu * _ackFreq) >> 1)
      89             :     , _bucketSize(0)
      90             :     , _sendRate(0)
      91             :     , _thread(0)
      92           0 :     , _acked(std::numeric_limits<uint16_t>::max())
      93             :     , _threadBuffers(Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS))
      94           0 :     , _recvBuffer(_mtu)
      95             :     , _readBuffer(0)
      96             :     , _readBufferPos(0)
      97             :     , _sequence(0)
      98             :     // ensure we have a handleConnectedTimeout before the write pop
      99           0 :     , _writeTimeOut(Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2)
     100             : {
     101           0 :     _buildNewID();
     102           0 :     ConnectionDescriptionPtr description = _getDescription();
     103           0 :     description->type = CONNECTIONTYPE_RSP;
     104           0 :     description->bandwidth = 1024 * 1024; // KB/s
     105             : 
     106           0 :     LBCHECK(_event->connect());
     107             : 
     108           0 :     _buffers.reserve(Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS));
     109           0 :     while (static_cast<int32_t>(_buffers.size()) <
     110           0 :            Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS))
     111             :     {
     112           0 :         _buffers.push_back(new Buffer(_mtu));
     113             :     }
     114             : 
     115           0 :     LBASSERT(sizeof(DatagramNack) <= size_t(_mtu));
     116           0 :     LBLOG(LOG_RSP) << "New RSP connection, " << _buffers.size()
     117           0 :                    << " buffers of " << _mtu << " bytes" << std::endl;
     118           0 : }
     119             : 
     120           0 : RSPConnection::~RSPConnection()
     121             : {
     122           0 :     close();
     123           0 :     while (!_buffers.empty())
     124             :     {
     125           0 :         delete _buffers.back();
     126           0 :         _buffers.pop_back();
     127             :     }
     128             : #ifdef CO_INSTRUMENT_RSP
     129             :     LBWARN << *this << std::endl;
     130             :     instrumentClock.reset();
     131             : #endif
     132           0 : }
     133             : 
     134           0 : void RSPConnection::close()
     135             : {
     136           0 :     if (_parent.isValid() && _parent->_id == _id)
     137           0 :         _parent->close();
     138             : 
     139           0 :     while (!_parent && _isWriting())
     140           0 :         lunchbox::sleep(10 /*ms*/);
     141             : 
     142           0 :     if (isClosed())
     143           0 :         return;
     144             :     {
     145           0 :         lunchbox::ScopedWrite mutex(_mutexEvent);
     146           0 :         if (_thread)
     147             :         {
     148           0 :             LBASSERT(!_thread->isCurrent());
     149           0 :             _sendSimpleDatagram(ID_EXIT, _id);
     150           0 :             _ioService.stop();
     151           0 :             _thread->join();
     152           0 :             delete _thread;
     153             :         }
     154             : 
     155           0 :         _setState(STATE_CLOSING);
     156           0 :         if (_thread)
     157             :         {
     158           0 :             _thread = 0;
     159             : 
     160             :             // notify children to close
     161           0 :             for (RSPConnectionsCIter i = _children.begin();
     162           0 :                  i != _children.end(); ++i)
     163             :             {
     164           0 :                 RSPConnectionPtr child = *i;
     165           0 :                 lunchbox::ScopedWrite mutexChild(child->_mutexEvent);
     166           0 :                 child->_appBuffers.push(0);
     167           0 :                 child->_event->set();
     168             :             }
     169             : 
     170           0 :             _children.clear();
     171           0 :             _newChildren.clear();
     172             :         }
     173             : 
     174           0 :         _parent = 0;
     175             : 
     176           0 :         if (_read)
     177           0 :             _read->close();
     178           0 :         delete _read;
     179           0 :         _read = 0;
     180             : 
     181           0 :         if (_write)
     182           0 :             _write->close();
     183           0 :         delete _write;
     184           0 :         _write = 0;
     185             : 
     186           0 :         _threadBuffers.clear();
     187           0 :         _appBuffers.push(0); // unlock any other read/write threads
     188             : 
     189           0 :         _setState(STATE_CLOSED);
     190             :     }
     191           0 :     _event->close();
     192             : }
     193             : 
     194             : //----------------------------------------------------------------------
     195             : // Async IO handles
     196             : //----------------------------------------------------------------------
     197           0 : uint16_t RSPConnection::_buildNewID()
     198             : {
     199           0 :     lunchbox::RNG rng;
     200           0 :     _id = rng.get<uint16_t>();
     201           0 :     return _id;
     202             : }
     203             : 
     204           0 : bool RSPConnection::listen()
     205             : {
     206           0 :     ConnectionDescriptionPtr description = _getDescription();
     207           0 :     LBASSERT(description->type == CONNECTIONTYPE_RSP);
     208             : 
     209           0 :     if (!isClosed())
     210           0 :         return false;
     211             : 
     212           0 :     _setState(STATE_CONNECTING);
     213           0 :     _numBuffers = Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS);
     214             : 
     215             :     // init udp connection
     216           0 :     if (description->port == ConnectionDescription::RANDOM_MULTICAST_PORT)
     217           0 :         description->port = 0; // Let OS choose
     218           0 :     else if (description->port == 0)
     219           0 :         description->port = CO_RSP_DEFAULT_PORT;
     220           0 :     if (description->getHostname().empty())
     221           0 :         description->setHostname("239.255.42.43");
     222           0 :     if (description->getInterface().empty())
     223           0 :         description->setInterface("0.0.0.0");
     224             : 
     225             :     try
     226             :     {
     227           0 :         const ip::address readAddress(ip::address::from_string("0.0.0.0"));
     228           0 :         const ip::udp::endpoint readEndpoint(readAddress, description->port);
     229           0 :         const std::string& port = std::to_string(unsigned(description->port));
     230           0 :         ip::udp::resolver resolver(_ioService);
     231           0 :         const ip::udp::resolver::query queryHN(ip::udp::v4(),
     232             :                                                description->getHostname(),
     233           0 :                                                port);
     234           0 :         const ip::udp::resolver::iterator hostname = resolver.resolve(queryHN);
     235             : 
     236           0 :         if (hostname == ip::udp::resolver::iterator())
     237           0 :             return false;
     238             : 
     239           0 :         const ip::udp::endpoint writeEndpoint = *hostname;
     240           0 :         const ip::address mcAddr(writeEndpoint.address());
     241             : 
     242           0 :         _read = new ip::udp::socket(_ioService);
     243           0 :         _write = new ip::udp::socket(_ioService);
     244           0 :         _read->open(readEndpoint.protocol());
     245           0 :         _write->open(writeEndpoint.protocol());
     246             : 
     247           0 :         _read->set_option(ip::udp::socket::reuse_address(true));
     248           0 :         _write->set_option(ip::udp::socket::reuse_address(true));
     249           0 :         _read->set_option(ip::udp::socket::receive_buffer_size(
     250           0 :             Global::getIAttribute(Global::IATTR_UDP_BUFFER_SIZE)));
     251           0 :         _write->set_option(ip::udp::socket::send_buffer_size(
     252           0 :             Global::getIAttribute(Global::IATTR_UDP_BUFFER_SIZE)));
     253             : 
     254           0 :         _read->bind(readEndpoint);
     255           0 :         description->port = _read->local_endpoint().port();
     256             : 
     257           0 :         const ip::udp::resolver::query queryIF(ip::udp::v4(),
     258             :                                                description->getInterface(),
     259           0 :                                                "0");
     260             :         const ip::udp::resolver::iterator interfaceIP =
     261           0 :             resolver.resolve(queryIF);
     262             : 
     263           0 :         if (interfaceIP == ip::udp::resolver::iterator())
     264           0 :             return false;
     265             : 
     266           0 :         const ip::address ifAddr(ip::udp::endpoint(*interfaceIP).address());
     267           0 :         LBDEBUG << "Joining " << mcAddr << " on " << ifAddr << std::endl;
     268             : 
     269           0 :         _read->set_option(
     270           0 :             ip::multicast::join_group(mcAddr.to_v4(), ifAddr.to_v4()));
     271           0 :         _write->set_option(ip::multicast::outbound_interface(ifAddr.to_v4()));
     272             : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
     273           0 :         const std::string& ifIP = ifAddr.to_string();
     274           0 :         ::setsockopt(_write->native(), SOL_SOCKET, SO_BINDTODEVICE,
     275           0 :                      ifIP.c_str(), ifIP.size() + 1);
     276           0 :         ::setsockopt(_read->native(), SOL_SOCKET, SO_BINDTODEVICE, ifIP.c_str(),
     277           0 :                      ifIP.size() + 1);
     278             : #endif
     279             : 
     280           0 :         _write->connect(writeEndpoint);
     281             : 
     282           0 :         _read->set_option(ip::multicast::enable_loopback(false));
     283           0 :         _write->set_option(ip::multicast::enable_loopback(false));
     284             :     }
     285           0 :     catch (const boost::system::system_error& e)
     286             :     {
     287           0 :         LBWARN << "can't setup underlying UDP connection: " << e.what()
     288           0 :                << std::endl;
     289           0 :         delete _read;
     290           0 :         delete _write;
     291           0 :         _read = 0;
     292           0 :         _write = 0;
     293           0 :         return false;
     294             :     }
     295             : 
     296             :     // init communication protocol thread
     297           0 :     _thread = new Thread(this);
     298           0 :     _bucketSize = 0;
     299           0 :     _sendRate = description->bandwidth;
     300             : 
     301             :     // waits until RSP protocol establishes connection to the multicast network
     302           0 :     if (!_thread->start())
     303             :     {
     304           0 :         close();
     305           0 :         return false;
     306             :     }
     307             : 
     308             :     // Make all buffers available for writing
     309           0 :     LBASSERT(_appBuffers.isEmpty());
     310           0 :     _appBuffers.push(_buffers);
     311             : 
     312           0 :     LBDEBUG << "Listening on " << description->getHostname() << ":"
     313           0 :             << description->port << " (" << description->toString() << " @"
     314           0 :             << (void*)this << ")" << std::endl;
     315           0 :     return true;
     316             : }
     317             : 
     318           0 : ConnectionPtr RSPConnection::acceptSync()
     319             : {
     320           0 :     if (!isListening())
     321           0 :         return 0;
     322             : 
     323           0 :     lunchbox::ScopedWrite mutex(_mutexConnection);
     324           0 :     LBASSERT(!_newChildren.empty());
     325           0 :     if (_newChildren.empty())
     326           0 :         return 0;
     327             : 
     328           0 :     RSPConnectionPtr newConnection = _newChildren.back();
     329           0 :     _newChildren.pop_back();
     330             : 
     331           0 :     LBDEBUG << _id << " accepted RSP connection " << newConnection->_id
     332           0 :             << std::endl;
     333             : 
     334           0 :     lunchbox::ScopedWrite mutex2(_mutexEvent);
     335           0 :     if (_newChildren.empty())
     336           0 :         _event->reset();
     337             :     else
     338           0 :         _event->set();
     339             : 
     340           0 :     return newConnection;
     341             : }
     342             : 
     343           0 : int64_t RSPConnection::readSync(void* buffer, const uint64_t bytes, const bool)
     344             : {
     345           0 :     LBASSERT(bytes > 0);
     346           0 :     if (!isConnected())
     347           0 :         return -1;
     348             : 
     349           0 :     uint64_t bytesLeft = bytes;
     350           0 :     uint8_t* ptr = reinterpret_cast<uint8_t*>(buffer);
     351             : 
     352             :     // redundant (done by the caller already), but saves some lock ops
     353           0 :     while (bytesLeft)
     354             :     {
     355           0 :         if (!_readBuffer)
     356             :         {
     357           0 :             LBASSERT(_readBufferPos == 0);
     358           0 :             _readBuffer = _appBuffers.pop();
     359           0 :             if (!_readBuffer)
     360             :             {
     361           0 :                 close();
     362             :                 return (bytes == bytesLeft)
     363           0 :                            ? -1
     364           0 :                            : static_cast<int64_t>(bytes - bytesLeft);
     365             :             }
     366             :         }
     367             : 
     368             :         const DatagramData* header =
     369           0 :             reinterpret_cast<const DatagramData*>(_readBuffer->getData());
     370           0 :         const uint8_t* payload = reinterpret_cast<const uint8_t*>(header + 1);
     371           0 :         const size_t dataLeft = header->size - _readBufferPos;
     372           0 :         const size_t size = LB_MIN(static_cast<size_t>(bytesLeft), dataLeft);
     373             : 
     374           0 :         memcpy(ptr, payload + _readBufferPos, size);
     375           0 :         _readBufferPos += size;
     376           0 :         ptr += size;
     377           0 :         bytesLeft -= size;
     378             : 
     379             :         // if all data in the buffer has been taken
     380           0 :         if (_readBufferPos >= header->size)
     381             :         {
     382           0 :             LBASSERT(_readBufferPos == header->size);
     383             :             // LBLOG( LOG_RSP ) << "reset read buffer  " << header->sequence
     384             :             //                 << std::endl;
     385             : 
     386           0 :             LBCHECK(_threadBuffers.push(_readBuffer));
     387           0 :             _readBuffer = 0;
     388           0 :             _readBufferPos = 0;
     389             :         }
     390             :         else
     391             :         {
     392           0 :             LBASSERT(_readBufferPos < header->size);
     393             :         }
     394             :     }
     395             : 
     396           0 :     if (_readBuffer || !_appBuffers.isEmpty())
     397           0 :         _event->set();
     398             :     else
     399             :     {
     400           0 :         lunchbox::ScopedWrite mutex(_mutexEvent);
     401           0 :         if (_appBuffers.isEmpty())
     402           0 :             _event->reset();
     403             :     }
     404             : 
     405             : #ifdef CO_INSTRUMENT_RSP
     406             :     nBytesRead += bytes;
     407             : #endif
     408           0 :     return bytes;
     409             : }
     410             : 
     411           0 : void RSPConnection::Thread::run()
     412             : {
     413           0 :     _connection->_runThread();
     414           0 :     _connection = 0;
     415           0 :     LBDEBUG << "Left RSP protocol thread" << std::endl;
     416           0 : }
     417             : 
     418           0 : void RSPConnection::_handleTimeout(const boost::system::error_code& error)
     419             : {
     420           0 :     if (error == boost::asio::error::operation_aborted)
     421           0 :         return;
     422             : 
     423           0 :     if (isListening())
     424           0 :         _handleConnectedTimeout();
     425           0 :     else if (_idAccepted)
     426           0 :         _handleInitTimeout();
     427             :     else
     428           0 :         _handleAcceptIDTimeout();
     429             : }
     430             : 
     431           0 : void RSPConnection::_handleAcceptIDTimeout()
     432             : {
     433           0 :     ++_timeouts;
     434           0 :     if (_timeouts < 20)
     435             :     {
     436           0 :         LBLOG(LOG_RSP) << "Announce " << _id << " " << _timeouts << std::endl;
     437           0 :         _sendSimpleDatagram(ID_HELLO, _id);
     438             :     }
     439             :     else
     440             :     {
     441           0 :         LBLOG(LOG_RSP) << "Confirm " << _id << std::endl;
     442           0 :         _sendSimpleDatagram(ID_CONFIRM, _id);
     443           0 :         _addConnection(_id, _sequence);
     444           0 :         _idAccepted = true;
     445           0 :         _timeouts = 0;
     446             :         // send a first datagram to announce me and discover all other
     447             :         // connections
     448           0 :         _sendCountNode();
     449             :     }
     450           0 :     _setTimeout(10);
     451           0 : }
     452             : 
     453           0 : void RSPConnection::_handleInitTimeout()
     454             : {
     455           0 :     LBASSERT(!isListening())
     456           0 :     ++_timeouts;
     457           0 :     if (_timeouts < 20)
     458           0 :         _sendCountNode();
     459             :     else
     460             :     {
     461           0 :         _setState(STATE_LISTENING);
     462           0 :         LBDEBUG << "RSP connection " << _id << " listening" << std::endl;
     463           0 :         _timeouts = 0;
     464           0 :         _ioService.stop(); // thread initialized, run restarts
     465             :     }
     466           0 :     _setTimeout(10);
     467           0 : }
     468             : 
     469           0 : void RSPConnection::_clearWriteQueues()
     470             : {
     471           0 :     while (!_threadBuffers.isEmpty())
     472             :     {
     473           0 :         Buffer* buffer = 0;
     474           0 :         _threadBuffers.pop(buffer);
     475           0 :         _writeBuffers.push_back(buffer);
     476             :     }
     477             : 
     478           0 :     _finishWriteQueue(_sequence - 1);
     479           0 :     LBASSERT(_threadBuffers.isEmpty() && _writeBuffers.empty());
     480           0 : }
     481             : 
     482           0 : void RSPConnection::_handleConnectedTimeout()
     483             : {
     484           0 :     if (!isListening())
     485             :     {
     486           0 :         _clearWriteQueues();
     487           0 :         _ioService.stop();
     488           0 :         return;
     489             :     }
     490             : 
     491           0 :     _processOutgoing();
     492             : 
     493           0 :     if (_timeouts >= CO_RSP_MAX_TIMEOUTS)
     494             :     {
     495           0 :         LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
     496           0 :         bool all = true;
     497           0 :         for (RSPConnectionsCIter i = _children.begin(); i != _children.end();
     498             :              ++i)
     499             :         {
     500           0 :             RSPConnectionPtr child = *i;
     501           0 :             if (child->_acked >= _sequence - _numBuffers && child->_id != _id)
     502             :             {
     503           0 :                 all = false;
     504           0 :                 break;
     505             :             }
     506             :         }
     507             : 
     508             :         // if all connections failed we probably got disconnected -> close and
     509             :         // exit else close all failed child connections
     510           0 :         if (all)
     511             :         {
     512           0 :             _sendSimpleDatagram(ID_EXIT, _id);
     513           0 :             _appBuffers.pushFront(0); // unlock write function
     514             : 
     515           0 :             for (RSPConnectionsCIter i = _children.begin();
     516           0 :                  i != _children.end(); ++i)
     517             :             {
     518           0 :                 RSPConnectionPtr child = *i;
     519           0 :                 child->_setState(STATE_CLOSING);
     520           0 :                 child->_appBuffers.push(0); // unlock read func
     521             :             }
     522             : 
     523           0 :             _clearWriteQueues();
     524           0 :             _ioService.stop();
     525           0 :             return;
     526             :         }
     527             : 
     528           0 :         RSPConnectionsCIter i = _children.begin();
     529           0 :         while (i != _children.end())
     530             :         {
     531           0 :             RSPConnectionPtr child = *i;
     532           0 :             if (child->_acked < _sequence - 1 && _id != child->_id)
     533             :             {
     534           0 :                 _sendSimpleDatagram(ID_EXIT, child->_id);
     535           0 :                 _removeConnection(child->_id);
     536             :             }
     537             :             else
     538             :             {
     539           0 :                 uint16_t wb = static_cast<uint16_t>(_writeBuffers.size());
     540           0 :                 child->_acked = _sequence - wb;
     541           0 :                 ++i;
     542             :             }
     543             :         }
     544             : 
     545           0 :         _timeouts = 0;
     546             :     }
     547             : }
     548             : 
     549           0 : RSPConnection::DatagramNode* RSPConnection::_getDatagramNode(const size_t bytes)
     550             : {
     551           0 :     if (bytes < sizeof(DatagramNode))
     552             :     {
     553           0 :         LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
     554           0 :                 << sizeof(DatagramNode) << " bytes" << std::endl;
     555             :         // close();
     556           0 :         return 0;
     557             :     }
     558             :     DatagramNode& node =
     559           0 :         *reinterpret_cast<DatagramNode*>(_recvBuffer.getData());
     560           0 :     if (node.protocolVersion != CO_RSP_PROTOCOL_VERSION)
     561             :     {
     562           0 :         LBERROR << "Protocol version mismatch, got " << node.protocolVersion
     563           0 :                 << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
     564             :         // close();
     565           0 :         return 0;
     566             :     }
     567           0 :     return &node;
     568             : }
     569             : 
     570           0 : bool RSPConnection::_initThread()
     571             : {
     572           0 :     LBLOG(LOG_RSP) << "Started RSP protocol thread" << std::endl;
     573           0 :     _timeouts = 0;
     574             : 
     575             :     // send a first datagram to announce me and discover other connections
     576           0 :     LBLOG(LOG_RSP) << "Announce " << _id << std::endl;
     577           0 :     _sendSimpleDatagram(ID_HELLO, _id);
     578           0 :     _setTimeout(10);
     579           0 :     _asyncReceiveFrom();
     580           0 :     _ioService.run();
     581           0 :     return isListening();
     582             : }
     583             : 
     584           0 : void RSPConnection::_runThread()
     585             : {
     586             :     //__debugbreak();
     587           0 :     _ioService.reset();
     588           0 :     _ioService.run();
     589           0 : }
     590             : 
     591           0 : void RSPConnection::_setTimeout(const int32_t timeOut)
     592             : {
     593           0 :     LBASSERT(timeOut >= 0);
     594           0 :     _timeout.expires_from_now(boost::posix_time::milliseconds(timeOut));
     595           0 :     _timeout.async_wait(boost::bind(&RSPConnection::_handleTimeout, this,
     596           0 :                                     boost::asio::placeholders::error));
     597           0 : }
     598             : 
     599           0 : void RSPConnection::_postWakeup()
     600             : {
     601           0 :     _wakeup.expires_from_now(boost::posix_time::milliseconds(0));
     602           0 :     _wakeup.async_wait(boost::bind(&RSPConnection::_handleTimeout, this,
     603           0 :                                    boost::asio::placeholders::error));
     604           0 : }
     605             : 
     606           0 : void RSPConnection::_processOutgoing()
     607             : {
     608           0 :     if (!_repeatQueue.empty())
     609           0 :         _repeatData();
     610             :     else
     611           0 :         _writeData();
     612             : 
     613           0 :     if (!_threadBuffers.isEmpty() || !_repeatQueue.empty())
     614             :     {
     615           0 :         _setTimeout(0); // call again to send remaining
     616           0 :         return;
     617             :     }
     618             :     // no more data to write, check/send ack request, reset timeout
     619             : 
     620           0 :     if (_writeBuffers.empty()) // got all acks
     621             :     {
     622           0 :         _timeouts = 0;
     623           0 :         _timeout.cancel();
     624           0 :         return;
     625             :     }
     626             : 
     627             :     const int64_t timeout =
     628           0 :         Global::getIAttribute(Global::IATTR_RSP_ACK_TIMEOUT);
     629           0 :     const int64_t left = timeout - _clock.getTime64();
     630             : 
     631           0 :     if (left > 0)
     632             :     {
     633           0 :         _setTimeout(left);
     634           0 :         return;
     635             :     }
     636             : 
     637             :     // (repeat) ack request
     638           0 :     _clock.reset();
     639           0 :     ++_timeouts;
     640           0 :     if (_timeouts < CO_RSP_MAX_TIMEOUTS)
     641           0 :         _sendAckRequest();
     642           0 :     _setTimeout(timeout);
     643             : }
     644             : 
     645           0 : void RSPConnection::_writeData()
     646             : {
     647           0 :     Buffer* buffer = 0;
     648           0 :     if (!_threadBuffers.pop(buffer)) // nothing to write
     649           0 :         return;
     650             : 
     651           0 :     _timeouts = 0;
     652           0 :     LBASSERT(buffer);
     653             : 
     654             :     // write buffer
     655           0 :     DatagramData* header = reinterpret_cast<DatagramData*>(buffer->getData());
     656           0 :     header->sequence = _sequence++;
     657             : 
     658             : #ifdef CO_RSP_MERGE_WRITES
     659           0 :     if (header->size < _payloadSize && !_threadBuffers.isEmpty())
     660             :     {
     661           0 :         std::vector<Buffer*> appBuffers;
     662           0 :         while (header->size < _payloadSize && !_threadBuffers.isEmpty())
     663             :         {
     664           0 :             Buffer* buffer2 = 0;
     665           0 :             LBCHECK(_threadBuffers.getFront(buffer2));
     666           0 :             LBASSERT(buffer2);
     667             :             DatagramData* header2 =
     668           0 :                 reinterpret_cast<DatagramData*>(buffer2->getData());
     669             : 
     670           0 :             if (uint32_t(header->size + header2->size) > _payloadSize)
     671           0 :                 break;
     672             : 
     673           0 :             memcpy(reinterpret_cast<uint8_t*>(header + 1) + header->size,
     674           0 :                    header2 + 1, header2->size);
     675           0 :             header->size += header2->size;
     676           0 :             LBCHECK(_threadBuffers.pop(buffer2));
     677           0 :             appBuffers.push_back(buffer2);
     678             : #ifdef CO_INSTRUMENT_RSP
     679             :             ++nMergedDatagrams;
     680             : #endif
     681             :         }
     682             : 
     683           0 :         if (!appBuffers.empty())
     684           0 :             _appBuffers.push(appBuffers);
     685             :     }
     686             : #endif
     687             : 
     688             :     // send data
     689             :     //  Note 1: We could optimize the send away if we're all alone, but this is
     690             :     //          not a use case for RSP, so we don't care.
     691             :     //  Note 2: Data to myself will be 'written' in _finishWriteQueue once we
     692             :     //          got all acks for the packet
     693           0 :     const uint32_t size = header->size + sizeof(DatagramData);
     694             : 
     695           0 :     _waitWritable(size); // OPT: process incoming in between
     696           0 :     _write->send(boost::asio::buffer(header, size));
     697             : 
     698             : #ifdef CO_INSTRUMENT_RSP
     699             :     ++nDatagrams;
     700             :     nBytesWritten += header->size;
     701             : #endif
     702             : 
     703             :     // save datagram for repeats (and self)
     704           0 :     _writeBuffers.push_back(buffer);
     705             : 
     706           0 :     if (_children.size() == 1) // We're all alone
     707             :     {
     708           0 :         LBASSERT(_children.front()->_id == _id);
     709           0 :         _finishWriteQueue(_sequence - 1);
     710             :     }
     711             : }
     712             : 
     713           0 : void RSPConnection::_waitWritable(const uint64_t bytes)
     714             : {
     715             : #ifdef CO_INSTRUMENT_RSP
     716             :     lunchbox::Clock clock;
     717             : #endif
     718             : 
     719           0 :     _bucketSize += static_cast<uint64_t>(_clock.resetTimef() * _sendRate);
     720             :     // opt omit: * 1024 / 1000;
     721           0 :     _bucketSize = LB_MIN(_bucketSize, _maxBucketSize);
     722             : 
     723           0 :     const uint64_t size = LB_MIN(bytes, static_cast<uint64_t>(_mtu));
     724           0 :     while (_bucketSize < size)
     725             :     {
     726           0 :         lunchbox::Thread::yield();
     727           0 :         float time = _clock.resetTimef();
     728             : 
     729           0 :         while (time == 0.f)
     730             :         {
     731           0 :             lunchbox::Thread::yield();
     732           0 :             time = _clock.resetTimef();
     733             :         }
     734             : 
     735           0 :         _bucketSize += static_cast<int64_t>(time * _sendRate);
     736           0 :         _bucketSize = LB_MIN(_bucketSize, _maxBucketSize);
     737             :     }
     738           0 :     _bucketSize -= size;
     739             : 
     740             : #ifdef CO_INSTRUMENT_RSP
     741             :     writeWaitTime += clock.getTimef();
     742             : #endif
     743             : 
     744           0 :     ConstConnectionDescriptionPtr description = getDescription();
     745           0 :     if (_sendRate < description->bandwidth)
     746             :     {
     747           0 :         _sendRate += int64_t(
     748           0 :             float(Global::getIAttribute(Global::IATTR_RSP_ERROR_UPSCALE)) *
     749           0 :             float(description->bandwidth) * .001f);
     750           0 :         LBLOG(LOG_RSP) << "speeding up to " << _sendRate << " KB/s"
     751           0 :                        << std::endl;
     752             :     }
     753           0 : }
     754             : 
     755           0 : void RSPConnection::_repeatData()
     756             : {
     757           0 :     _timeouts = 0;
     758             : 
     759           0 :     while (!_repeatQueue.empty())
     760             :     {
     761           0 :         Nack& request = _repeatQueue.front();
     762           0 :         const uint16_t distance = _sequence - request.start;
     763             : 
     764           0 :         if (distance == 0)
     765             :         {
     766           0 :             LBWARN << "ignoring invalid nack (" << request.start << ".."
     767           0 :                    << request.end << ")" << std::endl;
     768           0 :             _repeatQueue.pop_front();
     769           0 :             continue;
     770             :         }
     771             : 
     772           0 :         if (distance <= _writeBuffers.size()) // not already acked
     773             :         {
     774             :             //          LBLOG( LOG_RSP ) << "Repeat " << request.start << ", "
     775             :             //          << _sendRate
     776             :             //                           << "KB/s"<< std::endl;
     777             : 
     778           0 :             const size_t i = _writeBuffers.size() - distance;
     779           0 :             Buffer* buffer = _writeBuffers[i];
     780           0 :             LBASSERT(buffer);
     781             : 
     782             :             DatagramData* header =
     783           0 :                 reinterpret_cast<DatagramData*>(buffer->getData());
     784           0 :             const uint32_t size = header->size + sizeof(DatagramData);
     785           0 :             LBASSERT(header->sequence == request.start);
     786             : 
     787             :             // send data
     788           0 :             _waitWritable(size); // OPT: process incoming in between
     789           0 :             _write->send(boost::asio::buffer(header, size));
     790             : #ifdef CO_INSTRUMENT_RSP
     791             :             ++nRepeated;
     792             : #endif
     793             :         }
     794             : 
     795           0 :         if (request.start == request.end)
     796           0 :             _repeatQueue.pop_front(); // done with request
     797             :         else
     798           0 :             ++request.start;
     799             : 
     800           0 :         if (distance <= _writeBuffers.size()) // send something
     801           0 :             return;
     802             :     }
     803             : }
     804             : 
     805           0 : void RSPConnection::_finishWriteQueue(const uint16_t sequence)
     806             : {
     807           0 :     LBASSERT(!_writeBuffers.empty());
     808             : 
     809           0 :     RSPConnectionPtr connection = _findConnection(_id);
     810           0 :     LBASSERT(connection.isValid());
     811           0 :     LBASSERT(connection->_recvBuffers.empty());
     812             : 
     813             :     // Bundle pushing the buffers to the app to avoid excessive lock ops
     814           0 :     Buffers readBuffers;
     815           0 :     Buffers freeBuffers;
     816             : 
     817           0 :     const uint16_t size = _sequence - sequence - 1;
     818           0 :     LBASSERTINFO(size <= uint16_t(_writeBuffers.size()),
     819             :                  size << " > " << _writeBuffers.size());
     820           0 :     LBLOG(LOG_RSP) << "Got all remote acks for " << sequence << " current "
     821           0 :                    << _sequence << " advance " << _writeBuffers.size() - size
     822           0 :                    << " buffers" << std::endl;
     823             : 
     824           0 :     while (_writeBuffers.size() > size_t(size))
     825             :     {
     826           0 :         Buffer* buffer = _writeBuffers.front();
     827           0 :         _writeBuffers.pop_front();
     828             : 
     829             : #ifndef NDEBUG
     830             :         DatagramData* datagram =
     831           0 :             reinterpret_cast<DatagramData*>(buffer->getData());
     832           0 :         LBASSERT(datagram->writerID == _id);
     833           0 :         LBASSERTINFO(datagram->sequence ==
     834             :                          uint16_t(connection->_sequence + readBuffers.size()),
     835             :                      datagram->sequence << ", " << connection->_sequence << ", "
     836             :                                         << readBuffers.size());
     837             : // LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
     838             : #endif
     839             : 
     840           0 :         Buffer* newBuffer = connection->_newDataBuffer(*buffer);
     841           0 :         if (!newBuffer && !readBuffers.empty()) // push prepared app buffers
     842             :         {
     843           0 :             lunchbox::ScopedWrite mutex(connection->_mutexEvent);
     844           0 :             LBLOG(LOG_RSP) << "post " << readBuffers.size()
     845           0 :                            << " buffers starting with sequence "
     846           0 :                            << connection->_sequence << std::endl;
     847             : 
     848           0 :             connection->_appBuffers.push(readBuffers);
     849           0 :             connection->_sequence += uint16_t(readBuffers.size());
     850           0 :             readBuffers.clear();
     851           0 :             connection->_event->set();
     852             :         }
     853             : 
     854           0 :         while (!newBuffer) // no more data buffers, wait for app to drain
     855             :         {
     856           0 :             newBuffer = connection->_newDataBuffer(*buffer);
     857           0 :             lunchbox::Thread::yield();
     858             :         }
     859             : 
     860           0 :         freeBuffers.push_back(buffer);
     861           0 :         readBuffers.push_back(newBuffer);
     862             :     }
     863             : 
     864           0 :     _appBuffers.push(freeBuffers);
     865           0 :     if (!readBuffers.empty())
     866             :     {
     867           0 :         lunchbox::ScopedWrite mutex(connection->_mutexEvent);
     868             : #if 0
     869             :         LBLOG( LOG_RSP )
     870             :             << "post " << readBuffers.size() << " buffers starting at "
     871             :             << connection->_sequence << std::endl;
     872             : #endif
     873             : 
     874           0 :         connection->_appBuffers.push(readBuffers);
     875           0 :         connection->_sequence += uint16_t(readBuffers.size());
     876           0 :         connection->_event->set();
     877             :     }
     878             : 
     879           0 :     connection->_acked = uint16_t(connection->_sequence - 1);
     880           0 :     LBASSERT(connection->_acked == sequence);
     881             : 
     882           0 :     _timeouts = 0;
     883           0 : }
     884             : 
     885           0 : void RSPConnection::_handlePacket(const boost::system::error_code& /* error */,
     886             :                                   const size_t bytes)
     887             : {
     888           0 :     if (isListening())
     889             :     {
     890           0 :         _handleConnectedData(bytes);
     891             : 
     892           0 :         if (isListening())
     893           0 :             _processOutgoing();
     894             :         else
     895             :         {
     896           0 :             _ioService.stop();
     897           0 :             return;
     898             :         }
     899             :     }
     900           0 :     else if (bytes >= sizeof(DatagramNode))
     901             :     {
     902           0 :         if (_idAccepted)
     903           0 :             _handleInitData(bytes, false);
     904             :         else
     905           0 :             _handleAcceptIDData(bytes);
     906             :     }
     907             : 
     908             :     // LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
     909           0 :     _asyncReceiveFrom();
     910             : }
     911             : 
     912           0 : void RSPConnection::_handleAcceptIDData(const size_t bytes)
     913             : {
     914           0 :     DatagramNode* pNode = _getDatagramNode(bytes);
     915           0 :     if (!pNode)
     916           0 :         return;
     917             : 
     918           0 :     DatagramNode& node = *pNode;
     919             : 
     920           0 :     switch (node.type)
     921             :     {
     922             :     case ID_HELLO:
     923           0 :         _checkNewID(node.connectionID);
     924           0 :         break;
     925             : 
     926             :     case ID_HELLO_REPLY:
     927           0 :         _addConnection(node.connectionID, node.data);
     928           0 :         break;
     929             : 
     930             :     case ID_DENY:
     931             :         // a connection refused my ID, try another ID
     932           0 :         if (node.connectionID == _id)
     933             :         {
     934           0 :             _timeouts = 0;
     935           0 :             _sendSimpleDatagram(ID_HELLO, _buildNewID());
     936           0 :             LBLOG(LOG_RSP) << "Announce " << _id << std::endl;
     937             :         }
     938           0 :         break;
     939             : 
     940             :     case ID_EXIT:
     941           0 :         _removeConnection(node.connectionID);
     942           0 :         break;
     943             : 
     944             :     default:
     945           0 :         LBDEBUG << "Got unexpected datagram type " << node.type << std::endl;
     946           0 :         LBUNIMPLEMENTED;
     947           0 :         break;
     948             :     }
     949             : }
     950             : 
     951           0 : void RSPConnection::_handleInitData(const size_t bytes, const bool connected)
     952             : {
     953           0 :     DatagramNode* pNode = _getDatagramNode(bytes);
     954           0 :     if (!pNode)
     955           0 :         return;
     956             : 
     957           0 :     DatagramNode& node = *pNode;
     958             : 
     959           0 :     switch (node.type)
     960             :     {
     961             :     case ID_HELLO:
     962           0 :         if (!connected)
     963           0 :             _timeouts = 0;
     964           0 :         _checkNewID(node.connectionID);
     965           0 :         return;
     966             : 
     967             :     case ID_CONFIRM:
     968           0 :         if (!connected)
     969           0 :             _timeouts = 0;
     970           0 :         _addConnection(node.connectionID, node.data);
     971           0 :         return;
     972             : 
     973             :     case COUNTNODE:
     974           0 :         LBLOG(LOG_RSP) << "Got " << node.data << " nodes from "
     975           0 :                        << node.connectionID << std::endl;
     976           0 :         return;
     977             : 
     978             :     case ID_HELLO_REPLY:
     979           0 :         _addConnection(node.connectionID, node.data);
     980           0 :         return;
     981             : 
     982             :     case ID_EXIT:
     983           0 :         _removeConnection(node.connectionID);
     984           0 :         return;
     985             : 
     986             :     default:
     987           0 :         LBDEBUG << "Got unexpected datagram type " << node.type << std::endl;
     988           0 :         break;
     989             :     }
     990             : }
     991             : 
     992           0 : void RSPConnection::_handleConnectedData(const size_t bytes)
     993             : {
     994           0 :     if (bytes < sizeof(uint16_t))
     995           0 :         return;
     996             : 
     997           0 :     void* data = _recvBuffer.getData();
     998           0 :     uint16_t type = *reinterpret_cast<uint16_t*>(data);
     999           0 :     switch (type)
    1000             :     {
    1001             :     case DATA:
    1002           0 :         LBCHECK(_handleData(bytes));
    1003           0 :         break;
    1004             : 
    1005             :     case ACK:
    1006           0 :         LBCHECK(_handleAck(bytes));
    1007           0 :         break;
    1008             : 
    1009             :     case NACK:
    1010           0 :         LBCHECK(_handleNack());
    1011           0 :         break;
    1012             : 
    1013             :     case ACKREQ: // The writer asks for an ack/nack
    1014           0 :         LBCHECK(_handleAckRequest(bytes));
    1015           0 :         break;
    1016             : 
    1017             :     case ID_HELLO:
    1018             :     case ID_HELLO_REPLY:
    1019             :     case ID_CONFIRM:
    1020             :     case ID_EXIT:
    1021             :     case ID_DENY:
    1022             :     case COUNTNODE:
    1023           0 :         _handleInitData(bytes, true);
    1024           0 :         break;
    1025             : 
    1026             :     default:
    1027           0 :         LBASSERTINFO(false, "Don't know how to handle packet of type " << type);
    1028             :     }
    1029             : }
    1030             : 
    1031           0 : void RSPConnection::_asyncReceiveFrom()
    1032             : {
    1033           0 :     _read->async_receive_from(
    1034           0 :         boost::asio::buffer(_recvBuffer.getData(), _mtu), _readAddr,
    1035           0 :         boost::bind(&RSPConnection::_handlePacket, this,
    1036             :                     boost::asio::placeholders::error,
    1037           0 :                     boost::asio::placeholders::bytes_transferred));
    1038           0 : }
    1039             : 
    1040           0 : bool RSPConnection::_handleData(const size_t bytes)
    1041             : {
    1042           0 :     if (bytes < sizeof(DatagramData))
    1043           0 :         return false;
    1044             :     DatagramData& datagram =
    1045           0 :         *reinterpret_cast<DatagramData*>(_recvBuffer.getData());
    1046             : 
    1047             : #ifdef CO_INSTRUMENT_RSP
    1048             :     ++nReadData;
    1049             : #endif
    1050           0 :     const uint16_t writerID = datagram.writerID;
    1051             : #ifdef Darwin
    1052             :     // There is occasionally a packet from ourselves, even though multicast loop
    1053             :     // is not set?!
    1054             :     if (writerID == _id)
    1055             :         return true;
    1056             : #else
    1057           0 :     LBASSERT(writerID != _id);
    1058             : #endif
    1059             : 
    1060           0 :     RSPConnectionPtr connection = _findConnection(writerID);
    1061             : 
    1062           0 :     if (!connection) // unknown connection ?
    1063             :     {
    1064           0 :         LBASSERTINFO(false, "Can't find connection with id " << writerID);
    1065           0 :         return false;
    1066             :     }
    1067           0 :     LBASSERT(connection->_id == writerID);
    1068             : 
    1069           0 :     const uint16_t sequence = datagram.sequence;
    1070             :     //  LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID
    1071             :     //  <<std::endl;
    1072             : 
    1073           0 :     if (connection->_sequence == sequence) // in-order packet
    1074             :     {
    1075           0 :         Buffer* newBuffer = connection->_newDataBuffer(_recvBuffer);
    1076           0 :         if (!newBuffer) // no more data buffers, drop packet
    1077           0 :             return true;
    1078             : 
    1079           0 :         lunchbox::ScopedWrite mutex(connection->_mutexEvent);
    1080           0 :         connection->_pushDataBuffer(newBuffer);
    1081             : 
    1082           0 :         while (!connection->_recvBuffers.empty()) // enqueue ready pending data
    1083             :         {
    1084           0 :             newBuffer = connection->_recvBuffers.front();
    1085           0 :             if (!newBuffer)
    1086           0 :                 break;
    1087             : 
    1088           0 :             connection->_recvBuffers.pop_front();
    1089           0 :             connection->_pushDataBuffer(newBuffer);
    1090             :         }
    1091             : 
    1092           0 :         if (!connection->_recvBuffers.empty() &&
    1093           0 :             !connection->_recvBuffers.front()) // update for new _sequence
    1094             :         {
    1095           0 :             connection->_recvBuffers.pop_front();
    1096             :         }
    1097             : 
    1098           0 :         connection->_event->set();
    1099           0 :         return true;
    1100             :     }
    1101             : 
    1102           0 :     const uint16_t max = std::numeric_limits<uint16_t>::max();
    1103           0 :     if ((connection->_sequence > sequence &&
    1104           0 :          max - connection->_sequence + sequence > _numBuffers) ||
    1105           0 :         (connection->_sequence < sequence &&
    1106           0 :          sequence - connection->_sequence > _numBuffers))
    1107             :     {
    1108             :         // ignore it if it's a repetition for another reader
    1109           0 :         return true;
    1110             :     }
    1111             : 
    1112             :     // else out of order
    1113             : 
    1114           0 :     const uint16_t size = sequence - connection->_sequence;
    1115           0 :     LBASSERT(size != 0);
    1116           0 :     LBASSERTINFO(size <= _numBuffers, size << " > " << _numBuffers);
    1117             : 
    1118           0 :     ssize_t i = ssize_t(size) - 1;
    1119           0 :     const bool gotPacket = (connection->_recvBuffers.size() >= size &&
    1120           0 :                             connection->_recvBuffers[i]);
    1121           0 :     if (gotPacket)
    1122           0 :         return true;
    1123             : 
    1124           0 :     Buffer* newBuffer = connection->_newDataBuffer(_recvBuffer);
    1125           0 :     if (!newBuffer) // no more data buffers, drop packet
    1126           0 :         return true;
    1127             : 
    1128           0 :     if (connection->_recvBuffers.size() < size)
    1129           0 :         connection->_recvBuffers.resize(size, 0);
    1130             : 
    1131           0 :     LBASSERT(!connection->_recvBuffers[i]);
    1132           0 :     connection->_recvBuffers[i] = newBuffer;
    1133             : 
    1134             :     // early nack: request missing packets before current
    1135           0 :     --i;
    1136           0 :     Nack nack = {connection->_sequence, uint16_t(sequence - 1)};
    1137           0 :     if (i > 0)
    1138             :     {
    1139           0 :         if (connection->_recvBuffers[i]) // got previous packet
    1140           0 :             return true;
    1141             : 
    1142           0 :         while (i >= 0 && !connection->_recvBuffers[i])
    1143           0 :             --i;
    1144             : 
    1145           0 :         const Buffer* lastBuffer = i >= 0 ? connection->_recvBuffers[i] : 0;
    1146           0 :         if (lastBuffer)
    1147             :         {
    1148           0 :             nack.start = connection->_sequence + i;
    1149             :         }
    1150             :     }
    1151             : 
    1152           0 :     LBLOG(LOG_RSP) << "send early nack " << nack.start << ".." << nack.end
    1153           0 :                    << " current " << connection->_sequence << " ooo "
    1154           0 :                    << connection->_recvBuffers.size() << std::endl;
    1155             : 
    1156           0 :     if (nack.end < nack.start)
    1157             :         // OPT: don't drop nack 0..nack.end, but it doesn't happen often
    1158           0 :         nack.end = std::numeric_limits<uint16_t>::max();
    1159             : 
    1160           0 :     _sendNack(writerID, &nack, 1);
    1161           0 :     return true;
    1162             : }
    1163             : 
    1164           0 : RSPConnection::Buffer* RSPConnection::_newDataBuffer(Buffer& inBuffer)
    1165             : {
    1166           0 :     LBASSERT(static_cast<int32_t>(inBuffer.getMaxSize()) == _mtu);
    1167             : 
    1168           0 :     Buffer* buffer = 0;
    1169           0 :     if (_threadBuffers.pop(buffer))
    1170             :     {
    1171           0 :         buffer->swap(inBuffer);
    1172           0 :         return buffer;
    1173             :     }
    1174             : 
    1175             :     // we do not have a free buffer, which means that the receiver is slower
    1176             :     // then our read thread. This is bad, because now we'll drop the data and
    1177             :     // will send a NAck packet upon the ack request, causing retransmission even
    1178             :     // though we'll probably drop it again
    1179           0 :     LBLOG(LOG_RSP) << "Reader too slow, dropping data" << std::endl;
    1180             : 
    1181             :     // Set the event if there is data to read. This shouldn't be needed since
    1182             :     // the event should be set in this case, but it'll increase the robustness
    1183           0 :     lunchbox::ScopedWrite mutex(_mutexEvent);
    1184           0 :     if (!_appBuffers.isEmpty())
    1185           0 :         _event->set();
    1186           0 :     return 0;
    1187             : }
    1188             : 
    1189           0 : void RSPConnection::_pushDataBuffer(Buffer* buffer)
    1190             : {
    1191           0 :     LBASSERT(_parent);
    1192             : #ifndef NDEBUG
    1193           0 :     DatagramData* dgram = reinterpret_cast<DatagramData*>(buffer->getData());
    1194           0 :     LBASSERTINFO(dgram->sequence == _sequence, dgram->sequence << " != "
    1195             :                                                                << _sequence);
    1196             : #endif
    1197             : 
    1198           0 :     if (((_sequence + _parent->_id) % _ackFreq) == 0)
    1199           0 :         _parent->_sendAck(_id, _sequence);
    1200             : 
    1201           0 :     LBLOG(LOG_RSP) << "post buffer " << _sequence << std::endl;
    1202           0 :     ++_sequence;
    1203           0 :     _appBuffers.push(buffer);
    1204           0 : }
    1205             : 
    1206           0 : bool RSPConnection::_handleAck(const size_t bytes)
    1207             : {
    1208           0 :     if (bytes < sizeof(DatagramAck))
    1209           0 :         return false;
    1210           0 :     DatagramAck& ack = *reinterpret_cast<DatagramAck*>(_recvBuffer.getData());
    1211             : 
    1212             : #ifdef CO_INSTRUMENT_RSP
    1213             :     ++nAcksRead;
    1214             : #endif
    1215             : 
    1216           0 :     if (ack.writerID != _id)
    1217           0 :         return true;
    1218             : 
    1219           0 :     LBLOG(LOG_RSP) << "got ack from " << ack.readerID << " for " << ack.writerID
    1220           0 :                    << " sequence " << ack.sequence << " current " << _sequence
    1221           0 :                    << std::endl;
    1222             : 
    1223             :     // find destination connection, update ack data if needed
    1224           0 :     RSPConnectionPtr connection = _findConnection(ack.readerID);
    1225           0 :     if (!connection)
    1226             :     {
    1227           0 :         LBUNREACHABLE;
    1228           0 :         return false;
    1229             :     }
    1230             : 
    1231           0 :     if (connection->_acked >= ack.sequence &&
    1232           0 :         connection->_acked - ack.sequence <= _numBuffers)
    1233             :     {
    1234             :         // I have received a later ack previously from the reader
    1235           0 :         LBLOG(LOG_RSP) << "Late ack" << std::endl;
    1236           0 :         return true;
    1237             :     }
    1238             : 
    1239             : #ifdef CO_INSTRUMENT_RSP
    1240             :     ++nAcksAccepted;
    1241             : #endif
    1242           0 :     connection->_acked = ack.sequence;
    1243           0 :     _timeouts = 0; // reset timeout counter
    1244             : 
    1245             :     // Check if we can advance _acked
    1246           0 :     uint16_t acked = ack.sequence;
    1247             : 
    1248           0 :     for (RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i)
    1249             :     {
    1250           0 :         RSPConnectionPtr child = *i;
    1251           0 :         if (child->_id == _id)
    1252           0 :             continue;
    1253             : 
    1254           0 :         const uint16_t distance = child->_acked - acked;
    1255           0 :         if (distance > _numBuffers)
    1256           0 :             acked = child->_acked;
    1257             :     }
    1258             : 
    1259           0 :     RSPConnectionPtr selfChild = _findConnection(_id);
    1260           0 :     const uint16_t distance = acked - selfChild->_acked;
    1261           0 :     if (distance <= _numBuffers)
    1262           0 :         _finishWriteQueue(acked);
    1263           0 :     return true;
    1264             : }
    1265             : 
    1266           0 : bool RSPConnection::_handleNack()
    1267             : {
    1268             :     DatagramNack& nack =
    1269           0 :         *reinterpret_cast<DatagramNack*>(_recvBuffer.getData());
    1270             : 
    1271             : #ifdef CO_INSTRUMENT_RSP
    1272             :     ++nNAcksRead;
    1273             : #endif
    1274             : 
    1275           0 :     if (_id != nack.writerID)
    1276             :     {
    1277           0 :         LBLOG(LOG_RSP) << "ignore " << nack.count << " nacks from "
    1278           0 :                        << nack.readerID << " for " << nack.writerID
    1279           0 :                        << " (not me)" << std::endl;
    1280           0 :         return true;
    1281             :     }
    1282             : 
    1283           0 :     LBLOG(LOG_RSP) << "handle " << nack.count << " nacks from " << nack.readerID
    1284           0 :                    << " for " << nack.writerID << std::endl;
    1285             : 
    1286           0 :     RSPConnectionPtr connection = _findConnection(nack.readerID);
    1287           0 :     if (!connection)
    1288             :     {
    1289           0 :         LBUNREACHABLE;
    1290           0 :         return false;
    1291             :         // it's an unknown connection, TODO add this connection?
    1292             :     }
    1293             : 
    1294           0 :     _timeouts = 0;
    1295           0 :     _addRepeat(nack.nacks, nack.count);
    1296           0 :     return true;
    1297             : }
    1298             : 
    1299           0 : void RSPConnection::_addRepeat(const Nack* nacks, uint16_t num)
    1300             : {
    1301           0 :     LBLOG(LOG_RSP) << lunchbox::disableFlush << "Queue repeat requests ";
    1302           0 :     size_t lost = 0;
    1303             : 
    1304           0 :     for (size_t i = 0; i < num; ++i)
    1305             :     {
    1306           0 :         const Nack& nack = nacks[i];
    1307           0 :         LBASSERT(nack.start <= nack.end);
    1308             : 
    1309           0 :         LBLOG(LOG_RSP) << nack.start << ".." << nack.end << " ";
    1310             : 
    1311           0 :         bool merged = false;
    1312           0 :         for (RepeatQueue::iterator j = _repeatQueue.begin();
    1313           0 :              j != _repeatQueue.end() && !merged; ++j)
    1314             :         {
    1315           0 :             Nack& old = *j;
    1316           0 :             if (old.start <= nack.end && old.end >= nack.start)
    1317             :             {
    1318           0 :                 if (old.start > nack.start)
    1319             :                 {
    1320           0 :                     lost += old.start - nack.start;
    1321           0 :                     old.start = nack.start;
    1322           0 :                     merged = true;
    1323             :                 }
    1324           0 :                 if (old.end < nack.end)
    1325             :                 {
    1326           0 :                     lost += nack.end - old.end;
    1327           0 :                     old.end = nack.end;
    1328           0 :                     merged = true;
    1329             :                 }
    1330           0 :                 LBASSERT(lost < _numBuffers);
    1331             :             }
    1332             :         }
    1333             : 
    1334           0 :         if (!merged)
    1335             :         {
    1336           0 :             lost += uint16_t(nack.end - nack.start) + 1;
    1337           0 :             LBASSERT(lost <= _numBuffers);
    1338           0 :             _repeatQueue.push_back(nack);
    1339             :         }
    1340             :     }
    1341             : 
    1342           0 :     ConstConnectionDescriptionPtr description = getDescription();
    1343           0 :     if (_sendRate >
    1344           0 :         (description->bandwidth >>
    1345           0 :          Global::getIAttribute(Global::IATTR_RSP_MIN_SENDRATE_SHIFT)))
    1346             :     {
    1347             :         const float delta =
    1348           0 :             float(lost) * .001f *
    1349           0 :             Global::getIAttribute(Global::IATTR_RSP_ERROR_DOWNSCALE);
    1350             :         const float maxDelta =
    1351             :             .01f *
    1352           0 :             float(Global::getIAttribute(Global::IATTR_RSP_ERROR_MAXSCALE));
    1353           0 :         const float downScale = LB_MIN(delta, maxDelta);
    1354           0 :         _sendRate -= 1 + int64_t(_sendRate * downScale);
    1355           0 :         LBLOG(LOG_RSP) << ", lost " << lost << " slowing down "
    1356           0 :                        << downScale * 100.f << "% to " << _sendRate << " KB/s"
    1357           0 :                        << std::endl
    1358           0 :                        << lunchbox::enableFlush;
    1359             :     }
    1360             :     else
    1361           0 :         LBLOG(LOG_RSP) << std::endl << lunchbox::enableFlush;
    1362           0 : }
    1363             : 
    1364           0 : bool RSPConnection::_handleAckRequest(const size_t bytes)
    1365             : {
    1366           0 :     if (bytes < sizeof(DatagramAckRequest))
    1367           0 :         return false;
    1368             :     DatagramAckRequest& ackRequest =
    1369           0 :         *reinterpret_cast<DatagramAckRequest*>(_recvBuffer.getData());
    1370             : 
    1371           0 :     const uint16_t writerID = ackRequest.writerID;
    1372             : #ifdef Darwin
    1373             :     // There is occasionally a packet from ourselves, even though multicast loop
    1374             :     // is not set?!
    1375             :     if (writerID == _id)
    1376             :         return true;
    1377             : #else
    1378           0 :     LBASSERT(writerID != _id);
    1379             : #endif
    1380           0 :     RSPConnectionPtr connection = _findConnection(writerID);
    1381           0 :     if (!connection)
    1382             :     {
    1383           0 :         LBUNREACHABLE;
    1384           0 :         return false;
    1385             :     }
    1386             : 
    1387           0 :     const uint16_t reqID = ackRequest.sequence;
    1388           0 :     const uint16_t gotID = connection->_sequence - 1;
    1389           0 :     const uint16_t distance = reqID - gotID;
    1390             : 
    1391           0 :     LBLOG(LOG_RSP) << "ack request " << reqID << " from " << writerID << " got "
    1392           0 :                    << gotID << " missing " << distance << std::endl;
    1393             : 
    1394           0 :     if ((reqID == gotID) || (gotID > reqID && gotID - reqID <= _numBuffers) ||
    1395           0 :         (gotID < reqID && distance > _numBuffers))
    1396             :     {
    1397           0 :         _sendAck(connection->_id, gotID);
    1398           0 :         return true;
    1399             :     }
    1400             :     // else find all missing datagrams
    1401             : 
    1402           0 :     const uint16_t max = CO_RSP_MAX_NACKS - 2;
    1403             :     Nack nacks[CO_RSP_MAX_NACKS];
    1404           0 :     uint16_t i = 0;
    1405             : 
    1406           0 :     nacks[i].start = connection->_sequence;
    1407           0 :     LBLOG(LOG_RSP) << lunchbox::disableFlush << "nacks: " << nacks[i].start
    1408           0 :                    << "..";
    1409             : 
    1410           0 :     std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
    1411           0 :     std::deque<Buffer*>::const_iterator first = j;
    1412           0 :     for (; j != connection->_recvBuffers.end() && i < max; ++j)
    1413             :     {
    1414           0 :         if (*j) // got buffer
    1415             :         {
    1416           0 :             nacks[i].end = connection->_sequence + std::distance(first, j);
    1417           0 :             LBLOG(LOG_RSP) << nacks[i].end << ", ";
    1418           0 :             if (nacks[i].end < nacks[i].start)
    1419             :             {
    1420           0 :                 LBASSERT(nacks[i].end < _numBuffers);
    1421           0 :                 nacks[i + 1].start = 0;
    1422           0 :                 nacks[i + 1].end = nacks[i].end;
    1423           0 :                 nacks[i].end = std::numeric_limits<uint16_t>::max();
    1424           0 :                 ++i;
    1425             :             }
    1426           0 :             ++i;
    1427             : 
    1428             :             // find next hole
    1429           0 :             for (++j; j != connection->_recvBuffers.end() && (*j); ++j)
    1430             :                 /* nop */;
    1431             : 
    1432           0 :             if (j == connection->_recvBuffers.end())
    1433           0 :                 break;
    1434             : 
    1435           0 :             nacks[i].start =
    1436           0 :                 connection->_sequence + std::distance(first, j) + 1;
    1437           0 :             LBLOG(LOG_RSP) << nacks[i].start << "..";
    1438             :         }
    1439             :     }
    1440             : 
    1441           0 :     if (j != connection->_recvBuffers.end() || i == 0)
    1442             :     {
    1443           0 :         nacks[i].end = reqID;
    1444           0 :         LBLOG(LOG_RSP) << nacks[i].end;
    1445           0 :         ++i;
    1446             :     }
    1447           0 :     else if (uint16_t(reqID - nacks[i - 1].end) < _numBuffers)
    1448             :     {
    1449           0 :         nacks[i].start = nacks[i - 1].end + 1;
    1450           0 :         nacks[i].end = reqID;
    1451           0 :         LBLOG(LOG_RSP) << nacks[i].start << ".." << nacks[i].end;
    1452           0 :         ++i;
    1453             :     }
    1454           0 :     if (nacks[i - 1].end < nacks[i - 1].start)
    1455             :     {
    1456           0 :         LBASSERT(nacks[i - 1].end < _numBuffers);
    1457           0 :         nacks[i].start = 0;
    1458           0 :         nacks[i].end = nacks[i - 1].end;
    1459           0 :         nacks[i - 1].end = std::numeric_limits<uint16_t>::max();
    1460           0 :         ++i;
    1461             :     }
    1462             : 
    1463           0 :     LBLOG(LOG_RSP) << std::endl
    1464           0 :                    << lunchbox::enableFlush << "send " << i << " nacks to "
    1465           0 :                    << connection->_id << std::endl;
    1466             : 
    1467           0 :     LBASSERT(i > 0);
    1468           0 :     _sendNack(connection->_id, nacks, i);
    1469           0 :     return true;
    1470             : }
    1471             : 
    1472           0 : void RSPConnection::_checkNewID(uint16_t id)
    1473             : {
    1474             :     // look if the new ID exist in another connection
    1475           0 :     if (id == _id || _findConnection(id))
    1476             :     {
    1477           0 :         LBLOG(LOG_RSP) << "Deny " << id << std::endl;
    1478           0 :         _sendSimpleDatagram(ID_DENY, _id);
    1479             :     }
    1480             :     else
    1481           0 :         _sendSimpleDatagram(ID_HELLO_REPLY, _id);
    1482           0 : }
    1483             : 
    1484           0 : RSPConnectionPtr RSPConnection::_findConnection(const uint16_t id)
    1485             : {
    1486           0 :     for (RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i)
    1487             :     {
    1488           0 :         if ((*i)->_id == id)
    1489           0 :             return *i;
    1490             :     }
    1491           0 :     return 0;
    1492             : }
    1493             : 
    1494           0 : bool RSPConnection::_addConnection(const uint16_t id, const uint16_t sequence)
    1495             : {
    1496           0 :     if (_findConnection(id))
    1497           0 :         return false;
    1498             : 
    1499           0 :     LBDEBUG << "add connection " << id << std::endl;
    1500           0 :     RSPConnectionPtr connection = new RSPConnection();
    1501           0 :     connection->_id = id;
    1502           0 :     connection->_parent = this;
    1503           0 :     connection->_setState(STATE_CONNECTED);
    1504           0 :     connection->_setDescription(_getDescription());
    1505           0 :     connection->_sequence = sequence;
    1506           0 :     LBASSERT(connection->_appBuffers.isEmpty());
    1507             : 
    1508             :     // Make all buffers available for reading
    1509           0 :     for (BuffersCIter i = connection->_buffers.begin();
    1510           0 :          i != connection->_buffers.end(); ++i)
    1511             :     {
    1512           0 :         Buffer* buffer = *i;
    1513           0 :         LBCHECK(connection->_threadBuffers.push(buffer));
    1514             :     }
    1515             : 
    1516           0 :     _children.push_back(connection);
    1517           0 :     _sendCountNode();
    1518             : 
    1519           0 :     lunchbox::ScopedWrite mutex(_mutexConnection);
    1520           0 :     _newChildren.push_back(connection);
    1521             : 
    1522           0 :     lunchbox::ScopedWrite mutex2(_mutexEvent);
    1523           0 :     _event->set();
    1524           0 :     return true;
    1525             : }
    1526             : 
    1527           0 : void RSPConnection::_removeConnection(const uint16_t id)
    1528             : {
    1529           0 :     LBDEBUG << "remove connection " << id << std::endl;
    1530           0 :     if (id == _id)
    1531           0 :         return;
    1532             : 
    1533           0 :     for (RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i)
    1534             :     {
    1535           0 :         RSPConnectionPtr child = *i;
    1536           0 :         if (child->_id == id)
    1537             :         {
    1538           0 :             _children.erase(i);
    1539             : 
    1540           0 :             lunchbox::ScopedWrite mutex(child->_mutexEvent);
    1541           0 :             child->_appBuffers.push(0);
    1542           0 :             child->_event->set();
    1543           0 :             break;
    1544             :         }
    1545             :     }
    1546             : 
    1547           0 :     _sendCountNode();
    1548             : }
    1549             : 
    1550           0 : int64_t RSPConnection::write(const void* inData, const uint64_t bytes)
    1551             : {
    1552           0 :     if (_parent)
    1553           0 :         return _parent->write(inData, bytes);
    1554             : 
    1555           0 :     LBASSERT(isListening());
    1556           0 :     if (!_write)
    1557           0 :         return -1;
    1558             : 
    1559             :     // compute number of datagrams
    1560           0 :     uint64_t nDatagrams = bytes / _payloadSize;
    1561           0 :     if (nDatagrams * _payloadSize != bytes)
    1562           0 :         ++nDatagrams;
    1563             : 
    1564             :     // queue each datagram (might block if buffers are exhausted)
    1565           0 :     const uint8_t* data = reinterpret_cast<const uint8_t*>(inData);
    1566           0 :     const uint8_t* end = data + bytes;
    1567           0 :     for (uint64_t i = 0; i < nDatagrams; ++i)
    1568             :     {
    1569           0 :         size_t packetSize = end - data;
    1570           0 :         packetSize = LB_MIN(packetSize, _payloadSize);
    1571             : 
    1572           0 :         if (_appBuffers.isEmpty())
    1573             :             // trigger processing
    1574           0 :             _postWakeup();
    1575             : 
    1576             :         Buffer* buffer;
    1577           0 :         if (!_appBuffers.timedPop(_writeTimeOut, buffer))
    1578             :         {
    1579           0 :             LBERROR << "Timeout while writing" << std::endl;
    1580           0 :             buffer = 0;
    1581             :         }
    1582             : 
    1583           0 :         if (!buffer)
    1584             :         {
    1585           0 :             close();
    1586           0 :             return -1;
    1587             :         }
    1588             : 
    1589             :         // prepare packet header (sequence is done by thread)
    1590             :         DatagramData* header =
    1591           0 :             reinterpret_cast<DatagramData*>(buffer->getData());
    1592           0 :         header->type = DATA;
    1593           0 :         header->size = uint16_t(packetSize);
    1594           0 :         header->writerID = _id;
    1595             : 
    1596           0 :         memcpy(header + 1, data, packetSize);
    1597           0 :         data += packetSize;
    1598             : 
    1599           0 :         LBCHECK(_threadBuffers.push(buffer));
    1600             :     }
    1601           0 :     _postWakeup();
    1602           0 :     LBLOG(LOG_RSP) << "queued " << nDatagrams << " datagrams, " << bytes
    1603           0 :                    << " bytes" << std::endl;
    1604           0 :     return bytes;
    1605             : }
    1606             : 
    1607           0 : void RSPConnection::finish()
    1608             : {
    1609           0 :     if (_parent.isValid())
    1610             :     {
    1611           0 :         LBASSERTINFO(!_parent, "Writes are only allowed on RSP listeners");
    1612           0 :         return;
    1613             :     }
    1614           0 :     LBASSERT(isListening());
    1615           0 :     _appBuffers.waitSize(_buffers.size());
    1616             : }
    1617             : 
    1618           0 : void RSPConnection::_sendCountNode()
    1619             : {
    1620           0 :     if (!_findConnection(_id))
    1621           0 :         return;
    1622             : 
    1623           0 :     LBLOG(LOG_RSP) << _children.size() << " nodes" << std::endl;
    1624           0 :     DatagramNode count = {COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
    1625           0 :                           uint16_t(_children.size())};
    1626           0 :     _write->send(boost::asio::buffer(&count, sizeof(count)));
    1627             : }
    1628             : 
    1629           0 : void RSPConnection::_sendSimpleDatagram(const DatagramType type,
    1630             :                                         const uint16_t id)
    1631             : {
    1632             :     DatagramNode simple = {uint16_t(type), CO_RSP_PROTOCOL_VERSION, id,
    1633           0 :                            _sequence};
    1634           0 :     _write->send(boost::asio::buffer(&simple, sizeof(simple)));
    1635           0 : }
    1636             : 
    1637           0 : void RSPConnection::_sendAck(const uint16_t writerID, const uint16_t sequence)
    1638             : {
    1639           0 :     LBASSERT(_id != writerID);
    1640             : #ifdef CO_INSTRUMENT_RSP
    1641             :     ++nAcksSend;
    1642             : #endif
    1643             : 
    1644           0 :     LBLOG(LOG_RSP) << "send ack " << sequence << std::endl;
    1645           0 :     DatagramAck ack = {ACK, _id, writerID, sequence};
    1646           0 :     _write->send(boost::asio::buffer(&ack, sizeof(ack)));
    1647           0 : }
    1648             : 
    1649           0 : void RSPConnection::_sendNack(const uint16_t writerID, const Nack* nacks,
    1650             :                               const uint16_t count)
    1651             : {
    1652           0 :     LBASSERT(count > 0);
    1653           0 :     LBASSERT(count <= CO_RSP_MAX_NACKS);
    1654             : #ifdef CO_INSTRUMENT_RSP
    1655             :     ++nNAcksSend;
    1656             : #endif
    1657             :     /* optimization: use the direct access to the reader. */
    1658           0 :     if (writerID == _id)
    1659             :     {
    1660           0 :         _addRepeat(nacks, count);
    1661           0 :         return;
    1662             :     }
    1663             : 
    1664             :     const size_t size =
    1665           0 :         sizeof(DatagramNack) - (CO_RSP_MAX_NACKS - count) * sizeof(Nack);
    1666             : 
    1667             :     // set the header
    1668             :     DatagramNack packet;
    1669           0 :     packet.set(_id, writerID, count);
    1670           0 :     memcpy(packet.nacks, nacks, count * sizeof(Nack));
    1671           0 :     _write->send(boost::asio::buffer(&packet, size));
    1672             : }
    1673             : 
    1674           0 : void RSPConnection::_sendAckRequest()
    1675             : {
    1676             : #ifdef CO_INSTRUMENT_RSP
    1677             :     ++nAckRequests;
    1678             : #endif
    1679           0 :     LBLOG(LOG_RSP) << "send ack request for " << uint16_t(_sequence - 1)
    1680           0 :                    << std::endl;
    1681           0 :     DatagramAckRequest ackRequest = {ACKREQ, _id, uint16_t(_sequence - 1)};
    1682           0 :     _write->send(boost::asio::buffer(&ackRequest, sizeof(DatagramAckRequest)));
    1683           0 : }
    1684             : 
    1685           0 : std::ostream& operator<<(std::ostream& os, const RSPConnection& connection)
    1686             : {
    1687           0 :     os << lunchbox::disableFlush << lunchbox::disableHeader
    1688           0 :        << "RSPConnection id " << connection.getID() << " send rate "
    1689           0 :        << connection.getSendRate();
    1690             : 
    1691             : #ifdef CO_INSTRUMENT_RSP
    1692             :     const int prec = os.precision();
    1693             :     os.precision(3);
    1694             : 
    1695             :     const float time = instrumentClock.getTimef();
    1696             :     const float mbps = 1048.576f * time;
    1697             :     os << ": " << lunchbox::indent << std::endl
    1698             :        << float(nBytesRead) / mbps << " / " << float(nBytesWritten) / mbps
    1699             :        << " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
    1700             :        << " repeats " << nMergedDatagrams << " merged" << std::endl;
    1701             : 
    1702             :     os.precision(prec);
    1703             :     os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
    1704             :        << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
    1705             :        << writeWaitTime << " ms" << std::endl
    1706             :        << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
    1707             :        << lunchbox::exdent;
    1708             : 
    1709             :     nReadData = 0;
    1710             :     nBytesRead = 0;
    1711             :     nBytesWritten = 0;
    1712             :     nDatagrams = 0;
    1713             :     nRepeated = 0;
    1714             :     nMergedDatagrams = 0;
    1715             :     nAckRequests = 0;
    1716             :     nAcksSend = 0;
    1717             :     nAcksRead = 0;
    1718             :     nAcksAccepted = 0;
    1719             :     nNAcksSend = 0;
    1720             :     nNAcksRead = 0;
    1721             :     writeWaitTime = 0.f;
    1722             : 
    1723             :     for (auto child : connection._children)
    1724             :         os << *child << std::endl;
    1725             : #endif
    1726           0 :     os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
    1727             : 
    1728           0 :     return os;
    1729             : }
    1730          63 : }

Generated by: LCOV version 1.11