LCOV - code coverage report
Current view: top level - co - rspConnection.h (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 13 30 43.3 %
Date: 2014-10-06 Functions: 10 18 55.6 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c)      2009, Cedric Stalder <cedric.stalder@gmail.com>
       3             :  *               2009-2013, Stefan Eilemann <eile@equalizergraphics.com>
       4             :  *                    2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #ifndef CO_RSPCONNECTION_H
      23             : #define CO_RSPCONNECTION_H
      24             : 
      25             : #include <co/connection.h>      // base class
      26             : #include <co/eventConnection.h> // member
      27             : 
      28             : #include <lunchbox/buffer.h>  // member
      29             : #include <lunchbox/clock.h>   // member
      30             : #include <lunchbox/lfQueue.h> // member
      31             : #include <lunchbox/mtQueue.h> // member
      32             : 
      33             : #pragma warning(push)
      34             : #pragma warning(disable: 4267)
      35             : #include <boost/asio.hpp>
      36             : #pragma warning(pop)
      37             : 
      38             : namespace co
      39             : {
      40             : class RSPConnection;
      41             : typedef lunchbox::RefPtr< RSPConnection > RSPConnectionPtr;
      42             : 
      43             : /**
      44             :  * A reliable multicast connection.
      45             :  *
      46             :  * This connection implements a reliable stream protocol (RSP) over IP V4
      47             :  * UDP multicast. The <a href="http://www.equalizergraphics.com/documents/design/multicast.html#RSP">RSP
      48             :  * design document</a> describes the high-level protocol.
      49             :  */
      50             : class RSPConnection : public Connection
      51             : {
      52             : public:
      53             :     /** Create a new RSP-based connection. */
      54             :     RSPConnection();
      55             : 
      56             :     bool listen() override;
      57           8 :     void close() override { _close(); }
      58             : 
      59             :     /** Identical to listen() for multicast connections. */
      60           0 :     bool connect() override { return listen(); }
      61             : 
      62           2 :     void acceptNB() override { LBASSERT( isListening( )); }
      63             : 
      64             :     ConnectionPtr acceptSync() override;
      65       67598 :     void readNB( void*, const uint64_t ) override {/* NOP */}
      66             :     int64_t readSync( void* buffer, const uint64_t bytes,
      67             :                       const bool ignored ) override;
      68             :     int64_t write( const void* buffer,
      69             :                    const uint64_t bytes ) override;
      70             : 
      71             :     /** @internal Finish all pending send operations. */
      72             :     void finish() override;
      73             : 
      74             :     /** @internal @return current send speed in kilobyte per second. */
      75           0 :     int64_t getSendRate() const { return _sendRate; }
      76             : 
      77             :     /**
      78             :      * @internal
      79             :      * @return the unique identifier of this connection within the multicast
      80             :      *         group.
      81             :      */
      82           0 :     uint16_t getID() const { return _id; }
      83             : 
      84           0 :     Notifier getNotifier() const override
      85           0 :         { return _event->getNotifier(); }
      86             : 
      87             : protected:
      88             :     virtual ~RSPConnection();
      89             : 
      90             : private:
      91             :     /** Thread managing network IO and RSP protocol. */
      92             :     class Thread : public lunchbox::Thread
      93             :     {
      94             :     public:
      95           2 :         Thread( RSPConnectionPtr connection )
      96           2 :         : _connection( connection ){}
      97           4 :         virtual ~Thread(){ _connection = 0; }
      98             :     protected:
      99             :         void run() override;
     100           2 :         bool init() override { return _connection->_initThread(); }
     101             : 
     102             :     private:
     103             :         RSPConnectionPtr _connection;
     104             :     };
     105             : 
     106             :     /** The type of each UDP packet */
     107             :     enum DatagramType
     108             :     {
     109             :         DATA,      //!< the datagram contains data
     110             :         ACKREQ,    //!< ask for ack from all readers
     111             :         NACK,      //!< negative ack, request missing packets
     112             :         ACK,       //!< positive ack all data
     113             :         ID_HELLO,  //!< announce a new id
     114             :         ID_HELLO_REPLY, //!< reply to hello, transmit cur packet number
     115             :         ID_DENY,   //!< deny the id, already used
     116             :         ID_CONFIRM,//!< a new node is connected
     117             :         ID_EXIT,   //!< a node is disconnected
     118             :         COUNTNODE  //!< send to other the number of nodes which I have found
     119             :         // NOTE: Do not use more than 255 types here, since the endianness
     120             :         // detection magic relies on only using the LSB.
     121             :     };
     122             : 
     123             :     /** ID_HELLO, ID_DENY, ID_CONFIRM, ID_EXIT or COUNTNODE packet */
     124             :     struct DatagramNode
     125             :     {
     126             :         uint16_t type;
     127             :         uint16_t protocolVersion;
     128             :         uint16_t connectionID;  // clientID for type COUNTNODE
     129             :         uint16_t data;
     130             : 
     131          86 :         void byteswap()
     132             :             {
     133             : #ifdef COLLAGE_BIGENDIAN
     134             :                 lunchbox::byteswap( type );
     135             :                 lunchbox::byteswap( protocolVersion );
     136             :                 lunchbox::byteswap( connectionID );
     137             :                 lunchbox::byteswap( data );
     138             : #endif
     139          86 :             }
     140             :     };
     141             : 
     142             :     /** Request receive confirmation of all packets up to sequence. */
     143             :     struct DatagramAckRequest
     144             :     {
     145             :         uint16_t type;
     146             :         uint16_t writerID;
     147             :         uint16_t sequence;
     148             : 
     149           0 :         void byteswap()
     150             :             {
     151             : #ifdef COLLAGE_BIGENDIAN
     152             :                 lunchbox::byteswap( type );
     153             :                 lunchbox::byteswap( writerID );
     154             :                 lunchbox::byteswap( sequence );
     155             : #endif
     156           0 :             }
     157             :     };
     158             : 
     159             :     /** Missing packets from start..end sequence */
     160             :     struct Nack
     161             :     {
     162             :         uint16_t start;
     163             :         uint16_t end;
     164             :     };
     165             : 
     166             : #       define CO_RSP_MAX_NACKS 300 // fits in a single IP frame
     167             :     /** Request resend of lost packets */
     168             :     struct DatagramNack
     169             :     {
     170           0 :         void set( uint16_t rID, uint16_t wID, uint16_t n )
     171             :             {
     172           0 :                 type       = NACK;
     173           0 :                 readerID   = rID;
     174           0 :                 writerID   = wID;
     175           0 :                 count      = n;
     176           0 :             }
     177             : 
     178             :         uint16_t       type;
     179             :         uint16_t       readerID;
     180             :         uint16_t       writerID;
     181             :         uint16_t       count;       //!< number of NACK requests used
     182             :         Nack           nacks[ CO_RSP_MAX_NACKS ];
     183             : 
     184           0 :         void byteswap()
     185             :             {
     186             : #ifdef COLLAGE_BIGENDIAN
     187             :                 lunchbox::byteswap( type );
     188             :                 lunchbox::byteswap( readerID );
     189             :                 lunchbox::byteswap( writerID );
     190             :                 lunchbox::byteswap( count );
     191             :                 for( uint16_t i = 0; i < count; ++i )
     192             :                 {
     193             :                     lunchbox::byteswap( nacks[i].start );
     194             :                     lunchbox::byteswap( nacks[i].end );
     195             :                 }
     196             : #endif
     197           0 :             }
     198             :     };
     199             : 
     200             :     /** Acknowledge reception of all packets including sequence .*/
     201             :     struct DatagramAck
     202             :     {
     203             :         uint16_t        type;
     204             :         uint16_t        readerID;
     205             :         uint16_t        writerID;
     206             :         uint16_t        sequence;
     207             : 
     208           0 :         void byteswap()
     209             :             {
     210             : #ifdef COLLAGE_BIGENDIAN
     211             :                 lunchbox::byteswap( type );
     212             :                 lunchbox::byteswap( readerID );
     213             :                 lunchbox::byteswap( writerID );
     214             :                 lunchbox::byteswap( sequence );
     215             : #endif
     216           0 :             }
     217             :     };
     218             : 
     219             :     /** Data packet */
     220             :     struct DatagramData
     221             :     {
     222             :         uint16_t    type;
     223             :         uint16_t    size;
     224             :         uint16_t    writerID;
     225             :         uint16_t    sequence;
     226             : 
     227       29028 :         void byteswap()
     228             :             {
     229             : #ifdef COLLAGE_BIGENDIAN
     230             :                 lunchbox::byteswap( type );
     231             :                 lunchbox::byteswap( size );
     232             :                 lunchbox::byteswap( writerID );
     233             :                 lunchbox::byteswap( sequence );
     234             : #endif
     235       29028 :             }
     236             :     };
     237             : 
     238             :     typedef std::vector< RSPConnectionPtr > RSPConnections;
     239             :     typedef RSPConnections::iterator RSPConnectionsIter;
     240             :     typedef RSPConnections::const_iterator RSPConnectionsCIter;
     241             : 
     242             :     RSPConnectionPtr _parent;
     243             :     RSPConnections _children;
     244             : 
     245             :     // a link for all connection in the connecting state
     246             :     RSPConnections _newChildren;
     247             : 
     248             :     uint16_t _id; //!< The identifier used to demultiplex multipe writers
     249             :     bool     _idAccepted;
     250             :     int32_t  _mtu;
     251             :     int32_t  _ackFreq;
     252             :     uint32_t _payloadSize;
     253             :     int32_t  _timeouts;
     254             : 
     255             :     typedef lunchbox::RefPtr< EventConnection > EventConnectionPtr;
     256             :     EventConnectionPtr _event;
     257             : 
     258             :     boost::asio::io_service        _ioService;
     259             :     boost::asio::ip::udp::socket*  _read;
     260             :     boost::asio::ip::udp::socket*  _write;
     261             :     boost::asio::ip::udp::endpoint _readAddr;
     262             :     boost::asio::deadline_timer    _timeout;
     263             :     boost::asio::deadline_timer    _wakeup;
     264             : 
     265             :     lunchbox::Clock _clock;
     266             :     uint64_t        _maxBucketSize;
     267             :     size_t          _bucketSize;
     268             :     int64_t         _sendRate;
     269             : 
     270             :     Thread*      _thread;
     271             :     lunchbox::Lock   _mutexConnection;
     272             :     lunchbox::Lock   _mutexEvent;
     273             :     uint16_t     _acked;        // sequence ID of last confirmed ack
     274             : 
     275             :     typedef lunchbox::Bufferb Buffer;
     276             :     typedef std::vector< Buffer* > Buffers;
     277             :     typedef Buffers::iterator BuffersIter;
     278             :     typedef Buffers::const_iterator BuffersCIter;
     279             : 
     280             :     Buffers _buffers;                   //!< Data buffers
     281             :     /** Empty read buffers (connected) or write buffers (listening) */
     282             :     lunchbox::LFQueue< Buffer* > _threadBuffers;
     283             :     /** Ready data buffers (connected) or empty write buffers (listening) */
     284             :     lunchbox::MTQueue< Buffer* > _appBuffers;
     285             : 
     286             :     Buffer _recvBuffer;                      //!< Receive (thread) buffer
     287             :     std::deque< Buffer* > _recvBuffers;      //!< out-of-order buffers
     288             : 
     289             :     Buffer* _readBuffer;                     //!< Read (app) buffer
     290             :     uint64_t _readBufferPos;                 //!< Current read index
     291             : 
     292             :     uint16_t _sequence; //!< the next usable (write) or expected (read)
     293             :     std::deque< Buffer* > _writeBuffers;    //!< Written buffers, not acked
     294             : 
     295             :     typedef std::deque< Nack > RepeatQueue;
     296             :     RepeatQueue _repeatQueue; //!< nacks to repeat
     297             : 
     298             :     const unsigned _writeTimeOut;
     299             : 
     300             :     void _close();
     301             :     uint16_t _buildNewID();
     302             : 
     303             :     void _processOutgoing();
     304             :     void _writeData();
     305             :     void _repeatData();
     306             :     void _finishWriteQueue( const uint16_t sequence );
     307             : 
     308             :     bool _handleData( const size_t bytes );
     309             :     bool _handleAck( const size_t bytes );
     310             :     bool _handleNack();
     311             :     bool _handleAckRequest( const size_t bytes );
     312             : 
     313             :     Buffer* _newDataBuffer( Buffer& inBuffer );
     314             :     void _pushDataBuffer( Buffer* buffer );
     315             : 
     316             :     /* Run the reader thread */
     317             :     void _runThread();
     318             : 
     319             :     /* init the reader thread */
     320             :     bool _initThread();
     321             :     /* Make all buffers available for reading */
     322             :     void initBuffers();
     323             :     /* handle data about the comunication state */
     324             :     void _handlePacket( const boost::system::error_code& error,
     325             :                         const size_t bytes );
     326             :     void _handleConnectedData( const size_t bytes );
     327             :     void _handleInitData( const size_t bytes, const bool connected );
     328             :     void _handleAcceptIDData( const size_t bytes );
     329             : 
     330             :     /* handle timeout about the comunication state */
     331             :     void _handleTimeout( const boost::system::error_code& error );
     332             :     void _handleConnectedTimeout();
     333             :     void _handleInitTimeout();
     334             :     void _handleAcceptIDTimeout();
     335             : 
     336             :     void _clearWriteQueues();
     337             : 
     338             :     DatagramNode* _getDatagramNode( const size_t bytes );
     339             : 
     340             :     /** find the connection corresponding to the identifier */
     341             :     RSPConnectionPtr _findConnection( const uint16_t id );
     342             : 
     343             :     /** Sleep until allowed to send according to send rate */
     344             :     void _waitWritable( const uint64_t bytes );
     345             : 
     346             :     /** format and send a datagram count node */
     347             :     void _sendCountNode();
     348             : 
     349             :     void _addRepeat( const Nack* nacks, const uint16_t num );
     350             : 
     351             :     /** format and send an simple request which use only type and id field*/
     352             :     void _sendSimpleDatagram( const DatagramType type, const uint16_t id );
     353             : 
     354             :     /** format and send an ack request for the current sequence */
     355             :     void _sendAckRequest();
     356             : 
     357             :     /** format and send a positive ack */
     358             :     void _sendAck( const uint16_t writerID, const uint16_t sequence );
     359             : 
     360             :     /** format and send a negative ack */
     361             :     void _sendNack( const uint16_t toWriterID, const Nack* nacks,
     362             :                     const uint16_t num );
     363             : 
     364             :     void _checkNewID( const uint16_t id );
     365             : 
     366             :     /* add a new connection detected in the multicast network */
     367             :     bool _addConnection( const uint16_t id, const uint16_t sequence);
     368             :     void _removeConnection( const uint16_t id );
     369             : 
     370             :     void _setTimeout( const int32_t timeOut );
     371             :     void _postWakeup();
     372             :     void _asyncReceiveFrom();
     373          10 :     bool _isWriting() const
     374          10 :         { return !_threadBuffers.isEmpty() || !_writeBuffers.empty(); }
     375             : };
     376             : 
     377             : // cppcheck-suppress passedByValue
     378             : std::ostream& operator << ( std::ostream&, const RSPConnection& );
     379             : }
     380             : 
     381             : #endif //CO_RSPCONNECTION_H

Generated by: LCOV version 1.10