Line data Source code
1 :
2 : /* Copyright (c) 2009, Cedric Stalder <cedric.stalder@gmail.com>
3 : * 2009-2014, Stefan Eilemann <eile@equalizergraphics.com>
4 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "rspConnection.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "log.h"
28 :
29 : #include <lunchbox/rng.h>
30 : #include <lunchbox/scopedMutex.h>
31 : #include <lunchbox/sleep.h>
32 :
33 : #include <boost/bind.hpp>
34 :
35 : //#define CO_INSTRUMENT_RSP
36 : #define CO_RSP_MERGE_WRITES
37 : #define CO_RSP_MAX_TIMEOUTS 1000
38 : #ifdef _WIN32
39 : # define CO_RSP_DEFAULT_PORT (4242)
40 : #else
41 : # define CO_RSP_DEFAULT_PORT ( (getuid() % 64511) + 1024 )
42 : #endif
43 :
44 :
45 : // Note: Do not use version > 255, endianness detection magic relies on this.
46 : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
47 :
48 : namespace bp = boost::posix_time;
49 : namespace ip = boost::asio::ip;
50 :
51 : namespace co
52 : {
53 :
54 : namespace
55 : {
56 : #ifdef CO_INSTRUMENT_RSP
57 : lunchbox::a_int32_t nReadData;
58 : lunchbox::a_int32_t nBytesRead;
59 : lunchbox::a_int32_t nBytesWritten;
60 : lunchbox::a_int32_t nDatagrams;
61 : lunchbox::a_int32_t nRepeated;
62 : lunchbox::a_int32_t nMergedDatagrams;
63 : lunchbox::a_int32_t nAckRequests;
64 : lunchbox::a_int32_t nAcksSend;
65 : lunchbox::a_int32_t nAcksSendTotal;
66 : lunchbox::a_int32_t nAcksRead;
67 : lunchbox::a_int32_t nAcksAccepted;
68 : lunchbox::a_int32_t nNAcksSend;
69 : lunchbox::a_int32_t nNAcksRead;
70 : lunchbox::a_int32_t nNAcksResend;
71 :
72 : float writeWaitTime = 0.f;
73 : lunchbox::Clock instrumentClock;
74 : #endif
75 :
76 : static uint16_t _numBuffers = 0;
77 : }
78 :
79 0 : RSPConnection::RSPConnection()
80 : : _id( 0 )
81 : , _idAccepted( false )
82 0 : , _mtu( Global::getIAttribute( Global::IATTR_UDP_MTU ))
83 0 : , _ackFreq( Global::getIAttribute( Global::IATTR_RSP_ACK_FREQUENCY ))
84 : , _payloadSize( _mtu - sizeof( DatagramData ))
85 : , _timeouts( 0 )
86 0 : , _event( new EventConnection )
87 : , _read( 0 )
88 : , _write( 0 )
89 : , _timeout( _ioService )
90 : , _wakeup( _ioService )
91 0 : , _maxBucketSize( ( _mtu * _ackFreq) >> 1 )
92 : , _bucketSize( 0 )
93 : , _sendRate( 0 )
94 : , _thread( 0 )
95 0 : , _acked( std::numeric_limits< uint16_t >::max( ))
96 : , _threadBuffers( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS))
97 : , _recvBuffer( _mtu )
98 : , _readBuffer( 0 )
99 : , _readBufferPos( 0 )
100 : , _sequence( 0 )
101 : // ensure we have a handleConnectedTimeout before the write pop
102 0 : , _writeTimeOut( Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2 )
103 : {
104 0 : _buildNewID();
105 0 : ConnectionDescriptionPtr description = _getDescription();
106 0 : description->type = CONNECTIONTYPE_RSP;
107 0 : description->bandwidth = 102400;
108 :
109 0 : LBCHECK( _event->connect( ));
110 :
111 0 : _buffers.reserve( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ));
112 0 : while( static_cast< int32_t >( _buffers.size( )) <
113 0 : Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ))
114 : {
115 0 : _buffers.push_back( new Buffer( _mtu ));
116 : }
117 :
118 0 : LBASSERT( sizeof( DatagramNack ) <= size_t( _mtu ));
119 0 : LBLOG( LOG_RSP ) << "New RSP connection, " << _buffers.size()
120 0 : << " buffers of " << _mtu << " bytes" << std::endl;
121 :
122 0 : }
123 :
124 0 : RSPConnection::~RSPConnection()
125 : {
126 0 : _close();
127 0 : while( !_buffers.empty( ))
128 : {
129 0 : delete _buffers.back();
130 0 : _buffers.pop_back();
131 : }
132 0 : }
133 :
134 0 : void RSPConnection::_close()
135 : {
136 0 : if( _parent.isValid() && _parent->_id == _id )
137 0 : _parent->close();
138 :
139 0 : while( !_parent && _isWriting( ))
140 0 : lunchbox::sleep( 10 /*ms*/ );
141 :
142 0 : if( isClosed( ))
143 0 : return;
144 :
145 0 : lunchbox::ScopedWrite mutex( _mutexEvent );
146 0 : if( _thread )
147 : {
148 0 : LBASSERT( !_thread->isCurrent( ));
149 0 : _sendSimpleDatagram( ID_EXIT, _id );
150 0 : _ioService.stop();
151 0 : _thread->join();
152 0 : delete _thread;
153 : }
154 :
155 0 : _setState( STATE_CLOSING );
156 0 : if( _thread )
157 : {
158 0 : _thread = 0;
159 :
160 : // notify children to close
161 0 : for( RSPConnectionsCIter i=_children.begin(); i !=_children.end(); ++i )
162 : {
163 0 : RSPConnectionPtr child = *i;
164 0 : lunchbox::ScopedWrite mutexChild( child->_mutexEvent );
165 0 : child->_appBuffers.push( 0 );
166 0 : child->_event->set();
167 0 : }
168 :
169 0 : _children.clear();
170 0 : _newChildren.clear();
171 : }
172 :
173 0 : _parent = 0;
174 :
175 0 : if( _read )
176 0 : _read->close();
177 0 : delete _read;
178 0 : _read = 0;
179 :
180 0 : if( _write )
181 0 : _write->close();
182 0 : delete _write;
183 0 : _write = 0;
184 :
185 0 : _threadBuffers.clear();
186 0 : _appBuffers.push( 0 ); // unlock any other read/write threads
187 :
188 0 : _setState( STATE_CLOSED );
189 :
190 0 : mutex.leave();
191 0 : _event->close();
192 : }
193 :
194 : //----------------------------------------------------------------------
195 : // Async IO handles
196 : //----------------------------------------------------------------------
197 0 : uint16_t RSPConnection::_buildNewID()
198 : {
199 0 : lunchbox::RNG rng;
200 0 : _id = rng.get< uint16_t >();
201 0 : return _id;
202 : }
203 :
204 0 : bool RSPConnection::listen()
205 : {
206 0 : ConnectionDescriptionPtr description = _getDescription();
207 0 : LBASSERT( description->type == CONNECTIONTYPE_RSP );
208 :
209 0 : if( !isClosed( ))
210 0 : return false;
211 :
212 0 : _setState( STATE_CONNECTING );
213 0 : _numBuffers = Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS );
214 :
215 : // init udp connection
216 0 : if( description->port == ConnectionDescription::RANDOM_MULTICAST_PORT )
217 0 : description->port = 0; // Let OS choose
218 0 : else if( description->port == 0 )
219 0 : description->port = CO_RSP_DEFAULT_PORT;
220 0 : if( description->getHostname().empty( ))
221 0 : description->setHostname( "239.255.42.43" );
222 0 : if( description->getInterface().empty( ))
223 0 : description->setInterface( "0.0.0.0" );
224 :
225 : try
226 : {
227 0 : const ip::address readAddress( ip::address::from_string( "0.0.0.0" ));
228 : const ip::udp::endpoint readEndpoint( readAddress,
229 0 : description->port );
230 0 : const std::string& port = std::to_string( unsigned( description->port));
231 0 : ip::udp::resolver resolver( _ioService );
232 : const ip::udp::resolver::query queryHN( ip::udp::v4(),
233 0 : description->getHostname(),
234 0 : port );
235 0 : const ip::udp::resolver::iterator hostname = resolver.resolve( queryHN);
236 :
237 0 : if( hostname == ip::udp::resolver::iterator( ))
238 0 : return false;
239 :
240 0 : const ip::udp::endpoint writeEndpoint = *hostname;
241 0 : const ip::address mcAddr( writeEndpoint.address( ));
242 :
243 0 : _read = new ip::udp::socket( _ioService );
244 0 : _write = new ip::udp::socket( _ioService );
245 0 : _read->open( readEndpoint.protocol( ));
246 0 : _write->open( writeEndpoint.protocol( ));
247 :
248 0 : _read->set_option( ip::udp::socket::reuse_address( true ));
249 0 : _write->set_option( ip::udp::socket::reuse_address( true ));
250 : _read->set_option( ip::udp::socket::receive_buffer_size(
251 0 : Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
252 : _write->set_option( ip::udp::socket::send_buffer_size(
253 0 : Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
254 :
255 0 : _read->bind( readEndpoint );
256 0 : description->port = _read->local_endpoint().port();
257 :
258 : const ip::udp::resolver::query queryIF( ip::udp::v4(),
259 0 : description->getInterface(),
260 0 : "0" );
261 : const ip::udp::resolver::iterator interfaceIP =
262 0 : resolver.resolve( queryIF );
263 :
264 0 : if( interfaceIP == ip::udp::resolver::iterator( ))
265 0 : return false;
266 :
267 0 : const ip::address ifAddr( ip::udp::endpoint( *interfaceIP ).address( ));
268 0 : LBDEBUG << "Joining " << mcAddr << " on " << ifAddr << std::endl;
269 :
270 : _read->set_option( ip::multicast::join_group( mcAddr.to_v4(),
271 0 : ifAddr.to_v4( )));
272 0 : _write->set_option( ip::multicast::outbound_interface( ifAddr.to_v4()));
273 : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
274 0 : const std::string& ifIP = ifAddr.to_string();
275 : ::setsockopt( _write->native(), SOL_SOCKET, SO_BINDTODEVICE,
276 0 : ifIP.c_str(), ifIP.size() + 1 );
277 : ::setsockopt( _read->native(), SOL_SOCKET, SO_BINDTODEVICE,
278 0 : ifIP.c_str(), ifIP.size() + 1 );
279 : #endif
280 :
281 0 : _write->connect( writeEndpoint );
282 :
283 0 : _read->set_option( ip::multicast::enable_loopback( false ));
284 0 : _write->set_option( ip::multicast::enable_loopback( false ));
285 : }
286 0 : catch( const boost::system::system_error& e )
287 : {
288 0 : LBWARN << "can't setup underlying UDP connection: " << e.what()
289 0 : << std::endl;
290 0 : delete _read;
291 0 : delete _write;
292 0 : _read = 0;
293 0 : _write = 0;
294 0 : return false;
295 : }
296 :
297 : // init communication protocol thread
298 0 : _thread = new Thread( this );
299 0 : _bucketSize = 0;
300 0 : _sendRate = description->bandwidth;
301 :
302 : // waits until RSP protocol establishes connection to the multicast network
303 0 : if( !_thread->start( ) )
304 : {
305 0 : close();
306 0 : return false;
307 : }
308 :
309 : // Make all buffers available for writing
310 0 : LBASSERT( _appBuffers.isEmpty( ));
311 0 : _appBuffers.push( _buffers );
312 :
313 0 : LBDEBUG << "Listening on " << description->getHostname() << ":"
314 0 : << description->port << " (" << description->toString() << " @"
315 0 : << (void*)this << ")" << std::endl;
316 0 : return true;
317 : }
318 :
319 0 : ConnectionPtr RSPConnection::acceptSync()
320 : {
321 0 : if( !isListening( ))
322 0 : return 0;
323 :
324 0 : lunchbox::ScopedWrite mutex( _mutexConnection );
325 0 : LBASSERT( !_newChildren.empty( ));
326 0 : if( _newChildren.empty( ))
327 0 : return 0;
328 :
329 0 : RSPConnectionPtr newConnection = _newChildren.back();
330 0 : _newChildren.pop_back();
331 :
332 0 : LBDEBUG << _id << " accepted RSP connection " << newConnection->_id
333 0 : << std::endl;
334 :
335 0 : lunchbox::ScopedWrite mutex2( _mutexEvent );
336 0 : if( _newChildren.empty() )
337 0 : _event->reset();
338 : else
339 0 : _event->set();
340 :
341 0 : return newConnection;
342 : }
343 :
344 0 : int64_t RSPConnection::readSync( void* buffer, const uint64_t bytes, const bool)
345 : {
346 0 : LBASSERT( bytes > 0 );
347 0 : if( !isConnected ( ))
348 0 : return -1;
349 :
350 0 : uint64_t bytesLeft = bytes;
351 0 : uint8_t* ptr = reinterpret_cast< uint8_t* >( buffer );
352 :
353 : // redundant (done by the caller already), but saves some lock ops
354 0 : while( bytesLeft )
355 : {
356 0 : if( !_readBuffer )
357 : {
358 0 : LBASSERT( _readBufferPos == 0 );
359 0 : _readBuffer = _appBuffers.pop();
360 0 : if( !_readBuffer )
361 : {
362 0 : close();
363 : return (bytes == bytesLeft) ?
364 0 : -1 : static_cast< int64_t >( bytes - bytesLeft );
365 : }
366 : }
367 :
368 : const DatagramData* header = reinterpret_cast< const DatagramData* >(
369 0 : _readBuffer->getData( ));
370 0 : const uint8_t* payload = reinterpret_cast< const uint8_t* >( header+1 );
371 0 : const size_t dataLeft = header->size - _readBufferPos;
372 0 : const size_t size = LB_MIN( static_cast< size_t >( bytesLeft ),
373 : dataLeft );
374 :
375 0 : memcpy( ptr, payload + _readBufferPos, size );
376 0 : _readBufferPos += size;
377 0 : ptr += size;
378 0 : bytesLeft -= size;
379 :
380 : // if all data in the buffer has been taken
381 0 : if( _readBufferPos >= header->size )
382 : {
383 0 : LBASSERT( _readBufferPos == header->size );
384 : //LBLOG( LOG_RSP ) << "reset read buffer " << header->sequence
385 : // << std::endl;
386 :
387 0 : LBCHECK( _threadBuffers.push( _readBuffer ));
388 0 : _readBuffer = 0;
389 0 : _readBufferPos = 0;
390 : }
391 : else
392 : {
393 0 : LBASSERT( _readBufferPos < header->size );
394 : }
395 : }
396 :
397 0 : if( _readBuffer || !_appBuffers.isEmpty( ))
398 0 : _event->set();
399 : else
400 : {
401 0 : lunchbox::ScopedWrite mutex( _mutexEvent );
402 0 : if( _appBuffers.isEmpty( ))
403 0 : _event->reset();
404 : }
405 :
406 : #ifdef CO_INSTRUMENT_RSP
407 : nBytesRead += bytes;
408 : #endif
409 0 : return bytes;
410 : }
411 :
412 0 : void RSPConnection::Thread::run()
413 : {
414 0 : _connection->_runThread();
415 0 : _connection = 0;
416 0 : LBDEBUG << "Left RSP protocol thread" << std::endl;
417 0 : }
418 :
419 0 : void RSPConnection::_handleTimeout( const boost::system::error_code& error )
420 : {
421 0 : if( error == boost::asio::error::operation_aborted )
422 0 : return;
423 :
424 0 : if( isListening( ))
425 0 : _handleConnectedTimeout();
426 0 : else if( _idAccepted )
427 0 : _handleInitTimeout();
428 : else
429 0 : _handleAcceptIDTimeout();
430 : }
431 :
432 0 : void RSPConnection::_handleAcceptIDTimeout()
433 : {
434 0 : ++_timeouts;
435 0 : if( _timeouts < 20 )
436 : {
437 0 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
438 0 : _sendSimpleDatagram( ID_HELLO, _id );
439 : }
440 : else
441 : {
442 0 : LBLOG( LOG_RSP ) << "Confirm " << _id << std::endl;
443 0 : _sendSimpleDatagram( ID_CONFIRM, _id );
444 0 : _addConnection( _id, _sequence );
445 0 : _idAccepted = true;
446 0 : _timeouts = 0;
447 : // send a first datagram to announce me and discover all other
448 : // connections
449 0 : _sendCountNode();
450 : }
451 0 : _setTimeout( 10 );
452 0 : }
453 :
454 0 : void RSPConnection::_handleInitTimeout( )
455 : {
456 0 : LBASSERT( !isListening( ))
457 0 : ++_timeouts;
458 0 : if( _timeouts < 20 )
459 0 : _sendCountNode();
460 : else
461 : {
462 0 : _setState( STATE_LISTENING );
463 0 : LBDEBUG << "RSP connection " << _id << " listening" << std::endl;
464 0 : _timeouts = 0;
465 0 : _ioService.stop(); // thread initialized, run restarts
466 : }
467 0 : _setTimeout( 10 );
468 0 : }
469 :
470 0 : void RSPConnection::_clearWriteQueues()
471 : {
472 0 : while( !_threadBuffers.isEmpty( ))
473 : {
474 0 : Buffer* buffer = 0;
475 0 : _threadBuffers.pop( buffer );
476 0 : _writeBuffers.push_back( buffer );
477 : }
478 :
479 0 : _finishWriteQueue( _sequence - 1 );
480 0 : LBASSERT( _threadBuffers.isEmpty() && _writeBuffers.empty() );
481 0 : }
482 :
483 0 : void RSPConnection::_handleConnectedTimeout()
484 : {
485 0 : if( !isListening( ))
486 : {
487 0 : _clearWriteQueues();
488 0 : _ioService.stop();
489 0 : return;
490 : }
491 :
492 0 : _processOutgoing();
493 :
494 0 : if( _timeouts >= CO_RSP_MAX_TIMEOUTS )
495 : {
496 0 : LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
497 0 : bool all = true;
498 0 : for( RSPConnectionsCIter i =_children.begin(); i !=_children.end(); ++i)
499 : {
500 0 : RSPConnectionPtr child = *i;
501 0 : if ( child->_acked >= _sequence - _numBuffers && child->_id != _id )
502 : {
503 0 : all = false;
504 0 : break;
505 : }
506 0 : }
507 :
508 : // if all connections failed we probably got disconnected -> close and
509 : // exit else close all failed child connections
510 0 : if ( all )
511 : {
512 0 : _sendSimpleDatagram( ID_EXIT, _id );
513 0 : _appBuffers.pushFront( 0 ); // unlock write function
514 :
515 0 : for( RSPConnectionsCIter i =_children.begin();
516 0 : i !=_children.end(); ++i)
517 : {
518 0 : RSPConnectionPtr child = *i;
519 0 : child->_setState( STATE_CLOSING );
520 0 : child->_appBuffers.push( 0 ); // unlock read func
521 0 : }
522 :
523 0 : _clearWriteQueues();
524 0 : _ioService.stop();
525 0 : return;
526 : }
527 :
528 0 : RSPConnectionsCIter i =_children.begin();
529 0 : while ( i !=_children.end() )
530 : {
531 0 : RSPConnectionPtr child = *i;
532 0 : if ( child->_acked < _sequence - 1 && _id != child->_id )
533 : {
534 0 : _sendSimpleDatagram( ID_EXIT, child->_id );
535 0 : _removeConnection( child->_id );
536 : }
537 : else
538 : {
539 0 : uint16_t wb = static_cast<uint16_t>( _writeBuffers.size( ));
540 0 : child->_acked = _sequence - wb;
541 0 : ++i;
542 : }
543 0 : }
544 :
545 0 : _timeouts = 0;
546 : }
547 : }
548 :
549 : RSPConnection::DatagramNode*
550 0 : RSPConnection::_getDatagramNode( const size_t bytes )
551 : {
552 0 : if( bytes < sizeof( DatagramNode ))
553 : {
554 0 : LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
555 0 : << sizeof( DatagramNode ) << " bytes" << std::endl;
556 : //close();
557 0 : return 0;
558 : }
559 : DatagramNode& node =
560 0 : *reinterpret_cast< DatagramNode* >( _recvBuffer.getData( ));
561 0 : node.byteswap();
562 0 : if( node.protocolVersion != CO_RSP_PROTOCOL_VERSION )
563 : {
564 0 : LBERROR << "Protocol version mismatch, got " << node.protocolVersion
565 0 : << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
566 : //close();
567 0 : return 0;
568 : }
569 0 : return &node;
570 : }
571 :
572 0 : bool RSPConnection::_initThread()
573 : {
574 0 : LBLOG( LOG_RSP ) << "Started RSP protocol thread" << std::endl;
575 0 : _timeouts = 0;
576 :
577 : // send a first datagram to announce me and discover other connections
578 0 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
579 0 : _sendSimpleDatagram( ID_HELLO, _id );
580 0 : _setTimeout( 10 );
581 0 : _asyncReceiveFrom();
582 0 : _ioService.run();
583 0 : return isListening();
584 : }
585 :
586 0 : void RSPConnection::_runThread()
587 : {
588 : //__debugbreak();
589 0 : _ioService.reset();
590 0 : _ioService.run();
591 0 : }
592 :
593 0 : void RSPConnection::_setTimeout( const int32_t timeOut )
594 : {
595 0 : LBASSERT( timeOut >= 0 );
596 0 : _timeout.expires_from_now( boost::posix_time::milliseconds( timeOut ));
597 : _timeout.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
598 0 : boost::asio::placeholders::error ));
599 0 : }
600 :
601 0 : void RSPConnection::_postWakeup()
602 : {
603 0 : _wakeup.expires_from_now( boost::posix_time::milliseconds( 0 ));
604 : _wakeup.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
605 0 : boost::asio::placeholders::error ));
606 0 : }
607 :
608 0 : void RSPConnection::_processOutgoing()
609 : {
610 : #ifdef CO_INSTRUMENT_RSP
611 : if( instrumentClock.getTime64() > 1000 )
612 : {
613 : LBWARN << *this << std::endl;
614 : instrumentClock.reset();
615 : }
616 : #endif
617 :
618 0 : if( !_repeatQueue.empty( ))
619 0 : _repeatData();
620 : else
621 0 : _writeData();
622 :
623 0 : if( !_threadBuffers.isEmpty() || !_repeatQueue.empty( ))
624 : {
625 0 : _setTimeout( 0 ); // call again to send remaining
626 0 : return;
627 : }
628 : // no more data to write, check/send ack request, reset timeout
629 :
630 0 : if( _writeBuffers.empty( )) // got all acks
631 : {
632 0 : _timeouts = 0;
633 0 : _timeout.cancel();
634 0 : return;
635 : }
636 :
637 : const int64_t timeout =
638 0 : Global::getIAttribute( Global::IATTR_RSP_ACK_TIMEOUT );
639 0 : const int64_t left = timeout - _clock.getTime64();
640 :
641 0 : if( left > 0 )
642 : {
643 0 : _setTimeout( left );
644 0 : return;
645 : }
646 :
647 : // (repeat) ack request
648 0 : _clock.reset();
649 0 : ++_timeouts;
650 0 : if ( _timeouts < CO_RSP_MAX_TIMEOUTS )
651 0 : _sendAckRequest();
652 0 : _setTimeout( timeout );
653 : }
654 :
655 0 : void RSPConnection::_writeData()
656 : {
657 0 : Buffer* buffer = 0;
658 0 : if( !_threadBuffers.pop( buffer )) // nothing to write
659 0 : return;
660 :
661 0 : _timeouts = 0;
662 0 : LBASSERT( buffer );
663 :
664 : // write buffer
665 0 : DatagramData* header = reinterpret_cast<DatagramData*>( buffer->getData( ));
666 0 : header->sequence = _sequence++;
667 :
668 : #ifdef CO_RSP_MERGE_WRITES
669 0 : if( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
670 : {
671 0 : std::vector< Buffer* > appBuffers;
672 0 : while( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
673 : {
674 0 : Buffer* buffer2 = 0;
675 0 : LBCHECK( _threadBuffers.getFront( buffer2 ));
676 0 : LBASSERT( buffer2 );
677 : DatagramData* header2 =
678 0 : reinterpret_cast<DatagramData*>( buffer2->getData( ));
679 :
680 0 : if( uint32_t( header->size + header2->size ) > _payloadSize )
681 0 : break;
682 :
683 0 : memcpy( reinterpret_cast<uint8_t*>( header + 1 ) + header->size,
684 0 : header2 + 1, header2->size );
685 0 : header->size += header2->size;
686 0 : LBCHECK( _threadBuffers.pop( buffer2 ));
687 0 : appBuffers.push_back( buffer2 );
688 : #ifdef CO_INSTRUMENT_RSP
689 : ++nMergedDatagrams;
690 : #endif
691 : }
692 :
693 0 : if( !appBuffers.empty( ))
694 0 : _appBuffers.push( appBuffers );
695 : }
696 : #endif
697 :
698 : // send data
699 : // Note 1: We could optimize the send away if we're all alone, but this is
700 : // not a use case for RSP, so we don't care.
701 : // Note 2: Data to myself will be 'written' in _finishWriteQueue once we
702 : // got all acks for the packet
703 0 : const uint32_t size = header->size + sizeof( DatagramData );
704 :
705 0 : _waitWritable( size ); // OPT: process incoming in between
706 0 : header->byteswap();
707 0 : _write->send( boost::asio::buffer( header, size ));
708 :
709 : #ifdef CO_INSTRUMENT_RSP
710 : ++nDatagrams;
711 : nBytesWritten += header->size;
712 : #endif
713 :
714 : // save datagram for repeats (and self)
715 0 : _writeBuffers.push_back( buffer );
716 :
717 0 : if( _children.size() == 1 ) // We're all alone
718 : {
719 0 : LBASSERT( _children.front()->_id == _id );
720 0 : _finishWriteQueue( _sequence - 1 );
721 : }
722 : }
723 :
724 0 : void RSPConnection::_waitWritable( const uint64_t bytes )
725 : {
726 : #ifdef CO_INSTRUMENT_RSP
727 : lunchbox::Clock clock;
728 : #endif
729 :
730 0 : _bucketSize += static_cast< uint64_t >( _clock.resetTimef() * _sendRate );
731 : // opt omit: * 1024 / 1000;
732 0 : _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
733 :
734 0 : const uint64_t size = LB_MIN( bytes, static_cast< uint64_t >( _mtu ));
735 0 : while( _bucketSize < size )
736 : {
737 0 : lunchbox::Thread::yield();
738 0 : float time = _clock.resetTimef();
739 :
740 0 : while( time == 0.f )
741 : {
742 0 : lunchbox::Thread::yield();
743 0 : time = _clock.resetTimef();
744 : }
745 :
746 0 : _bucketSize += static_cast< int64_t >( time * _sendRate );
747 0 : _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
748 : }
749 0 : _bucketSize -= size;
750 :
751 : #ifdef CO_INSTRUMENT_RSP
752 : writeWaitTime += clock.getTimef();
753 : #endif
754 :
755 0 : ConstConnectionDescriptionPtr description = getDescription();
756 0 : if( _sendRate < description->bandwidth )
757 : {
758 : _sendRate += int64_t(
759 0 : float( Global::getIAttribute( Global::IATTR_RSP_ERROR_UPSCALE )) *
760 0 : float( description->bandwidth ) * .001f );
761 0 : LBLOG( LOG_RSP ) << "speeding up to " << _sendRate << " KB/s"
762 0 : << std::endl;
763 0 : }
764 0 : }
765 :
766 0 : void RSPConnection::_repeatData()
767 : {
768 0 : _timeouts = 0;
769 :
770 0 : while( !_repeatQueue.empty( ))
771 : {
772 0 : Nack& request = _repeatQueue.front();
773 0 : const uint16_t distance = _sequence - request.start;
774 :
775 0 : if ( distance == 0 )
776 : {
777 0 : LBWARN << "ignoring invalid nack (" << request.start
778 0 : << ".." << request.end << ")" << std::endl;
779 0 : _repeatQueue.pop_front();
780 0 : continue;
781 : }
782 :
783 0 : if( distance <= _writeBuffers.size( )) // not already acked
784 : {
785 : // LBLOG( LOG_RSP ) << "Repeat " << request.start << ", " << _sendRate
786 : // << "KB/s"<< std::endl;
787 :
788 0 : const size_t i = _writeBuffers.size() - distance;
789 0 : Buffer* buffer = _writeBuffers[i];
790 0 : LBASSERT( buffer );
791 :
792 : DatagramData* header =
793 0 : reinterpret_cast<DatagramData*>( buffer->getData( ));
794 0 : const uint32_t size = header->size + sizeof( DatagramData );
795 0 : LBASSERT( header->sequence == request.start );
796 :
797 : // send data
798 0 : _waitWritable( size ); // OPT: process incoming in between
799 : // already done by _writeData: header->byteswap();
800 0 : _write->send( boost::asio::buffer( header, size ) );
801 : #ifdef CO_INSTRUMENT_RSP
802 : ++nRepeated;
803 : #endif
804 : }
805 :
806 0 : if( request.start == request.end )
807 0 : _repeatQueue.pop_front(); // done with request
808 : else
809 0 : ++request.start;
810 :
811 0 : if( distance <= _writeBuffers.size( )) // send something
812 0 : return;
813 : }
814 : }
815 :
816 0 : void RSPConnection::_finishWriteQueue( const uint16_t sequence )
817 : {
818 0 : LBASSERT( !_writeBuffers.empty( ));
819 :
820 0 : RSPConnectionPtr connection = _findConnection( _id );
821 0 : LBASSERT( connection.isValid( ));
822 0 : LBASSERT( connection->_recvBuffers.empty( ));
823 :
824 : // Bundle pushing the buffers to the app to avoid excessive lock ops
825 0 : Buffers readBuffers;
826 0 : Buffers freeBuffers;
827 :
828 0 : const uint16_t size = _sequence - sequence - 1;
829 0 : LBASSERTINFO( size <= uint16_t( _writeBuffers.size( )),
830 : size << " > " << _writeBuffers.size( ));
831 0 : LBLOG( LOG_RSP ) << "Got all remote acks for " << sequence << " current "
832 0 : << _sequence << " advance " << _writeBuffers.size() - size
833 0 : << " buffers" << std::endl;
834 :
835 0 : while( _writeBuffers.size() > size_t( size ))
836 : {
837 0 : Buffer* buffer = _writeBuffers.front();
838 0 : _writeBuffers.pop_front();
839 :
840 : #ifndef NDEBUG
841 : DatagramData* datagram =
842 0 : reinterpret_cast< DatagramData* >( buffer->getData( ));
843 0 : datagram->byteswap();
844 0 : LBASSERT( datagram->writerID == _id );
845 0 : LBASSERTINFO( datagram->sequence ==
846 : uint16_t( connection->_sequence + readBuffers.size( )),
847 : datagram->sequence << ", " << connection->_sequence <<
848 : ", " << readBuffers.size( ));
849 : //LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
850 : #endif
851 :
852 0 : Buffer* newBuffer = connection->_newDataBuffer( *buffer );
853 0 : if( !newBuffer && !readBuffers.empty( )) // push prepared app buffers
854 : {
855 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
856 0 : LBLOG( LOG_RSP ) << "post " << readBuffers.size()
857 0 : << " buffers starting with sequence "
858 0 : << connection->_sequence << std::endl;
859 :
860 0 : connection->_appBuffers.push( readBuffers );
861 0 : connection->_sequence += uint16_t( readBuffers.size( ));
862 0 : readBuffers.clear();
863 0 : connection->_event->set();
864 : }
865 :
866 0 : while( !newBuffer ) // no more data buffers, wait for app to drain
867 : {
868 0 : newBuffer = connection->_newDataBuffer( *buffer );
869 0 : lunchbox::Thread::yield();
870 : }
871 :
872 0 : freeBuffers.push_back( buffer );
873 0 : readBuffers.push_back( newBuffer );
874 : }
875 :
876 0 : _appBuffers.push( freeBuffers );
877 0 : if( !readBuffers.empty( ))
878 : {
879 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
880 : #if 0
881 : LBLOG( LOG_RSP )
882 : << "post " << readBuffers.size() << " buffers starting at "
883 : << connection->_sequence << std::endl;
884 : #endif
885 :
886 0 : connection->_appBuffers.push( readBuffers );
887 0 : connection->_sequence += uint16_t( readBuffers.size( ));
888 0 : connection->_event->set();
889 : }
890 :
891 0 : connection->_acked = uint16_t( connection->_sequence - 1 );
892 0 : LBASSERT( connection->_acked == sequence );
893 :
894 0 : _timeouts = 0;
895 0 : }
896 :
897 0 : void RSPConnection::_handlePacket( const boost::system::error_code& /* error */,
898 : const size_t bytes )
899 : {
900 0 : if( isListening( ))
901 : {
902 0 : _handleConnectedData( bytes );
903 :
904 0 : if( isListening( ))
905 0 : _processOutgoing();
906 : else
907 : {
908 0 : _ioService.stop();
909 0 : return;
910 : }
911 : }
912 0 : else if( bytes >= sizeof( DatagramNode ))
913 : {
914 0 : if( _idAccepted )
915 0 : _handleInitData( bytes, false );
916 : else
917 0 : _handleAcceptIDData( bytes );
918 : }
919 :
920 : //LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
921 0 : _asyncReceiveFrom();
922 : }
923 :
924 0 : void RSPConnection::_handleAcceptIDData( const size_t bytes )
925 : {
926 0 : DatagramNode* pNode = _getDatagramNode( bytes );
927 0 : if( !pNode )
928 0 : return;
929 :
930 0 : DatagramNode& node = *pNode;
931 :
932 0 : switch( node.type )
933 : {
934 : case ID_HELLO:
935 0 : _checkNewID( node.connectionID );
936 0 : break;
937 :
938 : case ID_HELLO_REPLY:
939 0 : _addConnection( node.connectionID, node.data );
940 0 : break;
941 :
942 : case ID_DENY:
943 : // a connection refused my ID, try another ID
944 0 : if( node.connectionID == _id )
945 : {
946 0 : _timeouts = 0;
947 0 : _sendSimpleDatagram( ID_HELLO, _buildNewID( ));
948 0 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
949 : }
950 0 : break;
951 :
952 : case ID_EXIT:
953 0 : _removeConnection( node.connectionID );
954 0 : break;
955 :
956 : default:
957 0 : LBUNIMPLEMENTED;
958 0 : break;
959 : }
960 : }
961 :
962 0 : void RSPConnection::_handleInitData( const size_t bytes, const bool connected )
963 : {
964 0 : DatagramNode* pNode = _getDatagramNode( bytes );
965 0 : if( !pNode )
966 0 : return;
967 :
968 0 : DatagramNode& node = *pNode;
969 :
970 0 : switch( node.type )
971 : {
972 : case ID_HELLO:
973 0 : if( !connected )
974 0 : _timeouts = 0;
975 0 : _checkNewID( node.connectionID ) ;
976 0 : return;
977 :
978 : case ID_CONFIRM:
979 0 : if( !connected )
980 0 : _timeouts = 0;
981 0 : _addConnection( node.connectionID, node.data );
982 0 : return;
983 :
984 : case COUNTNODE:
985 0 : LBLOG( LOG_RSP ) << "Got " << node.data << " nodes from "
986 0 : << node.connectionID << std::endl;
987 0 : return;
988 :
989 : case ID_HELLO_REPLY:
990 0 : _addConnection( node.connectionID, node.data );
991 0 : return;
992 :
993 : case ID_EXIT:
994 0 : _removeConnection( node.connectionID );
995 0 : return;
996 :
997 : default:
998 0 : LBUNIMPLEMENTED;
999 0 : break;
1000 : }
1001 : }
1002 :
1003 0 : void RSPConnection::_handleConnectedData( const size_t bytes )
1004 : {
1005 0 : if( bytes < sizeof( uint16_t ))
1006 0 : return;
1007 :
1008 0 : void* data = _recvBuffer.getData();
1009 0 : uint16_t type = *reinterpret_cast< uint16_t* >( data );
1010 : #ifdef COLLAGE_BIGENDIAN
1011 : lunchbox::byteswap( type );
1012 : #endif
1013 0 : switch( type )
1014 : {
1015 : case DATA:
1016 0 : LBCHECK( _handleData( bytes ));
1017 0 : break;
1018 :
1019 : case ACK:
1020 0 : LBCHECK( _handleAck( bytes ));
1021 0 : break;
1022 :
1023 : case NACK:
1024 0 : LBCHECK( _handleNack( ));
1025 0 : break;
1026 :
1027 : case ACKREQ: // The writer asks for an ack/nack
1028 0 : LBCHECK( _handleAckRequest( bytes ));
1029 0 : break;
1030 :
1031 : case ID_HELLO:
1032 : case ID_HELLO_REPLY:
1033 : case ID_CONFIRM:
1034 : case ID_EXIT:
1035 : case ID_DENY:
1036 : case COUNTNODE:
1037 0 : _handleInitData( bytes, true );
1038 0 : break;
1039 :
1040 : default:
1041 0 : LBASSERTINFO( false,
1042 : "Don't know how to handle packet of type " << type );
1043 : }
1044 :
1045 : }
1046 :
1047 0 : void RSPConnection::_asyncReceiveFrom()
1048 : {
1049 : _read->async_receive_from(
1050 0 : boost::asio::buffer( _recvBuffer.getData(), _mtu ), _readAddr,
1051 : boost::bind( &RSPConnection::_handlePacket, this,
1052 : boost::asio::placeholders::error,
1053 0 : boost::asio::placeholders::bytes_transferred ));
1054 0 : }
1055 :
1056 0 : bool RSPConnection::_handleData( const size_t bytes )
1057 : {
1058 0 : if( bytes < sizeof( DatagramData ))
1059 0 : return false;
1060 : DatagramData& datagram =
1061 0 : *reinterpret_cast< DatagramData* >( _recvBuffer.getData( ));
1062 0 : datagram.byteswap();
1063 :
1064 : #ifdef CO_INSTRUMENT_RSP
1065 : ++nReadData;
1066 : #endif
1067 0 : const uint16_t writerID = datagram.writerID;
1068 : #ifdef Darwin
1069 : // There is occasionally a packet from ourselves, even though multicast loop
1070 : // is not set?!
1071 : if( writerID == _id )
1072 : return true;
1073 : #else
1074 0 : LBASSERT( writerID != _id );
1075 : #endif
1076 :
1077 0 : RSPConnectionPtr connection = _findConnection( writerID );
1078 :
1079 0 : if( !connection ) // unknown connection ?
1080 : {
1081 0 : LBASSERTINFO( false, "Can't find connection with id " << writerID );
1082 0 : return false;
1083 : }
1084 0 : LBASSERT( connection->_id == writerID );
1085 :
1086 0 : const uint16_t sequence = datagram.sequence;
1087 : // LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID <<std::endl;
1088 :
1089 0 : if( connection->_sequence == sequence ) // in-order packet
1090 : {
1091 0 : Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
1092 0 : if( !newBuffer ) // no more data buffers, drop packet
1093 0 : return true;
1094 :
1095 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
1096 0 : connection->_pushDataBuffer( newBuffer );
1097 :
1098 0 : while( !connection->_recvBuffers.empty( )) // enqueue ready pending data
1099 : {
1100 0 : newBuffer = connection->_recvBuffers.front();
1101 0 : if( !newBuffer )
1102 0 : break;
1103 :
1104 0 : connection->_recvBuffers.pop_front();
1105 0 : connection->_pushDataBuffer( newBuffer );
1106 : }
1107 :
1108 0 : if( !connection->_recvBuffers.empty() &&
1109 0 : !connection->_recvBuffers.front( )) // update for new _sequence
1110 : {
1111 0 : connection->_recvBuffers.pop_front();
1112 : }
1113 :
1114 0 : connection->_event->set();
1115 0 : return true;
1116 : }
1117 :
1118 0 : const uint16_t max = std::numeric_limits< uint16_t >::max();
1119 0 : if(( connection->_sequence > sequence &&
1120 0 : max - connection->_sequence + sequence > _numBuffers ) ||
1121 0 : ( connection->_sequence < sequence &&
1122 0 : sequence - connection->_sequence > _numBuffers ))
1123 : {
1124 : // ignore it if it's a repetition for another reader
1125 0 : return true;
1126 : }
1127 :
1128 : // else out of order
1129 :
1130 0 : const uint16_t size = sequence - connection->_sequence;
1131 0 : LBASSERT( size != 0 );
1132 0 : LBASSERTINFO( size <= _numBuffers, size << " > " << _numBuffers );
1133 :
1134 0 : ssize_t i = ssize_t( size ) - 1;
1135 0 : const bool gotPacket = ( connection->_recvBuffers.size() >= size &&
1136 0 : connection->_recvBuffers[ i ] );
1137 0 : if( gotPacket )
1138 0 : return true;
1139 :
1140 0 : Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
1141 0 : if( !newBuffer ) // no more data buffers, drop packet
1142 0 : return true;
1143 :
1144 0 : if( connection->_recvBuffers.size() < size )
1145 0 : connection->_recvBuffers.resize( size, 0 );
1146 :
1147 0 : LBASSERT( !connection->_recvBuffers[ i ] );
1148 0 : connection->_recvBuffers[ i ] = newBuffer;
1149 :
1150 : // early nack: request missing packets before current
1151 0 : --i;
1152 0 : Nack nack = { connection->_sequence, uint16_t( sequence - 1 ) };
1153 0 : if( i > 0 )
1154 : {
1155 0 : if( connection->_recvBuffers[i] ) // got previous packet
1156 0 : return true;
1157 :
1158 0 : while( i >= 0 && !connection->_recvBuffers[i] )
1159 0 : --i;
1160 :
1161 0 : const Buffer* lastBuffer = i>=0 ? connection->_recvBuffers[i] : 0;
1162 0 : if( lastBuffer )
1163 : {
1164 0 : nack.start = connection->_sequence + i;
1165 : }
1166 : }
1167 :
1168 0 : LBLOG( LOG_RSP ) << "send early nack " << nack.start << ".." << nack.end
1169 0 : << " current " << connection->_sequence << " ooo "
1170 0 : << connection->_recvBuffers.size() << std::endl;
1171 :
1172 0 : if( nack.end < nack.start )
1173 : // OPT: don't drop nack 0..nack.end, but it doesn't happen often
1174 0 : nack.end = std::numeric_limits< uint16_t >::max();
1175 :
1176 0 : _sendNack( writerID, &nack, 1 );
1177 0 : return true;
1178 : }
1179 :
1180 0 : RSPConnection::Buffer* RSPConnection::_newDataBuffer( Buffer& inBuffer )
1181 : {
1182 0 : LBASSERT( static_cast< int32_t >( inBuffer.getMaxSize( )) == _mtu );
1183 :
1184 0 : Buffer* buffer = 0;
1185 0 : if( _threadBuffers.pop( buffer ))
1186 : {
1187 0 : buffer->swap( inBuffer );
1188 0 : return buffer;
1189 : }
1190 :
1191 : // we do not have a free buffer, which means that the receiver is slower
1192 : // then our read thread. This is bad, because now we'll drop the data and
1193 : // will send a NAck packet upon the ack request, causing retransmission even
1194 : // though we'll probably drop it again
1195 0 : LBLOG( LOG_RSP ) << "Reader too slow, dropping data" << std::endl;
1196 :
1197 : // Set the event if there is data to read. This shouldn't be needed since
1198 : // the event should be set in this case, but it'll increase the robustness
1199 0 : lunchbox::ScopedWrite mutex( _mutexEvent );
1200 0 : if( !_appBuffers.isEmpty( ))
1201 0 : _event->set();
1202 0 : return 0;
1203 : }
1204 :
1205 0 : void RSPConnection::_pushDataBuffer( Buffer* buffer )
1206 : {
1207 0 : LBASSERT( _parent );
1208 : #ifndef NDEBUG
1209 0 : DatagramData* dgram = reinterpret_cast< DatagramData* >(buffer->getData( ));
1210 0 : LBASSERTINFO( dgram->sequence == _sequence,
1211 : dgram->sequence << " != " << _sequence );
1212 : #endif
1213 :
1214 0 : if( (( _sequence + _parent->_id ) % _ackFreq ) == 0 )
1215 0 : _parent->_sendAck( _id, _sequence );
1216 :
1217 0 : LBLOG( LOG_RSP ) << "post buffer " << _sequence << std::endl;
1218 0 : ++_sequence;
1219 0 : _appBuffers.push( buffer );
1220 0 : }
1221 :
1222 0 : bool RSPConnection::_handleAck( const size_t bytes )
1223 : {
1224 0 : if( bytes < sizeof( DatagramAck ))
1225 0 : return false;
1226 : DatagramAck& ack =
1227 0 : *reinterpret_cast< DatagramAck* >( _recvBuffer.getData( ));
1228 0 : ack.byteswap();
1229 :
1230 : #ifdef CO_INSTRUMENT_RSP
1231 : ++nAcksRead;
1232 : #endif
1233 :
1234 0 : if( ack.writerID != _id )
1235 0 : return true;
1236 :
1237 0 : LBLOG( LOG_RSP ) << "got ack from " << ack.readerID << " for "
1238 0 : << ack.writerID << " sequence " << ack.sequence
1239 0 : << " current " << _sequence << std::endl;
1240 :
1241 : // find destination connection, update ack data if needed
1242 0 : RSPConnectionPtr connection = _findConnection( ack.readerID );
1243 0 : if( !connection )
1244 : {
1245 0 : LBUNREACHABLE;
1246 0 : return false;
1247 : }
1248 :
1249 0 : if( connection->_acked >= ack.sequence &&
1250 0 : connection->_acked - ack.sequence <= _numBuffers )
1251 : {
1252 : // I have received a later ack previously from the reader
1253 0 : LBLOG( LOG_RSP ) << "Late ack" << std::endl;
1254 0 : return true;
1255 : }
1256 :
1257 : #ifdef CO_INSTRUMENT_RSP
1258 : ++nAcksAccepted;
1259 : #endif
1260 0 : connection->_acked = ack.sequence;
1261 0 : _timeouts = 0; // reset timeout counter
1262 :
1263 : // Check if we can advance _acked
1264 0 : uint16_t acked = ack.sequence;
1265 :
1266 0 : for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
1267 : {
1268 0 : RSPConnectionPtr child = *i;
1269 0 : if( child->_id == _id )
1270 0 : continue;
1271 :
1272 0 : const uint16_t distance = child->_acked - acked;
1273 0 : if( distance > _numBuffers )
1274 0 : acked = child->_acked;
1275 0 : }
1276 :
1277 0 : RSPConnectionPtr selfChild = _findConnection( _id );
1278 0 : const uint16_t distance = acked - selfChild->_acked;
1279 0 : if( distance <= _numBuffers )
1280 0 : _finishWriteQueue( acked );
1281 0 : return true;
1282 : }
1283 :
1284 0 : bool RSPConnection::_handleNack()
1285 : {
1286 : DatagramNack& nack =
1287 0 : *reinterpret_cast< DatagramNack* >( _recvBuffer.getData( ));
1288 0 : nack.byteswap();
1289 :
1290 : #ifdef CO_INSTRUMENT_RSP
1291 : ++nNAcksRead;
1292 : #endif
1293 :
1294 0 : if( _id != nack.writerID )
1295 : {
1296 0 : LBLOG( LOG_RSP )
1297 0 : << "ignore " << nack.count << " nacks from " << nack.readerID
1298 0 : << " for " << nack.writerID << " (not me)"<< std::endl;
1299 0 : return true;
1300 : }
1301 :
1302 0 : LBLOG( LOG_RSP )
1303 0 : << "handle " << nack.count << " nacks from " << nack.readerID
1304 0 : << " for " << nack.writerID << std::endl;
1305 :
1306 0 : RSPConnectionPtr connection = _findConnection( nack.readerID );
1307 0 : if( !connection )
1308 : {
1309 0 : LBUNREACHABLE;
1310 0 : return false;
1311 : // it's an unknown connection, TODO add this connection?
1312 : }
1313 :
1314 0 : _timeouts = 0;
1315 0 : _addRepeat( nack.nacks, nack.count );
1316 0 : return true;
1317 : }
1318 :
1319 0 : void RSPConnection::_addRepeat( const Nack* nacks, uint16_t num )
1320 : {
1321 0 : LBLOG( LOG_RSP ) << lunchbox::disableFlush << "Queue repeat requests ";
1322 0 : size_t lost = 0;
1323 :
1324 0 : for( size_t i = 0; i < num; ++i )
1325 : {
1326 0 : const Nack& nack = nacks[ i ];
1327 0 : LBASSERT( nack.start <= nack.end );
1328 :
1329 0 : LBLOG( LOG_RSP ) << nack.start << ".." << nack.end << " ";
1330 :
1331 0 : bool merged = false;
1332 0 : for( RepeatQueue::iterator j = _repeatQueue.begin();
1333 0 : j != _repeatQueue.end() && !merged; ++j )
1334 : {
1335 0 : Nack& old = *j;
1336 0 : if( old.start <= nack.end && old.end >= nack.start )
1337 : {
1338 0 : if( old.start > nack.start )
1339 : {
1340 0 : lost += old.start - nack.start;
1341 0 : old.start = nack.start;
1342 0 : merged = true;
1343 : }
1344 0 : if( old.end < nack.end )
1345 : {
1346 0 : lost += nack.end - old.end;
1347 0 : old.end = nack.end;
1348 0 : merged = true;
1349 : }
1350 0 : LBASSERT( lost < _numBuffers );
1351 : }
1352 : }
1353 :
1354 0 : if( !merged )
1355 : {
1356 0 : lost += uint16_t( nack.end - nack.start ) + 1;
1357 0 : LBASSERT( lost <= _numBuffers );
1358 0 : _repeatQueue.push_back( nack );
1359 : }
1360 : }
1361 :
1362 0 : ConstConnectionDescriptionPtr description = getDescription();
1363 0 : if( _sendRate >
1364 0 : ( description->bandwidth >>
1365 0 : Global::getIAttribute( Global::IATTR_RSP_MIN_SENDRATE_SHIFT )))
1366 : {
1367 0 : const float delta = float( lost ) * .001f *
1368 0 : Global::getIAttribute( Global::IATTR_RSP_ERROR_DOWNSCALE );
1369 0 : const float maxDelta = .01f *
1370 0 : float( Global::getIAttribute( Global::IATTR_RSP_ERROR_MAXSCALE ));
1371 0 : const float downScale = LB_MIN( delta, maxDelta );
1372 0 : _sendRate -= 1 + int64_t( _sendRate * downScale );
1373 0 : LBLOG( LOG_RSP )
1374 0 : << ", lost " << lost << " slowing down " << downScale * 100.f
1375 0 : << "% to " << _sendRate << " KB/s" << std::endl
1376 0 : << lunchbox::enableFlush;
1377 : }
1378 : else
1379 0 : LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush;
1380 0 : }
1381 :
1382 0 : bool RSPConnection::_handleAckRequest( const size_t bytes )
1383 : {
1384 0 : if( bytes < sizeof( DatagramAckRequest ))
1385 0 : return false;
1386 : DatagramAckRequest& ackRequest =
1387 0 : *reinterpret_cast< DatagramAckRequest* >( _recvBuffer.getData( ));
1388 0 : ackRequest.byteswap();
1389 :
1390 0 : const uint16_t writerID = ackRequest.writerID;
1391 : #ifdef Darwin
1392 : // There is occasionally a packet from ourselves, even though multicast loop
1393 : // is not set?!
1394 : if( writerID == _id )
1395 : return true;
1396 : #else
1397 0 : LBASSERT( writerID != _id );
1398 : #endif
1399 0 : RSPConnectionPtr connection = _findConnection( writerID );
1400 0 : if( !connection )
1401 : {
1402 0 : LBUNREACHABLE;
1403 0 : return false;
1404 : }
1405 :
1406 0 : const uint16_t reqID = ackRequest.sequence;
1407 0 : const uint16_t gotID = connection->_sequence - 1;
1408 0 : const uint16_t distance = reqID - gotID;
1409 :
1410 0 : LBLOG( LOG_RSP ) << "ack request " << reqID << " from " << writerID
1411 0 : << " got " << gotID << " missing " << distance
1412 0 : << std::endl;
1413 :
1414 0 : if( (reqID == gotID) ||
1415 0 : (gotID > reqID && gotID - reqID <= _numBuffers) ||
1416 0 : (gotID < reqID && distance > _numBuffers) )
1417 : {
1418 0 : _sendAck( connection->_id, gotID );
1419 0 : return true;
1420 : }
1421 : // else find all missing datagrams
1422 :
1423 0 : const uint16_t max = CO_RSP_MAX_NACKS - 2;
1424 : Nack nacks[ CO_RSP_MAX_NACKS ];
1425 0 : uint16_t i = 0;
1426 :
1427 0 : nacks[ i ].start = connection->_sequence;
1428 0 : LBLOG( LOG_RSP ) << lunchbox::disableFlush << "nacks: "
1429 0 : << nacks[i].start << "..";
1430 :
1431 0 : std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
1432 0 : std::deque<Buffer*>::const_iterator first = j;
1433 0 : for( ; j != connection->_recvBuffers.end() && i < max; ++j )
1434 : {
1435 0 : if( *j ) // got buffer
1436 : {
1437 0 : nacks[ i ].end = connection->_sequence + std::distance( first, j);
1438 0 : LBLOG( LOG_RSP ) << nacks[i].end << ", ";
1439 0 : if( nacks[ i ].end < nacks[ i ].start )
1440 : {
1441 0 : LBASSERT( nacks[ i ].end < _numBuffers );
1442 0 : nacks[ i + 1 ].start = 0;
1443 0 : nacks[ i + 1 ].end = nacks[ i ].end;
1444 0 : nacks[ i ].end = std::numeric_limits< uint16_t >::max();
1445 0 : ++i;
1446 : }
1447 0 : ++i;
1448 :
1449 : // find next hole
1450 0 : for( ++j; j != connection->_recvBuffers.end() && (*j); ++j )
1451 : /* nop */;
1452 :
1453 0 : if( j == connection->_recvBuffers.end( ))
1454 0 : break;
1455 :
1456 0 : nacks[i].start = connection->_sequence + std::distance(first, j) +1;
1457 0 : LBLOG( LOG_RSP ) << nacks[i].start << "..";
1458 : }
1459 : }
1460 :
1461 0 : if( j != connection->_recvBuffers.end() || i == 0 )
1462 : {
1463 0 : nacks[ i ].end = reqID;
1464 0 : LBLOG( LOG_RSP ) << nacks[i].end;
1465 0 : ++i;
1466 : }
1467 0 : else if( uint16_t( reqID - nacks[i-1].end ) < _numBuffers )
1468 : {
1469 0 : nacks[i].start = nacks[i-1].end + 1;
1470 0 : nacks[i].end = reqID;
1471 0 : LBLOG( LOG_RSP ) << nacks[i].start << ".." << nacks[i].end;
1472 0 : ++i;
1473 : }
1474 0 : if( nacks[ i -1 ].end < nacks[ i - 1 ].start )
1475 : {
1476 0 : LBASSERT( nacks[ i - 1 ].end < _numBuffers );
1477 0 : nacks[ i ].start = 0;
1478 0 : nacks[ i ].end = nacks[ i - 1 ].end;
1479 0 : nacks[ i - 1 ].end = std::numeric_limits< uint16_t >::max();
1480 0 : ++i;
1481 : }
1482 :
1483 0 : LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush << "send " << i
1484 0 : << " nacks to " << connection->_id << std::endl;
1485 :
1486 0 : LBASSERT( i > 0 );
1487 0 : _sendNack( connection->_id, nacks, i );
1488 0 : return true;
1489 : }
1490 :
1491 0 : void RSPConnection::_checkNewID( uint16_t id )
1492 : {
1493 : // look if the new ID exist in another connection
1494 0 : if( id == _id || _findConnection( id ))
1495 : {
1496 0 : LBLOG( LOG_RSP ) << "Deny " << id << std::endl;
1497 0 : _sendSimpleDatagram( ID_DENY, _id );
1498 : }
1499 : else
1500 0 : _sendSimpleDatagram( ID_HELLO_REPLY, _id );
1501 0 : }
1502 :
1503 0 : RSPConnectionPtr RSPConnection::_findConnection( const uint16_t id )
1504 : {
1505 0 : for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
1506 : {
1507 0 : if( (*i)->_id == id )
1508 0 : return *i;
1509 : }
1510 0 : return 0;
1511 : }
1512 :
1513 0 : bool RSPConnection::_addConnection( const uint16_t id, const uint16_t sequence )
1514 : {
1515 0 : if( _findConnection( id ))
1516 0 : return false;
1517 :
1518 0 : LBDEBUG << "add connection " << id << std::endl;
1519 0 : RSPConnectionPtr connection = new RSPConnection();
1520 0 : connection->_id = id;
1521 0 : connection->_parent = this;
1522 0 : connection->_setState( STATE_CONNECTED );
1523 0 : connection->_setDescription( _getDescription( ));
1524 0 : connection->_sequence = sequence;
1525 0 : LBASSERT( connection->_appBuffers.isEmpty( ));
1526 :
1527 : // Make all buffers available for reading
1528 0 : for( BuffersCIter i = connection->_buffers.begin();
1529 0 : i != connection->_buffers.end(); ++i )
1530 : {
1531 0 : Buffer* buffer = *i;
1532 0 : LBCHECK( connection->_threadBuffers.push( buffer ));
1533 : }
1534 :
1535 0 : _children.push_back( connection );
1536 0 : _sendCountNode();
1537 :
1538 0 : lunchbox::ScopedWrite mutex( _mutexConnection );
1539 0 : _newChildren.push_back( connection );
1540 :
1541 0 : lunchbox::ScopedWrite mutex2( _mutexEvent );
1542 0 : _event->set();
1543 0 : return true;
1544 : }
1545 :
1546 0 : void RSPConnection::_removeConnection( const uint16_t id )
1547 : {
1548 0 : LBDEBUG << "remove connection " << id << std::endl;
1549 0 : if( id == _id )
1550 0 : return;
1551 :
1552 0 : for( RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i )
1553 : {
1554 0 : RSPConnectionPtr child = *i;
1555 0 : if( child->_id == id )
1556 : {
1557 0 : _children.erase( i );
1558 :
1559 0 : lunchbox::ScopedWrite mutex( child->_mutexEvent );
1560 0 : child->_appBuffers.push( 0 );
1561 0 : child->_event->set();
1562 0 : break;
1563 : }
1564 0 : }
1565 :
1566 0 : _sendCountNode();
1567 : }
1568 :
1569 0 : int64_t RSPConnection::write( const void* inData, const uint64_t bytes )
1570 : {
1571 0 : if( _parent )
1572 0 : return _parent->write( inData, bytes );
1573 :
1574 0 : LBASSERT( isListening( ));
1575 0 : if( !_write )
1576 0 : return -1;
1577 :
1578 : // compute number of datagrams
1579 0 : uint64_t nDatagrams = bytes / _payloadSize;
1580 0 : if( nDatagrams * _payloadSize != bytes )
1581 0 : ++nDatagrams;
1582 :
1583 : // queue each datagram (might block if buffers are exhausted)
1584 0 : const uint8_t* data = reinterpret_cast< const uint8_t* >( inData );
1585 0 : const uint8_t* end = data + bytes;
1586 0 : for( uint64_t i = 0; i < nDatagrams; ++i )
1587 : {
1588 0 : size_t packetSize = end - data;
1589 0 : packetSize = LB_MIN( packetSize, _payloadSize );
1590 :
1591 0 : if( _appBuffers.isEmpty( ))
1592 : // trigger processing
1593 0 : _postWakeup();
1594 :
1595 : Buffer* buffer;
1596 0 : if ( !_appBuffers.timedPop( _writeTimeOut, buffer ) )
1597 : {
1598 0 : LBERROR << "Timeout while writing" << std::endl;
1599 0 : buffer = 0;
1600 : }
1601 :
1602 0 : if( !buffer )
1603 : {
1604 0 : close();
1605 0 : return -1;
1606 : }
1607 :
1608 : // prepare packet header (sequence is done by thread)
1609 : DatagramData* header =
1610 0 : reinterpret_cast< DatagramData* >( buffer->getData( ));
1611 0 : header->type = DATA;
1612 0 : header->size = uint16_t( packetSize );
1613 0 : header->writerID = _id;
1614 :
1615 0 : memcpy( header + 1, data, packetSize );
1616 0 : data += packetSize;
1617 :
1618 0 : LBCHECK( _threadBuffers.push( buffer ));
1619 : }
1620 0 : _postWakeup();
1621 0 : LBLOG( LOG_RSP ) << "queued " << nDatagrams << " datagrams, "
1622 0 : << bytes << " bytes" << std::endl;
1623 0 : return bytes;
1624 : }
1625 :
1626 0 : void RSPConnection::finish()
1627 : {
1628 0 : if( _parent.isValid( ))
1629 : {
1630 0 : LBASSERTINFO( !_parent, "Writes are only allowed on RSP listeners" );
1631 0 : return;
1632 : }
1633 0 : LBASSERT( isListening( ));
1634 0 : _appBuffers.waitSize( _buffers.size( ));
1635 : }
1636 :
1637 0 : void RSPConnection::_sendCountNode()
1638 : {
1639 0 : if( !_findConnection( _id ))
1640 0 : return;
1641 :
1642 0 : LBLOG( LOG_RSP ) << _children.size() << " nodes" << std::endl;
1643 : DatagramNode count = { COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
1644 0 : uint16_t( _children.size( )) };
1645 0 : count.byteswap();
1646 0 : _write->send( boost::asio::buffer( &count, sizeof( count )) );
1647 : }
1648 :
1649 0 : void RSPConnection::_sendSimpleDatagram( const DatagramType type,
1650 : const uint16_t id )
1651 : {
1652 : DatagramNode simple = { uint16_t( type ), CO_RSP_PROTOCOL_VERSION, id,
1653 0 : _sequence };
1654 0 : simple.byteswap();
1655 0 : _write->send( boost::asio::buffer( &simple, sizeof( simple )) );
1656 0 : }
1657 :
1658 0 : void RSPConnection::_sendAck( const uint16_t writerID,
1659 : const uint16_t sequence )
1660 : {
1661 0 : LBASSERT( _id != writerID );
1662 : #ifdef CO_INSTRUMENT_RSP
1663 : ++nAcksSend;
1664 : #endif
1665 :
1666 0 : LBLOG( LOG_RSP ) << "send ack " << sequence << std::endl;
1667 0 : DatagramAck ack = { ACK, _id, writerID, sequence };
1668 0 : ack.byteswap();
1669 0 : _write->send( boost::asio::buffer( &ack, sizeof( ack )) );
1670 0 : }
1671 :
1672 0 : void RSPConnection::_sendNack( const uint16_t writerID, const Nack* nacks,
1673 : const uint16_t count )
1674 : {
1675 0 : LBASSERT( count > 0 );
1676 0 : LBASSERT( count <= CO_RSP_MAX_NACKS );
1677 : #ifdef CO_INSTRUMENT_RSP
1678 : ++nNAcksSend;
1679 : #endif
1680 : /* optimization: use the direct access to the reader. */
1681 0 : if( writerID == _id )
1682 : {
1683 0 : _addRepeat( nacks, count );
1684 0 : return;
1685 : }
1686 :
1687 : const size_t size = sizeof( DatagramNack ) -
1688 0 : (CO_RSP_MAX_NACKS - count) * sizeof( Nack );
1689 :
1690 : // set the header
1691 : DatagramNack packet;
1692 0 : packet.set( _id, writerID, count );
1693 0 : memcpy( packet.nacks, nacks, count * sizeof( Nack ));
1694 0 : packet.byteswap();
1695 0 : _write->send( boost::asio::buffer( &packet, size ));
1696 : }
1697 :
1698 0 : void RSPConnection::_sendAckRequest()
1699 : {
1700 : #ifdef CO_INSTRUMENT_RSP
1701 : ++nAckRequests;
1702 : #endif
1703 0 : LBLOG( LOG_RSP ) << "send ack request for " << uint16_t( _sequence -1 )
1704 0 : << std::endl;
1705 0 : DatagramAckRequest ackRequest = { ACKREQ, _id, uint16_t( _sequence - 1 ) };
1706 0 : ackRequest.byteswap();
1707 0 : _write->send( boost::asio::buffer( &ackRequest, sizeof( DatagramAckRequest )) );
1708 0 : }
1709 :
1710 0 : std::ostream& operator << ( std::ostream& os,
1711 : const RSPConnection& connection )
1712 : {
1713 0 : os << lunchbox::disableFlush << lunchbox::disableHeader
1714 0 : << "RSPConnection id " << connection.getID() << " send rate "
1715 0 : << connection.getSendRate();
1716 :
1717 : #ifdef CO_INSTRUMENT_RSP
1718 : const int prec = os.precision();
1719 : os.precision( 3 );
1720 :
1721 : const float time = instrumentClock.getTimef();
1722 : const float mbps = 1048.576f * time;
1723 : os << ": " << lunchbox::indent << std::endl
1724 : << float( nBytesRead ) / mbps << " / " << float( nBytesWritten ) / mbps
1725 : << " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
1726 : << " repeats " << nMergedDatagrams
1727 : << " merged"
1728 : << std::endl;
1729 :
1730 : os.precision( prec );
1731 : os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
1732 : << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
1733 : << writeWaitTime << " ms"
1734 : << std::endl
1735 : << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
1736 : << lunchbox::exdent;
1737 :
1738 : nReadData = 0;
1739 : nBytesRead = 0;
1740 : nBytesWritten = 0;
1741 : nDatagrams = 0;
1742 : nRepeated = 0;
1743 : nMergedDatagrams = 0;
1744 : nAckRequests = 0;
1745 : nAcksSend = 0;
1746 : nAcksRead = 0;
1747 : nAcksAccepted = 0;
1748 : nNAcksSend = 0;
1749 : nNAcksRead = 0;
1750 : writeWaitTime = 0.f;
1751 : #endif
1752 0 : os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
1753 :
1754 0 : return os;
1755 : }
1756 :
1757 66 : }
|