Line data Source code
1 :
2 : /* Copyright (c) 2009-2016, Cedric Stalder <cedric.stalder@gmail.com>
3 : * Stefan Eilemann <eile@equalizergraphics.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "rspConnection.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "log.h"
28 :
29 : #include <lunchbox/rng.h>
30 : #include <lunchbox/scopedMutex.h>
31 : #include <lunchbox/sleep.h>
32 :
33 : #include <boost/bind.hpp>
34 :
35 : //#define CO_INSTRUMENT_RSP
36 : #define CO_RSP_MERGE_WRITES
37 : #define CO_RSP_MAX_TIMEOUTS 1000
38 : #ifdef _WIN32
39 : # define CO_RSP_DEFAULT_PORT (4242)
40 : #else
41 : # define CO_RSP_DEFAULT_PORT ( (getuid() % 64511) + 1024 )
42 : #endif
43 :
44 :
45 : // Note: Do not use version > 255, endianness detection magic relies on this.
46 : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
47 :
48 : namespace bp = boost::posix_time;
49 : namespace ip = boost::asio::ip;
50 :
51 : namespace co
52 : {
53 :
54 : namespace
55 : {
56 : #ifdef CO_INSTRUMENT_RSP
57 : lunchbox::a_int32_t nReadData;
58 : lunchbox::a_int32_t nBytesRead;
59 : lunchbox::a_int32_t nBytesWritten;
60 : lunchbox::a_int32_t nDatagrams;
61 : lunchbox::a_int32_t nRepeated;
62 : lunchbox::a_int32_t nMergedDatagrams;
63 : lunchbox::a_int32_t nAckRequests;
64 : lunchbox::a_int32_t nAcksSend;
65 : lunchbox::a_int32_t nAcksSendTotal;
66 : lunchbox::a_int32_t nAcksRead;
67 : lunchbox::a_int32_t nAcksAccepted;
68 : lunchbox::a_int32_t nNAcksSend;
69 : lunchbox::a_int32_t nNAcksRead;
70 : lunchbox::a_int32_t nNAcksResend;
71 :
72 : float writeWaitTime = 0.f;
73 : lunchbox::Clock instrumentClock;
74 : #endif
75 :
76 : static uint16_t _numBuffers = 0;
77 : }
78 :
79 0 : RSPConnection::RSPConnection()
80 : : _id( 0 )
81 : , _idAccepted( false )
82 0 : , _mtu( Global::getIAttribute( Global::IATTR_UDP_MTU ))
83 0 : , _ackFreq( Global::getIAttribute( Global::IATTR_RSP_ACK_FREQUENCY ))
84 0 : , _payloadSize( _mtu - sizeof( DatagramData ))
85 : , _timeouts( 0 )
86 0 : , _event( new EventConnection )
87 : , _read( 0 )
88 : , _write( 0 )
89 : , _timeout( _ioService )
90 : , _wakeup( _ioService )
91 0 : , _maxBucketSize( ( _mtu * _ackFreq) >> 1 )
92 : , _bucketSize( 0 )
93 : , _sendRate( 0 )
94 : , _thread( 0 )
95 0 : , _acked( std::numeric_limits< uint16_t >::max( ))
96 : , _threadBuffers( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS))
97 0 : , _recvBuffer( _mtu )
98 : , _readBuffer( 0 )
99 : , _readBufferPos( 0 )
100 : , _sequence( 0 )
101 : // ensure we have a handleConnectedTimeout before the write pop
102 0 : , _writeTimeOut( Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2 )
103 : {
104 0 : _buildNewID();
105 0 : ConnectionDescriptionPtr description = _getDescription();
106 0 : description->type = CONNECTIONTYPE_RSP;
107 0 : description->bandwidth = 102400;
108 :
109 0 : LBCHECK( _event->connect( ));
110 :
111 0 : _buffers.reserve( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ));
112 0 : while( static_cast< int32_t >( _buffers.size( )) <
113 0 : Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ))
114 : {
115 0 : _buffers.push_back( new Buffer( _mtu ));
116 : }
117 :
118 0 : LBASSERT( sizeof( DatagramNack ) <= size_t( _mtu ));
119 0 : LBLOG( LOG_RSP ) << "New RSP connection, " << _buffers.size()
120 0 : << " buffers of " << _mtu << " bytes" << std::endl;
121 :
122 0 : }
123 :
124 0 : RSPConnection::~RSPConnection()
125 : {
126 0 : _close();
127 0 : while( !_buffers.empty( ))
128 : {
129 0 : delete _buffers.back();
130 0 : _buffers.pop_back();
131 : }
132 : #ifdef CO_INSTRUMENT_RSP
133 : LBWARN << *this << std::endl;
134 : instrumentClock.reset();
135 : #endif
136 0 : }
137 :
138 0 : void RSPConnection::_close()
139 : {
140 0 : if( _parent.isValid() && _parent->_id == _id )
141 0 : _parent->close();
142 :
143 0 : while( !_parent && _isWriting( ))
144 0 : lunchbox::sleep( 10 /*ms*/ );
145 :
146 0 : if( isClosed( ))
147 0 : return;
148 :
149 0 : lunchbox::ScopedWrite mutex( _mutexEvent );
150 0 : if( _thread )
151 : {
152 0 : LBASSERT( !_thread->isCurrent( ));
153 0 : _sendSimpleDatagram( ID_EXIT, _id );
154 0 : _ioService.stop();
155 0 : _thread->join();
156 0 : delete _thread;
157 : }
158 :
159 0 : _setState( STATE_CLOSING );
160 0 : if( _thread )
161 : {
162 0 : _thread = 0;
163 :
164 : // notify children to close
165 0 : for( RSPConnectionsCIter i=_children.begin(); i !=_children.end(); ++i )
166 : {
167 0 : RSPConnectionPtr child = *i;
168 0 : lunchbox::ScopedWrite mutexChild( child->_mutexEvent );
169 0 : child->_appBuffers.push( 0 );
170 0 : child->_event->set();
171 : }
172 :
173 0 : _children.clear();
174 0 : _newChildren.clear();
175 : }
176 :
177 0 : _parent = 0;
178 :
179 0 : if( _read )
180 0 : _read->close();
181 0 : delete _read;
182 0 : _read = 0;
183 :
184 0 : if( _write )
185 0 : _write->close();
186 0 : delete _write;
187 0 : _write = 0;
188 :
189 0 : _threadBuffers.clear();
190 0 : _appBuffers.push( 0 ); // unlock any other read/write threads
191 :
192 0 : _setState( STATE_CLOSED );
193 :
194 0 : mutex.leave();
195 0 : _event->close();
196 : }
197 :
198 : //----------------------------------------------------------------------
199 : // Async IO handles
200 : //----------------------------------------------------------------------
201 0 : uint16_t RSPConnection::_buildNewID()
202 : {
203 0 : lunchbox::RNG rng;
204 0 : _id = rng.get< uint16_t >();
205 0 : return _id;
206 : }
207 :
208 0 : bool RSPConnection::listen()
209 : {
210 0 : ConnectionDescriptionPtr description = _getDescription();
211 0 : LBASSERT( description->type == CONNECTIONTYPE_RSP );
212 :
213 0 : if( !isClosed( ))
214 0 : return false;
215 :
216 0 : _setState( STATE_CONNECTING );
217 0 : _numBuffers = Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS );
218 :
219 : // init udp connection
220 0 : if( description->port == ConnectionDescription::RANDOM_MULTICAST_PORT )
221 0 : description->port = 0; // Let OS choose
222 0 : else if( description->port == 0 )
223 0 : description->port = CO_RSP_DEFAULT_PORT;
224 0 : if( description->getHostname().empty( ))
225 0 : description->setHostname( "239.255.42.43" );
226 0 : if( description->getInterface().empty( ))
227 0 : description->setInterface( "0.0.0.0" );
228 :
229 : try
230 : {
231 0 : const ip::address readAddress( ip::address::from_string( "0.0.0.0" ));
232 : const ip::udp::endpoint readEndpoint( readAddress,
233 0 : description->port );
234 0 : const std::string& port = std::to_string( unsigned( description->port));
235 0 : ip::udp::resolver resolver( _ioService );
236 0 : const ip::udp::resolver::query queryHN( ip::udp::v4(),
237 : description->getHostname(),
238 0 : port );
239 0 : const ip::udp::resolver::iterator hostname = resolver.resolve( queryHN);
240 :
241 0 : if( hostname == ip::udp::resolver::iterator( ))
242 0 : return false;
243 :
244 0 : const ip::udp::endpoint writeEndpoint = *hostname;
245 0 : const ip::address mcAddr( writeEndpoint.address( ));
246 :
247 0 : _read = new ip::udp::socket( _ioService );
248 0 : _write = new ip::udp::socket( _ioService );
249 0 : _read->open( readEndpoint.protocol( ));
250 0 : _write->open( writeEndpoint.protocol( ));
251 :
252 0 : _read->set_option( ip::udp::socket::reuse_address( true ));
253 0 : _write->set_option( ip::udp::socket::reuse_address( true ));
254 0 : _read->set_option( ip::udp::socket::receive_buffer_size(
255 0 : Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
256 0 : _write->set_option( ip::udp::socket::send_buffer_size(
257 0 : Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
258 :
259 0 : _read->bind( readEndpoint );
260 0 : description->port = _read->local_endpoint().port();
261 :
262 0 : const ip::udp::resolver::query queryIF( ip::udp::v4(),
263 : description->getInterface(),
264 0 : "0" );
265 : const ip::udp::resolver::iterator interfaceIP =
266 0 : resolver.resolve( queryIF );
267 :
268 0 : if( interfaceIP == ip::udp::resolver::iterator( ))
269 0 : return false;
270 :
271 0 : const ip::address ifAddr( ip::udp::endpoint( *interfaceIP ).address( ));
272 0 : LBDEBUG << "Joining " << mcAddr << " on " << ifAddr << std::endl;
273 :
274 0 : _read->set_option( ip::multicast::join_group( mcAddr.to_v4(),
275 0 : ifAddr.to_v4( )));
276 0 : _write->set_option( ip::multicast::outbound_interface( ifAddr.to_v4()));
277 : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
278 0 : const std::string& ifIP = ifAddr.to_string();
279 0 : ::setsockopt( _write->native(), SOL_SOCKET, SO_BINDTODEVICE,
280 0 : ifIP.c_str(), ifIP.size() + 1 );
281 0 : ::setsockopt( _read->native(), SOL_SOCKET, SO_BINDTODEVICE,
282 0 : ifIP.c_str(), ifIP.size() + 1 );
283 : #endif
284 :
285 0 : _write->connect( writeEndpoint );
286 :
287 0 : _read->set_option( ip::multicast::enable_loopback( false ));
288 0 : _write->set_option( ip::multicast::enable_loopback( false ));
289 : }
290 0 : catch( const boost::system::system_error& e )
291 : {
292 0 : LBWARN << "can't setup underlying UDP connection: " << e.what()
293 0 : << std::endl;
294 0 : delete _read;
295 0 : delete _write;
296 0 : _read = 0;
297 0 : _write = 0;
298 0 : return false;
299 : }
300 :
301 : // init communication protocol thread
302 0 : _thread = new Thread( this );
303 0 : _bucketSize = 0;
304 0 : _sendRate = description->bandwidth;
305 :
306 : // waits until RSP protocol establishes connection to the multicast network
307 0 : if( !_thread->start( ) )
308 : {
309 0 : close();
310 0 : return false;
311 : }
312 :
313 : // Make all buffers available for writing
314 0 : LBASSERT( _appBuffers.isEmpty( ));
315 0 : _appBuffers.push( _buffers );
316 :
317 0 : LBDEBUG << "Listening on " << description->getHostname() << ":"
318 0 : << description->port << " (" << description->toString() << " @"
319 0 : << (void*)this << ")" << std::endl;
320 0 : return true;
321 : }
322 :
323 0 : ConnectionPtr RSPConnection::acceptSync()
324 : {
325 0 : if( !isListening( ))
326 0 : return 0;
327 :
328 0 : lunchbox::ScopedWrite mutex( _mutexConnection );
329 0 : LBASSERT( !_newChildren.empty( ));
330 0 : if( _newChildren.empty( ))
331 0 : return 0;
332 :
333 0 : RSPConnectionPtr newConnection = _newChildren.back();
334 0 : _newChildren.pop_back();
335 :
336 0 : LBDEBUG << _id << " accepted RSP connection " << newConnection->_id
337 0 : << std::endl;
338 :
339 0 : lunchbox::ScopedWrite mutex2( _mutexEvent );
340 0 : if( _newChildren.empty() )
341 0 : _event->reset();
342 : else
343 0 : _event->set();
344 :
345 0 : return newConnection;
346 : }
347 :
348 0 : int64_t RSPConnection::readSync( void* buffer, const uint64_t bytes, const bool)
349 : {
350 0 : LBASSERT( bytes > 0 );
351 0 : if( !isConnected ( ))
352 0 : return -1;
353 :
354 0 : uint64_t bytesLeft = bytes;
355 0 : uint8_t* ptr = reinterpret_cast< uint8_t* >( buffer );
356 :
357 : // redundant (done by the caller already), but saves some lock ops
358 0 : while( bytesLeft )
359 : {
360 0 : if( !_readBuffer )
361 : {
362 0 : LBASSERT( _readBufferPos == 0 );
363 0 : _readBuffer = _appBuffers.pop();
364 0 : if( !_readBuffer )
365 : {
366 0 : close();
367 0 : return (bytes == bytesLeft) ?
368 0 : -1 : static_cast< int64_t >( bytes - bytesLeft );
369 : }
370 : }
371 :
372 : const DatagramData* header = reinterpret_cast< const DatagramData* >(
373 0 : _readBuffer->getData( ));
374 0 : const uint8_t* payload = reinterpret_cast< const uint8_t* >( header+1 );
375 0 : const size_t dataLeft = header->size - _readBufferPos;
376 0 : const size_t size = LB_MIN( static_cast< size_t >( bytesLeft ),
377 : dataLeft );
378 :
379 0 : memcpy( ptr, payload + _readBufferPos, size );
380 0 : _readBufferPos += size;
381 0 : ptr += size;
382 0 : bytesLeft -= size;
383 :
384 : // if all data in the buffer has been taken
385 0 : if( _readBufferPos >= header->size )
386 : {
387 0 : LBASSERT( _readBufferPos == header->size );
388 : //LBLOG( LOG_RSP ) << "reset read buffer " << header->sequence
389 : // << std::endl;
390 :
391 0 : LBCHECK( _threadBuffers.push( _readBuffer ));
392 0 : _readBuffer = 0;
393 0 : _readBufferPos = 0;
394 : }
395 : else
396 : {
397 0 : LBASSERT( _readBufferPos < header->size );
398 : }
399 : }
400 :
401 0 : if( _readBuffer || !_appBuffers.isEmpty( ))
402 0 : _event->set();
403 : else
404 : {
405 0 : lunchbox::ScopedWrite mutex( _mutexEvent );
406 0 : if( _appBuffers.isEmpty( ))
407 0 : _event->reset();
408 : }
409 :
410 : #ifdef CO_INSTRUMENT_RSP
411 : nBytesRead += bytes;
412 : #endif
413 0 : return bytes;
414 : }
415 :
416 0 : void RSPConnection::Thread::run()
417 : {
418 0 : _connection->_runThread();
419 0 : _connection = 0;
420 0 : LBDEBUG << "Left RSP protocol thread" << std::endl;
421 0 : }
422 :
423 0 : void RSPConnection::_handleTimeout( const boost::system::error_code& error )
424 : {
425 0 : if( error == boost::asio::error::operation_aborted )
426 0 : return;
427 :
428 0 : if( isListening( ))
429 0 : _handleConnectedTimeout();
430 0 : else if( _idAccepted )
431 0 : _handleInitTimeout();
432 : else
433 0 : _handleAcceptIDTimeout();
434 : }
435 :
436 0 : void RSPConnection::_handleAcceptIDTimeout()
437 : {
438 0 : ++_timeouts;
439 0 : if( _timeouts < 20 )
440 : {
441 0 : LBLOG( LOG_RSP ) << "Announce " << _id << " " << _timeouts << std::endl;
442 0 : _sendSimpleDatagram( ID_HELLO, _id );
443 : }
444 : else
445 : {
446 0 : LBLOG( LOG_RSP ) << "Confirm " << _id << std::endl;
447 0 : _sendSimpleDatagram( ID_CONFIRM, _id );
448 0 : _addConnection( _id, _sequence );
449 0 : _idAccepted = true;
450 0 : _timeouts = 0;
451 : // send a first datagram to announce me and discover all other
452 : // connections
453 0 : _sendCountNode();
454 : }
455 0 : _setTimeout( 10 );
456 0 : }
457 :
458 0 : void RSPConnection::_handleInitTimeout( )
459 : {
460 0 : LBASSERT( !isListening( ))
461 0 : ++_timeouts;
462 0 : if( _timeouts < 20 )
463 0 : _sendCountNode();
464 : else
465 : {
466 0 : _setState( STATE_LISTENING );
467 0 : LBDEBUG << "RSP connection " << _id << " listening" << std::endl;
468 0 : _timeouts = 0;
469 0 : _ioService.stop(); // thread initialized, run restarts
470 : }
471 0 : _setTimeout( 10 );
472 0 : }
473 :
474 0 : void RSPConnection::_clearWriteQueues()
475 : {
476 0 : while( !_threadBuffers.isEmpty( ))
477 : {
478 0 : Buffer* buffer = 0;
479 0 : _threadBuffers.pop( buffer );
480 0 : _writeBuffers.push_back( buffer );
481 : }
482 :
483 0 : _finishWriteQueue( _sequence - 1 );
484 0 : LBASSERT( _threadBuffers.isEmpty() && _writeBuffers.empty() );
485 0 : }
486 :
487 0 : void RSPConnection::_handleConnectedTimeout()
488 : {
489 0 : if( !isListening( ))
490 : {
491 0 : _clearWriteQueues();
492 0 : _ioService.stop();
493 0 : return;
494 : }
495 :
496 0 : _processOutgoing();
497 :
498 0 : if( _timeouts >= CO_RSP_MAX_TIMEOUTS )
499 : {
500 0 : LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
501 0 : bool all = true;
502 0 : for( RSPConnectionsCIter i =_children.begin(); i !=_children.end(); ++i)
503 : {
504 0 : RSPConnectionPtr child = *i;
505 0 : if ( child->_acked >= _sequence - _numBuffers && child->_id != _id )
506 : {
507 0 : all = false;
508 0 : break;
509 : }
510 : }
511 :
512 : // if all connections failed we probably got disconnected -> close and
513 : // exit else close all failed child connections
514 0 : if ( all )
515 : {
516 0 : _sendSimpleDatagram( ID_EXIT, _id );
517 0 : _appBuffers.pushFront( 0 ); // unlock write function
518 :
519 0 : for( RSPConnectionsCIter i =_children.begin();
520 0 : i !=_children.end(); ++i)
521 : {
522 0 : RSPConnectionPtr child = *i;
523 0 : child->_setState( STATE_CLOSING );
524 0 : child->_appBuffers.push( 0 ); // unlock read func
525 : }
526 :
527 0 : _clearWriteQueues();
528 0 : _ioService.stop();
529 0 : return;
530 : }
531 :
532 0 : RSPConnectionsCIter i =_children.begin();
533 0 : while ( i !=_children.end() )
534 : {
535 0 : RSPConnectionPtr child = *i;
536 0 : if ( child->_acked < _sequence - 1 && _id != child->_id )
537 : {
538 0 : _sendSimpleDatagram( ID_EXIT, child->_id );
539 0 : _removeConnection( child->_id );
540 : }
541 : else
542 : {
543 0 : uint16_t wb = static_cast<uint16_t>( _writeBuffers.size( ));
544 0 : child->_acked = _sequence - wb;
545 0 : ++i;
546 : }
547 : }
548 :
549 0 : _timeouts = 0;
550 : }
551 : }
552 :
553 : RSPConnection::DatagramNode*
554 0 : RSPConnection::_getDatagramNode( const size_t bytes )
555 : {
556 0 : if( bytes < sizeof( DatagramNode ))
557 : {
558 0 : LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
559 0 : << sizeof( DatagramNode ) << " bytes" << std::endl;
560 : //close();
561 0 : return 0;
562 : }
563 : DatagramNode& node =
564 0 : *reinterpret_cast< DatagramNode* >( _recvBuffer.getData( ));
565 0 : node.byteswap();
566 0 : if( node.protocolVersion != CO_RSP_PROTOCOL_VERSION )
567 : {
568 0 : LBERROR << "Protocol version mismatch, got " << node.protocolVersion
569 0 : << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
570 : //close();
571 0 : return 0;
572 : }
573 0 : return &node;
574 : }
575 :
576 0 : bool RSPConnection::_initThread()
577 : {
578 0 : LBLOG( LOG_RSP ) << "Started RSP protocol thread" << std::endl;
579 0 : _timeouts = 0;
580 :
581 : // send a first datagram to announce me and discover other connections
582 0 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
583 0 : _sendSimpleDatagram( ID_HELLO, _id );
584 0 : _setTimeout( 10 );
585 0 : _asyncReceiveFrom();
586 0 : _ioService.run();
587 0 : return isListening();
588 : }
589 :
590 0 : void RSPConnection::_runThread()
591 : {
592 : //__debugbreak();
593 0 : _ioService.reset();
594 0 : _ioService.run();
595 0 : }
596 :
597 0 : void RSPConnection::_setTimeout( const int32_t timeOut )
598 : {
599 0 : LBASSERT( timeOut >= 0 );
600 0 : _timeout.expires_from_now( boost::posix_time::milliseconds( timeOut ));
601 0 : _timeout.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
602 0 : boost::asio::placeholders::error ));
603 0 : }
604 :
605 0 : void RSPConnection::_postWakeup()
606 : {
607 0 : _wakeup.expires_from_now( boost::posix_time::milliseconds( 0 ));
608 0 : _wakeup.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
609 0 : boost::asio::placeholders::error ));
610 0 : }
611 :
612 0 : void RSPConnection::_processOutgoing()
613 : {
614 : #ifdef CO_INSTRUMENT_RSP
615 : if( instrumentClock.getTime64() > 1000 )
616 : {
617 : LBWARN << *this << std::endl;
618 : instrumentClock.reset();
619 : }
620 : #endif
621 :
622 0 : if( !_repeatQueue.empty( ))
623 0 : _repeatData();
624 : else
625 0 : _writeData();
626 :
627 0 : if( !_threadBuffers.isEmpty() || !_repeatQueue.empty( ))
628 : {
629 0 : _setTimeout( 0 ); // call again to send remaining
630 0 : return;
631 : }
632 : // no more data to write, check/send ack request, reset timeout
633 :
634 0 : if( _writeBuffers.empty( )) // got all acks
635 : {
636 0 : _timeouts = 0;
637 0 : _timeout.cancel();
638 0 : return;
639 : }
640 :
641 : const int64_t timeout =
642 0 : Global::getIAttribute( Global::IATTR_RSP_ACK_TIMEOUT );
643 0 : const int64_t left = timeout - _clock.getTime64();
644 :
645 0 : if( left > 0 )
646 : {
647 0 : _setTimeout( left );
648 0 : return;
649 : }
650 :
651 : // (repeat) ack request
652 0 : _clock.reset();
653 0 : ++_timeouts;
654 0 : if ( _timeouts < CO_RSP_MAX_TIMEOUTS )
655 0 : _sendAckRequest();
656 0 : _setTimeout( timeout );
657 : }
658 :
659 0 : void RSPConnection::_writeData()
660 : {
661 0 : Buffer* buffer = 0;
662 0 : if( !_threadBuffers.pop( buffer )) // nothing to write
663 0 : return;
664 :
665 0 : _timeouts = 0;
666 0 : LBASSERT( buffer );
667 :
668 : // write buffer
669 0 : DatagramData* header = reinterpret_cast<DatagramData*>( buffer->getData( ));
670 0 : header->sequence = _sequence++;
671 :
672 : #ifdef CO_RSP_MERGE_WRITES
673 0 : if( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
674 : {
675 0 : std::vector< Buffer* > appBuffers;
676 0 : while( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
677 : {
678 0 : Buffer* buffer2 = 0;
679 0 : LBCHECK( _threadBuffers.getFront( buffer2 ));
680 0 : LBASSERT( buffer2 );
681 : DatagramData* header2 =
682 0 : reinterpret_cast<DatagramData*>( buffer2->getData( ));
683 :
684 0 : if( uint32_t( header->size + header2->size ) > _payloadSize )
685 0 : break;
686 :
687 0 : memcpy( reinterpret_cast<uint8_t*>( header + 1 ) + header->size,
688 0 : header2 + 1, header2->size );
689 0 : header->size += header2->size;
690 0 : LBCHECK( _threadBuffers.pop( buffer2 ));
691 0 : appBuffers.push_back( buffer2 );
692 : #ifdef CO_INSTRUMENT_RSP
693 : ++nMergedDatagrams;
694 : #endif
695 : }
696 :
697 0 : if( !appBuffers.empty( ))
698 0 : _appBuffers.push( appBuffers );
699 : }
700 : #endif
701 :
702 : // send data
703 : // Note 1: We could optimize the send away if we're all alone, but this is
704 : // not a use case for RSP, so we don't care.
705 : // Note 2: Data to myself will be 'written' in _finishWriteQueue once we
706 : // got all acks for the packet
707 0 : const uint32_t size = header->size + sizeof( DatagramData );
708 :
709 0 : _waitWritable( size ); // OPT: process incoming in between
710 0 : header->byteswap();
711 0 : _write->send( boost::asio::buffer( header, size ));
712 :
713 : #ifdef CO_INSTRUMENT_RSP
714 : ++nDatagrams;
715 : nBytesWritten += header->size;
716 : #endif
717 :
718 : // save datagram for repeats (and self)
719 0 : _writeBuffers.push_back( buffer );
720 :
721 0 : if( _children.size() == 1 ) // We're all alone
722 : {
723 0 : LBASSERT( _children.front()->_id == _id );
724 0 : _finishWriteQueue( _sequence - 1 );
725 : }
726 : }
727 :
728 0 : void RSPConnection::_waitWritable( const uint64_t bytes )
729 : {
730 : #ifdef CO_INSTRUMENT_RSP
731 : lunchbox::Clock clock;
732 : #endif
733 :
734 0 : _bucketSize += static_cast< uint64_t >( _clock.resetTimef() * _sendRate );
735 : // opt omit: * 1024 / 1000;
736 0 : _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
737 :
738 0 : const uint64_t size = LB_MIN( bytes, static_cast< uint64_t >( _mtu ));
739 0 : while( _bucketSize < size )
740 : {
741 0 : lunchbox::Thread::yield();
742 0 : float time = _clock.resetTimef();
743 :
744 0 : while( time == 0.f )
745 : {
746 0 : lunchbox::Thread::yield();
747 0 : time = _clock.resetTimef();
748 : }
749 :
750 0 : _bucketSize += static_cast< int64_t >( time * _sendRate );
751 0 : _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
752 : }
753 0 : _bucketSize -= size;
754 :
755 : #ifdef CO_INSTRUMENT_RSP
756 : writeWaitTime += clock.getTimef();
757 : #endif
758 :
759 0 : ConstConnectionDescriptionPtr description = getDescription();
760 0 : if( _sendRate < description->bandwidth )
761 : {
762 0 : _sendRate += int64_t(
763 0 : float( Global::getIAttribute( Global::IATTR_RSP_ERROR_UPSCALE )) *
764 0 : float( description->bandwidth ) * .001f );
765 0 : LBLOG( LOG_RSP ) << "speeding up to " << _sendRate << " KB/s"
766 0 : << std::endl;
767 : }
768 0 : }
769 :
770 0 : void RSPConnection::_repeatData()
771 : {
772 0 : _timeouts = 0;
773 :
774 0 : while( !_repeatQueue.empty( ))
775 : {
776 0 : Nack& request = _repeatQueue.front();
777 0 : const uint16_t distance = _sequence - request.start;
778 :
779 0 : if ( distance == 0 )
780 : {
781 0 : LBWARN << "ignoring invalid nack (" << request.start
782 0 : << ".." << request.end << ")" << std::endl;
783 0 : _repeatQueue.pop_front();
784 0 : continue;
785 : }
786 :
787 0 : if( distance <= _writeBuffers.size( )) // not already acked
788 : {
789 : // LBLOG( LOG_RSP ) << "Repeat " << request.start << ", " << _sendRate
790 : // << "KB/s"<< std::endl;
791 :
792 0 : const size_t i = _writeBuffers.size() - distance;
793 0 : Buffer* buffer = _writeBuffers[i];
794 0 : LBASSERT( buffer );
795 :
796 : DatagramData* header =
797 0 : reinterpret_cast<DatagramData*>( buffer->getData( ));
798 0 : const uint32_t size = header->size + sizeof( DatagramData );
799 0 : LBASSERT( header->sequence == request.start );
800 :
801 : // send data
802 0 : _waitWritable( size ); // OPT: process incoming in between
803 : // already done by _writeData: header->byteswap();
804 0 : _write->send( boost::asio::buffer( header, size ) );
805 : #ifdef CO_INSTRUMENT_RSP
806 : ++nRepeated;
807 : #endif
808 : }
809 :
810 0 : if( request.start == request.end )
811 0 : _repeatQueue.pop_front(); // done with request
812 : else
813 0 : ++request.start;
814 :
815 0 : if( distance <= _writeBuffers.size( )) // send something
816 0 : return;
817 : }
818 : }
819 :
820 0 : void RSPConnection::_finishWriteQueue( const uint16_t sequence )
821 : {
822 0 : LBASSERT( !_writeBuffers.empty( ));
823 :
824 0 : RSPConnectionPtr connection = _findConnection( _id );
825 0 : LBASSERT( connection.isValid( ));
826 0 : LBASSERT( connection->_recvBuffers.empty( ));
827 :
828 : // Bundle pushing the buffers to the app to avoid excessive lock ops
829 0 : Buffers readBuffers;
830 0 : Buffers freeBuffers;
831 :
832 0 : const uint16_t size = _sequence - sequence - 1;
833 0 : LBASSERTINFO( size <= uint16_t( _writeBuffers.size( )),
834 : size << " > " << _writeBuffers.size( ));
835 0 : LBLOG( LOG_RSP ) << "Got all remote acks for " << sequence << " current "
836 0 : << _sequence << " advance " << _writeBuffers.size() - size
837 0 : << " buffers" << std::endl;
838 :
839 0 : while( _writeBuffers.size() > size_t( size ))
840 : {
841 0 : Buffer* buffer = _writeBuffers.front();
842 0 : _writeBuffers.pop_front();
843 :
844 : #ifndef NDEBUG
845 : DatagramData* datagram =
846 0 : reinterpret_cast< DatagramData* >( buffer->getData( ));
847 0 : datagram->byteswap();
848 0 : LBASSERT( datagram->writerID == _id );
849 0 : LBASSERTINFO( datagram->sequence ==
850 : uint16_t( connection->_sequence + readBuffers.size( )),
851 : datagram->sequence << ", " << connection->_sequence <<
852 : ", " << readBuffers.size( ));
853 : //LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
854 : #endif
855 :
856 0 : Buffer* newBuffer = connection->_newDataBuffer( *buffer );
857 0 : if( !newBuffer && !readBuffers.empty( )) // push prepared app buffers
858 : {
859 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
860 0 : LBLOG( LOG_RSP ) << "post " << readBuffers.size()
861 0 : << " buffers starting with sequence "
862 0 : << connection->_sequence << std::endl;
863 :
864 0 : connection->_appBuffers.push( readBuffers );
865 0 : connection->_sequence += uint16_t( readBuffers.size( ));
866 0 : readBuffers.clear();
867 0 : connection->_event->set();
868 : }
869 :
870 0 : while( !newBuffer ) // no more data buffers, wait for app to drain
871 : {
872 0 : newBuffer = connection->_newDataBuffer( *buffer );
873 0 : lunchbox::Thread::yield();
874 : }
875 :
876 0 : freeBuffers.push_back( buffer );
877 0 : readBuffers.push_back( newBuffer );
878 : }
879 :
880 0 : _appBuffers.push( freeBuffers );
881 0 : if( !readBuffers.empty( ))
882 : {
883 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
884 : #if 0
885 : LBLOG( LOG_RSP )
886 : << "post " << readBuffers.size() << " buffers starting at "
887 : << connection->_sequence << std::endl;
888 : #endif
889 :
890 0 : connection->_appBuffers.push( readBuffers );
891 0 : connection->_sequence += uint16_t( readBuffers.size( ));
892 0 : connection->_event->set();
893 : }
894 :
895 0 : connection->_acked = uint16_t( connection->_sequence - 1 );
896 0 : LBASSERT( connection->_acked == sequence );
897 :
898 0 : _timeouts = 0;
899 0 : }
900 :
901 0 : void RSPConnection::_handlePacket( const boost::system::error_code& /* error */,
902 : const size_t bytes )
903 : {
904 0 : if( isListening( ))
905 : {
906 0 : _handleConnectedData( bytes );
907 :
908 0 : if( isListening( ))
909 0 : _processOutgoing();
910 : else
911 : {
912 0 : _ioService.stop();
913 0 : return;
914 : }
915 : }
916 0 : else if( bytes >= sizeof( DatagramNode ))
917 : {
918 0 : if( _idAccepted )
919 0 : _handleInitData( bytes, false );
920 : else
921 0 : _handleAcceptIDData( bytes );
922 : }
923 :
924 : //LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
925 0 : _asyncReceiveFrom();
926 : }
927 :
928 0 : void RSPConnection::_handleAcceptIDData( const size_t bytes )
929 : {
930 0 : DatagramNode* pNode = _getDatagramNode( bytes );
931 0 : if( !pNode )
932 0 : return;
933 :
934 0 : DatagramNode& node = *pNode;
935 :
936 0 : switch( node.type )
937 : {
938 : case ID_HELLO:
939 0 : _checkNewID( node.connectionID );
940 0 : break;
941 :
942 : case ID_HELLO_REPLY:
943 0 : _addConnection( node.connectionID, node.data );
944 0 : break;
945 :
946 : case ID_DENY:
947 : // a connection refused my ID, try another ID
948 0 : if( node.connectionID == _id )
949 : {
950 0 : _timeouts = 0;
951 0 : _sendSimpleDatagram( ID_HELLO, _buildNewID( ));
952 0 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
953 : }
954 0 : break;
955 :
956 : case ID_EXIT:
957 0 : _removeConnection( node.connectionID );
958 0 : break;
959 :
960 : default:
961 0 : LBERROR << "Got unexpected datagram type " << node.type <<std::endl;
962 0 : LBUNIMPLEMENTED;
963 0 : break;
964 : }
965 : }
966 :
967 0 : void RSPConnection::_handleInitData( const size_t bytes, const bool connected )
968 : {
969 0 : DatagramNode* pNode = _getDatagramNode( bytes );
970 0 : if( !pNode )
971 0 : return;
972 :
973 0 : DatagramNode& node = *pNode;
974 :
975 0 : switch( node.type )
976 : {
977 : case ID_HELLO:
978 0 : if( !connected )
979 0 : _timeouts = 0;
980 0 : _checkNewID( node.connectionID ) ;
981 0 : return;
982 :
983 : case ID_CONFIRM:
984 0 : if( !connected )
985 0 : _timeouts = 0;
986 0 : _addConnection( node.connectionID, node.data );
987 0 : return;
988 :
989 : case COUNTNODE:
990 0 : LBLOG( LOG_RSP ) << "Got " << node.data << " nodes from "
991 0 : << node.connectionID << std::endl;
992 0 : return;
993 :
994 : case ID_HELLO_REPLY:
995 0 : _addConnection( node.connectionID, node.data );
996 0 : return;
997 :
998 : case ID_EXIT:
999 0 : _removeConnection( node.connectionID );
1000 0 : return;
1001 :
1002 : default:
1003 0 : LBERROR << "Got unexpected datagram type " << node.type <<std::endl;
1004 0 : LBUNIMPLEMENTED;
1005 0 : break;
1006 : }
1007 : }
1008 :
1009 0 : void RSPConnection::_handleConnectedData( const size_t bytes )
1010 : {
1011 0 : if( bytes < sizeof( uint16_t ))
1012 0 : return;
1013 :
1014 0 : void* data = _recvBuffer.getData();
1015 0 : uint16_t type = *reinterpret_cast< uint16_t* >( data );
1016 : #ifdef COLLAGE_BIGENDIAN
1017 : lunchbox::byteswap( type );
1018 : #endif
1019 0 : switch( type )
1020 : {
1021 : case DATA:
1022 0 : LBCHECK( _handleData( bytes ));
1023 0 : break;
1024 :
1025 : case ACK:
1026 0 : LBCHECK( _handleAck( bytes ));
1027 0 : break;
1028 :
1029 : case NACK:
1030 0 : LBCHECK( _handleNack( ));
1031 0 : break;
1032 :
1033 : case ACKREQ: // The writer asks for an ack/nack
1034 0 : LBCHECK( _handleAckRequest( bytes ));
1035 0 : break;
1036 :
1037 : case ID_HELLO:
1038 : case ID_HELLO_REPLY:
1039 : case ID_CONFIRM:
1040 : case ID_EXIT:
1041 : case ID_DENY:
1042 : case COUNTNODE:
1043 0 : _handleInitData( bytes, true );
1044 0 : break;
1045 :
1046 : default:
1047 0 : LBASSERTINFO( false,
1048 : "Don't know how to handle packet of type " << type );
1049 : }
1050 :
1051 : }
1052 :
1053 0 : void RSPConnection::_asyncReceiveFrom()
1054 : {
1055 0 : _read->async_receive_from(
1056 0 : boost::asio::buffer( _recvBuffer.getData(), _mtu ), _readAddr,
1057 0 : boost::bind( &RSPConnection::_handlePacket, this,
1058 : boost::asio::placeholders::error,
1059 0 : boost::asio::placeholders::bytes_transferred ));
1060 0 : }
1061 :
1062 0 : bool RSPConnection::_handleData( const size_t bytes )
1063 : {
1064 0 : if( bytes < sizeof( DatagramData ))
1065 0 : return false;
1066 : DatagramData& datagram =
1067 0 : *reinterpret_cast< DatagramData* >( _recvBuffer.getData( ));
1068 0 : datagram.byteswap();
1069 :
1070 : #ifdef CO_INSTRUMENT_RSP
1071 : ++nReadData;
1072 : #endif
1073 0 : const uint16_t writerID = datagram.writerID;
1074 : #ifdef Darwin
1075 : // There is occasionally a packet from ourselves, even though multicast loop
1076 : // is not set?!
1077 : if( writerID == _id )
1078 : return true;
1079 : #else
1080 0 : LBASSERT( writerID != _id );
1081 : #endif
1082 :
1083 0 : RSPConnectionPtr connection = _findConnection( writerID );
1084 :
1085 0 : if( !connection ) // unknown connection ?
1086 : {
1087 0 : LBASSERTINFO( false, "Can't find connection with id " << writerID );
1088 0 : return false;
1089 : }
1090 0 : LBASSERT( connection->_id == writerID );
1091 :
1092 0 : const uint16_t sequence = datagram.sequence;
1093 : // LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID <<std::endl;
1094 :
1095 0 : if( connection->_sequence == sequence ) // in-order packet
1096 : {
1097 0 : Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
1098 0 : if( !newBuffer ) // no more data buffers, drop packet
1099 0 : return true;
1100 :
1101 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
1102 0 : connection->_pushDataBuffer( newBuffer );
1103 :
1104 0 : while( !connection->_recvBuffers.empty( )) // enqueue ready pending data
1105 : {
1106 0 : newBuffer = connection->_recvBuffers.front();
1107 0 : if( !newBuffer )
1108 0 : break;
1109 :
1110 0 : connection->_recvBuffers.pop_front();
1111 0 : connection->_pushDataBuffer( newBuffer );
1112 : }
1113 :
1114 0 : if( !connection->_recvBuffers.empty() &&
1115 0 : !connection->_recvBuffers.front( )) // update for new _sequence
1116 : {
1117 0 : connection->_recvBuffers.pop_front();
1118 : }
1119 :
1120 0 : connection->_event->set();
1121 0 : return true;
1122 : }
1123 :
1124 0 : const uint16_t max = std::numeric_limits< uint16_t >::max();
1125 0 : if(( connection->_sequence > sequence &&
1126 0 : max - connection->_sequence + sequence > _numBuffers ) ||
1127 0 : ( connection->_sequence < sequence &&
1128 0 : sequence - connection->_sequence > _numBuffers ))
1129 : {
1130 : // ignore it if it's a repetition for another reader
1131 0 : return true;
1132 : }
1133 :
1134 : // else out of order
1135 :
1136 0 : const uint16_t size = sequence - connection->_sequence;
1137 0 : LBASSERT( size != 0 );
1138 0 : LBASSERTINFO( size <= _numBuffers, size << " > " << _numBuffers );
1139 :
1140 0 : ssize_t i = ssize_t( size ) - 1;
1141 0 : const bool gotPacket = ( connection->_recvBuffers.size() >= size &&
1142 0 : connection->_recvBuffers[ i ] );
1143 0 : if( gotPacket )
1144 0 : return true;
1145 :
1146 0 : Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
1147 0 : if( !newBuffer ) // no more data buffers, drop packet
1148 0 : return true;
1149 :
1150 0 : if( connection->_recvBuffers.size() < size )
1151 0 : connection->_recvBuffers.resize( size, 0 );
1152 :
1153 0 : LBASSERT( !connection->_recvBuffers[ i ] );
1154 0 : connection->_recvBuffers[ i ] = newBuffer;
1155 :
1156 : // early nack: request missing packets before current
1157 0 : --i;
1158 0 : Nack nack = { connection->_sequence, uint16_t( sequence - 1 ) };
1159 0 : if( i > 0 )
1160 : {
1161 0 : if( connection->_recvBuffers[i] ) // got previous packet
1162 0 : return true;
1163 :
1164 0 : while( i >= 0 && !connection->_recvBuffers[i] )
1165 0 : --i;
1166 :
1167 0 : const Buffer* lastBuffer = i>=0 ? connection->_recvBuffers[i] : 0;
1168 0 : if( lastBuffer )
1169 : {
1170 0 : nack.start = connection->_sequence + i;
1171 : }
1172 : }
1173 :
1174 0 : LBLOG( LOG_RSP ) << "send early nack " << nack.start << ".." << nack.end
1175 0 : << " current " << connection->_sequence << " ooo "
1176 0 : << connection->_recvBuffers.size() << std::endl;
1177 :
1178 0 : if( nack.end < nack.start )
1179 : // OPT: don't drop nack 0..nack.end, but it doesn't happen often
1180 0 : nack.end = std::numeric_limits< uint16_t >::max();
1181 :
1182 0 : _sendNack( writerID, &nack, 1 );
1183 0 : return true;
1184 : }
1185 :
1186 0 : RSPConnection::Buffer* RSPConnection::_newDataBuffer( Buffer& inBuffer )
1187 : {
1188 0 : LBASSERT( static_cast< int32_t >( inBuffer.getMaxSize( )) == _mtu );
1189 :
1190 0 : Buffer* buffer = 0;
1191 0 : if( _threadBuffers.pop( buffer ))
1192 : {
1193 0 : buffer->swap( inBuffer );
1194 0 : return buffer;
1195 : }
1196 :
1197 : // we do not have a free buffer, which means that the receiver is slower
1198 : // then our read thread. This is bad, because now we'll drop the data and
1199 : // will send a NAck packet upon the ack request, causing retransmission even
1200 : // though we'll probably drop it again
1201 0 : LBLOG( LOG_RSP ) << "Reader too slow, dropping data" << std::endl;
1202 :
1203 : // Set the event if there is data to read. This shouldn't be needed since
1204 : // the event should be set in this case, but it'll increase the robustness
1205 0 : lunchbox::ScopedWrite mutex( _mutexEvent );
1206 0 : if( !_appBuffers.isEmpty( ))
1207 0 : _event->set();
1208 0 : return 0;
1209 : }
1210 :
1211 0 : void RSPConnection::_pushDataBuffer( Buffer* buffer )
1212 : {
1213 0 : LBASSERT( _parent );
1214 : #ifndef NDEBUG
1215 0 : DatagramData* dgram = reinterpret_cast< DatagramData* >(buffer->getData( ));
1216 0 : LBASSERTINFO( dgram->sequence == _sequence,
1217 : dgram->sequence << " != " << _sequence );
1218 : #endif
1219 :
1220 0 : if( (( _sequence + _parent->_id ) % _ackFreq ) == 0 )
1221 0 : _parent->_sendAck( _id, _sequence );
1222 :
1223 0 : LBLOG( LOG_RSP ) << "post buffer " << _sequence << std::endl;
1224 0 : ++_sequence;
1225 0 : _appBuffers.push( buffer );
1226 0 : }
1227 :
1228 0 : bool RSPConnection::_handleAck( const size_t bytes )
1229 : {
1230 0 : if( bytes < sizeof( DatagramAck ))
1231 0 : return false;
1232 : DatagramAck& ack =
1233 0 : *reinterpret_cast< DatagramAck* >( _recvBuffer.getData( ));
1234 0 : ack.byteswap();
1235 :
1236 : #ifdef CO_INSTRUMENT_RSP
1237 : ++nAcksRead;
1238 : #endif
1239 :
1240 0 : if( ack.writerID != _id )
1241 0 : return true;
1242 :
1243 0 : LBLOG( LOG_RSP ) << "got ack from " << ack.readerID << " for "
1244 0 : << ack.writerID << " sequence " << ack.sequence
1245 0 : << " current " << _sequence << std::endl;
1246 :
1247 : // find destination connection, update ack data if needed
1248 0 : RSPConnectionPtr connection = _findConnection( ack.readerID );
1249 0 : if( !connection )
1250 : {
1251 0 : LBUNREACHABLE;
1252 0 : return false;
1253 : }
1254 :
1255 0 : if( connection->_acked >= ack.sequence &&
1256 0 : connection->_acked - ack.sequence <= _numBuffers )
1257 : {
1258 : // I have received a later ack previously from the reader
1259 0 : LBLOG( LOG_RSP ) << "Late ack" << std::endl;
1260 0 : return true;
1261 : }
1262 :
1263 : #ifdef CO_INSTRUMENT_RSP
1264 : ++nAcksAccepted;
1265 : #endif
1266 0 : connection->_acked = ack.sequence;
1267 0 : _timeouts = 0; // reset timeout counter
1268 :
1269 : // Check if we can advance _acked
1270 0 : uint16_t acked = ack.sequence;
1271 :
1272 0 : for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
1273 : {
1274 0 : RSPConnectionPtr child = *i;
1275 0 : if( child->_id == _id )
1276 0 : continue;
1277 :
1278 0 : const uint16_t distance = child->_acked - acked;
1279 0 : if( distance > _numBuffers )
1280 0 : acked = child->_acked;
1281 : }
1282 :
1283 0 : RSPConnectionPtr selfChild = _findConnection( _id );
1284 0 : const uint16_t distance = acked - selfChild->_acked;
1285 0 : if( distance <= _numBuffers )
1286 0 : _finishWriteQueue( acked );
1287 0 : return true;
1288 : }
1289 :
1290 0 : bool RSPConnection::_handleNack()
1291 : {
1292 : DatagramNack& nack =
1293 0 : *reinterpret_cast< DatagramNack* >( _recvBuffer.getData( ));
1294 0 : nack.byteswap();
1295 :
1296 : #ifdef CO_INSTRUMENT_RSP
1297 : ++nNAcksRead;
1298 : #endif
1299 :
1300 0 : if( _id != nack.writerID )
1301 : {
1302 0 : LBLOG( LOG_RSP )
1303 0 : << "ignore " << nack.count << " nacks from " << nack.readerID
1304 0 : << " for " << nack.writerID << " (not me)"<< std::endl;
1305 0 : return true;
1306 : }
1307 :
1308 0 : LBLOG( LOG_RSP )
1309 0 : << "handle " << nack.count << " nacks from " << nack.readerID
1310 0 : << " for " << nack.writerID << std::endl;
1311 :
1312 0 : RSPConnectionPtr connection = _findConnection( nack.readerID );
1313 0 : if( !connection )
1314 : {
1315 0 : LBUNREACHABLE;
1316 0 : return false;
1317 : // it's an unknown connection, TODO add this connection?
1318 : }
1319 :
1320 0 : _timeouts = 0;
1321 0 : _addRepeat( nack.nacks, nack.count );
1322 0 : return true;
1323 : }
1324 :
1325 0 : void RSPConnection::_addRepeat( const Nack* nacks, uint16_t num )
1326 : {
1327 0 : LBLOG( LOG_RSP ) << lunchbox::disableFlush << "Queue repeat requests ";
1328 0 : size_t lost = 0;
1329 :
1330 0 : for( size_t i = 0; i < num; ++i )
1331 : {
1332 0 : const Nack& nack = nacks[ i ];
1333 0 : LBASSERT( nack.start <= nack.end );
1334 :
1335 0 : LBLOG( LOG_RSP ) << nack.start << ".." << nack.end << " ";
1336 :
1337 0 : bool merged = false;
1338 0 : for( RepeatQueue::iterator j = _repeatQueue.begin();
1339 0 : j != _repeatQueue.end() && !merged; ++j )
1340 : {
1341 0 : Nack& old = *j;
1342 0 : if( old.start <= nack.end && old.end >= nack.start )
1343 : {
1344 0 : if( old.start > nack.start )
1345 : {
1346 0 : lost += old.start - nack.start;
1347 0 : old.start = nack.start;
1348 0 : merged = true;
1349 : }
1350 0 : if( old.end < nack.end )
1351 : {
1352 0 : lost += nack.end - old.end;
1353 0 : old.end = nack.end;
1354 0 : merged = true;
1355 : }
1356 0 : LBASSERT( lost < _numBuffers );
1357 : }
1358 : }
1359 :
1360 0 : if( !merged )
1361 : {
1362 0 : lost += uint16_t( nack.end - nack.start ) + 1;
1363 0 : LBASSERT( lost <= _numBuffers );
1364 0 : _repeatQueue.push_back( nack );
1365 : }
1366 : }
1367 :
1368 0 : ConstConnectionDescriptionPtr description = getDescription();
1369 0 : if( _sendRate >
1370 0 : ( description->bandwidth >>
1371 0 : Global::getIAttribute( Global::IATTR_RSP_MIN_SENDRATE_SHIFT )))
1372 : {
1373 0 : const float delta = float( lost ) * .001f *
1374 0 : Global::getIAttribute( Global::IATTR_RSP_ERROR_DOWNSCALE );
1375 : const float maxDelta = .01f *
1376 0 : float( Global::getIAttribute( Global::IATTR_RSP_ERROR_MAXSCALE ));
1377 0 : const float downScale = LB_MIN( delta, maxDelta );
1378 0 : _sendRate -= 1 + int64_t( _sendRate * downScale );
1379 0 : LBLOG( LOG_RSP )
1380 0 : << ", lost " << lost << " slowing down " << downScale * 100.f
1381 0 : << "% to " << _sendRate << " KB/s" << std::endl
1382 0 : << lunchbox::enableFlush;
1383 : }
1384 : else
1385 0 : LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush;
1386 0 : }
1387 :
1388 0 : bool RSPConnection::_handleAckRequest( const size_t bytes )
1389 : {
1390 0 : if( bytes < sizeof( DatagramAckRequest ))
1391 0 : return false;
1392 : DatagramAckRequest& ackRequest =
1393 0 : *reinterpret_cast< DatagramAckRequest* >( _recvBuffer.getData( ));
1394 0 : ackRequest.byteswap();
1395 :
1396 0 : const uint16_t writerID = ackRequest.writerID;
1397 : #ifdef Darwin
1398 : // There is occasionally a packet from ourselves, even though multicast loop
1399 : // is not set?!
1400 : if( writerID == _id )
1401 : return true;
1402 : #else
1403 0 : LBASSERT( writerID != _id );
1404 : #endif
1405 0 : RSPConnectionPtr connection = _findConnection( writerID );
1406 0 : if( !connection )
1407 : {
1408 0 : LBUNREACHABLE;
1409 0 : return false;
1410 : }
1411 :
1412 0 : const uint16_t reqID = ackRequest.sequence;
1413 0 : const uint16_t gotID = connection->_sequence - 1;
1414 0 : const uint16_t distance = reqID - gotID;
1415 :
1416 0 : LBLOG( LOG_RSP ) << "ack request " << reqID << " from " << writerID
1417 0 : << " got " << gotID << " missing " << distance
1418 0 : << std::endl;
1419 :
1420 0 : if( (reqID == gotID) ||
1421 0 : (gotID > reqID && gotID - reqID <= _numBuffers) ||
1422 0 : (gotID < reqID && distance > _numBuffers) )
1423 : {
1424 0 : _sendAck( connection->_id, gotID );
1425 0 : return true;
1426 : }
1427 : // else find all missing datagrams
1428 :
1429 0 : const uint16_t max = CO_RSP_MAX_NACKS - 2;
1430 : Nack nacks[ CO_RSP_MAX_NACKS ];
1431 0 : uint16_t i = 0;
1432 :
1433 0 : nacks[ i ].start = connection->_sequence;
1434 0 : LBLOG( LOG_RSP ) << lunchbox::disableFlush << "nacks: "
1435 0 : << nacks[i].start << "..";
1436 :
1437 0 : std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
1438 0 : std::deque<Buffer*>::const_iterator first = j;
1439 0 : for( ; j != connection->_recvBuffers.end() && i < max; ++j )
1440 : {
1441 0 : if( *j ) // got buffer
1442 : {
1443 0 : nacks[ i ].end = connection->_sequence + std::distance( first, j);
1444 0 : LBLOG( LOG_RSP ) << nacks[i].end << ", ";
1445 0 : if( nacks[ i ].end < nacks[ i ].start )
1446 : {
1447 0 : LBASSERT( nacks[ i ].end < _numBuffers );
1448 0 : nacks[ i + 1 ].start = 0;
1449 0 : nacks[ i + 1 ].end = nacks[ i ].end;
1450 0 : nacks[ i ].end = std::numeric_limits< uint16_t >::max();
1451 0 : ++i;
1452 : }
1453 0 : ++i;
1454 :
1455 : // find next hole
1456 0 : for( ++j; j != connection->_recvBuffers.end() && (*j); ++j )
1457 : /* nop */;
1458 :
1459 0 : if( j == connection->_recvBuffers.end( ))
1460 0 : break;
1461 :
1462 0 : nacks[i].start = connection->_sequence + std::distance(first, j) +1;
1463 0 : LBLOG( LOG_RSP ) << nacks[i].start << "..";
1464 : }
1465 : }
1466 :
1467 0 : if( j != connection->_recvBuffers.end() || i == 0 )
1468 : {
1469 0 : nacks[ i ].end = reqID;
1470 0 : LBLOG( LOG_RSP ) << nacks[i].end;
1471 0 : ++i;
1472 : }
1473 0 : else if( uint16_t( reqID - nacks[i-1].end ) < _numBuffers )
1474 : {
1475 0 : nacks[i].start = nacks[i-1].end + 1;
1476 0 : nacks[i].end = reqID;
1477 0 : LBLOG( LOG_RSP ) << nacks[i].start << ".." << nacks[i].end;
1478 0 : ++i;
1479 : }
1480 0 : if( nacks[ i -1 ].end < nacks[ i - 1 ].start )
1481 : {
1482 0 : LBASSERT( nacks[ i - 1 ].end < _numBuffers );
1483 0 : nacks[ i ].start = 0;
1484 0 : nacks[ i ].end = nacks[ i - 1 ].end;
1485 0 : nacks[ i - 1 ].end = std::numeric_limits< uint16_t >::max();
1486 0 : ++i;
1487 : }
1488 :
1489 0 : LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush << "send " << i
1490 0 : << " nacks to " << connection->_id << std::endl;
1491 :
1492 0 : LBASSERT( i > 0 );
1493 0 : _sendNack( connection->_id, nacks, i );
1494 0 : return true;
1495 : }
1496 :
1497 0 : void RSPConnection::_checkNewID( uint16_t id )
1498 : {
1499 : // look if the new ID exist in another connection
1500 0 : if( id == _id || _findConnection( id ))
1501 : {
1502 0 : LBLOG( LOG_RSP ) << "Deny " << id << std::endl;
1503 0 : _sendSimpleDatagram( ID_DENY, _id );
1504 : }
1505 : else
1506 0 : _sendSimpleDatagram( ID_HELLO_REPLY, _id );
1507 0 : }
1508 :
1509 0 : RSPConnectionPtr RSPConnection::_findConnection( const uint16_t id )
1510 : {
1511 0 : for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
1512 : {
1513 0 : if( (*i)->_id == id )
1514 0 : return *i;
1515 : }
1516 0 : return 0;
1517 : }
1518 :
1519 0 : bool RSPConnection::_addConnection( const uint16_t id, const uint16_t sequence )
1520 : {
1521 0 : if( _findConnection( id ))
1522 0 : return false;
1523 :
1524 0 : LBDEBUG << "add connection " << id << std::endl;
1525 0 : RSPConnectionPtr connection = new RSPConnection();
1526 0 : connection->_id = id;
1527 0 : connection->_parent = this;
1528 0 : connection->_setState( STATE_CONNECTED );
1529 0 : connection->_setDescription( _getDescription( ));
1530 0 : connection->_sequence = sequence;
1531 0 : LBASSERT( connection->_appBuffers.isEmpty( ));
1532 :
1533 : // Make all buffers available for reading
1534 0 : for( BuffersCIter i = connection->_buffers.begin();
1535 0 : i != connection->_buffers.end(); ++i )
1536 : {
1537 0 : Buffer* buffer = *i;
1538 0 : LBCHECK( connection->_threadBuffers.push( buffer ));
1539 : }
1540 :
1541 0 : _children.push_back( connection );
1542 0 : _sendCountNode();
1543 :
1544 0 : lunchbox::ScopedWrite mutex( _mutexConnection );
1545 0 : _newChildren.push_back( connection );
1546 :
1547 0 : lunchbox::ScopedWrite mutex2( _mutexEvent );
1548 0 : _event->set();
1549 0 : return true;
1550 : }
1551 :
1552 0 : void RSPConnection::_removeConnection( const uint16_t id )
1553 : {
1554 0 : LBDEBUG << "remove connection " << id << std::endl;
1555 0 : if( id == _id )
1556 0 : return;
1557 :
1558 0 : for( RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i )
1559 : {
1560 0 : RSPConnectionPtr child = *i;
1561 0 : if( child->_id == id )
1562 : {
1563 0 : _children.erase( i );
1564 :
1565 0 : lunchbox::ScopedWrite mutex( child->_mutexEvent );
1566 0 : child->_appBuffers.push( 0 );
1567 0 : child->_event->set();
1568 0 : break;
1569 : }
1570 : }
1571 :
1572 0 : _sendCountNode();
1573 : }
1574 :
1575 0 : int64_t RSPConnection::write( const void* inData, const uint64_t bytes )
1576 : {
1577 0 : if( _parent )
1578 0 : return _parent->write( inData, bytes );
1579 :
1580 0 : LBASSERT( isListening( ));
1581 0 : if( !_write )
1582 0 : return -1;
1583 :
1584 : // compute number of datagrams
1585 0 : uint64_t nDatagrams = bytes / _payloadSize;
1586 0 : if( nDatagrams * _payloadSize != bytes )
1587 0 : ++nDatagrams;
1588 :
1589 : // queue each datagram (might block if buffers are exhausted)
1590 0 : const uint8_t* data = reinterpret_cast< const uint8_t* >( inData );
1591 0 : const uint8_t* end = data + bytes;
1592 0 : for( uint64_t i = 0; i < nDatagrams; ++i )
1593 : {
1594 0 : size_t packetSize = end - data;
1595 0 : packetSize = LB_MIN( packetSize, _payloadSize );
1596 :
1597 0 : if( _appBuffers.isEmpty( ))
1598 : // trigger processing
1599 0 : _postWakeup();
1600 :
1601 : Buffer* buffer;
1602 0 : if ( !_appBuffers.timedPop( _writeTimeOut, buffer ) )
1603 : {
1604 0 : LBERROR << "Timeout while writing" << std::endl;
1605 0 : buffer = 0;
1606 : }
1607 :
1608 0 : if( !buffer )
1609 : {
1610 0 : close();
1611 0 : return -1;
1612 : }
1613 :
1614 : // prepare packet header (sequence is done by thread)
1615 : DatagramData* header =
1616 0 : reinterpret_cast< DatagramData* >( buffer->getData( ));
1617 0 : header->type = DATA;
1618 0 : header->size = uint16_t( packetSize );
1619 0 : header->writerID = _id;
1620 :
1621 0 : memcpy( header + 1, data, packetSize );
1622 0 : data += packetSize;
1623 :
1624 0 : LBCHECK( _threadBuffers.push( buffer ));
1625 : }
1626 0 : _postWakeup();
1627 0 : LBLOG( LOG_RSP ) << "queued " << nDatagrams << " datagrams, "
1628 0 : << bytes << " bytes" << std::endl;
1629 0 : return bytes;
1630 : }
1631 :
1632 0 : void RSPConnection::finish()
1633 : {
1634 0 : if( _parent.isValid( ))
1635 : {
1636 0 : LBASSERTINFO( !_parent, "Writes are only allowed on RSP listeners" );
1637 0 : return;
1638 : }
1639 0 : LBASSERT( isListening( ));
1640 0 : _appBuffers.waitSize( _buffers.size( ));
1641 : }
1642 :
1643 0 : void RSPConnection::_sendCountNode()
1644 : {
1645 0 : if( !_findConnection( _id ))
1646 0 : return;
1647 :
1648 0 : LBLOG( LOG_RSP ) << _children.size() << " nodes" << std::endl;
1649 0 : DatagramNode count = { COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
1650 0 : uint16_t( _children.size( )) };
1651 0 : count.byteswap();
1652 0 : _write->send( boost::asio::buffer( &count, sizeof( count )) );
1653 : }
1654 :
1655 0 : void RSPConnection::_sendSimpleDatagram( const DatagramType type,
1656 : const uint16_t id )
1657 : {
1658 : DatagramNode simple = { uint16_t( type ), CO_RSP_PROTOCOL_VERSION, id,
1659 0 : _sequence };
1660 0 : simple.byteswap();
1661 0 : _write->send( boost::asio::buffer( &simple, sizeof( simple )) );
1662 0 : }
1663 :
1664 0 : void RSPConnection::_sendAck( const uint16_t writerID,
1665 : const uint16_t sequence )
1666 : {
1667 0 : LBASSERT( _id != writerID );
1668 : #ifdef CO_INSTRUMENT_RSP
1669 : ++nAcksSend;
1670 : #endif
1671 :
1672 0 : LBLOG( LOG_RSP ) << "send ack " << sequence << std::endl;
1673 0 : DatagramAck ack = { ACK, _id, writerID, sequence };
1674 0 : ack.byteswap();
1675 0 : _write->send( boost::asio::buffer( &ack, sizeof( ack )) );
1676 0 : }
1677 :
1678 0 : void RSPConnection::_sendNack( const uint16_t writerID, const Nack* nacks,
1679 : const uint16_t count )
1680 : {
1681 0 : LBASSERT( count > 0 );
1682 0 : LBASSERT( count <= CO_RSP_MAX_NACKS );
1683 : #ifdef CO_INSTRUMENT_RSP
1684 : ++nNAcksSend;
1685 : #endif
1686 : /* optimization: use the direct access to the reader. */
1687 0 : if( writerID == _id )
1688 : {
1689 0 : _addRepeat( nacks, count );
1690 0 : return;
1691 : }
1692 :
1693 : const size_t size = sizeof( DatagramNack ) -
1694 0 : (CO_RSP_MAX_NACKS - count) * sizeof( Nack );
1695 :
1696 : // set the header
1697 : DatagramNack packet;
1698 0 : packet.set( _id, writerID, count );
1699 0 : memcpy( packet.nacks, nacks, count * sizeof( Nack ));
1700 0 : packet.byteswap();
1701 0 : _write->send( boost::asio::buffer( &packet, size ));
1702 : }
1703 :
1704 0 : void RSPConnection::_sendAckRequest()
1705 : {
1706 : #ifdef CO_INSTRUMENT_RSP
1707 : ++nAckRequests;
1708 : #endif
1709 0 : LBLOG( LOG_RSP ) << "send ack request for " << uint16_t( _sequence -1 )
1710 0 : << std::endl;
1711 0 : DatagramAckRequest ackRequest = { ACKREQ, _id, uint16_t( _sequence - 1 ) };
1712 0 : ackRequest.byteswap();
1713 0 : _write->send( boost::asio::buffer( &ackRequest, sizeof( DatagramAckRequest )) );
1714 0 : }
1715 :
1716 0 : std::ostream& operator << ( std::ostream& os,
1717 : const RSPConnection& connection )
1718 : {
1719 0 : os << lunchbox::disableFlush << lunchbox::disableHeader
1720 0 : << "RSPConnection id " << connection.getID() << " send rate "
1721 0 : << connection.getSendRate();
1722 :
1723 : #ifdef CO_INSTRUMENT_RSP
1724 : const int prec = os.precision();
1725 : os.precision( 3 );
1726 :
1727 : const float time = instrumentClock.getTimef();
1728 : const float mbps = 1048.576f * time;
1729 : os << ": " << lunchbox::indent << std::endl
1730 : << float( nBytesRead ) / mbps << " / " << float( nBytesWritten ) / mbps
1731 : << " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
1732 : << " repeats " << nMergedDatagrams
1733 : << " merged"
1734 : << std::endl;
1735 :
1736 : os.precision( prec );
1737 : os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
1738 : << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
1739 : << writeWaitTime << " ms"
1740 : << std::endl
1741 : << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
1742 : << lunchbox::exdent;
1743 :
1744 : nReadData = 0;
1745 : nBytesRead = 0;
1746 : nBytesWritten = 0;
1747 : nDatagrams = 0;
1748 : nRepeated = 0;
1749 : nMergedDatagrams = 0;
1750 : nAckRequests = 0;
1751 : nAcksSend = 0;
1752 : nAcksRead = 0;
1753 : nAcksAccepted = 0;
1754 : nNAcksSend = 0;
1755 : nNAcksRead = 0;
1756 : writeWaitTime = 0.f;
1757 : #endif
1758 0 : os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
1759 :
1760 0 : return os;
1761 : }
1762 :
1763 66 : }
|