Line data Source code
1 :
2 : /* Copyright (c) 2009-2017, Cedric Stalder <cedric.stalder@gmail.com>
3 : * Stefan Eilemann <eile@equalizergraphics.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "rspConnection.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "log.h"
28 :
29 : #include <lunchbox/rng.h>
30 : #include <lunchbox/scopedMutex.h>
31 : #include <lunchbox/sleep.h>
32 :
33 : #include <boost/bind.hpp>
34 :
35 : //#define CO_INSTRUMENT_RSP
36 : #define CO_RSP_MERGE_WRITES
37 : #define CO_RSP_MAX_TIMEOUTS 1000
38 : #ifdef _WIN32
39 : #define CO_RSP_DEFAULT_PORT (4242)
40 : #else
41 : #define CO_RSP_DEFAULT_PORT ((getuid() % 64511) + 1024)
42 : #endif
43 :
44 : // Note: Do not use version > 255, endianness detection magic relies on this.
45 : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
46 :
47 : namespace ip = boost::asio::ip;
48 :
49 : namespace co
50 : {
51 : namespace
52 : {
53 : #ifdef CO_INSTRUMENT_RSP
54 : lunchbox::a_int32_t nReadData;
55 : lunchbox::a_int32_t nBytesRead;
56 : lunchbox::a_int32_t nBytesWritten;
57 : lunchbox::a_int32_t nDatagrams;
58 : lunchbox::a_int32_t nRepeated;
59 : lunchbox::a_int32_t nMergedDatagrams;
60 : lunchbox::a_int32_t nAckRequests;
61 : lunchbox::a_int32_t nAcksSend;
62 : lunchbox::a_int32_t nAcksSendTotal;
63 : lunchbox::a_int32_t nAcksRead;
64 : lunchbox::a_int32_t nAcksAccepted;
65 : lunchbox::a_int32_t nNAcksSend;
66 : lunchbox::a_int32_t nNAcksRead;
67 : lunchbox::a_int32_t nNAcksResend;
68 :
69 : float writeWaitTime = 0.f;
70 : lunchbox::Clock instrumentClock;
71 : #endif
72 :
73 : static uint16_t _numBuffers = 0;
74 : }
75 :
76 0 : RSPConnection::RSPConnection()
77 : : _id(0)
78 : , _idAccepted(false)
79 0 : , _mtu(Global::getIAttribute(Global::IATTR_UDP_MTU))
80 0 : , _ackFreq(Global::getIAttribute(Global::IATTR_RSP_ACK_FREQUENCY))
81 0 : , _payloadSize(_mtu - sizeof(DatagramData))
82 : , _timeouts(0)
83 0 : , _event(new EventConnection)
84 : , _read(0)
85 : , _write(0)
86 : , _timeout(_ioService)
87 : , _wakeup(_ioService)
88 0 : , _maxBucketSize((_mtu * _ackFreq) >> 1)
89 : , _bucketSize(0)
90 : , _sendRate(0)
91 : , _thread(0)
92 0 : , _acked(std::numeric_limits<uint16_t>::max())
93 : , _threadBuffers(Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS))
94 0 : , _recvBuffer(_mtu)
95 : , _readBuffer(0)
96 : , _readBufferPos(0)
97 : , _sequence(0)
98 : // ensure we have a handleConnectedTimeout before the write pop
99 0 : , _writeTimeOut(Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2)
100 : {
101 0 : _buildNewID();
102 0 : ConnectionDescriptionPtr description = _getDescription();
103 0 : description->type = CONNECTIONTYPE_RSP;
104 0 : description->bandwidth = 1024 * 1024; // KB/s
105 :
106 0 : LBCHECK(_event->connect());
107 :
108 0 : _buffers.reserve(Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS));
109 0 : while (static_cast<int32_t>(_buffers.size()) <
110 0 : Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS))
111 : {
112 0 : _buffers.push_back(new Buffer(_mtu));
113 : }
114 :
115 0 : LBASSERT(sizeof(DatagramNack) <= size_t(_mtu));
116 0 : LBLOG(LOG_RSP) << "New RSP connection, " << _buffers.size()
117 0 : << " buffers of " << _mtu << " bytes" << std::endl;
118 0 : }
119 :
120 0 : RSPConnection::~RSPConnection()
121 : {
122 0 : close();
123 0 : while (!_buffers.empty())
124 : {
125 0 : delete _buffers.back();
126 0 : _buffers.pop_back();
127 : }
128 : #ifdef CO_INSTRUMENT_RSP
129 : LBWARN << *this << std::endl;
130 : instrumentClock.reset();
131 : #endif
132 0 : }
133 :
134 0 : void RSPConnection::close()
135 : {
136 0 : if (_parent.isValid() && _parent->_id == _id)
137 0 : _parent->close();
138 :
139 0 : while (!_parent && _isWriting())
140 0 : lunchbox::sleep(10 /*ms*/);
141 :
142 0 : if (isClosed())
143 0 : return;
144 : {
145 0 : lunchbox::ScopedWrite mutex(_mutexEvent);
146 0 : if (_thread)
147 : {
148 0 : LBASSERT(!_thread->isCurrent());
149 0 : _sendSimpleDatagram(ID_EXIT, _id);
150 0 : _ioService.stop();
151 0 : _thread->join();
152 0 : delete _thread;
153 : }
154 :
155 0 : _setState(STATE_CLOSING);
156 0 : if (_thread)
157 : {
158 0 : _thread = 0;
159 :
160 : // notify children to close
161 0 : for (RSPConnectionsCIter i = _children.begin();
162 0 : i != _children.end(); ++i)
163 : {
164 0 : RSPConnectionPtr child = *i;
165 0 : lunchbox::ScopedWrite mutexChild(child->_mutexEvent);
166 0 : child->_appBuffers.push(0);
167 0 : child->_event->set();
168 : }
169 :
170 0 : _children.clear();
171 0 : _newChildren.clear();
172 : }
173 :
174 0 : _parent = 0;
175 :
176 0 : if (_read)
177 0 : _read->close();
178 0 : delete _read;
179 0 : _read = 0;
180 :
181 0 : if (_write)
182 0 : _write->close();
183 0 : delete _write;
184 0 : _write = 0;
185 :
186 0 : _threadBuffers.clear();
187 0 : _appBuffers.push(0); // unlock any other read/write threads
188 :
189 0 : _setState(STATE_CLOSED);
190 : }
191 0 : _event->close();
192 : }
193 :
194 : //----------------------------------------------------------------------
195 : // Async IO handles
196 : //----------------------------------------------------------------------
197 0 : uint16_t RSPConnection::_buildNewID()
198 : {
199 0 : lunchbox::RNG rng;
200 0 : _id = rng.get<uint16_t>();
201 0 : return _id;
202 : }
203 :
204 0 : bool RSPConnection::listen()
205 : {
206 0 : ConnectionDescriptionPtr description = _getDescription();
207 0 : LBASSERT(description->type == CONNECTIONTYPE_RSP);
208 :
209 0 : if (!isClosed())
210 0 : return false;
211 :
212 0 : _setState(STATE_CONNECTING);
213 0 : _numBuffers = Global::getIAttribute(Global::IATTR_RSP_NUM_BUFFERS);
214 :
215 : // init udp connection
216 0 : if (description->port == ConnectionDescription::RANDOM_MULTICAST_PORT)
217 0 : description->port = 0; // Let OS choose
218 0 : else if (description->port == 0)
219 0 : description->port = CO_RSP_DEFAULT_PORT;
220 0 : if (description->getHostname().empty())
221 0 : description->setHostname("239.255.42.43");
222 0 : if (description->getInterface().empty())
223 0 : description->setInterface("0.0.0.0");
224 :
225 : try
226 : {
227 0 : const ip::address readAddress(ip::address::from_string("0.0.0.0"));
228 0 : const ip::udp::endpoint readEndpoint(readAddress, description->port);
229 0 : const std::string& port = std::to_string(unsigned(description->port));
230 0 : ip::udp::resolver resolver(_ioService);
231 0 : const ip::udp::resolver::query queryHN(ip::udp::v4(),
232 : description->getHostname(),
233 0 : port);
234 0 : const ip::udp::resolver::iterator hostname = resolver.resolve(queryHN);
235 :
236 0 : if (hostname == ip::udp::resolver::iterator())
237 0 : return false;
238 :
239 0 : const ip::udp::endpoint writeEndpoint = *hostname;
240 0 : const ip::address mcAddr(writeEndpoint.address());
241 :
242 0 : _read = new ip::udp::socket(_ioService);
243 0 : _write = new ip::udp::socket(_ioService);
244 0 : _read->open(readEndpoint.protocol());
245 0 : _write->open(writeEndpoint.protocol());
246 :
247 0 : _read->set_option(ip::udp::socket::reuse_address(true));
248 0 : _write->set_option(ip::udp::socket::reuse_address(true));
249 0 : _read->set_option(ip::udp::socket::receive_buffer_size(
250 0 : Global::getIAttribute(Global::IATTR_UDP_BUFFER_SIZE)));
251 0 : _write->set_option(ip::udp::socket::send_buffer_size(
252 0 : Global::getIAttribute(Global::IATTR_UDP_BUFFER_SIZE)));
253 :
254 0 : _read->bind(readEndpoint);
255 0 : description->port = _read->local_endpoint().port();
256 :
257 0 : const ip::udp::resolver::query queryIF(ip::udp::v4(),
258 : description->getInterface(),
259 0 : "0");
260 : const ip::udp::resolver::iterator interfaceIP =
261 0 : resolver.resolve(queryIF);
262 :
263 0 : if (interfaceIP == ip::udp::resolver::iterator())
264 0 : return false;
265 :
266 0 : const ip::address ifAddr(ip::udp::endpoint(*interfaceIP).address());
267 0 : LBDEBUG << "Joining " << mcAddr << " on " << ifAddr << std::endl;
268 :
269 0 : _read->set_option(
270 0 : ip::multicast::join_group(mcAddr.to_v4(), ifAddr.to_v4()));
271 0 : _write->set_option(ip::multicast::outbound_interface(ifAddr.to_v4()));
272 : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
273 0 : const std::string& ifIP = ifAddr.to_string();
274 0 : ::setsockopt(_write->native(), SOL_SOCKET, SO_BINDTODEVICE,
275 0 : ifIP.c_str(), ifIP.size() + 1);
276 0 : ::setsockopt(_read->native(), SOL_SOCKET, SO_BINDTODEVICE, ifIP.c_str(),
277 0 : ifIP.size() + 1);
278 : #endif
279 :
280 0 : _write->connect(writeEndpoint);
281 :
282 0 : _read->set_option(ip::multicast::enable_loopback(false));
283 0 : _write->set_option(ip::multicast::enable_loopback(false));
284 : }
285 0 : catch (const boost::system::system_error& e)
286 : {
287 0 : LBWARN << "can't setup underlying UDP connection: " << e.what()
288 0 : << std::endl;
289 0 : delete _read;
290 0 : delete _write;
291 0 : _read = 0;
292 0 : _write = 0;
293 0 : return false;
294 : }
295 :
296 : // init communication protocol thread
297 0 : _thread = new Thread(this);
298 0 : _bucketSize = 0;
299 0 : _sendRate = description->bandwidth;
300 :
301 : // waits until RSP protocol establishes connection to the multicast network
302 0 : if (!_thread->start())
303 : {
304 0 : close();
305 0 : return false;
306 : }
307 :
308 : // Make all buffers available for writing
309 0 : LBASSERT(_appBuffers.isEmpty());
310 0 : _appBuffers.push(_buffers);
311 :
312 0 : LBDEBUG << "Listening on " << description->getHostname() << ":"
313 0 : << description->port << " (" << description->toString() << " @"
314 0 : << (void*)this << ")" << std::endl;
315 0 : return true;
316 : }
317 :
318 0 : ConnectionPtr RSPConnection::acceptSync()
319 : {
320 0 : if (!isListening())
321 0 : return 0;
322 :
323 0 : lunchbox::ScopedWrite mutex(_mutexConnection);
324 0 : LBASSERT(!_newChildren.empty());
325 0 : if (_newChildren.empty())
326 0 : return 0;
327 :
328 0 : RSPConnectionPtr newConnection = _newChildren.back();
329 0 : _newChildren.pop_back();
330 :
331 0 : LBDEBUG << _id << " accepted RSP connection " << newConnection->_id
332 0 : << std::endl;
333 :
334 0 : lunchbox::ScopedWrite mutex2(_mutexEvent);
335 0 : if (_newChildren.empty())
336 0 : _event->reset();
337 : else
338 0 : _event->set();
339 :
340 0 : return newConnection;
341 : }
342 :
343 0 : int64_t RSPConnection::readSync(void* buffer, const uint64_t bytes, const bool)
344 : {
345 0 : LBASSERT(bytes > 0);
346 0 : if (!isConnected())
347 0 : return -1;
348 :
349 0 : uint64_t bytesLeft = bytes;
350 0 : uint8_t* ptr = reinterpret_cast<uint8_t*>(buffer);
351 :
352 : // redundant (done by the caller already), but saves some lock ops
353 0 : while (bytesLeft)
354 : {
355 0 : if (!_readBuffer)
356 : {
357 0 : LBASSERT(_readBufferPos == 0);
358 0 : _readBuffer = _appBuffers.pop();
359 0 : if (!_readBuffer)
360 : {
361 0 : close();
362 : return (bytes == bytesLeft)
363 0 : ? -1
364 0 : : static_cast<int64_t>(bytes - bytesLeft);
365 : }
366 : }
367 :
368 : const DatagramData* header =
369 0 : reinterpret_cast<const DatagramData*>(_readBuffer->getData());
370 0 : const uint8_t* payload = reinterpret_cast<const uint8_t*>(header + 1);
371 0 : const size_t dataLeft = header->size - _readBufferPos;
372 0 : const size_t size = LB_MIN(static_cast<size_t>(bytesLeft), dataLeft);
373 :
374 0 : memcpy(ptr, payload + _readBufferPos, size);
375 0 : _readBufferPos += size;
376 0 : ptr += size;
377 0 : bytesLeft -= size;
378 :
379 : // if all data in the buffer has been taken
380 0 : if (_readBufferPos >= header->size)
381 : {
382 0 : LBASSERT(_readBufferPos == header->size);
383 : // LBLOG( LOG_RSP ) << "reset read buffer " << header->sequence
384 : // << std::endl;
385 :
386 0 : LBCHECK(_threadBuffers.push(_readBuffer));
387 0 : _readBuffer = 0;
388 0 : _readBufferPos = 0;
389 : }
390 : else
391 : {
392 0 : LBASSERT(_readBufferPos < header->size);
393 : }
394 : }
395 :
396 0 : if (_readBuffer || !_appBuffers.isEmpty())
397 0 : _event->set();
398 : else
399 : {
400 0 : lunchbox::ScopedWrite mutex(_mutexEvent);
401 0 : if (_appBuffers.isEmpty())
402 0 : _event->reset();
403 : }
404 :
405 : #ifdef CO_INSTRUMENT_RSP
406 : nBytesRead += bytes;
407 : #endif
408 0 : return bytes;
409 : }
410 :
411 0 : void RSPConnection::Thread::run()
412 : {
413 0 : _connection->_runThread();
414 0 : _connection = 0;
415 0 : LBDEBUG << "Left RSP protocol thread" << std::endl;
416 0 : }
417 :
418 0 : void RSPConnection::_handleTimeout(const boost::system::error_code& error)
419 : {
420 0 : if (error == boost::asio::error::operation_aborted)
421 0 : return;
422 :
423 0 : if (isListening())
424 0 : _handleConnectedTimeout();
425 0 : else if (_idAccepted)
426 0 : _handleInitTimeout();
427 : else
428 0 : _handleAcceptIDTimeout();
429 : }
430 :
431 0 : void RSPConnection::_handleAcceptIDTimeout()
432 : {
433 0 : ++_timeouts;
434 0 : if (_timeouts < 20)
435 : {
436 0 : LBLOG(LOG_RSP) << "Announce " << _id << " " << _timeouts << std::endl;
437 0 : _sendSimpleDatagram(ID_HELLO, _id);
438 : }
439 : else
440 : {
441 0 : LBLOG(LOG_RSP) << "Confirm " << _id << std::endl;
442 0 : _sendSimpleDatagram(ID_CONFIRM, _id);
443 0 : _addConnection(_id, _sequence);
444 0 : _idAccepted = true;
445 0 : _timeouts = 0;
446 : // send a first datagram to announce me and discover all other
447 : // connections
448 0 : _sendCountNode();
449 : }
450 0 : _setTimeout(10);
451 0 : }
452 :
453 0 : void RSPConnection::_handleInitTimeout()
454 : {
455 0 : LBASSERT(!isListening())
456 0 : ++_timeouts;
457 0 : if (_timeouts < 20)
458 0 : _sendCountNode();
459 : else
460 : {
461 0 : _setState(STATE_LISTENING);
462 0 : LBDEBUG << "RSP connection " << _id << " listening" << std::endl;
463 0 : _timeouts = 0;
464 0 : _ioService.stop(); // thread initialized, run restarts
465 : }
466 0 : _setTimeout(10);
467 0 : }
468 :
469 0 : void RSPConnection::_clearWriteQueues()
470 : {
471 0 : while (!_threadBuffers.isEmpty())
472 : {
473 0 : Buffer* buffer = 0;
474 0 : _threadBuffers.pop(buffer);
475 0 : _writeBuffers.push_back(buffer);
476 : }
477 :
478 0 : _finishWriteQueue(_sequence - 1);
479 0 : LBASSERT(_threadBuffers.isEmpty() && _writeBuffers.empty());
480 0 : }
481 :
482 0 : void RSPConnection::_handleConnectedTimeout()
483 : {
484 0 : if (!isListening())
485 : {
486 0 : _clearWriteQueues();
487 0 : _ioService.stop();
488 0 : return;
489 : }
490 :
491 0 : _processOutgoing();
492 :
493 0 : if (_timeouts >= CO_RSP_MAX_TIMEOUTS)
494 : {
495 0 : LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
496 0 : bool all = true;
497 0 : for (RSPConnectionsCIter i = _children.begin(); i != _children.end();
498 : ++i)
499 : {
500 0 : RSPConnectionPtr child = *i;
501 0 : if (child->_acked >= _sequence - _numBuffers && child->_id != _id)
502 : {
503 0 : all = false;
504 0 : break;
505 : }
506 : }
507 :
508 : // if all connections failed we probably got disconnected -> close and
509 : // exit else close all failed child connections
510 0 : if (all)
511 : {
512 0 : _sendSimpleDatagram(ID_EXIT, _id);
513 0 : _appBuffers.pushFront(0); // unlock write function
514 :
515 0 : for (RSPConnectionsCIter i = _children.begin();
516 0 : i != _children.end(); ++i)
517 : {
518 0 : RSPConnectionPtr child = *i;
519 0 : child->_setState(STATE_CLOSING);
520 0 : child->_appBuffers.push(0); // unlock read func
521 : }
522 :
523 0 : _clearWriteQueues();
524 0 : _ioService.stop();
525 0 : return;
526 : }
527 :
528 0 : RSPConnectionsCIter i = _children.begin();
529 0 : while (i != _children.end())
530 : {
531 0 : RSPConnectionPtr child = *i;
532 0 : if (child->_acked < _sequence - 1 && _id != child->_id)
533 : {
534 0 : _sendSimpleDatagram(ID_EXIT, child->_id);
535 0 : _removeConnection(child->_id);
536 : }
537 : else
538 : {
539 0 : uint16_t wb = static_cast<uint16_t>(_writeBuffers.size());
540 0 : child->_acked = _sequence - wb;
541 0 : ++i;
542 : }
543 : }
544 :
545 0 : _timeouts = 0;
546 : }
547 : }
548 :
549 0 : RSPConnection::DatagramNode* RSPConnection::_getDatagramNode(const size_t bytes)
550 : {
551 0 : if (bytes < sizeof(DatagramNode))
552 : {
553 0 : LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
554 0 : << sizeof(DatagramNode) << " bytes" << std::endl;
555 : // close();
556 0 : return 0;
557 : }
558 : DatagramNode& node =
559 0 : *reinterpret_cast<DatagramNode*>(_recvBuffer.getData());
560 0 : if (node.protocolVersion != CO_RSP_PROTOCOL_VERSION)
561 : {
562 0 : LBERROR << "Protocol version mismatch, got " << node.protocolVersion
563 0 : << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
564 : // close();
565 0 : return 0;
566 : }
567 0 : return &node;
568 : }
569 :
570 0 : bool RSPConnection::_initThread()
571 : {
572 0 : LBLOG(LOG_RSP) << "Started RSP protocol thread" << std::endl;
573 0 : _timeouts = 0;
574 :
575 : // send a first datagram to announce me and discover other connections
576 0 : LBLOG(LOG_RSP) << "Announce " << _id << std::endl;
577 0 : _sendSimpleDatagram(ID_HELLO, _id);
578 0 : _setTimeout(10);
579 0 : _asyncReceiveFrom();
580 0 : _ioService.run();
581 0 : return isListening();
582 : }
583 :
584 0 : void RSPConnection::_runThread()
585 : {
586 : //__debugbreak();
587 0 : _ioService.reset();
588 0 : _ioService.run();
589 0 : }
590 :
591 0 : void RSPConnection::_setTimeout(const int32_t timeOut)
592 : {
593 0 : LBASSERT(timeOut >= 0);
594 0 : _timeout.expires_from_now(boost::posix_time::milliseconds(timeOut));
595 0 : _timeout.async_wait(boost::bind(&RSPConnection::_handleTimeout, this,
596 0 : boost::asio::placeholders::error));
597 0 : }
598 :
599 0 : void RSPConnection::_postWakeup()
600 : {
601 0 : _wakeup.expires_from_now(boost::posix_time::milliseconds(0));
602 0 : _wakeup.async_wait(boost::bind(&RSPConnection::_handleTimeout, this,
603 0 : boost::asio::placeholders::error));
604 0 : }
605 :
606 0 : void RSPConnection::_processOutgoing()
607 : {
608 0 : if (!_repeatQueue.empty())
609 0 : _repeatData();
610 : else
611 0 : _writeData();
612 :
613 0 : if (!_threadBuffers.isEmpty() || !_repeatQueue.empty())
614 : {
615 0 : _setTimeout(0); // call again to send remaining
616 0 : return;
617 : }
618 : // no more data to write, check/send ack request, reset timeout
619 :
620 0 : if (_writeBuffers.empty()) // got all acks
621 : {
622 0 : _timeouts = 0;
623 0 : _timeout.cancel();
624 0 : return;
625 : }
626 :
627 : const int64_t timeout =
628 0 : Global::getIAttribute(Global::IATTR_RSP_ACK_TIMEOUT);
629 0 : const int64_t left = timeout - _clock.getTime64();
630 :
631 0 : if (left > 0)
632 : {
633 0 : _setTimeout(left);
634 0 : return;
635 : }
636 :
637 : // (repeat) ack request
638 0 : _clock.reset();
639 0 : ++_timeouts;
640 0 : if (_timeouts < CO_RSP_MAX_TIMEOUTS)
641 0 : _sendAckRequest();
642 0 : _setTimeout(timeout);
643 : }
644 :
645 0 : void RSPConnection::_writeData()
646 : {
647 0 : Buffer* buffer = 0;
648 0 : if (!_threadBuffers.pop(buffer)) // nothing to write
649 0 : return;
650 :
651 0 : _timeouts = 0;
652 0 : LBASSERT(buffer);
653 :
654 : // write buffer
655 0 : DatagramData* header = reinterpret_cast<DatagramData*>(buffer->getData());
656 0 : header->sequence = _sequence++;
657 :
658 : #ifdef CO_RSP_MERGE_WRITES
659 0 : if (header->size < _payloadSize && !_threadBuffers.isEmpty())
660 : {
661 0 : std::vector<Buffer*> appBuffers;
662 0 : while (header->size < _payloadSize && !_threadBuffers.isEmpty())
663 : {
664 0 : Buffer* buffer2 = 0;
665 0 : LBCHECK(_threadBuffers.getFront(buffer2));
666 0 : LBASSERT(buffer2);
667 : DatagramData* header2 =
668 0 : reinterpret_cast<DatagramData*>(buffer2->getData());
669 :
670 0 : if (uint32_t(header->size + header2->size) > _payloadSize)
671 0 : break;
672 :
673 0 : memcpy(reinterpret_cast<uint8_t*>(header + 1) + header->size,
674 0 : header2 + 1, header2->size);
675 0 : header->size += header2->size;
676 0 : LBCHECK(_threadBuffers.pop(buffer2));
677 0 : appBuffers.push_back(buffer2);
678 : #ifdef CO_INSTRUMENT_RSP
679 : ++nMergedDatagrams;
680 : #endif
681 : }
682 :
683 0 : if (!appBuffers.empty())
684 0 : _appBuffers.push(appBuffers);
685 : }
686 : #endif
687 :
688 : // send data
689 : // Note 1: We could optimize the send away if we're all alone, but this is
690 : // not a use case for RSP, so we don't care.
691 : // Note 2: Data to myself will be 'written' in _finishWriteQueue once we
692 : // got all acks for the packet
693 0 : const uint32_t size = header->size + sizeof(DatagramData);
694 :
695 0 : _waitWritable(size); // OPT: process incoming in between
696 0 : _write->send(boost::asio::buffer(header, size));
697 :
698 : #ifdef CO_INSTRUMENT_RSP
699 : ++nDatagrams;
700 : nBytesWritten += header->size;
701 : #endif
702 :
703 : // save datagram for repeats (and self)
704 0 : _writeBuffers.push_back(buffer);
705 :
706 0 : if (_children.size() == 1) // We're all alone
707 : {
708 0 : LBASSERT(_children.front()->_id == _id);
709 0 : _finishWriteQueue(_sequence - 1);
710 : }
711 : }
712 :
713 0 : void RSPConnection::_waitWritable(const uint64_t bytes)
714 : {
715 : #ifdef CO_INSTRUMENT_RSP
716 : lunchbox::Clock clock;
717 : #endif
718 :
719 0 : _bucketSize += static_cast<uint64_t>(_clock.resetTimef() * _sendRate);
720 : // opt omit: * 1024 / 1000;
721 0 : _bucketSize = LB_MIN(_bucketSize, _maxBucketSize);
722 :
723 0 : const uint64_t size = LB_MIN(bytes, static_cast<uint64_t>(_mtu));
724 0 : while (_bucketSize < size)
725 : {
726 0 : lunchbox::Thread::yield();
727 0 : float time = _clock.resetTimef();
728 :
729 0 : while (time == 0.f)
730 : {
731 0 : lunchbox::Thread::yield();
732 0 : time = _clock.resetTimef();
733 : }
734 :
735 0 : _bucketSize += static_cast<int64_t>(time * _sendRate);
736 0 : _bucketSize = LB_MIN(_bucketSize, _maxBucketSize);
737 : }
738 0 : _bucketSize -= size;
739 :
740 : #ifdef CO_INSTRUMENT_RSP
741 : writeWaitTime += clock.getTimef();
742 : #endif
743 :
744 0 : ConstConnectionDescriptionPtr description = getDescription();
745 0 : if (_sendRate < description->bandwidth)
746 : {
747 0 : _sendRate += int64_t(
748 0 : float(Global::getIAttribute(Global::IATTR_RSP_ERROR_UPSCALE)) *
749 0 : float(description->bandwidth) * .001f);
750 0 : LBLOG(LOG_RSP) << "speeding up to " << _sendRate << " KB/s"
751 0 : << std::endl;
752 : }
753 0 : }
754 :
755 0 : void RSPConnection::_repeatData()
756 : {
757 0 : _timeouts = 0;
758 :
759 0 : while (!_repeatQueue.empty())
760 : {
761 0 : Nack& request = _repeatQueue.front();
762 0 : const uint16_t distance = _sequence - request.start;
763 :
764 0 : if (distance == 0)
765 : {
766 0 : LBWARN << "ignoring invalid nack (" << request.start << ".."
767 0 : << request.end << ")" << std::endl;
768 0 : _repeatQueue.pop_front();
769 0 : continue;
770 : }
771 :
772 0 : if (distance <= _writeBuffers.size()) // not already acked
773 : {
774 : // LBLOG( LOG_RSP ) << "Repeat " << request.start << ", "
775 : // << _sendRate
776 : // << "KB/s"<< std::endl;
777 :
778 0 : const size_t i = _writeBuffers.size() - distance;
779 0 : Buffer* buffer = _writeBuffers[i];
780 0 : LBASSERT(buffer);
781 :
782 : DatagramData* header =
783 0 : reinterpret_cast<DatagramData*>(buffer->getData());
784 0 : const uint32_t size = header->size + sizeof(DatagramData);
785 0 : LBASSERT(header->sequence == request.start);
786 :
787 : // send data
788 0 : _waitWritable(size); // OPT: process incoming in between
789 0 : _write->send(boost::asio::buffer(header, size));
790 : #ifdef CO_INSTRUMENT_RSP
791 : ++nRepeated;
792 : #endif
793 : }
794 :
795 0 : if (request.start == request.end)
796 0 : _repeatQueue.pop_front(); // done with request
797 : else
798 0 : ++request.start;
799 :
800 0 : if (distance <= _writeBuffers.size()) // send something
801 0 : return;
802 : }
803 : }
804 :
805 0 : void RSPConnection::_finishWriteQueue(const uint16_t sequence)
806 : {
807 0 : LBASSERT(!_writeBuffers.empty());
808 :
809 0 : RSPConnectionPtr connection = _findConnection(_id);
810 0 : LBASSERT(connection.isValid());
811 0 : LBASSERT(connection->_recvBuffers.empty());
812 :
813 : // Bundle pushing the buffers to the app to avoid excessive lock ops
814 0 : Buffers readBuffers;
815 0 : Buffers freeBuffers;
816 :
817 0 : const uint16_t size = _sequence - sequence - 1;
818 0 : LBASSERTINFO(size <= uint16_t(_writeBuffers.size()),
819 : size << " > " << _writeBuffers.size());
820 0 : LBLOG(LOG_RSP) << "Got all remote acks for " << sequence << " current "
821 0 : << _sequence << " advance " << _writeBuffers.size() - size
822 0 : << " buffers" << std::endl;
823 :
824 0 : while (_writeBuffers.size() > size_t(size))
825 : {
826 0 : Buffer* buffer = _writeBuffers.front();
827 0 : _writeBuffers.pop_front();
828 :
829 : #ifndef NDEBUG
830 : DatagramData* datagram =
831 0 : reinterpret_cast<DatagramData*>(buffer->getData());
832 0 : LBASSERT(datagram->writerID == _id);
833 0 : LBASSERTINFO(datagram->sequence ==
834 : uint16_t(connection->_sequence + readBuffers.size()),
835 : datagram->sequence << ", " << connection->_sequence << ", "
836 : << readBuffers.size());
837 : // LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
838 : #endif
839 :
840 0 : Buffer* newBuffer = connection->_newDataBuffer(*buffer);
841 0 : if (!newBuffer && !readBuffers.empty()) // push prepared app buffers
842 : {
843 0 : lunchbox::ScopedWrite mutex(connection->_mutexEvent);
844 0 : LBLOG(LOG_RSP) << "post " << readBuffers.size()
845 0 : << " buffers starting with sequence "
846 0 : << connection->_sequence << std::endl;
847 :
848 0 : connection->_appBuffers.push(readBuffers);
849 0 : connection->_sequence += uint16_t(readBuffers.size());
850 0 : readBuffers.clear();
851 0 : connection->_event->set();
852 : }
853 :
854 0 : while (!newBuffer) // no more data buffers, wait for app to drain
855 : {
856 0 : newBuffer = connection->_newDataBuffer(*buffer);
857 0 : lunchbox::Thread::yield();
858 : }
859 :
860 0 : freeBuffers.push_back(buffer);
861 0 : readBuffers.push_back(newBuffer);
862 : }
863 :
864 0 : _appBuffers.push(freeBuffers);
865 0 : if (!readBuffers.empty())
866 : {
867 0 : lunchbox::ScopedWrite mutex(connection->_mutexEvent);
868 : #if 0
869 : LBLOG( LOG_RSP )
870 : << "post " << readBuffers.size() << " buffers starting at "
871 : << connection->_sequence << std::endl;
872 : #endif
873 :
874 0 : connection->_appBuffers.push(readBuffers);
875 0 : connection->_sequence += uint16_t(readBuffers.size());
876 0 : connection->_event->set();
877 : }
878 :
879 0 : connection->_acked = uint16_t(connection->_sequence - 1);
880 0 : LBASSERT(connection->_acked == sequence);
881 :
882 0 : _timeouts = 0;
883 0 : }
884 :
885 0 : void RSPConnection::_handlePacket(const boost::system::error_code& /* error */,
886 : const size_t bytes)
887 : {
888 0 : if (isListening())
889 : {
890 0 : _handleConnectedData(bytes);
891 :
892 0 : if (isListening())
893 0 : _processOutgoing();
894 : else
895 : {
896 0 : _ioService.stop();
897 0 : return;
898 : }
899 : }
900 0 : else if (bytes >= sizeof(DatagramNode))
901 : {
902 0 : if (_idAccepted)
903 0 : _handleInitData(bytes, false);
904 : else
905 0 : _handleAcceptIDData(bytes);
906 : }
907 :
908 : // LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
909 0 : _asyncReceiveFrom();
910 : }
911 :
912 0 : void RSPConnection::_handleAcceptIDData(const size_t bytes)
913 : {
914 0 : DatagramNode* pNode = _getDatagramNode(bytes);
915 0 : if (!pNode)
916 0 : return;
917 :
918 0 : DatagramNode& node = *pNode;
919 :
920 0 : switch (node.type)
921 : {
922 : case ID_HELLO:
923 0 : _checkNewID(node.connectionID);
924 0 : break;
925 :
926 : case ID_HELLO_REPLY:
927 0 : _addConnection(node.connectionID, node.data);
928 0 : break;
929 :
930 : case ID_DENY:
931 : // a connection refused my ID, try another ID
932 0 : if (node.connectionID == _id)
933 : {
934 0 : _timeouts = 0;
935 0 : _sendSimpleDatagram(ID_HELLO, _buildNewID());
936 0 : LBLOG(LOG_RSP) << "Announce " << _id << std::endl;
937 : }
938 0 : break;
939 :
940 : case ID_EXIT:
941 0 : _removeConnection(node.connectionID);
942 0 : break;
943 :
944 : default:
945 0 : LBDEBUG << "Got unexpected datagram type " << node.type << std::endl;
946 0 : LBUNIMPLEMENTED;
947 0 : break;
948 : }
949 : }
950 :
951 0 : void RSPConnection::_handleInitData(const size_t bytes, const bool connected)
952 : {
953 0 : DatagramNode* pNode = _getDatagramNode(bytes);
954 0 : if (!pNode)
955 0 : return;
956 :
957 0 : DatagramNode& node = *pNode;
958 :
959 0 : switch (node.type)
960 : {
961 : case ID_HELLO:
962 0 : if (!connected)
963 0 : _timeouts = 0;
964 0 : _checkNewID(node.connectionID);
965 0 : return;
966 :
967 : case ID_CONFIRM:
968 0 : if (!connected)
969 0 : _timeouts = 0;
970 0 : _addConnection(node.connectionID, node.data);
971 0 : return;
972 :
973 : case COUNTNODE:
974 0 : LBLOG(LOG_RSP) << "Got " << node.data << " nodes from "
975 0 : << node.connectionID << std::endl;
976 0 : return;
977 :
978 : case ID_HELLO_REPLY:
979 0 : _addConnection(node.connectionID, node.data);
980 0 : return;
981 :
982 : case ID_EXIT:
983 0 : _removeConnection(node.connectionID);
984 0 : return;
985 :
986 : default:
987 0 : LBDEBUG << "Got unexpected datagram type " << node.type << std::endl;
988 0 : break;
989 : }
990 : }
991 :
992 0 : void RSPConnection::_handleConnectedData(const size_t bytes)
993 : {
994 0 : if (bytes < sizeof(uint16_t))
995 0 : return;
996 :
997 0 : void* data = _recvBuffer.getData();
998 0 : uint16_t type = *reinterpret_cast<uint16_t*>(data);
999 0 : switch (type)
1000 : {
1001 : case DATA:
1002 0 : LBCHECK(_handleData(bytes));
1003 0 : break;
1004 :
1005 : case ACK:
1006 0 : LBCHECK(_handleAck(bytes));
1007 0 : break;
1008 :
1009 : case NACK:
1010 0 : LBCHECK(_handleNack());
1011 0 : break;
1012 :
1013 : case ACKREQ: // The writer asks for an ack/nack
1014 0 : LBCHECK(_handleAckRequest(bytes));
1015 0 : break;
1016 :
1017 : case ID_HELLO:
1018 : case ID_HELLO_REPLY:
1019 : case ID_CONFIRM:
1020 : case ID_EXIT:
1021 : case ID_DENY:
1022 : case COUNTNODE:
1023 0 : _handleInitData(bytes, true);
1024 0 : break;
1025 :
1026 : default:
1027 0 : LBASSERTINFO(false, "Don't know how to handle packet of type " << type);
1028 : }
1029 : }
1030 :
1031 0 : void RSPConnection::_asyncReceiveFrom()
1032 : {
1033 0 : _read->async_receive_from(
1034 0 : boost::asio::buffer(_recvBuffer.getData(), _mtu), _readAddr,
1035 0 : boost::bind(&RSPConnection::_handlePacket, this,
1036 : boost::asio::placeholders::error,
1037 0 : boost::asio::placeholders::bytes_transferred));
1038 0 : }
1039 :
1040 0 : bool RSPConnection::_handleData(const size_t bytes)
1041 : {
1042 0 : if (bytes < sizeof(DatagramData))
1043 0 : return false;
1044 : DatagramData& datagram =
1045 0 : *reinterpret_cast<DatagramData*>(_recvBuffer.getData());
1046 :
1047 : #ifdef CO_INSTRUMENT_RSP
1048 : ++nReadData;
1049 : #endif
1050 0 : const uint16_t writerID = datagram.writerID;
1051 : #ifdef Darwin
1052 : // There is occasionally a packet from ourselves, even though multicast loop
1053 : // is not set?!
1054 : if (writerID == _id)
1055 : return true;
1056 : #else
1057 0 : LBASSERT(writerID != _id);
1058 : #endif
1059 :
1060 0 : RSPConnectionPtr connection = _findConnection(writerID);
1061 :
1062 0 : if (!connection) // unknown connection ?
1063 : {
1064 0 : LBASSERTINFO(false, "Can't find connection with id " << writerID);
1065 0 : return false;
1066 : }
1067 0 : LBASSERT(connection->_id == writerID);
1068 :
1069 0 : const uint16_t sequence = datagram.sequence;
1070 : // LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID
1071 : // <<std::endl;
1072 :
1073 0 : if (connection->_sequence == sequence) // in-order packet
1074 : {
1075 0 : Buffer* newBuffer = connection->_newDataBuffer(_recvBuffer);
1076 0 : if (!newBuffer) // no more data buffers, drop packet
1077 0 : return true;
1078 :
1079 0 : lunchbox::ScopedWrite mutex(connection->_mutexEvent);
1080 0 : connection->_pushDataBuffer(newBuffer);
1081 :
1082 0 : while (!connection->_recvBuffers.empty()) // enqueue ready pending data
1083 : {
1084 0 : newBuffer = connection->_recvBuffers.front();
1085 0 : if (!newBuffer)
1086 0 : break;
1087 :
1088 0 : connection->_recvBuffers.pop_front();
1089 0 : connection->_pushDataBuffer(newBuffer);
1090 : }
1091 :
1092 0 : if (!connection->_recvBuffers.empty() &&
1093 0 : !connection->_recvBuffers.front()) // update for new _sequence
1094 : {
1095 0 : connection->_recvBuffers.pop_front();
1096 : }
1097 :
1098 0 : connection->_event->set();
1099 0 : return true;
1100 : }
1101 :
1102 0 : const uint16_t max = std::numeric_limits<uint16_t>::max();
1103 0 : if ((connection->_sequence > sequence &&
1104 0 : max - connection->_sequence + sequence > _numBuffers) ||
1105 0 : (connection->_sequence < sequence &&
1106 0 : sequence - connection->_sequence > _numBuffers))
1107 : {
1108 : // ignore it if it's a repetition for another reader
1109 0 : return true;
1110 : }
1111 :
1112 : // else out of order
1113 :
1114 0 : const uint16_t size = sequence - connection->_sequence;
1115 0 : LBASSERT(size != 0);
1116 0 : LBASSERTINFO(size <= _numBuffers, size << " > " << _numBuffers);
1117 :
1118 0 : ssize_t i = ssize_t(size) - 1;
1119 0 : const bool gotPacket = (connection->_recvBuffers.size() >= size &&
1120 0 : connection->_recvBuffers[i]);
1121 0 : if (gotPacket)
1122 0 : return true;
1123 :
1124 0 : Buffer* newBuffer = connection->_newDataBuffer(_recvBuffer);
1125 0 : if (!newBuffer) // no more data buffers, drop packet
1126 0 : return true;
1127 :
1128 0 : if (connection->_recvBuffers.size() < size)
1129 0 : connection->_recvBuffers.resize(size, 0);
1130 :
1131 0 : LBASSERT(!connection->_recvBuffers[i]);
1132 0 : connection->_recvBuffers[i] = newBuffer;
1133 :
1134 : // early nack: request missing packets before current
1135 0 : --i;
1136 0 : Nack nack = {connection->_sequence, uint16_t(sequence - 1)};
1137 0 : if (i > 0)
1138 : {
1139 0 : if (connection->_recvBuffers[i]) // got previous packet
1140 0 : return true;
1141 :
1142 0 : while (i >= 0 && !connection->_recvBuffers[i])
1143 0 : --i;
1144 :
1145 0 : const Buffer* lastBuffer = i >= 0 ? connection->_recvBuffers[i] : 0;
1146 0 : if (lastBuffer)
1147 : {
1148 0 : nack.start = connection->_sequence + i;
1149 : }
1150 : }
1151 :
1152 0 : LBLOG(LOG_RSP) << "send early nack " << nack.start << ".." << nack.end
1153 0 : << " current " << connection->_sequence << " ooo "
1154 0 : << connection->_recvBuffers.size() << std::endl;
1155 :
1156 0 : if (nack.end < nack.start)
1157 : // OPT: don't drop nack 0..nack.end, but it doesn't happen often
1158 0 : nack.end = std::numeric_limits<uint16_t>::max();
1159 :
1160 0 : _sendNack(writerID, &nack, 1);
1161 0 : return true;
1162 : }
1163 :
1164 0 : RSPConnection::Buffer* RSPConnection::_newDataBuffer(Buffer& inBuffer)
1165 : {
1166 0 : LBASSERT(static_cast<int32_t>(inBuffer.getMaxSize()) == _mtu);
1167 :
1168 0 : Buffer* buffer = 0;
1169 0 : if (_threadBuffers.pop(buffer))
1170 : {
1171 0 : buffer->swap(inBuffer);
1172 0 : return buffer;
1173 : }
1174 :
1175 : // we do not have a free buffer, which means that the receiver is slower
1176 : // then our read thread. This is bad, because now we'll drop the data and
1177 : // will send a NAck packet upon the ack request, causing retransmission even
1178 : // though we'll probably drop it again
1179 0 : LBLOG(LOG_RSP) << "Reader too slow, dropping data" << std::endl;
1180 :
1181 : // Set the event if there is data to read. This shouldn't be needed since
1182 : // the event should be set in this case, but it'll increase the robustness
1183 0 : lunchbox::ScopedWrite mutex(_mutexEvent);
1184 0 : if (!_appBuffers.isEmpty())
1185 0 : _event->set();
1186 0 : return 0;
1187 : }
1188 :
1189 0 : void RSPConnection::_pushDataBuffer(Buffer* buffer)
1190 : {
1191 0 : LBASSERT(_parent);
1192 : #ifndef NDEBUG
1193 0 : DatagramData* dgram = reinterpret_cast<DatagramData*>(buffer->getData());
1194 0 : LBASSERTINFO(dgram->sequence == _sequence, dgram->sequence << " != "
1195 : << _sequence);
1196 : #endif
1197 :
1198 0 : if (((_sequence + _parent->_id) % _ackFreq) == 0)
1199 0 : _parent->_sendAck(_id, _sequence);
1200 :
1201 0 : LBLOG(LOG_RSP) << "post buffer " << _sequence << std::endl;
1202 0 : ++_sequence;
1203 0 : _appBuffers.push(buffer);
1204 0 : }
1205 :
1206 0 : bool RSPConnection::_handleAck(const size_t bytes)
1207 : {
1208 0 : if (bytes < sizeof(DatagramAck))
1209 0 : return false;
1210 0 : DatagramAck& ack = *reinterpret_cast<DatagramAck*>(_recvBuffer.getData());
1211 :
1212 : #ifdef CO_INSTRUMENT_RSP
1213 : ++nAcksRead;
1214 : #endif
1215 :
1216 0 : if (ack.writerID != _id)
1217 0 : return true;
1218 :
1219 0 : LBLOG(LOG_RSP) << "got ack from " << ack.readerID << " for " << ack.writerID
1220 0 : << " sequence " << ack.sequence << " current " << _sequence
1221 0 : << std::endl;
1222 :
1223 : // find destination connection, update ack data if needed
1224 0 : RSPConnectionPtr connection = _findConnection(ack.readerID);
1225 0 : if (!connection)
1226 : {
1227 0 : LBUNREACHABLE;
1228 0 : return false;
1229 : }
1230 :
1231 0 : if (connection->_acked >= ack.sequence &&
1232 0 : connection->_acked - ack.sequence <= _numBuffers)
1233 : {
1234 : // I have received a later ack previously from the reader
1235 0 : LBLOG(LOG_RSP) << "Late ack" << std::endl;
1236 0 : return true;
1237 : }
1238 :
1239 : #ifdef CO_INSTRUMENT_RSP
1240 : ++nAcksAccepted;
1241 : #endif
1242 0 : connection->_acked = ack.sequence;
1243 0 : _timeouts = 0; // reset timeout counter
1244 :
1245 : // Check if we can advance _acked
1246 0 : uint16_t acked = ack.sequence;
1247 :
1248 0 : for (RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i)
1249 : {
1250 0 : RSPConnectionPtr child = *i;
1251 0 : if (child->_id == _id)
1252 0 : continue;
1253 :
1254 0 : const uint16_t distance = child->_acked - acked;
1255 0 : if (distance > _numBuffers)
1256 0 : acked = child->_acked;
1257 : }
1258 :
1259 0 : RSPConnectionPtr selfChild = _findConnection(_id);
1260 0 : const uint16_t distance = acked - selfChild->_acked;
1261 0 : if (distance <= _numBuffers)
1262 0 : _finishWriteQueue(acked);
1263 0 : return true;
1264 : }
1265 :
1266 0 : bool RSPConnection::_handleNack()
1267 : {
1268 : DatagramNack& nack =
1269 0 : *reinterpret_cast<DatagramNack*>(_recvBuffer.getData());
1270 :
1271 : #ifdef CO_INSTRUMENT_RSP
1272 : ++nNAcksRead;
1273 : #endif
1274 :
1275 0 : if (_id != nack.writerID)
1276 : {
1277 0 : LBLOG(LOG_RSP) << "ignore " << nack.count << " nacks from "
1278 0 : << nack.readerID << " for " << nack.writerID
1279 0 : << " (not me)" << std::endl;
1280 0 : return true;
1281 : }
1282 :
1283 0 : LBLOG(LOG_RSP) << "handle " << nack.count << " nacks from " << nack.readerID
1284 0 : << " for " << nack.writerID << std::endl;
1285 :
1286 0 : RSPConnectionPtr connection = _findConnection(nack.readerID);
1287 0 : if (!connection)
1288 : {
1289 0 : LBUNREACHABLE;
1290 0 : return false;
1291 : // it's an unknown connection, TODO add this connection?
1292 : }
1293 :
1294 0 : _timeouts = 0;
1295 0 : _addRepeat(nack.nacks, nack.count);
1296 0 : return true;
1297 : }
1298 :
1299 0 : void RSPConnection::_addRepeat(const Nack* nacks, uint16_t num)
1300 : {
1301 0 : LBLOG(LOG_RSP) << lunchbox::disableFlush << "Queue repeat requests ";
1302 0 : size_t lost = 0;
1303 :
1304 0 : for (size_t i = 0; i < num; ++i)
1305 : {
1306 0 : const Nack& nack = nacks[i];
1307 0 : LBASSERT(nack.start <= nack.end);
1308 :
1309 0 : LBLOG(LOG_RSP) << nack.start << ".." << nack.end << " ";
1310 :
1311 0 : bool merged = false;
1312 0 : for (RepeatQueue::iterator j = _repeatQueue.begin();
1313 0 : j != _repeatQueue.end() && !merged; ++j)
1314 : {
1315 0 : Nack& old = *j;
1316 0 : if (old.start <= nack.end && old.end >= nack.start)
1317 : {
1318 0 : if (old.start > nack.start)
1319 : {
1320 0 : lost += old.start - nack.start;
1321 0 : old.start = nack.start;
1322 0 : merged = true;
1323 : }
1324 0 : if (old.end < nack.end)
1325 : {
1326 0 : lost += nack.end - old.end;
1327 0 : old.end = nack.end;
1328 0 : merged = true;
1329 : }
1330 0 : LBASSERT(lost < _numBuffers);
1331 : }
1332 : }
1333 :
1334 0 : if (!merged)
1335 : {
1336 0 : lost += uint16_t(nack.end - nack.start) + 1;
1337 0 : LBASSERT(lost <= _numBuffers);
1338 0 : _repeatQueue.push_back(nack);
1339 : }
1340 : }
1341 :
1342 0 : ConstConnectionDescriptionPtr description = getDescription();
1343 0 : if (_sendRate >
1344 0 : (description->bandwidth >>
1345 0 : Global::getIAttribute(Global::IATTR_RSP_MIN_SENDRATE_SHIFT)))
1346 : {
1347 : const float delta =
1348 0 : float(lost) * .001f *
1349 0 : Global::getIAttribute(Global::IATTR_RSP_ERROR_DOWNSCALE);
1350 : const float maxDelta =
1351 : .01f *
1352 0 : float(Global::getIAttribute(Global::IATTR_RSP_ERROR_MAXSCALE));
1353 0 : const float downScale = LB_MIN(delta, maxDelta);
1354 0 : _sendRate -= 1 + int64_t(_sendRate * downScale);
1355 0 : LBLOG(LOG_RSP) << ", lost " << lost << " slowing down "
1356 0 : << downScale * 100.f << "% to " << _sendRate << " KB/s"
1357 0 : << std::endl
1358 0 : << lunchbox::enableFlush;
1359 : }
1360 : else
1361 0 : LBLOG(LOG_RSP) << std::endl << lunchbox::enableFlush;
1362 0 : }
1363 :
1364 0 : bool RSPConnection::_handleAckRequest(const size_t bytes)
1365 : {
1366 0 : if (bytes < sizeof(DatagramAckRequest))
1367 0 : return false;
1368 : DatagramAckRequest& ackRequest =
1369 0 : *reinterpret_cast<DatagramAckRequest*>(_recvBuffer.getData());
1370 :
1371 0 : const uint16_t writerID = ackRequest.writerID;
1372 : #ifdef Darwin
1373 : // There is occasionally a packet from ourselves, even though multicast loop
1374 : // is not set?!
1375 : if (writerID == _id)
1376 : return true;
1377 : #else
1378 0 : LBASSERT(writerID != _id);
1379 : #endif
1380 0 : RSPConnectionPtr connection = _findConnection(writerID);
1381 0 : if (!connection)
1382 : {
1383 0 : LBUNREACHABLE;
1384 0 : return false;
1385 : }
1386 :
1387 0 : const uint16_t reqID = ackRequest.sequence;
1388 0 : const uint16_t gotID = connection->_sequence - 1;
1389 0 : const uint16_t distance = reqID - gotID;
1390 :
1391 0 : LBLOG(LOG_RSP) << "ack request " << reqID << " from " << writerID << " got "
1392 0 : << gotID << " missing " << distance << std::endl;
1393 :
1394 0 : if ((reqID == gotID) || (gotID > reqID && gotID - reqID <= _numBuffers) ||
1395 0 : (gotID < reqID && distance > _numBuffers))
1396 : {
1397 0 : _sendAck(connection->_id, gotID);
1398 0 : return true;
1399 : }
1400 : // else find all missing datagrams
1401 :
1402 0 : const uint16_t max = CO_RSP_MAX_NACKS - 2;
1403 : Nack nacks[CO_RSP_MAX_NACKS];
1404 0 : uint16_t i = 0;
1405 :
1406 0 : nacks[i].start = connection->_sequence;
1407 0 : LBLOG(LOG_RSP) << lunchbox::disableFlush << "nacks: " << nacks[i].start
1408 0 : << "..";
1409 :
1410 0 : std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
1411 0 : std::deque<Buffer*>::const_iterator first = j;
1412 0 : for (; j != connection->_recvBuffers.end() && i < max; ++j)
1413 : {
1414 0 : if (*j) // got buffer
1415 : {
1416 0 : nacks[i].end = connection->_sequence + std::distance(first, j);
1417 0 : LBLOG(LOG_RSP) << nacks[i].end << ", ";
1418 0 : if (nacks[i].end < nacks[i].start)
1419 : {
1420 0 : LBASSERT(nacks[i].end < _numBuffers);
1421 0 : nacks[i + 1].start = 0;
1422 0 : nacks[i + 1].end = nacks[i].end;
1423 0 : nacks[i].end = std::numeric_limits<uint16_t>::max();
1424 0 : ++i;
1425 : }
1426 0 : ++i;
1427 :
1428 : // find next hole
1429 0 : for (++j; j != connection->_recvBuffers.end() && (*j); ++j)
1430 : /* nop */;
1431 :
1432 0 : if (j == connection->_recvBuffers.end())
1433 0 : break;
1434 :
1435 0 : nacks[i].start =
1436 0 : connection->_sequence + std::distance(first, j) + 1;
1437 0 : LBLOG(LOG_RSP) << nacks[i].start << "..";
1438 : }
1439 : }
1440 :
1441 0 : if (j != connection->_recvBuffers.end() || i == 0)
1442 : {
1443 0 : nacks[i].end = reqID;
1444 0 : LBLOG(LOG_RSP) << nacks[i].end;
1445 0 : ++i;
1446 : }
1447 0 : else if (uint16_t(reqID - nacks[i - 1].end) < _numBuffers)
1448 : {
1449 0 : nacks[i].start = nacks[i - 1].end + 1;
1450 0 : nacks[i].end = reqID;
1451 0 : LBLOG(LOG_RSP) << nacks[i].start << ".." << nacks[i].end;
1452 0 : ++i;
1453 : }
1454 0 : if (nacks[i - 1].end < nacks[i - 1].start)
1455 : {
1456 0 : LBASSERT(nacks[i - 1].end < _numBuffers);
1457 0 : nacks[i].start = 0;
1458 0 : nacks[i].end = nacks[i - 1].end;
1459 0 : nacks[i - 1].end = std::numeric_limits<uint16_t>::max();
1460 0 : ++i;
1461 : }
1462 :
1463 0 : LBLOG(LOG_RSP) << std::endl
1464 0 : << lunchbox::enableFlush << "send " << i << " nacks to "
1465 0 : << connection->_id << std::endl;
1466 :
1467 0 : LBASSERT(i > 0);
1468 0 : _sendNack(connection->_id, nacks, i);
1469 0 : return true;
1470 : }
1471 :
1472 0 : void RSPConnection::_checkNewID(uint16_t id)
1473 : {
1474 : // look if the new ID exist in another connection
1475 0 : if (id == _id || _findConnection(id))
1476 : {
1477 0 : LBLOG(LOG_RSP) << "Deny " << id << std::endl;
1478 0 : _sendSimpleDatagram(ID_DENY, _id);
1479 : }
1480 : else
1481 0 : _sendSimpleDatagram(ID_HELLO_REPLY, _id);
1482 0 : }
1483 :
1484 0 : RSPConnectionPtr RSPConnection::_findConnection(const uint16_t id)
1485 : {
1486 0 : for (RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i)
1487 : {
1488 0 : if ((*i)->_id == id)
1489 0 : return *i;
1490 : }
1491 0 : return 0;
1492 : }
1493 :
1494 0 : bool RSPConnection::_addConnection(const uint16_t id, const uint16_t sequence)
1495 : {
1496 0 : if (_findConnection(id))
1497 0 : return false;
1498 :
1499 0 : LBDEBUG << "add connection " << id << std::endl;
1500 0 : RSPConnectionPtr connection = new RSPConnection();
1501 0 : connection->_id = id;
1502 0 : connection->_parent = this;
1503 0 : connection->_setState(STATE_CONNECTED);
1504 0 : connection->_setDescription(_getDescription());
1505 0 : connection->_sequence = sequence;
1506 0 : LBASSERT(connection->_appBuffers.isEmpty());
1507 :
1508 : // Make all buffers available for reading
1509 0 : for (BuffersCIter i = connection->_buffers.begin();
1510 0 : i != connection->_buffers.end(); ++i)
1511 : {
1512 0 : Buffer* buffer = *i;
1513 0 : LBCHECK(connection->_threadBuffers.push(buffer));
1514 : }
1515 :
1516 0 : _children.push_back(connection);
1517 0 : _sendCountNode();
1518 :
1519 0 : lunchbox::ScopedWrite mutex(_mutexConnection);
1520 0 : _newChildren.push_back(connection);
1521 :
1522 0 : lunchbox::ScopedWrite mutex2(_mutexEvent);
1523 0 : _event->set();
1524 0 : return true;
1525 : }
1526 :
1527 0 : void RSPConnection::_removeConnection(const uint16_t id)
1528 : {
1529 0 : LBDEBUG << "remove connection " << id << std::endl;
1530 0 : if (id == _id)
1531 0 : return;
1532 :
1533 0 : for (RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i)
1534 : {
1535 0 : RSPConnectionPtr child = *i;
1536 0 : if (child->_id == id)
1537 : {
1538 0 : _children.erase(i);
1539 :
1540 0 : lunchbox::ScopedWrite mutex(child->_mutexEvent);
1541 0 : child->_appBuffers.push(0);
1542 0 : child->_event->set();
1543 0 : break;
1544 : }
1545 : }
1546 :
1547 0 : _sendCountNode();
1548 : }
1549 :
1550 0 : int64_t RSPConnection::write(const void* inData, const uint64_t bytes)
1551 : {
1552 0 : if (_parent)
1553 0 : return _parent->write(inData, bytes);
1554 :
1555 0 : LBASSERT(isListening());
1556 0 : if (!_write)
1557 0 : return -1;
1558 :
1559 : // compute number of datagrams
1560 0 : uint64_t nDatagrams = bytes / _payloadSize;
1561 0 : if (nDatagrams * _payloadSize != bytes)
1562 0 : ++nDatagrams;
1563 :
1564 : // queue each datagram (might block if buffers are exhausted)
1565 0 : const uint8_t* data = reinterpret_cast<const uint8_t*>(inData);
1566 0 : const uint8_t* end = data + bytes;
1567 0 : for (uint64_t i = 0; i < nDatagrams; ++i)
1568 : {
1569 0 : size_t packetSize = end - data;
1570 0 : packetSize = LB_MIN(packetSize, _payloadSize);
1571 :
1572 0 : if (_appBuffers.isEmpty())
1573 : // trigger processing
1574 0 : _postWakeup();
1575 :
1576 : Buffer* buffer;
1577 0 : if (!_appBuffers.timedPop(_writeTimeOut, buffer))
1578 : {
1579 0 : LBERROR << "Timeout while writing" << std::endl;
1580 0 : buffer = 0;
1581 : }
1582 :
1583 0 : if (!buffer)
1584 : {
1585 0 : close();
1586 0 : return -1;
1587 : }
1588 :
1589 : // prepare packet header (sequence is done by thread)
1590 : DatagramData* header =
1591 0 : reinterpret_cast<DatagramData*>(buffer->getData());
1592 0 : header->type = DATA;
1593 0 : header->size = uint16_t(packetSize);
1594 0 : header->writerID = _id;
1595 :
1596 0 : memcpy(header + 1, data, packetSize);
1597 0 : data += packetSize;
1598 :
1599 0 : LBCHECK(_threadBuffers.push(buffer));
1600 : }
1601 0 : _postWakeup();
1602 0 : LBLOG(LOG_RSP) << "queued " << nDatagrams << " datagrams, " << bytes
1603 0 : << " bytes" << std::endl;
1604 0 : return bytes;
1605 : }
1606 :
1607 0 : void RSPConnection::finish()
1608 : {
1609 0 : if (_parent.isValid())
1610 : {
1611 0 : LBASSERTINFO(!_parent, "Writes are only allowed on RSP listeners");
1612 0 : return;
1613 : }
1614 0 : LBASSERT(isListening());
1615 0 : _appBuffers.waitSize(_buffers.size());
1616 : }
1617 :
1618 0 : void RSPConnection::_sendCountNode()
1619 : {
1620 0 : if (!_findConnection(_id))
1621 0 : return;
1622 :
1623 0 : LBLOG(LOG_RSP) << _children.size() << " nodes" << std::endl;
1624 0 : DatagramNode count = {COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
1625 0 : uint16_t(_children.size())};
1626 0 : _write->send(boost::asio::buffer(&count, sizeof(count)));
1627 : }
1628 :
1629 0 : void RSPConnection::_sendSimpleDatagram(const DatagramType type,
1630 : const uint16_t id)
1631 : {
1632 : DatagramNode simple = {uint16_t(type), CO_RSP_PROTOCOL_VERSION, id,
1633 0 : _sequence};
1634 0 : _write->send(boost::asio::buffer(&simple, sizeof(simple)));
1635 0 : }
1636 :
1637 0 : void RSPConnection::_sendAck(const uint16_t writerID, const uint16_t sequence)
1638 : {
1639 0 : LBASSERT(_id != writerID);
1640 : #ifdef CO_INSTRUMENT_RSP
1641 : ++nAcksSend;
1642 : #endif
1643 :
1644 0 : LBLOG(LOG_RSP) << "send ack " << sequence << std::endl;
1645 0 : DatagramAck ack = {ACK, _id, writerID, sequence};
1646 0 : _write->send(boost::asio::buffer(&ack, sizeof(ack)));
1647 0 : }
1648 :
1649 0 : void RSPConnection::_sendNack(const uint16_t writerID, const Nack* nacks,
1650 : const uint16_t count)
1651 : {
1652 0 : LBASSERT(count > 0);
1653 0 : LBASSERT(count <= CO_RSP_MAX_NACKS);
1654 : #ifdef CO_INSTRUMENT_RSP
1655 : ++nNAcksSend;
1656 : #endif
1657 : /* optimization: use the direct access to the reader. */
1658 0 : if (writerID == _id)
1659 : {
1660 0 : _addRepeat(nacks, count);
1661 0 : return;
1662 : }
1663 :
1664 : const size_t size =
1665 0 : sizeof(DatagramNack) - (CO_RSP_MAX_NACKS - count) * sizeof(Nack);
1666 :
1667 : // set the header
1668 : DatagramNack packet;
1669 0 : packet.set(_id, writerID, count);
1670 0 : memcpy(packet.nacks, nacks, count * sizeof(Nack));
1671 0 : _write->send(boost::asio::buffer(&packet, size));
1672 : }
1673 :
1674 0 : void RSPConnection::_sendAckRequest()
1675 : {
1676 : #ifdef CO_INSTRUMENT_RSP
1677 : ++nAckRequests;
1678 : #endif
1679 0 : LBLOG(LOG_RSP) << "send ack request for " << uint16_t(_sequence - 1)
1680 0 : << std::endl;
1681 0 : DatagramAckRequest ackRequest = {ACKREQ, _id, uint16_t(_sequence - 1)};
1682 0 : _write->send(boost::asio::buffer(&ackRequest, sizeof(DatagramAckRequest)));
1683 0 : }
1684 :
1685 0 : std::ostream& operator<<(std::ostream& os, const RSPConnection& connection)
1686 : {
1687 0 : os << lunchbox::disableFlush << lunchbox::disableHeader
1688 0 : << "RSPConnection id " << connection.getID() << " send rate "
1689 0 : << connection.getSendRate();
1690 :
1691 : #ifdef CO_INSTRUMENT_RSP
1692 : const int prec = os.precision();
1693 : os.precision(3);
1694 :
1695 : const float time = instrumentClock.getTimef();
1696 : const float mbps = 1048.576f * time;
1697 : os << ": " << lunchbox::indent << std::endl
1698 : << float(nBytesRead) / mbps << " / " << float(nBytesWritten) / mbps
1699 : << " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
1700 : << " repeats " << nMergedDatagrams << " merged" << std::endl;
1701 :
1702 : os.precision(prec);
1703 : os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
1704 : << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
1705 : << writeWaitTime << " ms" << std::endl
1706 : << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
1707 : << lunchbox::exdent;
1708 :
1709 : nReadData = 0;
1710 : nBytesRead = 0;
1711 : nBytesWritten = 0;
1712 : nDatagrams = 0;
1713 : nRepeated = 0;
1714 : nMergedDatagrams = 0;
1715 : nAckRequests = 0;
1716 : nAcksSend = 0;
1717 : nAcksRead = 0;
1718 : nAcksAccepted = 0;
1719 : nNAcksSend = 0;
1720 : nNAcksRead = 0;
1721 : writeWaitTime = 0.f;
1722 :
1723 : for (auto child : connection._children)
1724 : os << *child << std::endl;
1725 : #endif
1726 0 : os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
1727 :
1728 0 : return os;
1729 : }
1730 63 : }
|