Line data Source code
1 :
2 : /* Copyright (c) 2009, Cedric Stalder <cedric.stalder@gmail.com>
3 : * 2009-2013, Stefan Eilemann <eile@equalizergraphics.com>
4 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_RSPCONNECTION_H
23 : #define CO_RSPCONNECTION_H
24 :
25 : #include <co/connection.h> // base class
26 : #include <co/eventConnection.h> // member
27 :
28 : #include <lunchbox/buffer.h> // member
29 : #include <lunchbox/clock.h> // member
30 : #include <lunchbox/lfQueue.h> // member
31 : #include <lunchbox/mtQueue.h> // member
32 :
33 : #pragma warning(push)
34 : #pragma warning(disable: 4267)
35 : #include <boost/asio.hpp>
36 : #pragma warning(pop)
37 :
38 : namespace co
39 : {
40 : class RSPConnection;
41 : typedef lunchbox::RefPtr< RSPConnection > RSPConnectionPtr;
42 :
43 : /**
44 : * A reliable multicast connection.
45 : *
46 : * This connection implements a reliable stream protocol (RSP) over IP V4
47 : * UDP multicast. The <a href="http://www.equalizergraphics.com/documents/design/multicast.html#RSP">RSP
48 : * design document</a> describes the high-level protocol.
49 : */
50 : class RSPConnection : public Connection
51 : {
52 : public:
53 : /** Create a new RSP-based connection. */
54 : RSPConnection();
55 :
56 : bool listen() override;
57 0 : void close() override { _close(); }
58 :
59 : /** Identical to listen() for multicast connections. */
60 0 : bool connect() override { return listen(); }
61 :
62 0 : void acceptNB() override { LBASSERT( isListening( )); }
63 :
64 : ConnectionPtr acceptSync() override;
65 0 : void readNB( void*, const uint64_t ) override {/* NOP */}
66 : int64_t readSync( void* buffer, const uint64_t bytes,
67 : const bool ignored ) override;
68 : int64_t write( const void* buffer,
69 : const uint64_t bytes ) override;
70 :
71 : /** @internal Finish all pending send operations. */
72 : void finish() override;
73 :
74 : /** @internal @return current send speed in kilobyte per second. */
75 0 : int64_t getSendRate() const { return _sendRate; }
76 :
77 : /**
78 : * @internal
79 : * @return the unique identifier of this connection within the multicast
80 : * group.
81 : */
82 0 : uint16_t getID() const { return _id; }
83 :
84 0 : Notifier getNotifier() const override
85 0 : { return _event->getNotifier(); }
86 :
87 : protected:
88 : virtual ~RSPConnection();
89 :
90 : private:
91 : /** Thread managing network IO and RSP protocol. */
92 : class Thread : public lunchbox::Thread
93 : {
94 : public:
95 0 : explicit Thread( RSPConnectionPtr connection )
96 0 : : _connection( connection ) {}
97 0 : virtual ~Thread(){ _connection = 0; }
98 :
99 : protected:
100 : void run() override;
101 0 : bool init() override { return _connection->_initThread(); }
102 :
103 : private:
104 : RSPConnectionPtr _connection;
105 : };
106 :
107 : /** The type of each UDP packet */
108 : enum DatagramType
109 : {
110 : DATA, //!< the datagram contains data
111 : ACKREQ, //!< ask for ack from all readers
112 : NACK, //!< negative ack, request missing packets
113 : ACK, //!< positive ack all data
114 : ID_HELLO, //!< announce a new id
115 : ID_HELLO_REPLY, //!< reply to hello, transmit cur packet number
116 : ID_DENY, //!< deny the id, already used
117 : ID_CONFIRM,//!< a new node is connected
118 : ID_EXIT, //!< a node is disconnected
119 : COUNTNODE //!< send to other the number of nodes which I have found
120 : // NOTE: Do not use more than 255 types here, since the endianness
121 : // detection magic relies on only using the LSB.
122 : };
123 :
124 : /** ID_HELLO, ID_DENY, ID_CONFIRM, ID_EXIT or COUNTNODE packet */
125 : struct DatagramNode
126 : {
127 : uint16_t type;
128 : uint16_t protocolVersion;
129 : uint16_t connectionID; // clientID for type COUNTNODE
130 : uint16_t data;
131 :
132 0 : void byteswap()
133 : {
134 : #ifdef COLLAGE_BIGENDIAN
135 : lunchbox::byteswap( type );
136 : lunchbox::byteswap( protocolVersion );
137 : lunchbox::byteswap( connectionID );
138 : lunchbox::byteswap( data );
139 : #endif
140 0 : }
141 : };
142 :
143 : /** Request receive confirmation of all packets up to sequence. */
144 : struct DatagramAckRequest
145 : {
146 : uint16_t type;
147 : uint16_t writerID;
148 : uint16_t sequence;
149 :
150 0 : void byteswap()
151 : {
152 : #ifdef COLLAGE_BIGENDIAN
153 : lunchbox::byteswap( type );
154 : lunchbox::byteswap( writerID );
155 : lunchbox::byteswap( sequence );
156 : #endif
157 0 : }
158 : };
159 :
160 : /** Missing packets from start..end sequence */
161 : struct Nack
162 : {
163 : uint16_t start;
164 : uint16_t end;
165 : };
166 :
167 : # define CO_RSP_MAX_NACKS 300 // fits in a single IP frame
168 : /** Request resend of lost packets */
169 : struct DatagramNack
170 : {
171 0 : void set( uint16_t rID, uint16_t wID, uint16_t n )
172 : {
173 0 : type = NACK;
174 0 : readerID = rID;
175 0 : writerID = wID;
176 0 : count = n;
177 0 : }
178 :
179 : uint16_t type;
180 : uint16_t readerID;
181 : uint16_t writerID;
182 : uint16_t count; //!< number of NACK requests used
183 : Nack nacks[ CO_RSP_MAX_NACKS ];
184 :
185 0 : void byteswap()
186 : {
187 : #ifdef COLLAGE_BIGENDIAN
188 : lunchbox::byteswap( type );
189 : lunchbox::byteswap( readerID );
190 : lunchbox::byteswap( writerID );
191 : lunchbox::byteswap( count );
192 : for( uint16_t i = 0; i < count; ++i )
193 : {
194 : lunchbox::byteswap( nacks[i].start );
195 : lunchbox::byteswap( nacks[i].end );
196 : }
197 : #endif
198 0 : }
199 : };
200 :
201 : /** Acknowledge reception of all packets including sequence .*/
202 : struct DatagramAck
203 : {
204 : uint16_t type;
205 : uint16_t readerID;
206 : uint16_t writerID;
207 : uint16_t sequence;
208 :
209 0 : void byteswap()
210 : {
211 : #ifdef COLLAGE_BIGENDIAN
212 : lunchbox::byteswap( type );
213 : lunchbox::byteswap( readerID );
214 : lunchbox::byteswap( writerID );
215 : lunchbox::byteswap( sequence );
216 : #endif
217 0 : }
218 : };
219 :
220 : /** Data packet */
221 : struct DatagramData
222 : {
223 : uint16_t type;
224 : uint16_t size;
225 : uint16_t writerID;
226 : uint16_t sequence;
227 :
228 0 : void byteswap()
229 : {
230 : #ifdef COLLAGE_BIGENDIAN
231 : lunchbox::byteswap( type );
232 : lunchbox::byteswap( size );
233 : lunchbox::byteswap( writerID );
234 : lunchbox::byteswap( sequence );
235 : #endif
236 0 : }
237 : };
238 :
239 : typedef std::vector< RSPConnectionPtr > RSPConnections;
240 : typedef RSPConnections::iterator RSPConnectionsIter;
241 : typedef RSPConnections::const_iterator RSPConnectionsCIter;
242 :
243 : RSPConnectionPtr _parent;
244 : RSPConnections _children;
245 :
246 : // a link for all connection in the connecting state
247 : RSPConnections _newChildren;
248 :
249 : uint16_t _id; //!< The identifier used to demultiplex multipe writers
250 : bool _idAccepted;
251 : int32_t _mtu;
252 : int32_t _ackFreq;
253 : uint32_t _payloadSize;
254 : int32_t _timeouts;
255 :
256 : typedef lunchbox::RefPtr< EventConnection > EventConnectionPtr;
257 : EventConnectionPtr _event;
258 :
259 : boost::asio::io_service _ioService;
260 : boost::asio::ip::udp::socket* _read;
261 : boost::asio::ip::udp::socket* _write;
262 : boost::asio::ip::udp::endpoint _readAddr;
263 : boost::asio::deadline_timer _timeout;
264 : boost::asio::deadline_timer _wakeup;
265 :
266 : lunchbox::Clock _clock;
267 : uint64_t _maxBucketSize;
268 : size_t _bucketSize;
269 : int64_t _sendRate;
270 :
271 : Thread* _thread;
272 : lunchbox::Lock _mutexConnection;
273 : lunchbox::Lock _mutexEvent;
274 : uint16_t _acked; // sequence ID of last confirmed ack
275 :
276 : typedef lunchbox::Bufferb Buffer;
277 : typedef std::vector< Buffer* > Buffers;
278 : typedef Buffers::iterator BuffersIter;
279 : typedef Buffers::const_iterator BuffersCIter;
280 :
281 : Buffers _buffers; //!< Data buffers
282 : /** Empty read buffers (connected) or write buffers (listening) */
283 : lunchbox::LFQueue< Buffer* > _threadBuffers;
284 : /** Ready data buffers (connected) or empty write buffers (listening) */
285 : lunchbox::MTQueue< Buffer* > _appBuffers;
286 :
287 : Buffer _recvBuffer; //!< Receive (thread) buffer
288 : std::deque< Buffer* > _recvBuffers; //!< out-of-order buffers
289 :
290 : Buffer* _readBuffer; //!< Read (app) buffer
291 : uint64_t _readBufferPos; //!< Current read index
292 :
293 : uint16_t _sequence; //!< the next usable (write) or expected (read)
294 : std::deque< Buffer* > _writeBuffers; //!< Written buffers, not acked
295 :
296 : typedef std::deque< Nack > RepeatQueue;
297 : RepeatQueue _repeatQueue; //!< nacks to repeat
298 :
299 : const unsigned _writeTimeOut;
300 :
301 : void _close();
302 : uint16_t _buildNewID();
303 :
304 : void _processOutgoing();
305 : void _writeData();
306 : void _repeatData();
307 : void _finishWriteQueue( const uint16_t sequence );
308 :
309 : bool _handleData( const size_t bytes );
310 : bool _handleAck( const size_t bytes );
311 : bool _handleNack();
312 : bool _handleAckRequest( const size_t bytes );
313 :
314 : Buffer* _newDataBuffer( Buffer& inBuffer );
315 : void _pushDataBuffer( Buffer* buffer );
316 :
317 : /* Run the reader thread */
318 : void _runThread();
319 :
320 : /* init the reader thread */
321 : bool _initThread();
322 : /* Make all buffers available for reading */
323 : void initBuffers();
324 : /* handle data about the comunication state */
325 : void _handlePacket( const boost::system::error_code& error,
326 : const size_t bytes );
327 : void _handleConnectedData( const size_t bytes );
328 : void _handleInitData( const size_t bytes, const bool connected );
329 : void _handleAcceptIDData( const size_t bytes );
330 :
331 : /* handle timeout about the comunication state */
332 : void _handleTimeout( const boost::system::error_code& error );
333 : void _handleConnectedTimeout();
334 : void _handleInitTimeout();
335 : void _handleAcceptIDTimeout();
336 :
337 : void _clearWriteQueues();
338 :
339 : DatagramNode* _getDatagramNode( const size_t bytes );
340 :
341 : /** find the connection corresponding to the identifier */
342 : RSPConnectionPtr _findConnection( const uint16_t id );
343 :
344 : /** Sleep until allowed to send according to send rate */
345 : void _waitWritable( const uint64_t bytes );
346 :
347 : /** format and send a datagram count node */
348 : void _sendCountNode();
349 :
350 : void _addRepeat( const Nack* nacks, const uint16_t num );
351 :
352 : /** format and send an simple request which use only type and id field*/
353 : void _sendSimpleDatagram( const DatagramType type, const uint16_t id );
354 :
355 : /** format and send an ack request for the current sequence */
356 : void _sendAckRequest();
357 :
358 : /** format and send a positive ack */
359 : void _sendAck( const uint16_t writerID, const uint16_t sequence );
360 :
361 : /** format and send a negative ack */
362 : void _sendNack( const uint16_t toWriterID, const Nack* nacks,
363 : const uint16_t num );
364 :
365 : void _checkNewID( const uint16_t id );
366 :
367 : /* add a new connection detected in the multicast network */
368 : bool _addConnection( const uint16_t id, const uint16_t sequence);
369 : void _removeConnection( const uint16_t id );
370 :
371 : void _setTimeout( const int32_t timeOut );
372 : void _postWakeup();
373 : void _asyncReceiveFrom();
374 0 : bool _isWriting() const
375 0 : { return !_threadBuffers.isEmpty() || !_writeBuffers.empty(); }
376 : };
377 :
378 : std::ostream& operator << ( std::ostream&, const RSPConnection& );
379 : }
380 :
381 : #endif //CO_RSPCONNECTION_H
|