Line data Source code
1 :
2 : /* Copyright (c) 2009, Cedric Stalder <cedric.stalder@gmail.com>
3 : * 2009-2013, Stefan Eilemann <eile@equalizergraphics.com>
4 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_RSPCONNECTION_H
23 : #define CO_RSPCONNECTION_H
24 :
25 : #include <co/connection.h> // base class
26 : #include <co/eventConnection.h> // member
27 :
28 : #include <lunchbox/buffer.h> // member
29 : #include <lunchbox/clock.h> // member
30 : #include <lunchbox/lfQueue.h> // member
31 : #include <lunchbox/mtQueue.h> // member
32 :
33 : #pragma warning(push)
34 : #pragma warning(disable: 4267)
35 : #include <boost/asio.hpp>
36 : #pragma warning(pop)
37 :
38 : namespace co
39 : {
40 : class RSPConnection;
41 : typedef lunchbox::RefPtr< RSPConnection > RSPConnectionPtr;
42 :
43 : /**
44 : * A reliable multicast connection.
45 : *
46 : * This connection implements a reliable stream protocol (RSP) over IP V4
47 : * UDP multicast. The <a href="http://www.equalizergraphics.com/documents/design/multicast.html#RSP">RSP
48 : * design document</a> describes the high-level protocol.
49 : */
50 : class RSPConnection : public Connection
51 : {
52 : public:
53 : /** Create a new RSP-based connection. */
54 : RSPConnection();
55 :
56 : bool listen() override;
57 8 : void close() override { _close(); }
58 :
59 : /** Identical to listen() for multicast connections. */
60 0 : bool connect() override { return listen(); }
61 :
62 2 : void acceptNB() override { LBASSERT( isListening( )); }
63 :
64 : ConnectionPtr acceptSync() override;
65 67598 : void readNB( void*, const uint64_t ) override {/* NOP */}
66 : int64_t readSync( void* buffer, const uint64_t bytes,
67 : const bool ignored ) override;
68 : int64_t write( const void* buffer,
69 : const uint64_t bytes ) override;
70 :
71 : /** @internal Finish all pending send operations. */
72 : void finish() override;
73 :
74 : /** @internal @return current send speed in kilobyte per second. */
75 0 : int64_t getSendRate() const { return _sendRate; }
76 :
77 : /**
78 : * @internal
79 : * @return the unique identifier of this connection within the multicast
80 : * group.
81 : */
82 0 : uint16_t getID() const { return _id; }
83 :
84 0 : Notifier getNotifier() const override
85 0 : { return _event->getNotifier(); }
86 :
87 : protected:
88 : virtual ~RSPConnection();
89 :
90 : private:
91 : /** Thread managing network IO and RSP protocol. */
92 : class Thread : public lunchbox::Thread
93 : {
94 : public:
95 2 : Thread( RSPConnectionPtr connection )
96 2 : : _connection( connection ){}
97 4 : virtual ~Thread(){ _connection = 0; }
98 : protected:
99 : void run() override;
100 2 : bool init() override { return _connection->_initThread(); }
101 :
102 : private:
103 : RSPConnectionPtr _connection;
104 : };
105 :
106 : /** The type of each UDP packet */
107 : enum DatagramType
108 : {
109 : DATA, //!< the datagram contains data
110 : ACKREQ, //!< ask for ack from all readers
111 : NACK, //!< negative ack, request missing packets
112 : ACK, //!< positive ack all data
113 : ID_HELLO, //!< announce a new id
114 : ID_HELLO_REPLY, //!< reply to hello, transmit cur packet number
115 : ID_DENY, //!< deny the id, already used
116 : ID_CONFIRM,//!< a new node is connected
117 : ID_EXIT, //!< a node is disconnected
118 : COUNTNODE //!< send to other the number of nodes which I have found
119 : // NOTE: Do not use more than 255 types here, since the endianness
120 : // detection magic relies on only using the LSB.
121 : };
122 :
123 : /** ID_HELLO, ID_DENY, ID_CONFIRM, ID_EXIT or COUNTNODE packet */
124 : struct DatagramNode
125 : {
126 : uint16_t type;
127 : uint16_t protocolVersion;
128 : uint16_t connectionID; // clientID for type COUNTNODE
129 : uint16_t data;
130 :
131 86 : void byteswap()
132 : {
133 : #ifdef COLLAGE_BIGENDIAN
134 : lunchbox::byteswap( type );
135 : lunchbox::byteswap( protocolVersion );
136 : lunchbox::byteswap( connectionID );
137 : lunchbox::byteswap( data );
138 : #endif
139 86 : }
140 : };
141 :
142 : /** Request receive confirmation of all packets up to sequence. */
143 : struct DatagramAckRequest
144 : {
145 : uint16_t type;
146 : uint16_t writerID;
147 : uint16_t sequence;
148 :
149 0 : void byteswap()
150 : {
151 : #ifdef COLLAGE_BIGENDIAN
152 : lunchbox::byteswap( type );
153 : lunchbox::byteswap( writerID );
154 : lunchbox::byteswap( sequence );
155 : #endif
156 0 : }
157 : };
158 :
159 : /** Missing packets from start..end sequence */
160 : struct Nack
161 : {
162 : uint16_t start;
163 : uint16_t end;
164 : };
165 :
166 : # define CO_RSP_MAX_NACKS 300 // fits in a single IP frame
167 : /** Request resend of lost packets */
168 : struct DatagramNack
169 : {
170 0 : void set( uint16_t rID, uint16_t wID, uint16_t n )
171 : {
172 0 : type = NACK;
173 0 : readerID = rID;
174 0 : writerID = wID;
175 0 : count = n;
176 0 : }
177 :
178 : uint16_t type;
179 : uint16_t readerID;
180 : uint16_t writerID;
181 : uint16_t count; //!< number of NACK requests used
182 : Nack nacks[ CO_RSP_MAX_NACKS ];
183 :
184 0 : void byteswap()
185 : {
186 : #ifdef COLLAGE_BIGENDIAN
187 : lunchbox::byteswap( type );
188 : lunchbox::byteswap( readerID );
189 : lunchbox::byteswap( writerID );
190 : lunchbox::byteswap( count );
191 : for( uint16_t i = 0; i < count; ++i )
192 : {
193 : lunchbox::byteswap( nacks[i].start );
194 : lunchbox::byteswap( nacks[i].end );
195 : }
196 : #endif
197 0 : }
198 : };
199 :
200 : /** Acknowledge reception of all packets including sequence .*/
201 : struct DatagramAck
202 : {
203 : uint16_t type;
204 : uint16_t readerID;
205 : uint16_t writerID;
206 : uint16_t sequence;
207 :
208 0 : void byteswap()
209 : {
210 : #ifdef COLLAGE_BIGENDIAN
211 : lunchbox::byteswap( type );
212 : lunchbox::byteswap( readerID );
213 : lunchbox::byteswap( writerID );
214 : lunchbox::byteswap( sequence );
215 : #endif
216 0 : }
217 : };
218 :
219 : /** Data packet */
220 : struct DatagramData
221 : {
222 : uint16_t type;
223 : uint16_t size;
224 : uint16_t writerID;
225 : uint16_t sequence;
226 :
227 29028 : void byteswap()
228 : {
229 : #ifdef COLLAGE_BIGENDIAN
230 : lunchbox::byteswap( type );
231 : lunchbox::byteswap( size );
232 : lunchbox::byteswap( writerID );
233 : lunchbox::byteswap( sequence );
234 : #endif
235 29028 : }
236 : };
237 :
238 : typedef std::vector< RSPConnectionPtr > RSPConnections;
239 : typedef RSPConnections::iterator RSPConnectionsIter;
240 : typedef RSPConnections::const_iterator RSPConnectionsCIter;
241 :
242 : RSPConnectionPtr _parent;
243 : RSPConnections _children;
244 :
245 : // a link for all connection in the connecting state
246 : RSPConnections _newChildren;
247 :
248 : uint16_t _id; //!< The identifier used to demultiplex multipe writers
249 : bool _idAccepted;
250 : int32_t _mtu;
251 : int32_t _ackFreq;
252 : uint32_t _payloadSize;
253 : int32_t _timeouts;
254 :
255 : typedef lunchbox::RefPtr< EventConnection > EventConnectionPtr;
256 : EventConnectionPtr _event;
257 :
258 : boost::asio::io_service _ioService;
259 : boost::asio::ip::udp::socket* _read;
260 : boost::asio::ip::udp::socket* _write;
261 : boost::asio::ip::udp::endpoint _readAddr;
262 : boost::asio::deadline_timer _timeout;
263 : boost::asio::deadline_timer _wakeup;
264 :
265 : lunchbox::Clock _clock;
266 : uint64_t _maxBucketSize;
267 : size_t _bucketSize;
268 : int64_t _sendRate;
269 :
270 : Thread* _thread;
271 : lunchbox::Lock _mutexConnection;
272 : lunchbox::Lock _mutexEvent;
273 : uint16_t _acked; // sequence ID of last confirmed ack
274 :
275 : typedef lunchbox::Bufferb Buffer;
276 : typedef std::vector< Buffer* > Buffers;
277 : typedef Buffers::iterator BuffersIter;
278 : typedef Buffers::const_iterator BuffersCIter;
279 :
280 : Buffers _buffers; //!< Data buffers
281 : /** Empty read buffers (connected) or write buffers (listening) */
282 : lunchbox::LFQueue< Buffer* > _threadBuffers;
283 : /** Ready data buffers (connected) or empty write buffers (listening) */
284 : lunchbox::MTQueue< Buffer* > _appBuffers;
285 :
286 : Buffer _recvBuffer; //!< Receive (thread) buffer
287 : std::deque< Buffer* > _recvBuffers; //!< out-of-order buffers
288 :
289 : Buffer* _readBuffer; //!< Read (app) buffer
290 : uint64_t _readBufferPos; //!< Current read index
291 :
292 : uint16_t _sequence; //!< the next usable (write) or expected (read)
293 : std::deque< Buffer* > _writeBuffers; //!< Written buffers, not acked
294 :
295 : typedef std::deque< Nack > RepeatQueue;
296 : RepeatQueue _repeatQueue; //!< nacks to repeat
297 :
298 : const unsigned _writeTimeOut;
299 :
300 : void _close();
301 : uint16_t _buildNewID();
302 :
303 : void _processOutgoing();
304 : void _writeData();
305 : void _repeatData();
306 : void _finishWriteQueue( const uint16_t sequence );
307 :
308 : bool _handleData( const size_t bytes );
309 : bool _handleAck( const size_t bytes );
310 : bool _handleNack();
311 : bool _handleAckRequest( const size_t bytes );
312 :
313 : Buffer* _newDataBuffer( Buffer& inBuffer );
314 : void _pushDataBuffer( Buffer* buffer );
315 :
316 : /* Run the reader thread */
317 : void _runThread();
318 :
319 : /* init the reader thread */
320 : bool _initThread();
321 : /* Make all buffers available for reading */
322 : void initBuffers();
323 : /* handle data about the comunication state */
324 : void _handlePacket( const boost::system::error_code& error,
325 : const size_t bytes );
326 : void _handleConnectedData( const size_t bytes );
327 : void _handleInitData( const size_t bytes, const bool connected );
328 : void _handleAcceptIDData( const size_t bytes );
329 :
330 : /* handle timeout about the comunication state */
331 : void _handleTimeout( const boost::system::error_code& error );
332 : void _handleConnectedTimeout();
333 : void _handleInitTimeout();
334 : void _handleAcceptIDTimeout();
335 :
336 : void _clearWriteQueues();
337 :
338 : DatagramNode* _getDatagramNode( const size_t bytes );
339 :
340 : /** find the connection corresponding to the identifier */
341 : RSPConnectionPtr _findConnection( const uint16_t id );
342 :
343 : /** Sleep until allowed to send according to send rate */
344 : void _waitWritable( const uint64_t bytes );
345 :
346 : /** format and send a datagram count node */
347 : void _sendCountNode();
348 :
349 : void _addRepeat( const Nack* nacks, const uint16_t num );
350 :
351 : /** format and send an simple request which use only type and id field*/
352 : void _sendSimpleDatagram( const DatagramType type, const uint16_t id );
353 :
354 : /** format and send an ack request for the current sequence */
355 : void _sendAckRequest();
356 :
357 : /** format and send a positive ack */
358 : void _sendAck( const uint16_t writerID, const uint16_t sequence );
359 :
360 : /** format and send a negative ack */
361 : void _sendNack( const uint16_t toWriterID, const Nack* nacks,
362 : const uint16_t num );
363 :
364 : void _checkNewID( const uint16_t id );
365 :
366 : /* add a new connection detected in the multicast network */
367 : bool _addConnection( const uint16_t id, const uint16_t sequence);
368 : void _removeConnection( const uint16_t id );
369 :
370 : void _setTimeout( const int32_t timeOut );
371 : void _postWakeup();
372 : void _asyncReceiveFrom();
373 10 : bool _isWriting() const
374 10 : { return !_threadBuffers.isEmpty() || !_writeBuffers.empty(); }
375 : };
376 :
377 : // cppcheck-suppress passedByValue
378 : std::ostream& operator << ( std::ostream&, const RSPConnection& );
379 : }
380 :
381 : #endif //CO_RSPCONNECTION_H
|