LCOV - code coverage report
Current view: top level - co - rspConnection.h (source / functions) Hit Total Coverage
Test: Collage Lines: 0 19 0.0 %
Date: 2018-01-09 16:37:03 Functions: 0 12 0.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2009-2017, Cedric Stalder <cedric.stalder@gmail.com>
       3             :  *                          Stefan Eilemann <eile@equalizergraphics.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #ifndef CO_RSPCONNECTION_H
      23             : #define CO_RSPCONNECTION_H
      24             : 
      25             : #include <co/connection.h>      // base class
      26             : #include <co/eventConnection.h> // member
      27             : 
      28             : #include <lunchbox/buffer.h>  // member
      29             : #include <lunchbox/clock.h>   // member
      30             : #include <lunchbox/lfQueue.h> // member
      31             : #include <lunchbox/mtQueue.h> // member
      32             : 
      33             : #pragma warning(push)
      34             : #pragma warning(disable : 4267)
      35             : #include <boost/asio.hpp>
      36             : #pragma warning(pop)
      37             : 
      38             : namespace co
      39             : {
      40             : class RSPConnection;
      41             : typedef lunchbox::RefPtr<RSPConnection> RSPConnectionPtr;
      42             : 
      43             : /**
      44             :  * A reliable multicast connection.
      45             :  *
      46             :  * This connection implements a reliable stream protocol (RSP) over IP V4
      47             :  * UDP multicast. The <a
      48             :  * href="http://www.equalizergraphics.com/documents/design/multicast.html#RSP">RSP
      49             :  * design document</a> describes the high-level protocol.
      50             :  */
      51             : class RSPConnection : public Connection
      52             : {
      53             : public:
      54             :     /** Create a new RSP-based connection. */
      55             :     RSPConnection();
      56             : 
      57             :     bool listen() override;
      58             :     void close() override;
      59             : 
      60             :     /** Identical to listen() for multicast connections. */
      61           0 :     bool connect() override { return listen(); }
      62           0 :     void acceptNB() override { LBASSERT(isListening()); }
      63             :     ConnectionPtr acceptSync() override;
      64           0 :     void readNB(void*, const uint64_t) override { /* NOP */}
      65             :     int64_t readSync(void* buffer, const uint64_t bytes,
      66             :                      const bool ignored) override;
      67             :     int64_t write(const void* buffer, const uint64_t bytes) override;
      68             : 
      69             :     /** @internal Finish all pending send operations. */
      70             :     void finish() override;
      71             : 
      72             :     /** @internal @return current send speed in kilobyte per second. */
      73           0 :     int64_t getSendRate() const { return _sendRate; }
      74             :     /**
      75             :      * @internal
      76             :      * @return the unique identifier of this connection within the multicast
      77             :      *         group.
      78             :      */
      79           0 :     uint16_t getID() const { return _id; }
      80           0 :     Notifier getNotifier() const override { return _event->getNotifier(); }
      81             : protected:
      82             :     virtual ~RSPConnection();
      83             : 
      84             : private:
      85             :     /** Thread managing network IO and RSP protocol. */
      86             :     class Thread : public lunchbox::Thread
      87             :     {
      88             :     public:
      89           0 :         explicit Thread(RSPConnectionPtr connection)
      90           0 :             : _connection(connection)
      91             :         {
      92           0 :         }
      93           0 :         virtual ~Thread() { _connection = 0; }
      94             :     protected:
      95             :         void run() override;
      96           0 :         bool init() override { return _connection->_initThread(); }
      97             :     private:
      98             :         RSPConnectionPtr _connection;
      99             :     };
     100             : 
     101             :     /** The type of each UDP packet */
     102             :     enum DatagramType
     103             :     {
     104             :         DATA,           //!< the datagram contains data
     105             :         ACKREQ,         //!< ask for ack from all readers
     106             :         NACK,           //!< negative ack, request missing packets
     107             :         ACK,            //!< positive ack all data
     108             :         ID_HELLO,       //!< announce a new id
     109             :         ID_HELLO_REPLY, //!< reply to hello, transmit cur packet number
     110             :         ID_DENY,        //!< deny the id, already used
     111             :         ID_CONFIRM,     //!< a new node is connected
     112             :         ID_EXIT,        //!< a node is disconnected
     113             :         COUNTNODE //!< send to other the number of nodes which I have found
     114             :         // NOTE: Do not use more than 255 types here, since the endianness
     115             :         // detection magic relies on only using the LSB.
     116             :     };
     117             : 
     118             :     /** ID_HELLO, ID_DENY, ID_CONFIRM, ID_EXIT or COUNTNODE packet */
     119             :     struct DatagramNode
     120             :     {
     121             :         uint16_t type;
     122             :         uint16_t protocolVersion;
     123             :         uint16_t connectionID; // clientID for type COUNTNODE
     124             :         uint16_t data;
     125             :     };
     126             : 
     127             :     /** Request receive confirmation of all packets up to sequence. */
     128             :     struct DatagramAckRequest
     129             :     {
     130             :         uint16_t type;
     131             :         uint16_t writerID;
     132             :         uint16_t sequence;
     133             :     };
     134             : 
     135             :     /** Missing packets from start..end sequence */
     136             :     struct Nack
     137             :     {
     138             :         uint16_t start;
     139             :         uint16_t end;
     140             :     };
     141             : 
     142             : #define CO_RSP_MAX_NACKS 300 // fits in a single IP frame
     143             :     /** Request resend of lost packets */
     144             :     struct DatagramNack
     145             :     {
     146           0 :         void set(uint16_t rID, uint16_t wID, uint16_t n)
     147             :         {
     148           0 :             type = NACK;
     149           0 :             readerID = rID;
     150           0 :             writerID = wID;
     151           0 :             count = n;
     152           0 :         }
     153             : 
     154             :         uint16_t type;
     155             :         uint16_t readerID;
     156             :         uint16_t writerID;
     157             :         uint16_t count; //!< number of NACK requests used
     158             :         Nack nacks[CO_RSP_MAX_NACKS];
     159             :     };
     160             : 
     161             :     /** Acknowledge reception of all packets including sequence .*/
     162             :     struct DatagramAck
     163             :     {
     164             :         uint16_t type;
     165             :         uint16_t readerID;
     166             :         uint16_t writerID;
     167             :         uint16_t sequence;
     168             :     };
     169             : 
     170             :     /** Data packet */
     171             :     struct DatagramData
     172             :     {
     173             :         uint16_t type;
     174             :         uint16_t size;
     175             :         uint16_t writerID;
     176             :         uint16_t sequence;
     177             :     };
     178             : 
     179             :     typedef std::vector<RSPConnectionPtr> RSPConnections;
     180             :     typedef RSPConnections::iterator RSPConnectionsIter;
     181             :     typedef RSPConnections::const_iterator RSPConnectionsCIter;
     182             : 
     183             :     RSPConnectionPtr _parent;
     184             :     RSPConnections _children;
     185             : 
     186             :     // a link for all connection in the connecting state
     187             :     RSPConnections _newChildren;
     188             : 
     189             :     uint16_t _id; //!< The identifier used to demultiplex multipe writers
     190             :     bool _idAccepted;
     191             :     int32_t _mtu;
     192             :     int32_t _ackFreq;
     193             :     uint32_t _payloadSize;
     194             :     int32_t _timeouts;
     195             : 
     196             :     typedef lunchbox::RefPtr<EventConnection> EventConnectionPtr;
     197             :     EventConnectionPtr _event;
     198             : 
     199             :     boost::asio::io_service _ioService;
     200             :     boost::asio::ip::udp::socket* _read;
     201             :     boost::asio::ip::udp::socket* _write;
     202             :     boost::asio::ip::udp::endpoint _readAddr;
     203             :     boost::asio::deadline_timer _timeout;
     204             :     boost::asio::deadline_timer _wakeup;
     205             : 
     206             :     lunchbox::Clock _clock;
     207             :     uint64_t _maxBucketSize;
     208             :     size_t _bucketSize;
     209             :     int64_t _sendRate;
     210             : 
     211             :     Thread* _thread;
     212             :     std::mutex _mutexConnection;
     213             :     std::mutex _mutexEvent;
     214             :     uint16_t _acked; // sequence ID of last confirmed ack
     215             : 
     216             :     typedef lunchbox::Bufferb Buffer;
     217             :     typedef std::vector<Buffer*> Buffers;
     218             :     typedef Buffers::iterator BuffersIter;
     219             :     typedef Buffers::const_iterator BuffersCIter;
     220             : 
     221             :     Buffers _buffers; //!< Data buffers
     222             :     /** Empty read buffers (connected) or write buffers (listening) */
     223             :     lunchbox::LFQueue<Buffer*> _threadBuffers;
     224             :     /** Ready data buffers (connected) or empty write buffers (listening) */
     225             :     lunchbox::MTQueue<Buffer*> _appBuffers;
     226             : 
     227             :     Buffer _recvBuffer;               //!< Receive (thread) buffer
     228             :     std::deque<Buffer*> _recvBuffers; //!< out-of-order buffers
     229             : 
     230             :     Buffer* _readBuffer;     //!< Read (app) buffer
     231             :     uint64_t _readBufferPos; //!< Current read index
     232             : 
     233             :     uint16_t _sequence; //!< the next usable (write) or expected (read)
     234             :     std::deque<Buffer*> _writeBuffers; //!< Written buffers, not acked
     235             : 
     236             :     typedef std::deque<Nack> RepeatQueue;
     237             :     RepeatQueue _repeatQueue; //!< nacks to repeat
     238             : 
     239             :     const unsigned _writeTimeOut;
     240             : 
     241             :     uint16_t _buildNewID();
     242             : 
     243             :     void _processOutgoing();
     244             :     void _writeData();
     245             :     void _repeatData();
     246             :     void _finishWriteQueue(const uint16_t sequence);
     247             : 
     248             :     bool _handleData(const size_t bytes);
     249             :     bool _handleAck(const size_t bytes);
     250             :     bool _handleNack();
     251             :     bool _handleAckRequest(const size_t bytes);
     252             : 
     253             :     Buffer* _newDataBuffer(Buffer& inBuffer);
     254             :     void _pushDataBuffer(Buffer* buffer);
     255             : 
     256             :     /* Run the reader thread */
     257             :     void _runThread();
     258             : 
     259             :     /* init the reader thread */
     260             :     bool _initThread();
     261             :     /* Make all buffers available for reading */
     262             :     void initBuffers();
     263             :     /* handle data about the comunication state */
     264             :     void _handlePacket(const boost::system::error_code& error,
     265             :                        const size_t bytes);
     266             :     void _handleConnectedData(const size_t bytes);
     267             :     void _handleInitData(const size_t bytes, const bool connected);
     268             :     void _handleAcceptIDData(const size_t bytes);
     269             : 
     270             :     /* handle timeout about the comunication state */
     271             :     void _handleTimeout(const boost::system::error_code& error);
     272             :     void _handleConnectedTimeout();
     273             :     void _handleInitTimeout();
     274             :     void _handleAcceptIDTimeout();
     275             : 
     276             :     void _clearWriteQueues();
     277             : 
     278             :     DatagramNode* _getDatagramNode(const size_t bytes);
     279             : 
     280             :     /** find the connection corresponding to the identifier */
     281             :     RSPConnectionPtr _findConnection(const uint16_t id);
     282             : 
     283             :     /** Sleep until allowed to send according to send rate */
     284             :     void _waitWritable(const uint64_t bytes);
     285             : 
     286             :     /** format and send a datagram count node */
     287             :     void _sendCountNode();
     288             : 
     289             :     void _addRepeat(const Nack* nacks, const uint16_t num);
     290             : 
     291             :     /** format and send an simple request which use only type and id field*/
     292             :     void _sendSimpleDatagram(const DatagramType type, const uint16_t id);
     293             : 
     294             :     /** format and send an ack request for the current sequence */
     295             :     void _sendAckRequest();
     296             : 
     297             :     /** format and send a positive ack */
     298             :     void _sendAck(const uint16_t writerID, const uint16_t sequence);
     299             : 
     300             :     /** format and send a negative ack */
     301             :     void _sendNack(const uint16_t toWriterID, const Nack* nacks,
     302             :                    const uint16_t num);
     303             : 
     304             :     void _checkNewID(const uint16_t id);
     305             : 
     306             :     /* add a new connection detected in the multicast network */
     307             :     bool _addConnection(const uint16_t id, const uint16_t sequence);
     308             :     void _removeConnection(const uint16_t id);
     309             : 
     310             :     void _setTimeout(const int32_t timeOut);
     311             :     void _postWakeup();
     312             :     void _asyncReceiveFrom();
     313           0 :     bool _isWriting() const
     314             :     {
     315           0 :         return !_threadBuffers.isEmpty() || !_writeBuffers.empty();
     316             :     }
     317             : 
     318             :     friend std::ostream& operator<<(std::ostream&, const RSPConnection&);
     319             : };
     320             : 
     321             : std::ostream& operator<<(std::ostream&, const RSPConnection&);
     322             : }
     323             : 
     324             : #endif // CO_RSPCONNECTION_H

Generated by: LCOV version 1.11