Line data Source code
1 :
2 : /* Copyright (c) 2009-2017, Cedric Stalder <cedric.stalder@gmail.com>
3 : * Stefan Eilemann <eile@equalizergraphics.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_RSPCONNECTION_H
23 : #define CO_RSPCONNECTION_H
24 :
25 : #include <co/connection.h> // base class
26 : #include <co/eventConnection.h> // member
27 :
28 : #include <lunchbox/buffer.h> // member
29 : #include <lunchbox/clock.h> // member
30 : #include <lunchbox/lfQueue.h> // member
31 : #include <lunchbox/mtQueue.h> // member
32 :
33 : #pragma warning(push)
34 : #pragma warning(disable : 4267)
35 : #include <boost/asio.hpp>
36 : #pragma warning(pop)
37 :
38 : namespace co
39 : {
40 : class RSPConnection;
41 : typedef lunchbox::RefPtr<RSPConnection> RSPConnectionPtr;
42 :
43 : /**
44 : * A reliable multicast connection.
45 : *
46 : * This connection implements a reliable stream protocol (RSP) over IP V4
47 : * UDP multicast. The <a
48 : * href="http://www.equalizergraphics.com/documents/design/multicast.html#RSP">RSP
49 : * design document</a> describes the high-level protocol.
50 : */
51 : class RSPConnection : public Connection
52 : {
53 : public:
54 : /** Create a new RSP-based connection. */
55 : RSPConnection();
56 :
57 : bool listen() override;
58 : void close() override;
59 :
60 : /** Identical to listen() for multicast connections. */
61 0 : bool connect() override { return listen(); }
62 0 : void acceptNB() override { LBASSERT(isListening()); }
63 : ConnectionPtr acceptSync() override;
64 0 : void readNB(void*, const uint64_t) override { /* NOP */}
65 : int64_t readSync(void* buffer, const uint64_t bytes,
66 : const bool ignored) override;
67 : int64_t write(const void* buffer, const uint64_t bytes) override;
68 :
69 : /** @internal Finish all pending send operations. */
70 : void finish() override;
71 :
72 : /** @internal @return current send speed in kilobyte per second. */
73 0 : int64_t getSendRate() const { return _sendRate; }
74 : /**
75 : * @internal
76 : * @return the unique identifier of this connection within the multicast
77 : * group.
78 : */
79 0 : uint16_t getID() const { return _id; }
80 0 : Notifier getNotifier() const override { return _event->getNotifier(); }
81 : protected:
82 : virtual ~RSPConnection();
83 :
84 : private:
85 : /** Thread managing network IO and RSP protocol. */
86 : class Thread : public lunchbox::Thread
87 : {
88 : public:
89 0 : explicit Thread(RSPConnectionPtr connection)
90 0 : : _connection(connection)
91 : {
92 0 : }
93 0 : virtual ~Thread() { _connection = 0; }
94 : protected:
95 : void run() override;
96 0 : bool init() override { return _connection->_initThread(); }
97 : private:
98 : RSPConnectionPtr _connection;
99 : };
100 :
101 : /** The type of each UDP packet */
102 : enum DatagramType
103 : {
104 : DATA, //!< the datagram contains data
105 : ACKREQ, //!< ask for ack from all readers
106 : NACK, //!< negative ack, request missing packets
107 : ACK, //!< positive ack all data
108 : ID_HELLO, //!< announce a new id
109 : ID_HELLO_REPLY, //!< reply to hello, transmit cur packet number
110 : ID_DENY, //!< deny the id, already used
111 : ID_CONFIRM, //!< a new node is connected
112 : ID_EXIT, //!< a node is disconnected
113 : COUNTNODE //!< send to other the number of nodes which I have found
114 : // NOTE: Do not use more than 255 types here, since the endianness
115 : // detection magic relies on only using the LSB.
116 : };
117 :
118 : /** ID_HELLO, ID_DENY, ID_CONFIRM, ID_EXIT or COUNTNODE packet */
119 : struct DatagramNode
120 : {
121 : uint16_t type;
122 : uint16_t protocolVersion;
123 : uint16_t connectionID; // clientID for type COUNTNODE
124 : uint16_t data;
125 : };
126 :
127 : /** Request receive confirmation of all packets up to sequence. */
128 : struct DatagramAckRequest
129 : {
130 : uint16_t type;
131 : uint16_t writerID;
132 : uint16_t sequence;
133 : };
134 :
135 : /** Missing packets from start..end sequence */
136 : struct Nack
137 : {
138 : uint16_t start;
139 : uint16_t end;
140 : };
141 :
142 : #define CO_RSP_MAX_NACKS 300 // fits in a single IP frame
143 : /** Request resend of lost packets */
144 : struct DatagramNack
145 : {
146 0 : void set(uint16_t rID, uint16_t wID, uint16_t n)
147 : {
148 0 : type = NACK;
149 0 : readerID = rID;
150 0 : writerID = wID;
151 0 : count = n;
152 0 : }
153 :
154 : uint16_t type;
155 : uint16_t readerID;
156 : uint16_t writerID;
157 : uint16_t count; //!< number of NACK requests used
158 : Nack nacks[CO_RSP_MAX_NACKS];
159 : };
160 :
161 : /** Acknowledge reception of all packets including sequence .*/
162 : struct DatagramAck
163 : {
164 : uint16_t type;
165 : uint16_t readerID;
166 : uint16_t writerID;
167 : uint16_t sequence;
168 : };
169 :
170 : /** Data packet */
171 : struct DatagramData
172 : {
173 : uint16_t type;
174 : uint16_t size;
175 : uint16_t writerID;
176 : uint16_t sequence;
177 : };
178 :
179 : typedef std::vector<RSPConnectionPtr> RSPConnections;
180 : typedef RSPConnections::iterator RSPConnectionsIter;
181 : typedef RSPConnections::const_iterator RSPConnectionsCIter;
182 :
183 : RSPConnectionPtr _parent;
184 : RSPConnections _children;
185 :
186 : // a link for all connection in the connecting state
187 : RSPConnections _newChildren;
188 :
189 : uint16_t _id; //!< The identifier used to demultiplex multipe writers
190 : bool _idAccepted;
191 : int32_t _mtu;
192 : int32_t _ackFreq;
193 : uint32_t _payloadSize;
194 : int32_t _timeouts;
195 :
196 : typedef lunchbox::RefPtr<EventConnection> EventConnectionPtr;
197 : EventConnectionPtr _event;
198 :
199 : boost::asio::io_service _ioService;
200 : boost::asio::ip::udp::socket* _read;
201 : boost::asio::ip::udp::socket* _write;
202 : boost::asio::ip::udp::endpoint _readAddr;
203 : boost::asio::deadline_timer _timeout;
204 : boost::asio::deadline_timer _wakeup;
205 :
206 : lunchbox::Clock _clock;
207 : uint64_t _maxBucketSize;
208 : size_t _bucketSize;
209 : int64_t _sendRate;
210 :
211 : Thread* _thread;
212 : std::mutex _mutexConnection;
213 : std::mutex _mutexEvent;
214 : uint16_t _acked; // sequence ID of last confirmed ack
215 :
216 : typedef lunchbox::Bufferb Buffer;
217 : typedef std::vector<Buffer*> Buffers;
218 : typedef Buffers::iterator BuffersIter;
219 : typedef Buffers::const_iterator BuffersCIter;
220 :
221 : Buffers _buffers; //!< Data buffers
222 : /** Empty read buffers (connected) or write buffers (listening) */
223 : lunchbox::LFQueue<Buffer*> _threadBuffers;
224 : /** Ready data buffers (connected) or empty write buffers (listening) */
225 : lunchbox::MTQueue<Buffer*> _appBuffers;
226 :
227 : Buffer _recvBuffer; //!< Receive (thread) buffer
228 : std::deque<Buffer*> _recvBuffers; //!< out-of-order buffers
229 :
230 : Buffer* _readBuffer; //!< Read (app) buffer
231 : uint64_t _readBufferPos; //!< Current read index
232 :
233 : uint16_t _sequence; //!< the next usable (write) or expected (read)
234 : std::deque<Buffer*> _writeBuffers; //!< Written buffers, not acked
235 :
236 : typedef std::deque<Nack> RepeatQueue;
237 : RepeatQueue _repeatQueue; //!< nacks to repeat
238 :
239 : const unsigned _writeTimeOut;
240 :
241 : uint16_t _buildNewID();
242 :
243 : void _processOutgoing();
244 : void _writeData();
245 : void _repeatData();
246 : void _finishWriteQueue(const uint16_t sequence);
247 :
248 : bool _handleData(const size_t bytes);
249 : bool _handleAck(const size_t bytes);
250 : bool _handleNack();
251 : bool _handleAckRequest(const size_t bytes);
252 :
253 : Buffer* _newDataBuffer(Buffer& inBuffer);
254 : void _pushDataBuffer(Buffer* buffer);
255 :
256 : /* Run the reader thread */
257 : void _runThread();
258 :
259 : /* init the reader thread */
260 : bool _initThread();
261 : /* Make all buffers available for reading */
262 : void initBuffers();
263 : /* handle data about the comunication state */
264 : void _handlePacket(const boost::system::error_code& error,
265 : const size_t bytes);
266 : void _handleConnectedData(const size_t bytes);
267 : void _handleInitData(const size_t bytes, const bool connected);
268 : void _handleAcceptIDData(const size_t bytes);
269 :
270 : /* handle timeout about the comunication state */
271 : void _handleTimeout(const boost::system::error_code& error);
272 : void _handleConnectedTimeout();
273 : void _handleInitTimeout();
274 : void _handleAcceptIDTimeout();
275 :
276 : void _clearWriteQueues();
277 :
278 : DatagramNode* _getDatagramNode(const size_t bytes);
279 :
280 : /** find the connection corresponding to the identifier */
281 : RSPConnectionPtr _findConnection(const uint16_t id);
282 :
283 : /** Sleep until allowed to send according to send rate */
284 : void _waitWritable(const uint64_t bytes);
285 :
286 : /** format and send a datagram count node */
287 : void _sendCountNode();
288 :
289 : void _addRepeat(const Nack* nacks, const uint16_t num);
290 :
291 : /** format and send an simple request which use only type and id field*/
292 : void _sendSimpleDatagram(const DatagramType type, const uint16_t id);
293 :
294 : /** format and send an ack request for the current sequence */
295 : void _sendAckRequest();
296 :
297 : /** format and send a positive ack */
298 : void _sendAck(const uint16_t writerID, const uint16_t sequence);
299 :
300 : /** format and send a negative ack */
301 : void _sendNack(const uint16_t toWriterID, const Nack* nacks,
302 : const uint16_t num);
303 :
304 : void _checkNewID(const uint16_t id);
305 :
306 : /* add a new connection detected in the multicast network */
307 : bool _addConnection(const uint16_t id, const uint16_t sequence);
308 : void _removeConnection(const uint16_t id);
309 :
310 : void _setTimeout(const int32_t timeOut);
311 : void _postWakeup();
312 : void _asyncReceiveFrom();
313 0 : bool _isWriting() const
314 : {
315 0 : return !_threadBuffers.isEmpty() || !_writeBuffers.empty();
316 : }
317 :
318 : friend std::ostream& operator<<(std::ostream&, const RSPConnection&);
319 : };
320 :
321 : std::ostream& operator<<(std::ostream&, const RSPConnection&);
322 : }
323 :
324 : #endif // CO_RSPCONNECTION_H
|