LCOV - code coverage report
Current view: top level - co - rspConnection.h (source / functions) Hit Total Coverage
Test: Collage Lines: 0 30 0.0 %
Date: 2016-12-14 01:26:48 Functions: 0 18 0.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c)      2009, Cedric Stalder <cedric.stalder@gmail.com>
       3             :  *               2009-2013, Stefan Eilemann <eile@equalizergraphics.com>
       4             :  *                    2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #ifndef CO_RSPCONNECTION_H
      23             : #define CO_RSPCONNECTION_H
      24             : 
      25             : #include <co/connection.h>      // base class
      26             : #include <co/eventConnection.h> // member
      27             : 
      28             : #include <lunchbox/buffer.h>  // member
      29             : #include <lunchbox/clock.h>   // member
      30             : #include <lunchbox/lfQueue.h> // member
      31             : #include <lunchbox/mtQueue.h> // member
      32             : 
      33             : #pragma warning(push)
      34             : #pragma warning(disable: 4267)
      35             : #include <boost/asio.hpp>
      36             : #pragma warning(pop)
      37             : 
      38             : namespace co
      39             : {
      40             : class RSPConnection;
      41             : typedef lunchbox::RefPtr< RSPConnection > RSPConnectionPtr;
      42             : 
      43             : /**
      44             :  * A reliable multicast connection.
      45             :  *
      46             :  * This connection implements a reliable stream protocol (RSP) over IP V4
      47             :  * UDP multicast. The <a href="http://www.equalizergraphics.com/documents/design/multicast.html#RSP">RSP
      48             :  * design document</a> describes the high-level protocol.
      49             :  */
      50             : class RSPConnection : public Connection
      51             : {
      52             : public:
      53             :     /** Create a new RSP-based connection. */
      54             :     RSPConnection();
      55             : 
      56             :     bool listen() override;
      57           0 :     void close() override { _close(); }
      58             : 
      59             :     /** Identical to listen() for multicast connections. */
      60           0 :     bool connect() override { return listen(); }
      61             : 
      62           0 :     void acceptNB() override { LBASSERT( isListening( )); }
      63             : 
      64             :     ConnectionPtr acceptSync() override;
      65           0 :     void readNB( void*, const uint64_t ) override {/* NOP */}
      66             :     int64_t readSync( void* buffer, const uint64_t bytes,
      67             :                       const bool ignored ) override;
      68             :     int64_t write( const void* buffer,
      69             :                    const uint64_t bytes ) override;
      70             : 
      71             :     /** @internal Finish all pending send operations. */
      72             :     void finish() override;
      73             : 
      74             :     /** @internal @return current send speed in kilobyte per second. */
      75           0 :     int64_t getSendRate() const { return _sendRate; }
      76             : 
      77             :     /**
      78             :      * @internal
      79             :      * @return the unique identifier of this connection within the multicast
      80             :      *         group.
      81             :      */
      82           0 :     uint16_t getID() const { return _id; }
      83             : 
      84           0 :     Notifier getNotifier() const override
      85           0 :         { return _event->getNotifier(); }
      86             : 
      87             : protected:
      88             :     virtual ~RSPConnection();
      89             : 
      90             : private:
      91             :     /** Thread managing network IO and RSP protocol. */
      92             :     class Thread : public lunchbox::Thread
      93             :     {
      94             :     public:
      95           0 :         explicit Thread( RSPConnectionPtr connection )
      96           0 :             : _connection( connection ) {}
      97           0 :         virtual ~Thread(){ _connection = 0; }
      98             : 
      99             :     protected:
     100             :         void run() override;
     101           0 :         bool init() override { return _connection->_initThread(); }
     102             : 
     103             :     private:
     104             :         RSPConnectionPtr _connection;
     105             :     };
     106             : 
     107             :     /** The type of each UDP packet */
     108             :     enum DatagramType
     109             :     {
     110             :         DATA,      //!< the datagram contains data
     111             :         ACKREQ,    //!< ask for ack from all readers
     112             :         NACK,      //!< negative ack, request missing packets
     113             :         ACK,       //!< positive ack all data
     114             :         ID_HELLO,  //!< announce a new id
     115             :         ID_HELLO_REPLY, //!< reply to hello, transmit cur packet number
     116             :         ID_DENY,   //!< deny the id, already used
     117             :         ID_CONFIRM,//!< a new node is connected
     118             :         ID_EXIT,   //!< a node is disconnected
     119             :         COUNTNODE  //!< send to other the number of nodes which I have found
     120             :         // NOTE: Do not use more than 255 types here, since the endianness
     121             :         // detection magic relies on only using the LSB.
     122             :     };
     123             : 
     124             :     /** ID_HELLO, ID_DENY, ID_CONFIRM, ID_EXIT or COUNTNODE packet */
     125             :     struct DatagramNode
     126             :     {
     127             :         uint16_t type;
     128             :         uint16_t protocolVersion;
     129             :         uint16_t connectionID;  // clientID for type COUNTNODE
     130             :         uint16_t data;
     131             : 
     132           0 :         void byteswap()
     133             :             {
     134             : #ifdef COLLAGE_BIGENDIAN
     135             :                 lunchbox::byteswap( type );
     136             :                 lunchbox::byteswap( protocolVersion );
     137             :                 lunchbox::byteswap( connectionID );
     138             :                 lunchbox::byteswap( data );
     139             : #endif
     140           0 :             }
     141             :     };
     142             : 
     143             :     /** Request receive confirmation of all packets up to sequence. */
     144             :     struct DatagramAckRequest
     145             :     {
     146             :         uint16_t type;
     147             :         uint16_t writerID;
     148             :         uint16_t sequence;
     149             : 
     150           0 :         void byteswap()
     151             :             {
     152             : #ifdef COLLAGE_BIGENDIAN
     153             :                 lunchbox::byteswap( type );
     154             :                 lunchbox::byteswap( writerID );
     155             :                 lunchbox::byteswap( sequence );
     156             : #endif
     157           0 :             }
     158             :     };
     159             : 
     160             :     /** Missing packets from start..end sequence */
     161             :     struct Nack
     162             :     {
     163             :         uint16_t start;
     164             :         uint16_t end;
     165             :     };
     166             : 
     167             : #       define CO_RSP_MAX_NACKS 300 // fits in a single IP frame
     168             :     /** Request resend of lost packets */
     169             :     struct DatagramNack
     170             :     {
     171           0 :         void set( uint16_t rID, uint16_t wID, uint16_t n )
     172             :         {
     173           0 :             type       = NACK;
     174           0 :             readerID   = rID;
     175           0 :             writerID   = wID;
     176           0 :             count      = n;
     177           0 :         }
     178             : 
     179             :         uint16_t       type;
     180             :         uint16_t       readerID;
     181             :         uint16_t       writerID;
     182             :         uint16_t       count;       //!< number of NACK requests used
     183             :         Nack           nacks[ CO_RSP_MAX_NACKS ];
     184             : 
     185           0 :         void byteswap()
     186             :             {
     187             : #ifdef COLLAGE_BIGENDIAN
     188             :                 lunchbox::byteswap( type );
     189             :                 lunchbox::byteswap( readerID );
     190             :                 lunchbox::byteswap( writerID );
     191             :                 lunchbox::byteswap( count );
     192             :                 for( uint16_t i = 0; i < count; ++i )
     193             :                 {
     194             :                     lunchbox::byteswap( nacks[i].start );
     195             :                     lunchbox::byteswap( nacks[i].end );
     196             :                 }
     197             : #endif
     198           0 :             }
     199             :     };
     200             : 
     201             :     /** Acknowledge reception of all packets including sequence .*/
     202             :     struct DatagramAck
     203             :     {
     204             :         uint16_t        type;
     205             :         uint16_t        readerID;
     206             :         uint16_t        writerID;
     207             :         uint16_t        sequence;
     208             : 
     209           0 :         void byteswap()
     210             :             {
     211             : #ifdef COLLAGE_BIGENDIAN
     212             :                 lunchbox::byteswap( type );
     213             :                 lunchbox::byteswap( readerID );
     214             :                 lunchbox::byteswap( writerID );
     215             :                 lunchbox::byteswap( sequence );
     216             : #endif
     217           0 :             }
     218             :     };
     219             : 
     220             :     /** Data packet */
     221             :     struct DatagramData
     222             :     {
     223             :         uint16_t    type;
     224             :         uint16_t    size;
     225             :         uint16_t    writerID;
     226             :         uint16_t    sequence;
     227             : 
     228           0 :         void byteswap()
     229             :             {
     230             : #ifdef COLLAGE_BIGENDIAN
     231             :                 lunchbox::byteswap( type );
     232             :                 lunchbox::byteswap( size );
     233             :                 lunchbox::byteswap( writerID );
     234             :                 lunchbox::byteswap( sequence );
     235             : #endif
     236           0 :             }
     237             :     };
     238             : 
     239             :     typedef std::vector< RSPConnectionPtr > RSPConnections;
     240             :     typedef RSPConnections::iterator RSPConnectionsIter;
     241             :     typedef RSPConnections::const_iterator RSPConnectionsCIter;
     242             : 
     243             :     RSPConnectionPtr _parent;
     244             :     RSPConnections _children;
     245             : 
     246             :     // a link for all connection in the connecting state
     247             :     RSPConnections _newChildren;
     248             : 
     249             :     uint16_t _id; //!< The identifier used to demultiplex multipe writers
     250             :     bool     _idAccepted;
     251             :     int32_t  _mtu;
     252             :     int32_t  _ackFreq;
     253             :     uint32_t _payloadSize;
     254             :     int32_t  _timeouts;
     255             : 
     256             :     typedef lunchbox::RefPtr< EventConnection > EventConnectionPtr;
     257             :     EventConnectionPtr _event;
     258             : 
     259             :     boost::asio::io_service        _ioService;
     260             :     boost::asio::ip::udp::socket*  _read;
     261             :     boost::asio::ip::udp::socket*  _write;
     262             :     boost::asio::ip::udp::endpoint _readAddr;
     263             :     boost::asio::deadline_timer    _timeout;
     264             :     boost::asio::deadline_timer    _wakeup;
     265             : 
     266             :     lunchbox::Clock _clock;
     267             :     uint64_t        _maxBucketSize;
     268             :     size_t          _bucketSize;
     269             :     int64_t         _sendRate;
     270             : 
     271             :     Thread*      _thread;
     272             :     lunchbox::Lock   _mutexConnection;
     273             :     lunchbox::Lock   _mutexEvent;
     274             :     uint16_t     _acked;        // sequence ID of last confirmed ack
     275             : 
     276             :     typedef lunchbox::Bufferb Buffer;
     277             :     typedef std::vector< Buffer* > Buffers;
     278             :     typedef Buffers::iterator BuffersIter;
     279             :     typedef Buffers::const_iterator BuffersCIter;
     280             : 
     281             :     Buffers _buffers;                   //!< Data buffers
     282             :     /** Empty read buffers (connected) or write buffers (listening) */
     283             :     lunchbox::LFQueue< Buffer* > _threadBuffers;
     284             :     /** Ready data buffers (connected) or empty write buffers (listening) */
     285             :     lunchbox::MTQueue< Buffer* > _appBuffers;
     286             : 
     287             :     Buffer _recvBuffer;                      //!< Receive (thread) buffer
     288             :     std::deque< Buffer* > _recvBuffers;      //!< out-of-order buffers
     289             : 
     290             :     Buffer* _readBuffer;                     //!< Read (app) buffer
     291             :     uint64_t _readBufferPos;                 //!< Current read index
     292             : 
     293             :     uint16_t _sequence; //!< the next usable (write) or expected (read)
     294             :     std::deque< Buffer* > _writeBuffers;    //!< Written buffers, not acked
     295             : 
     296             :     typedef std::deque< Nack > RepeatQueue;
     297             :     RepeatQueue _repeatQueue; //!< nacks to repeat
     298             : 
     299             :     const unsigned _writeTimeOut;
     300             : 
     301             :     void _close();
     302             :     uint16_t _buildNewID();
     303             : 
     304             :     void _processOutgoing();
     305             :     void _writeData();
     306             :     void _repeatData();
     307             :     void _finishWriteQueue( const uint16_t sequence );
     308             : 
     309             :     bool _handleData( const size_t bytes );
     310             :     bool _handleAck( const size_t bytes );
     311             :     bool _handleNack();
     312             :     bool _handleAckRequest( const size_t bytes );
     313             : 
     314             :     Buffer* _newDataBuffer( Buffer& inBuffer );
     315             :     void _pushDataBuffer( Buffer* buffer );
     316             : 
     317             :     /* Run the reader thread */
     318             :     void _runThread();
     319             : 
     320             :     /* init the reader thread */
     321             :     bool _initThread();
     322             :     /* Make all buffers available for reading */
     323             :     void initBuffers();
     324             :     /* handle data about the comunication state */
     325             :     void _handlePacket( const boost::system::error_code& error,
     326             :                         const size_t bytes );
     327             :     void _handleConnectedData( const size_t bytes );
     328             :     void _handleInitData( const size_t bytes, const bool connected );
     329             :     void _handleAcceptIDData( const size_t bytes );
     330             : 
     331             :     /* handle timeout about the comunication state */
     332             :     void _handleTimeout( const boost::system::error_code& error );
     333             :     void _handleConnectedTimeout();
     334             :     void _handleInitTimeout();
     335             :     void _handleAcceptIDTimeout();
     336             : 
     337             :     void _clearWriteQueues();
     338             : 
     339             :     DatagramNode* _getDatagramNode( const size_t bytes );
     340             : 
     341             :     /** find the connection corresponding to the identifier */
     342             :     RSPConnectionPtr _findConnection( const uint16_t id );
     343             : 
     344             :     /** Sleep until allowed to send according to send rate */
     345             :     void _waitWritable( const uint64_t bytes );
     346             : 
     347             :     /** format and send a datagram count node */
     348             :     void _sendCountNode();
     349             : 
     350             :     void _addRepeat( const Nack* nacks, const uint16_t num );
     351             : 
     352             :     /** format and send an simple request which use only type and id field*/
     353             :     void _sendSimpleDatagram( const DatagramType type, const uint16_t id );
     354             : 
     355             :     /** format and send an ack request for the current sequence */
     356             :     void _sendAckRequest();
     357             : 
     358             :     /** format and send a positive ack */
     359             :     void _sendAck( const uint16_t writerID, const uint16_t sequence );
     360             : 
     361             :     /** format and send a negative ack */
     362             :     void _sendNack( const uint16_t toWriterID, const Nack* nacks,
     363             :                     const uint16_t num );
     364             : 
     365             :     void _checkNewID( const uint16_t id );
     366             : 
     367             :     /* add a new connection detected in the multicast network */
     368             :     bool _addConnection( const uint16_t id, const uint16_t sequence);
     369             :     void _removeConnection( const uint16_t id );
     370             : 
     371             :     void _setTimeout( const int32_t timeOut );
     372             :     void _postWakeup();
     373             :     void _asyncReceiveFrom();
     374           0 :     bool _isWriting() const
     375           0 :         { return !_threadBuffers.isEmpty() || !_writeBuffers.empty(); }
     376             : };
     377             : 
     378             : std::ostream& operator << ( std::ostream&, const RSPConnection& );
     379             : }
     380             : 
     381             : #endif //CO_RSPCONNECTION_H

Generated by: LCOV version 1.11