Line data Source code
1 :
2 : /* Copyright (c) 2009, Cedric Stalder <cedric.stalder@gmail.com>
3 : * 2009-2014, Stefan Eilemann <eile@equalizergraphics.com>
4 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "rspConnection.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "log.h"
28 :
29 : #include <lunchbox/rng.h>
30 : #include <lunchbox/scopedMutex.h>
31 : #include <lunchbox/sleep.h>
32 :
33 : #include <boost/bind.hpp>
34 :
35 : //#define CO_INSTRUMENT_RSP
36 : #define CO_RSP_MERGE_WRITES
37 : #define CO_RSP_MAX_TIMEOUTS 1000
38 : #ifdef _WIN32
39 : # define CO_RSP_DEFAULT_PORT (4242)
40 : #else
41 : # define CO_RSP_DEFAULT_PORT ( (getuid() % 64511) + 1024 )
42 : #endif
43 :
44 :
45 : // Note: Do not use version > 255, endianness detection magic relies on this.
46 : const uint16_t CO_RSP_PROTOCOL_VERSION = 0;
47 :
48 : namespace bp = boost::posix_time;
49 : namespace ip = boost::asio::ip;
50 :
51 : namespace co
52 : {
53 :
54 : namespace
55 : {
56 : #ifdef CO_INSTRUMENT_RSP
57 : lunchbox::a_int32_t nReadData;
58 : lunchbox::a_int32_t nBytesRead;
59 : lunchbox::a_int32_t nBytesWritten;
60 : lunchbox::a_int32_t nDatagrams;
61 : lunchbox::a_int32_t nRepeated;
62 : lunchbox::a_int32_t nMergedDatagrams;
63 : lunchbox::a_int32_t nAckRequests;
64 : lunchbox::a_int32_t nAcksSend;
65 : lunchbox::a_int32_t nAcksSendTotal;
66 : lunchbox::a_int32_t nAcksRead;
67 : lunchbox::a_int32_t nAcksAccepted;
68 : lunchbox::a_int32_t nNAcksSend;
69 : lunchbox::a_int32_t nNAcksRead;
70 : lunchbox::a_int32_t nNAcksResend;
71 :
72 : float writeWaitTime = 0.f;
73 : lunchbox::Clock instrumentClock;
74 : #endif
75 :
76 : static uint16_t _numBuffers = 0;
77 : }
78 :
79 4 : RSPConnection::RSPConnection()
80 : : _id( 0 )
81 : , _idAccepted( false )
82 4 : , _mtu( Global::getIAttribute( Global::IATTR_UDP_MTU ))
83 4 : , _ackFreq( Global::getIAttribute( Global::IATTR_RSP_ACK_FREQUENCY ))
84 : , _payloadSize( _mtu - sizeof( DatagramData ))
85 : , _timeouts( 0 )
86 4 : , _event( new EventConnection )
87 : , _read( 0 )
88 : , _write( 0 )
89 : , _timeout( _ioService )
90 : , _wakeup( _ioService )
91 4 : , _maxBucketSize( ( _mtu * _ackFreq) >> 1 )
92 : , _bucketSize( 0 )
93 : , _sendRate( 0 )
94 : , _thread( 0 )
95 4 : , _acked( std::numeric_limits< uint16_t >::max( ))
96 : , _threadBuffers( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS))
97 : , _recvBuffer( _mtu )
98 : , _readBuffer( 0 )
99 : , _readBufferPos( 0 )
100 : , _sequence( 0 )
101 : // ensure we have a handleConnectedTimeout before the write pop
102 24 : , _writeTimeOut( Global::IATTR_RSP_ACK_TIMEOUT * CO_RSP_MAX_TIMEOUTS * 2 )
103 : {
104 4 : _buildNewID();
105 4 : ConnectionDescriptionPtr description = _getDescription();
106 4 : description->type = CONNECTIONTYPE_RSP;
107 4 : description->bandwidth = 102400;
108 :
109 4 : LBCHECK( _event->connect( ));
110 :
111 4 : _buffers.reserve( Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ));
112 524 : while( static_cast< int32_t >( _buffers.size( )) <
113 260 : Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS ))
114 : {
115 256 : _buffers.push_back( new Buffer( _mtu ));
116 : }
117 :
118 4 : LBASSERT( sizeof( DatagramNack ) <= size_t( _mtu ));
119 4 : LBLOG( LOG_RSP ) << "New RSP connection, " << _buffers.size()
120 4 : << " buffers of " << _mtu << " bytes" << std::endl;
121 :
122 4 : }
123 :
124 12 : RSPConnection::~RSPConnection()
125 : {
126 4 : _close();
127 264 : while( !_buffers.empty( ))
128 : {
129 256 : delete _buffers.back();
130 256 : _buffers.pop_back();
131 : }
132 8 : }
133 :
134 12 : void RSPConnection::_close()
135 : {
136 12 : if( _parent.isValid() && _parent->_id == _id )
137 2 : _parent->close();
138 :
139 24 : while( !_parent && _isWriting( ))
140 0 : lunchbox::sleep( 10 /*ms*/ );
141 :
142 12 : if( isClosed( ))
143 20 : return;
144 :
145 4 : lunchbox::ScopedWrite mutex( _mutexEvent );
146 4 : if( _thread )
147 : {
148 2 : LBASSERT( !_thread->isCurrent( ));
149 2 : _sendSimpleDatagram( ID_EXIT, _id );
150 2 : _ioService.stop();
151 2 : _thread->join();
152 2 : delete _thread;
153 : }
154 :
155 4 : _setState( STATE_CLOSING );
156 4 : if( _thread )
157 : {
158 2 : _thread = 0;
159 :
160 : // notify children to close
161 4 : for( RSPConnectionsCIter i=_children.begin(); i !=_children.end(); ++i )
162 : {
163 2 : RSPConnectionPtr child = *i;
164 4 : lunchbox::ScopedWrite mutexChild( child->_mutexEvent );
165 2 : child->_appBuffers.push( 0 );
166 2 : child->_event->set();
167 2 : }
168 :
169 2 : _children.clear();
170 2 : _newChildren.clear();
171 : }
172 :
173 4 : _parent = 0;
174 :
175 4 : if( _read )
176 2 : _read->close();
177 4 : delete _read;
178 4 : _read = 0;
179 :
180 4 : if( _write )
181 2 : _write->close();
182 4 : delete _write;
183 4 : _write = 0;
184 :
185 4 : _threadBuffers.clear();
186 4 : _appBuffers.push( 0 ); // unlock any other read/write threads
187 :
188 4 : _setState( STATE_CLOSED );
189 :
190 4 : mutex.leave();
191 4 : _event->close();
192 : }
193 :
194 : //----------------------------------------------------------------------
195 : // Async IO handles
196 : //----------------------------------------------------------------------
197 4 : uint16_t RSPConnection::_buildNewID()
198 : {
199 4 : lunchbox::RNG rng;
200 4 : _id = rng.get< uint16_t >();
201 4 : return _id;
202 : }
203 :
204 2 : bool RSPConnection::listen()
205 : {
206 2 : ConnectionDescriptionPtr description = _getDescription();
207 2 : LBASSERT( description->type == CONNECTIONTYPE_RSP );
208 :
209 2 : if( !isClosed( ))
210 0 : return false;
211 :
212 2 : _setState( STATE_CONNECTING );
213 2 : _numBuffers = Global::getIAttribute( Global::IATTR_RSP_NUM_BUFFERS );
214 :
215 : // init udp connection
216 2 : if( description->port == 0 )
217 1 : description->port = CO_RSP_DEFAULT_PORT;
218 2 : if( description->getHostname().empty( ))
219 0 : description->setHostname( "239.255.42.43" );
220 2 : if( description->getInterface().empty( ))
221 1 : description->setInterface( "0.0.0.0" );
222 :
223 : try
224 : {
225 2 : const ip::address readAddress( ip::address::from_string( "0.0.0.0" ));
226 : const ip::udp::endpoint readEndpoint( readAddress,
227 2 : description->port );
228 :
229 2 : std::stringstream portStr;
230 2 : portStr << description->port;
231 4 : const std::string& port = portStr.str();
232 4 : ip::udp::resolver resolver( _ioService );
233 : const ip::udp::resolver::query queryHN( ip::udp::v4(),
234 2 : description->getHostname(),
235 4 : port );
236 4 : const ip::udp::resolver::iterator end;
237 : const ip::udp::resolver::iterator hostnameIP =
238 4 : resolver.resolve( queryHN );
239 :
240 2 : if( hostnameIP == end )
241 0 : return false;
242 :
243 2 : const ip::udp::endpoint writeEndpoint = *hostnameIP;
244 2 : const ip::address mcAddr( writeEndpoint.address() );
245 :
246 2 : _read = new ip::udp::socket( _ioService );
247 2 : _write = new ip::udp::socket( _ioService );
248 2 : _read->open( readEndpoint.protocol( ));
249 2 : _write->open( writeEndpoint.protocol( ));
250 :
251 2 : _read->set_option( ip::udp::socket::reuse_address( true ));
252 2 : _write->set_option( ip::udp::socket::reuse_address( true ));
253 : _read->set_option( ip::udp::socket::receive_buffer_size(
254 2 : Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
255 : _write->set_option( ip::udp::socket::send_buffer_size(
256 2 : Global::getIAttribute( Global::IATTR_UDP_BUFFER_SIZE )));
257 :
258 2 : _read->bind( readEndpoint );
259 :
260 : const ip::udp::resolver::query queryIF( ip::udp::v4(),
261 2 : description->getInterface(),
262 6 : "0" );
263 : const ip::udp::resolver::iterator interfaceIP =
264 4 : resolver.resolve( queryIF );
265 :
266 2 : if( interfaceIP == end )
267 0 : return false;
268 :
269 2 : const ip::address ifAddr( ip::udp::endpoint( *interfaceIP ).address( ));
270 2 : LBINFO << "Joining " << mcAddr << " on " << ifAddr << std::endl;
271 :
272 : _read->set_option( ip::multicast::join_group( mcAddr.to_v4(),
273 2 : ifAddr.to_v4( )));
274 2 : _write->set_option( ip::multicast::outbound_interface( ifAddr.to_v4()));
275 : #ifdef SO_BINDTODEVICE // https://github.com/Eyescale/Collage/issues/16
276 4 : const std::string& ifIP = ifAddr.to_string();
277 : ::setsockopt( _write->native(), SOL_SOCKET, SO_BINDTODEVICE,
278 2 : ifIP.c_str(), ifIP.size() + 1 );
279 : ::setsockopt( _read->native(), SOL_SOCKET, SO_BINDTODEVICE,
280 2 : ifIP.c_str(), ifIP.size() + 1 );
281 : #endif
282 :
283 2 : _write->connect( writeEndpoint );
284 :
285 2 : _read->set_option( ip::multicast::enable_loopback( false ));
286 4 : _write->set_option( ip::multicast::enable_loopback( false ));
287 : }
288 : catch( const boost::system::system_error& e )
289 : {
290 : LBWARN << "can't setup underlying UDP connection: " << e.what()
291 : << std::endl;
292 : delete _read;
293 : delete _write;
294 : _read = 0;
295 : _write = 0;
296 : return false;
297 : }
298 :
299 : // init communication protocol thread
300 2 : _thread = new Thread( this );
301 2 : _bucketSize = 0;
302 2 : _sendRate = description->bandwidth;
303 :
304 : // waits until RSP protocol establishes connection to the multicast network
305 2 : if( !_thread->start( ) )
306 : {
307 0 : close();
308 0 : return false;
309 : }
310 :
311 : // Make all buffers available for writing
312 2 : LBASSERT( _appBuffers.isEmpty( ));
313 2 : _appBuffers.push( _buffers );
314 :
315 8 : LBINFO << "Listening on " << description->getHostname() << ":"
316 10 : << description->port << " (" << description->toString() << " @"
317 8 : << (void*)this << ")" << std::endl;
318 2 : return true;
319 : }
320 :
321 2 : ConnectionPtr RSPConnection::acceptSync()
322 : {
323 2 : if( !isListening( ))
324 0 : return 0;
325 :
326 2 : lunchbox::ScopedWrite mutex( _mutexConnection );
327 2 : LBASSERT( !_newChildren.empty( ));
328 2 : if( _newChildren.empty( ))
329 0 : return 0;
330 :
331 4 : RSPConnectionPtr newConnection = _newChildren.back();
332 2 : _newChildren.pop_back();
333 :
334 6 : LBINFO << _id << " accepted RSP connection " << newConnection->_id
335 6 : << std::endl;
336 :
337 4 : lunchbox::ScopedWrite mutex2( _mutexEvent );
338 2 : if( _newChildren.empty() )
339 2 : _event->reset();
340 : else
341 0 : _event->set();
342 :
343 4 : return newConnection;
344 : }
345 :
346 67598 : int64_t RSPConnection::readSync( void* buffer, const uint64_t bytes, const bool)
347 : {
348 67598 : LBASSERT( bytes > 0 );
349 67598 : if( !isConnected ( ))
350 0 : return -1;
351 :
352 67598 : uint64_t bytesLeft = bytes;
353 67598 : uint8_t* ptr = reinterpret_cast< uint8_t* >( buffer );
354 :
355 : // redundant (done by the caller already), but saves some lock ops
356 203643 : while( bytesLeft )
357 : {
358 68449 : if( !_readBuffer )
359 : {
360 14516 : LBASSERT( _readBufferPos == 0 );
361 14516 : _readBuffer = _appBuffers.pop();
362 14516 : if( !_readBuffer )
363 : {
364 2 : close();
365 : return (bytes == bytesLeft) ?
366 2 : -1 : static_cast< int64_t >( bytes - bytesLeft );
367 : }
368 : }
369 :
370 : const DatagramData* header = reinterpret_cast< const DatagramData* >(
371 68447 : _readBuffer->getData( ));
372 68447 : const uint8_t* payload = reinterpret_cast< const uint8_t* >( header+1 );
373 68447 : const size_t dataLeft = header->size - _readBufferPos;
374 68447 : const size_t size = LB_MIN( static_cast< size_t >( bytesLeft ),
375 : dataLeft );
376 :
377 68447 : memcpy( ptr, payload + _readBufferPos, size );
378 68447 : _readBufferPos += size;
379 68447 : ptr += size;
380 68447 : bytesLeft -= size;
381 :
382 : // if all data in the buffer has been taken
383 68447 : if( _readBufferPos >= header->size )
384 : {
385 14514 : LBASSERT( _readBufferPos == header->size );
386 : //LBLOG( LOG_RSP ) << "reset read buffer " << header->sequence
387 : // << std::endl;
388 :
389 14514 : LBCHECK( _threadBuffers.push( _readBuffer ));
390 14514 : _readBuffer = 0;
391 14514 : _readBufferPos = 0;
392 : }
393 : else
394 : {
395 53933 : LBASSERT( _readBufferPos < header->size );
396 : }
397 : }
398 :
399 67596 : if( _readBuffer || !_appBuffers.isEmpty( ))
400 54236 : _event->set();
401 : else
402 : {
403 13360 : lunchbox::ScopedWrite mutex( _mutexEvent );
404 13360 : if( _appBuffers.isEmpty( ))
405 13325 : _event->reset();
406 : }
407 :
408 : #ifdef CO_INSTRUMENT_RSP
409 : nBytesRead += bytes;
410 : #endif
411 67596 : return bytes;
412 : }
413 :
414 2 : void RSPConnection::Thread::run()
415 : {
416 2 : _connection->_runThread();
417 2 : _connection = 0;
418 2 : LBINFO << "Left RSP protocol thread" << std::endl;
419 2 : }
420 :
421 83806 : void RSPConnection::_handleTimeout( const boost::system::error_code& error )
422 : {
423 83806 : if( error == boost::asio::error::operation_aborted )
424 153015 : return;
425 :
426 14597 : if( isListening( ))
427 14517 : _handleConnectedTimeout();
428 80 : else if( _idAccepted )
429 40 : _handleInitTimeout();
430 : else
431 40 : _handleAcceptIDTimeout();
432 : }
433 :
434 40 : void RSPConnection::_handleAcceptIDTimeout()
435 : {
436 40 : ++_timeouts;
437 40 : if( _timeouts < 20 )
438 : {
439 38 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
440 38 : _sendSimpleDatagram( ID_HELLO, _id );
441 : }
442 : else
443 : {
444 2 : LBLOG( LOG_RSP ) << "Confirm " << _id << std::endl;
445 2 : _sendSimpleDatagram( ID_CONFIRM, _id );
446 2 : _addConnection( _id, _sequence );
447 2 : _idAccepted = true;
448 2 : _timeouts = 0;
449 : // send a first datagram to announce me and discover all other
450 : // connections
451 2 : _sendCountNode();
452 : }
453 40 : _setTimeout( 10 );
454 40 : }
455 :
456 40 : void RSPConnection::_handleInitTimeout( )
457 : {
458 40 : LBASSERT( !isListening( ))
459 40 : ++_timeouts;
460 40 : if( _timeouts < 20 )
461 38 : _sendCountNode();
462 : else
463 : {
464 2 : _setState( STATE_LISTENING );
465 2 : LBINFO << "RSP connection " << _id << " listening" << std::endl;
466 2 : _timeouts = 0;
467 2 : _ioService.stop(); // thread initialized, run restarts
468 : }
469 40 : _setTimeout( 10 );
470 40 : }
471 :
472 0 : void RSPConnection::_clearWriteQueues()
473 : {
474 0 : while( !_threadBuffers.isEmpty() )
475 : {
476 0 : Buffer* buffer = 0;
477 0 : _threadBuffers.pop( buffer );
478 0 : _writeBuffers.push_back( buffer );
479 : }
480 :
481 0 : _finishWriteQueue( _sequence - 1 );
482 0 : LBASSERT( _threadBuffers.isEmpty() && _writeBuffers.empty() );
483 0 : }
484 :
485 14517 : void RSPConnection::_handleConnectedTimeout()
486 : {
487 14517 : if( !isListening( ))
488 : {
489 0 : _clearWriteQueues();
490 0 : _ioService.stop();
491 0 : return;
492 : }
493 :
494 14517 : _processOutgoing();
495 :
496 14517 : if( _timeouts >= CO_RSP_MAX_TIMEOUTS )
497 : {
498 0 : LBERROR << "Too many timeouts during send: " << _timeouts << std::endl;
499 0 : bool all = true;
500 0 : for( RSPConnectionsCIter i =_children.begin(); i !=_children.end(); ++i)
501 : {
502 0 : RSPConnectionPtr child = *i;
503 0 : if ( child->_acked >= _sequence - _numBuffers && child->_id != _id )
504 : {
505 0 : all = false;
506 0 : break;
507 : }
508 0 : }
509 :
510 : // if all connections failed we probably got disconnected -> close and
511 : // exit else close all failed child connections
512 0 : if ( all )
513 : {
514 0 : _sendSimpleDatagram( ID_EXIT, _id );
515 0 : _appBuffers.pushFront( 0 ); // unlock write function
516 :
517 0 : for( RSPConnectionsCIter i =_children.begin();
518 0 : i !=_children.end(); ++i)
519 : {
520 0 : RSPConnectionPtr child = *i;
521 0 : child->_setState( STATE_CLOSING );
522 0 : child->_appBuffers.push( 0 ); // unlock read func
523 0 : }
524 :
525 0 : _clearWriteQueues();
526 0 : _ioService.stop();
527 0 : return;
528 : }
529 :
530 0 : RSPConnectionsCIter i =_children.begin();
531 0 : while ( i !=_children.end() )
532 : {
533 0 : RSPConnectionPtr child = *i;
534 0 : if ( child->_acked < _sequence - 1 && _id != child->_id )
535 : {
536 0 : _sendSimpleDatagram( ID_EXIT, child->_id );
537 0 : _removeConnection( child->_id );
538 : }
539 : else
540 : {
541 0 : uint16_t wb = static_cast<uint16_t>( _writeBuffers.size( ));
542 0 : child->_acked = _sequence - wb;
543 0 : ++i;
544 : }
545 0 : }
546 :
547 0 : _timeouts = 0;
548 : }
549 : }
550 :
551 : RSPConnection::DatagramNode*
552 0 : RSPConnection::_getDatagramNode( const size_t bytes )
553 : {
554 0 : if( bytes < sizeof( DatagramNode ))
555 : {
556 0 : LBERROR << "DatagramNode size mismatch, got " << bytes << " instead of "
557 0 : << sizeof( DatagramNode ) << " bytes" << std::endl;
558 : //close();
559 0 : return 0;
560 : }
561 : DatagramNode& node =
562 0 : *reinterpret_cast< DatagramNode* >( _recvBuffer.getData( ));
563 0 : node.byteswap();
564 0 : if( node.protocolVersion != CO_RSP_PROTOCOL_VERSION )
565 : {
566 0 : LBERROR << "Protocol version mismatch, got " << node.protocolVersion
567 0 : << " instead of " << CO_RSP_PROTOCOL_VERSION << std::endl;
568 : //close();
569 0 : return 0;
570 : }
571 0 : return &node;
572 : }
573 :
574 2 : bool RSPConnection::_initThread()
575 : {
576 2 : LBLOG( LOG_RSP ) << "Started RSP protocol thread" << std::endl;
577 2 : _timeouts = 0;
578 :
579 : // send a first datagram to announce me and discover other connections
580 2 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
581 2 : _sendSimpleDatagram( ID_HELLO, _id );
582 2 : _setTimeout( 10 );
583 2 : _asyncReceiveFrom();
584 2 : _ioService.run();
585 2 : return isListening();
586 : }
587 :
588 2 : void RSPConnection::_runThread()
589 : {
590 : //__debugbreak();
591 2 : _ioService.reset();
592 2 : _ioService.run();
593 2 : }
594 :
595 14579 : void RSPConnection::_setTimeout( const int32_t timeOut )
596 : {
597 14579 : LBASSERT( timeOut >= 0 );
598 14579 : _timeout.expires_from_now( boost::posix_time::milliseconds( timeOut ));
599 : _timeout.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
600 14579 : boost::asio::placeholders::error ));
601 14579 : }
602 :
603 69227 : void RSPConnection::_postWakeup()
604 : {
605 69227 : _wakeup.expires_from_now( boost::posix_time::milliseconds( 0 ));
606 : _wakeup.async_wait( boost::bind( &RSPConnection::_handleTimeout, this,
607 69227 : boost::asio::placeholders::error ));
608 69227 : }
609 :
610 14517 : void RSPConnection::_processOutgoing()
611 : {
612 : #ifdef CO_INSTRUMENT_RSP
613 : if( instrumentClock.getTime64() > 1000 )
614 : {
615 : LBWARN << *this << std::endl;
616 : instrumentClock.reset();
617 : }
618 : #endif
619 :
620 14517 : if( !_repeatQueue.empty( ))
621 0 : _repeatData();
622 : else
623 14517 : _writeData();
624 :
625 14517 : if( !_threadBuffers.isEmpty() || !_repeatQueue.empty( ))
626 : {
627 14497 : _setTimeout( 0 ); // call again to send remaining
628 14497 : return;
629 : }
630 : // no more data to write, check/send ack request, reset timeout
631 :
632 20 : if( _writeBuffers.empty( )) // got all acks
633 : {
634 20 : _timeouts = 0;
635 20 : _timeout.cancel();
636 20 : return;
637 : }
638 :
639 : const int64_t timeout =
640 0 : Global::getIAttribute( Global::IATTR_RSP_ACK_TIMEOUT );
641 0 : const int64_t left = timeout - _clock.getTime64();
642 :
643 0 : if( left > 0 )
644 : {
645 0 : _setTimeout( left );
646 0 : return;
647 : }
648 :
649 : // (repeat) ack request
650 0 : _clock.reset();
651 0 : ++_timeouts;
652 0 : if ( _timeouts < CO_RSP_MAX_TIMEOUTS )
653 0 : _sendAckRequest();
654 0 : _setTimeout( timeout );
655 : }
656 :
657 14517 : void RSPConnection::_writeData()
658 : {
659 14517 : Buffer* buffer = 0;
660 14517 : if( !_threadBuffers.pop( buffer )) // nothing to write
661 14520 : return;
662 :
663 14514 : _timeouts = 0;
664 14514 : LBASSERT( buffer );
665 :
666 : // write buffer
667 14514 : DatagramData* header = reinterpret_cast<DatagramData*>( buffer->getData( ));
668 14514 : header->sequence = _sequence++;
669 :
670 : #ifdef CO_RSP_MERGE_WRITES
671 14514 : if( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
672 : {
673 13645 : std::vector< Buffer* > appBuffers;
674 81223 : while( header->size < _payloadSize && !_threadBuffers.isEmpty( ))
675 : {
676 54783 : Buffer* buffer2 = 0;
677 54783 : LBCHECK( _threadBuffers.getFront( buffer2 ));
678 54783 : LBASSERT( buffer2 );
679 : DatagramData* header2 =
680 54783 : reinterpret_cast<DatagramData*>( buffer2->getData( ));
681 :
682 54783 : if( uint32_t( header->size + header2->size ) > _payloadSize )
683 850 : break;
684 :
685 53933 : memcpy( reinterpret_cast<uint8_t*>( header + 1 ) + header->size,
686 107866 : header2 + 1, header2->size );
687 53933 : header->size += header2->size;
688 53933 : LBCHECK( _threadBuffers.pop( buffer2 ));
689 53933 : appBuffers.push_back( buffer2 );
690 : #ifdef CO_INSTRUMENT_RSP
691 : ++nMergedDatagrams;
692 : #endif
693 : }
694 :
695 13645 : if( !appBuffers.empty( ))
696 12795 : _appBuffers.push( appBuffers );
697 : }
698 : #endif
699 :
700 : // send data
701 : // Note 1: We could optimize the send away if we're all alone, but this is
702 : // not a use case for RSP, so we don't care.
703 : // Note 2: Data to myself will be 'written' in _finishWriteQueue once we
704 : // got all acks for the packet
705 14514 : const uint32_t size = header->size + sizeof( DatagramData );
706 :
707 14514 : _waitWritable( size ); // OPT: process incoming in between
708 14514 : header->byteswap();
709 14514 : _write->send( boost::asio::buffer( header, size ));
710 :
711 : #ifdef CO_INSTRUMENT_RSP
712 : ++nDatagrams;
713 : nBytesWritten += header->size;
714 : #endif
715 :
716 : // save datagram for repeats (and self)
717 14514 : _writeBuffers.push_back( buffer );
718 :
719 14514 : if( _children.size() == 1 ) // We're all alone
720 : {
721 14514 : LBASSERT( _children.front()->_id == _id );
722 14514 : _finishWriteQueue( _sequence - 1 );
723 : }
724 : }
725 :
726 14514 : void RSPConnection::_waitWritable( const uint64_t bytes )
727 : {
728 : #ifdef CO_INSTRUMENT_RSP
729 : lunchbox::Clock clock;
730 : #endif
731 :
732 14514 : _bucketSize += static_cast< uint64_t >( _clock.resetTimef() * _sendRate );
733 : // opt omit: * 1024 / 1000;
734 14514 : _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
735 :
736 14514 : const uint64_t size = LB_MIN( bytes, static_cast< uint64_t >( _mtu ));
737 2381648 : while( _bucketSize < size )
738 : {
739 2352620 : lunchbox::Thread::yield();
740 2352620 : float time = _clock.resetTimef();
741 :
742 4705240 : while( time == 0.f )
743 : {
744 0 : lunchbox::Thread::yield();
745 0 : time = _clock.resetTimef();
746 : }
747 :
748 2352620 : _bucketSize += static_cast< int64_t >( time * _sendRate );
749 2352620 : _bucketSize = LB_MIN( _bucketSize, _maxBucketSize );
750 : }
751 14514 : _bucketSize -= size;
752 :
753 : #ifdef CO_INSTRUMENT_RSP
754 : writeWaitTime += clock.getTimef();
755 : #endif
756 :
757 14514 : ConstConnectionDescriptionPtr description = getDescription();
758 14514 : if( _sendRate < description->bandwidth )
759 : {
760 : _sendRate += int64_t(
761 0 : float( Global::getIAttribute( Global::IATTR_RSP_ERROR_UPSCALE )) *
762 0 : float( description->bandwidth ) * .001f );
763 0 : LBLOG( LOG_RSP ) << "speeding up to " << _sendRate << " KB/s"
764 0 : << std::endl;
765 14514 : }
766 14514 : }
767 :
768 0 : void RSPConnection::_repeatData()
769 : {
770 0 : _timeouts = 0;
771 :
772 0 : while( !_repeatQueue.empty( ))
773 : {
774 0 : Nack& request = _repeatQueue.front();
775 0 : const uint16_t distance = _sequence - request.start;
776 :
777 0 : if ( distance == 0 )
778 : {
779 0 : LBWARN << "ignoring invalid nack (" << request.start
780 0 : << ".." << request.end << ")" << std::endl;
781 0 : _repeatQueue.pop_front();
782 0 : continue;
783 : }
784 :
785 0 : if( distance <= _writeBuffers.size( )) // not already acked
786 : {
787 : // LBLOG( LOG_RSP ) << "Repeat " << request.start << ", " << _sendRate
788 : // << "KB/s"<< std::endl;
789 :
790 0 : const size_t i = _writeBuffers.size() - distance;
791 0 : Buffer* buffer = _writeBuffers[i];
792 0 : LBASSERT( buffer );
793 :
794 : DatagramData* header =
795 0 : reinterpret_cast<DatagramData*>( buffer->getData( ));
796 0 : const uint32_t size = header->size + sizeof( DatagramData );
797 0 : LBASSERT( header->sequence == request.start );
798 :
799 : // send data
800 0 : _waitWritable( size ); // OPT: process incoming in between
801 : // already done by _writeData: header->byteswap();
802 0 : _write->send( boost::asio::buffer( header, size ) );
803 : #ifdef CO_INSTRUMENT_RSP
804 : ++nRepeated;
805 : #endif
806 : }
807 :
808 0 : if( request.start == request.end )
809 0 : _repeatQueue.pop_front(); // done with request
810 : else
811 0 : ++request.start;
812 :
813 0 : if( distance <= _writeBuffers.size( )) // send something
814 0 : return;
815 : }
816 : }
817 :
818 14514 : void RSPConnection::_finishWriteQueue( const uint16_t sequence )
819 : {
820 14514 : LBASSERT( !_writeBuffers.empty( ));
821 :
822 14514 : RSPConnectionPtr connection = _findConnection( _id );
823 14514 : LBASSERT( connection.isValid( ));
824 14514 : LBASSERT( connection->_recvBuffers.empty( ));
825 :
826 : // Bundle pushing the buffers to the app to avoid excessive lock ops
827 29028 : Buffers readBuffers;
828 29028 : Buffers freeBuffers;
829 :
830 14514 : const uint16_t size = _sequence - sequence - 1;
831 14514 : LBASSERTINFO( size <= uint16_t( _writeBuffers.size( )),
832 : size << " > " << _writeBuffers.size( ));
833 14514 : LBLOG( LOG_RSP ) << "Got all remote acks for " << sequence << " current "
834 0 : << _sequence << " advance " << _writeBuffers.size() - size
835 14514 : << " buffers" << std::endl;
836 :
837 43542 : while( _writeBuffers.size() > size_t( size ))
838 : {
839 14514 : Buffer* buffer = _writeBuffers.front();
840 14514 : _writeBuffers.pop_front();
841 :
842 : #ifndef NDEBUG
843 : DatagramData* datagram =
844 14514 : reinterpret_cast< DatagramData* >( buffer->getData( ));
845 14514 : datagram->byteswap();
846 14514 : LBASSERT( datagram->writerID == _id );
847 14514 : LBASSERTINFO( datagram->sequence ==
848 : uint16_t( connection->_sequence + readBuffers.size( )),
849 : datagram->sequence << ", " << connection->_sequence <<
850 : ", " << readBuffers.size( ));
851 : //LBLOG( LOG_RSP ) << "self receive " << datagram->sequence << std::endl;
852 : #endif
853 :
854 14514 : Buffer* newBuffer = connection->_newDataBuffer( *buffer );
855 14514 : if( !newBuffer && !readBuffers.empty( )) // push prepared app buffers
856 : {
857 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
858 0 : LBLOG( LOG_RSP ) << "post " << readBuffers.size()
859 0 : << " buffers starting with sequence "
860 0 : << connection->_sequence << std::endl;
861 :
862 0 : connection->_appBuffers.push( readBuffers );
863 0 : connection->_sequence += uint16_t( readBuffers.size( ));
864 0 : readBuffers.clear();
865 0 : connection->_event->set();
866 : }
867 :
868 29028 : while( !newBuffer ) // no more data buffers, wait for app to drain
869 : {
870 0 : newBuffer = connection->_newDataBuffer( *buffer );
871 0 : lunchbox::Thread::yield();
872 : }
873 :
874 14514 : freeBuffers.push_back( buffer );
875 14514 : readBuffers.push_back( newBuffer );
876 : }
877 :
878 14514 : _appBuffers.push( freeBuffers );
879 14514 : if( !readBuffers.empty( ))
880 : {
881 14514 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
882 : #if 0
883 : LBLOG( LOG_RSP )
884 : << "post " << readBuffers.size() << " buffers starting at "
885 : << connection->_sequence << std::endl;
886 : #endif
887 :
888 14514 : connection->_appBuffers.push( readBuffers );
889 14514 : connection->_sequence += uint16_t( readBuffers.size( ));
890 14514 : connection->_event->set();
891 : }
892 :
893 14514 : connection->_acked = uint16_t( connection->_sequence - 1 );
894 14514 : LBASSERT( connection->_acked == sequence );
895 :
896 29028 : _timeouts = 0;
897 14514 : }
898 :
899 0 : void RSPConnection::_handlePacket( const boost::system::error_code& /* error */,
900 : const size_t bytes )
901 : {
902 0 : if( isListening( ))
903 : {
904 0 : _handleConnectedData( bytes );
905 :
906 0 : if( isListening( ))
907 0 : _processOutgoing();
908 : else
909 : {
910 0 : _ioService.stop();
911 0 : return;
912 : }
913 : }
914 0 : else if( bytes >= sizeof( DatagramNode ))
915 : {
916 0 : if( _idAccepted )
917 0 : _handleInitData( bytes, false );
918 : else
919 0 : _handleAcceptIDData( bytes );
920 : }
921 :
922 : //LBLOG( LOG_RSP ) << "_handlePacket timeout " << timeout << std::endl;
923 0 : _asyncReceiveFrom();
924 : }
925 :
926 0 : void RSPConnection::_handleAcceptIDData( const size_t bytes )
927 : {
928 0 : DatagramNode* pNode = _getDatagramNode( bytes );
929 0 : if( !pNode )
930 0 : return;
931 :
932 0 : DatagramNode& node = *pNode;
933 :
934 0 : switch( node.type )
935 : {
936 : case ID_HELLO:
937 0 : _checkNewID( node.connectionID );
938 0 : break;
939 :
940 : case ID_HELLO_REPLY:
941 0 : _addConnection( node.connectionID, node.data );
942 0 : break;
943 :
944 : case ID_DENY:
945 : // a connection refused my ID, try another ID
946 0 : if( node.connectionID == _id )
947 : {
948 0 : _timeouts = 0;
949 0 : _sendSimpleDatagram( ID_HELLO, _buildNewID( ));
950 0 : LBLOG( LOG_RSP ) << "Announce " << _id << std::endl;
951 : }
952 0 : break;
953 :
954 : case ID_EXIT:
955 0 : _removeConnection( node.connectionID );
956 0 : break;
957 :
958 : default:
959 0 : LBUNIMPLEMENTED;
960 0 : break;
961 : }
962 : }
963 :
964 0 : void RSPConnection::_handleInitData( const size_t bytes, const bool connected )
965 : {
966 0 : DatagramNode* pNode = _getDatagramNode( bytes );
967 0 : if( !pNode )
968 0 : return;
969 :
970 0 : DatagramNode& node = *pNode;
971 :
972 0 : switch( node.type )
973 : {
974 : case ID_HELLO:
975 0 : if( !connected )
976 0 : _timeouts = 0;
977 0 : _checkNewID( node.connectionID ) ;
978 0 : return;
979 :
980 : case ID_CONFIRM:
981 0 : if( !connected )
982 0 : _timeouts = 0;
983 0 : _addConnection( node.connectionID, node.data );
984 0 : return;
985 :
986 : case COUNTNODE:
987 0 : LBLOG( LOG_RSP ) << "Got " << node.data << " nodes from "
988 0 : << node.connectionID << std::endl;
989 0 : return;
990 :
991 : case ID_HELLO_REPLY:
992 0 : _addConnection( node.connectionID, node.data );
993 0 : return;
994 :
995 : case ID_EXIT:
996 0 : _removeConnection( node.connectionID );
997 0 : return;
998 :
999 : default:
1000 0 : LBUNIMPLEMENTED;
1001 0 : break;
1002 : }
1003 : }
1004 :
1005 0 : void RSPConnection::_handleConnectedData( const size_t bytes )
1006 : {
1007 0 : if( bytes < sizeof( uint16_t ))
1008 0 : return;
1009 :
1010 0 : void* data = _recvBuffer.getData();
1011 0 : uint16_t type = *reinterpret_cast< uint16_t* >( data );
1012 : #ifdef COLLAGE_BIGENDIAN
1013 : lunchbox::byteswap( type );
1014 : #endif
1015 0 : switch( type )
1016 : {
1017 : case DATA:
1018 0 : LBCHECK( _handleData( bytes ));
1019 0 : break;
1020 :
1021 : case ACK:
1022 0 : LBCHECK( _handleAck( bytes ));
1023 0 : break;
1024 :
1025 : case NACK:
1026 0 : LBCHECK( _handleNack( ));
1027 0 : break;
1028 :
1029 : case ACKREQ: // The writer asks for an ack/nack
1030 0 : LBCHECK( _handleAckRequest( bytes ));
1031 0 : break;
1032 :
1033 : case ID_HELLO:
1034 : case ID_HELLO_REPLY:
1035 : case ID_CONFIRM:
1036 : case ID_EXIT:
1037 : case ID_DENY:
1038 : case COUNTNODE:
1039 0 : _handleInitData( bytes, true );
1040 0 : break;
1041 :
1042 : default:
1043 0 : LBASSERTINFO( false,
1044 : "Don't know how to handle packet of type " << type );
1045 : }
1046 :
1047 : }
1048 :
1049 2 : void RSPConnection::_asyncReceiveFrom()
1050 : {
1051 : _read->async_receive_from(
1052 2 : boost::asio::buffer( _recvBuffer.getData(), _mtu ), _readAddr,
1053 : boost::bind( &RSPConnection::_handlePacket, this,
1054 : boost::asio::placeholders::error,
1055 4 : boost::asio::placeholders::bytes_transferred ));
1056 2 : }
1057 :
1058 0 : bool RSPConnection::_handleData( const size_t bytes )
1059 : {
1060 0 : if( bytes < sizeof( DatagramData ))
1061 0 : return false;
1062 : DatagramData& datagram =
1063 0 : *reinterpret_cast< DatagramData* >( _recvBuffer.getData( ));
1064 0 : datagram.byteswap();
1065 :
1066 : #ifdef CO_INSTRUMENT_RSP
1067 : ++nReadData;
1068 : #endif
1069 0 : const uint16_t writerID = datagram.writerID;
1070 : #ifdef Darwin
1071 : // There is occasionally a packet from ourselves, even though multicast loop
1072 : // is not set?!
1073 : if( writerID == _id )
1074 : return true;
1075 : #else
1076 0 : LBASSERT( writerID != _id );
1077 : #endif
1078 :
1079 0 : RSPConnectionPtr connection = _findConnection( writerID );
1080 :
1081 0 : if( !connection ) // unknown connection ?
1082 : {
1083 0 : LBASSERTINFO( false, "Can't find connection with id " << writerID );
1084 0 : return false;
1085 : }
1086 0 : LBASSERT( connection->_id == writerID );
1087 :
1088 0 : const uint16_t sequence = datagram.sequence;
1089 : // LBLOG( LOG_RSP ) << "rcvd " << sequence << " from " << writerID <<std::endl;
1090 :
1091 0 : if( connection->_sequence == sequence ) // in-order packet
1092 : {
1093 0 : Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
1094 0 : if( !newBuffer ) // no more data buffers, drop packet
1095 0 : return true;
1096 :
1097 0 : lunchbox::ScopedWrite mutex( connection->_mutexEvent );
1098 0 : connection->_pushDataBuffer( newBuffer );
1099 :
1100 0 : while( !connection->_recvBuffers.empty( )) // enqueue ready pending data
1101 : {
1102 0 : newBuffer = connection->_recvBuffers.front();
1103 0 : if( !newBuffer )
1104 0 : break;
1105 :
1106 0 : connection->_recvBuffers.pop_front();
1107 0 : connection->_pushDataBuffer( newBuffer );
1108 : }
1109 :
1110 0 : if( !connection->_recvBuffers.empty() &&
1111 0 : !connection->_recvBuffers.front( )) // update for new _sequence
1112 : {
1113 0 : connection->_recvBuffers.pop_front();
1114 : }
1115 :
1116 0 : connection->_event->set();
1117 0 : return true;
1118 : }
1119 :
1120 0 : const uint16_t max = std::numeric_limits< uint16_t >::max();
1121 0 : if(( connection->_sequence > sequence &&
1122 0 : max - connection->_sequence + sequence > _numBuffers ) ||
1123 0 : ( connection->_sequence < sequence &&
1124 0 : sequence - connection->_sequence > _numBuffers ))
1125 : {
1126 : // ignore it if it's a repetition for another reader
1127 0 : return true;
1128 : }
1129 :
1130 : // else out of order
1131 :
1132 0 : const uint16_t size = sequence - connection->_sequence;
1133 0 : LBASSERT( size != 0 );
1134 0 : LBASSERTINFO( size <= _numBuffers, size << " > " << _numBuffers );
1135 :
1136 0 : ssize_t i = ssize_t( size ) - 1;
1137 0 : const bool gotPacket = ( connection->_recvBuffers.size() >= size &&
1138 0 : connection->_recvBuffers[ i ] );
1139 0 : if( gotPacket )
1140 0 : return true;
1141 :
1142 0 : Buffer* newBuffer = connection->_newDataBuffer( _recvBuffer );
1143 0 : if( !newBuffer ) // no more data buffers, drop packet
1144 0 : return true;
1145 :
1146 0 : if( connection->_recvBuffers.size() < size )
1147 0 : connection->_recvBuffers.resize( size, 0 );
1148 :
1149 0 : LBASSERT( !connection->_recvBuffers[ i ] );
1150 0 : connection->_recvBuffers[ i ] = newBuffer;
1151 :
1152 : // early nack: request missing packets before current
1153 0 : --i;
1154 0 : Nack nack = { connection->_sequence, uint16_t( sequence - 1 ) };
1155 0 : if( i > 0 )
1156 : {
1157 0 : if( connection->_recvBuffers[i] ) // got previous packet
1158 0 : return true;
1159 :
1160 0 : while( i >= 0 && !connection->_recvBuffers[i] )
1161 0 : --i;
1162 :
1163 0 : const Buffer* lastBuffer = i>=0 ? connection->_recvBuffers[i] : 0;
1164 0 : if( lastBuffer )
1165 : {
1166 0 : nack.start = connection->_sequence + i;
1167 : }
1168 : }
1169 :
1170 0 : LBLOG( LOG_RSP ) << "send early nack " << nack.start << ".." << nack.end
1171 0 : << " current " << connection->_sequence << " ooo "
1172 0 : << connection->_recvBuffers.size() << std::endl;
1173 :
1174 0 : if( nack.end < nack.start )
1175 : // OPT: don't drop nack 0..nack.end, but it doesn't happen often
1176 0 : nack.end = std::numeric_limits< uint16_t >::max();
1177 :
1178 0 : _sendNack( writerID, &nack, 1 );
1179 0 : return true;
1180 : }
1181 :
1182 14514 : RSPConnection::Buffer* RSPConnection::_newDataBuffer( Buffer& inBuffer )
1183 : {
1184 14514 : LBASSERT( static_cast< int32_t >( inBuffer.getMaxSize( )) == _mtu );
1185 :
1186 14514 : Buffer* buffer = 0;
1187 14514 : if( _threadBuffers.pop( buffer ))
1188 : {
1189 14514 : buffer->swap( inBuffer );
1190 14514 : return buffer;
1191 : }
1192 :
1193 : // we do not have a free buffer, which means that the receiver is slower
1194 : // then our read thread. This is bad, because now we'll drop the data and
1195 : // will send a NAck packet upon the ack request, causing retransmission even
1196 : // though we'll probably drop it again
1197 0 : LBLOG( LOG_RSP ) << "Reader too slow, dropping data" << std::endl;
1198 :
1199 : // Set the event if there is data to read. This shouldn't be needed since
1200 : // the event should be set in this case, but it'll increase the robustness
1201 0 : lunchbox::ScopedWrite mutex( _mutexEvent );
1202 0 : if( !_appBuffers.isEmpty( ))
1203 0 : _event->set();
1204 0 : return 0;
1205 : }
1206 :
1207 0 : void RSPConnection::_pushDataBuffer( Buffer* buffer )
1208 : {
1209 0 : LBASSERT( _parent );
1210 : #ifndef NDEBUG
1211 0 : DatagramData* dgram = reinterpret_cast< DatagramData* >(buffer->getData( ));
1212 0 : LBASSERTINFO( dgram->sequence == _sequence,
1213 : dgram->sequence << " != " << _sequence );
1214 : #endif
1215 :
1216 0 : if( (( _sequence + _parent->_id ) % _ackFreq ) == 0 )
1217 0 : _parent->_sendAck( _id, _sequence );
1218 :
1219 0 : LBLOG( LOG_RSP ) << "post buffer " << _sequence << std::endl;
1220 0 : ++_sequence;
1221 0 : _appBuffers.push( buffer );
1222 0 : }
1223 :
1224 0 : bool RSPConnection::_handleAck( const size_t bytes )
1225 : {
1226 0 : if( bytes < sizeof( DatagramAck ))
1227 0 : return false;
1228 : DatagramAck& ack =
1229 0 : *reinterpret_cast< DatagramAck* >( _recvBuffer.getData( ));
1230 0 : ack.byteswap();
1231 :
1232 : #ifdef CO_INSTRUMENT_RSP
1233 : ++nAcksRead;
1234 : #endif
1235 :
1236 0 : if( ack.writerID != _id )
1237 0 : return true;
1238 :
1239 0 : LBLOG( LOG_RSP ) << "got ack from " << ack.readerID << " for "
1240 0 : << ack.writerID << " sequence " << ack.sequence
1241 0 : << " current " << _sequence << std::endl;
1242 :
1243 : // find destination connection, update ack data if needed
1244 0 : RSPConnectionPtr connection = _findConnection( ack.readerID );
1245 0 : if( !connection )
1246 : {
1247 0 : LBUNREACHABLE;
1248 0 : return false;
1249 : }
1250 :
1251 0 : if( connection->_acked >= ack.sequence &&
1252 0 : connection->_acked - ack.sequence <= _numBuffers )
1253 : {
1254 : // I have received a later ack previously from the reader
1255 0 : LBLOG( LOG_RSP ) << "Late ack" << std::endl;
1256 0 : return true;
1257 : }
1258 :
1259 : #ifdef CO_INSTRUMENT_RSP
1260 : ++nAcksAccepted;
1261 : #endif
1262 0 : connection->_acked = ack.sequence;
1263 0 : _timeouts = 0; // reset timeout counter
1264 :
1265 : // Check if we can advance _acked
1266 0 : uint16_t acked = ack.sequence;
1267 :
1268 0 : for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
1269 : {
1270 0 : RSPConnectionPtr child = *i;
1271 0 : if( child->_id == _id )
1272 0 : continue;
1273 :
1274 0 : const uint16_t distance = child->_acked - acked;
1275 0 : if( distance > _numBuffers )
1276 0 : acked = child->_acked;
1277 0 : }
1278 :
1279 0 : RSPConnectionPtr selfChild = _findConnection( _id );
1280 0 : const uint16_t distance = acked - selfChild->_acked;
1281 0 : if( distance <= _numBuffers )
1282 0 : _finishWriteQueue( acked );
1283 0 : return true;
1284 : }
1285 :
1286 0 : bool RSPConnection::_handleNack()
1287 : {
1288 : DatagramNack& nack =
1289 0 : *reinterpret_cast< DatagramNack* >( _recvBuffer.getData( ));
1290 0 : nack.byteswap();
1291 :
1292 : #ifdef CO_INSTRUMENT_RSP
1293 : ++nNAcksRead;
1294 : #endif
1295 :
1296 0 : if( _id != nack.writerID )
1297 : {
1298 0 : LBLOG( LOG_RSP )
1299 0 : << "ignore " << nack.count << " nacks from " << nack.readerID
1300 0 : << " for " << nack.writerID << " (not me)"<< std::endl;
1301 0 : return true;
1302 : }
1303 :
1304 0 : LBLOG( LOG_RSP )
1305 0 : << "handle " << nack.count << " nacks from " << nack.readerID
1306 0 : << " for " << nack.writerID << std::endl;
1307 :
1308 0 : RSPConnectionPtr connection = _findConnection( nack.readerID );
1309 0 : if( !connection )
1310 : {
1311 0 : LBUNREACHABLE;
1312 0 : return false;
1313 : // it's an unknown connection, TODO add this connection?
1314 : }
1315 :
1316 0 : _timeouts = 0;
1317 0 : _addRepeat( nack.nacks, nack.count );
1318 0 : return true;
1319 : }
1320 :
1321 0 : void RSPConnection::_addRepeat( const Nack* nacks, uint16_t num )
1322 : {
1323 0 : LBLOG( LOG_RSP ) << lunchbox::disableFlush << "Queue repeat requests ";
1324 0 : size_t lost = 0;
1325 :
1326 0 : for( size_t i = 0; i < num; ++i )
1327 : {
1328 0 : const Nack& nack = nacks[ i ];
1329 0 : LBASSERT( nack.start <= nack.end );
1330 :
1331 0 : LBLOG( LOG_RSP ) << nack.start << ".." << nack.end << " ";
1332 :
1333 0 : bool merged = false;
1334 0 : for( RepeatQueue::iterator j = _repeatQueue.begin();
1335 0 : j != _repeatQueue.end() && !merged; ++j )
1336 : {
1337 0 : Nack& old = *j;
1338 0 : if( old.start <= nack.end && old.end >= nack.start )
1339 : {
1340 0 : if( old.start > nack.start )
1341 : {
1342 0 : lost += old.start - nack.start;
1343 0 : old.start = nack.start;
1344 0 : merged = true;
1345 : }
1346 0 : if( old.end < nack.end )
1347 : {
1348 0 : lost += nack.end - old.end;
1349 0 : old.end = nack.end;
1350 0 : merged = true;
1351 : }
1352 0 : LBASSERT( lost < _numBuffers );
1353 : }
1354 : }
1355 :
1356 0 : if( !merged )
1357 : {
1358 0 : lost += uint16_t( nack.end - nack.start ) + 1;
1359 0 : LBASSERT( lost <= _numBuffers );
1360 0 : _repeatQueue.push_back( nack );
1361 : }
1362 : }
1363 :
1364 0 : ConstConnectionDescriptionPtr description = getDescription();
1365 0 : if( _sendRate >
1366 0 : ( description->bandwidth >>
1367 0 : Global::getIAttribute( Global::IATTR_RSP_MIN_SENDRATE_SHIFT )))
1368 : {
1369 0 : const float delta = float( lost ) * .001f *
1370 0 : Global::getIAttribute( Global::IATTR_RSP_ERROR_DOWNSCALE );
1371 0 : const float maxDelta = .01f *
1372 0 : float( Global::getIAttribute( Global::IATTR_RSP_ERROR_MAXSCALE ));
1373 0 : const float downScale = LB_MIN( delta, maxDelta );
1374 0 : _sendRate -= 1 + int64_t( _sendRate * downScale );
1375 0 : LBLOG( LOG_RSP )
1376 0 : << ", lost " << lost << " slowing down " << downScale * 100.f
1377 0 : << "% to " << _sendRate << " KB/s" << std::endl
1378 0 : << lunchbox::enableFlush;
1379 : }
1380 : else
1381 0 : LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush;
1382 0 : }
1383 :
1384 0 : bool RSPConnection::_handleAckRequest( const size_t bytes )
1385 : {
1386 0 : if( bytes < sizeof( DatagramAckRequest ))
1387 0 : return false;
1388 : DatagramAckRequest& ackRequest =
1389 0 : *reinterpret_cast< DatagramAckRequest* >( _recvBuffer.getData( ));
1390 0 : ackRequest.byteswap();
1391 :
1392 0 : const uint16_t writerID = ackRequest.writerID;
1393 : #ifdef Darwin
1394 : // There is occasionally a packet from ourselves, even though multicast loop
1395 : // is not set?!
1396 : if( writerID == _id )
1397 : return true;
1398 : #else
1399 0 : LBASSERT( writerID != _id );
1400 : #endif
1401 0 : RSPConnectionPtr connection = _findConnection( writerID );
1402 0 : if( !connection )
1403 : {
1404 0 : LBUNREACHABLE;
1405 0 : return false;
1406 : }
1407 :
1408 0 : const uint16_t reqID = ackRequest.sequence;
1409 0 : const uint16_t gotID = connection->_sequence - 1;
1410 0 : const uint16_t distance = reqID - gotID;
1411 :
1412 0 : LBLOG( LOG_RSP ) << "ack request " << reqID << " from " << writerID
1413 0 : << " got " << gotID << " missing " << distance
1414 0 : << std::endl;
1415 :
1416 0 : if( (reqID == gotID) ||
1417 0 : (gotID > reqID && gotID - reqID <= _numBuffers) ||
1418 0 : (gotID < reqID && distance > _numBuffers) )
1419 : {
1420 0 : _sendAck( connection->_id, gotID );
1421 0 : return true;
1422 : }
1423 : // else find all missing datagrams
1424 :
1425 0 : const uint16_t max = CO_RSP_MAX_NACKS - 2;
1426 : Nack nacks[ CO_RSP_MAX_NACKS ];
1427 0 : uint16_t i = 0;
1428 :
1429 0 : nacks[ i ].start = connection->_sequence;
1430 0 : LBLOG( LOG_RSP ) << lunchbox::disableFlush << "nacks: "
1431 0 : << nacks[i].start << "..";
1432 :
1433 0 : std::deque<Buffer*>::const_iterator j = connection->_recvBuffers.begin();
1434 0 : std::deque<Buffer*>::const_iterator first = j;
1435 0 : for( ; j != connection->_recvBuffers.end() && i < max; ++j )
1436 : {
1437 0 : if( *j ) // got buffer
1438 : {
1439 0 : nacks[ i ].end = connection->_sequence + std::distance( first, j);
1440 0 : LBLOG( LOG_RSP ) << nacks[i].end << ", ";
1441 0 : if( nacks[ i ].end < nacks[ i ].start )
1442 : {
1443 0 : LBASSERT( nacks[ i ].end < _numBuffers );
1444 0 : nacks[ i + 1 ].start = 0;
1445 0 : nacks[ i + 1 ].end = nacks[ i ].end;
1446 0 : nacks[ i ].end = std::numeric_limits< uint16_t >::max();
1447 0 : ++i;
1448 : }
1449 0 : ++i;
1450 :
1451 : // find next hole
1452 0 : for( ++j; j != connection->_recvBuffers.end() && (*j); ++j )
1453 : /* nop */;
1454 :
1455 0 : if( j == connection->_recvBuffers.end( ))
1456 0 : break;
1457 :
1458 0 : nacks[i].start = connection->_sequence + std::distance(first, j) +1;
1459 0 : LBLOG( LOG_RSP ) << nacks[i].start << "..";
1460 : }
1461 : }
1462 :
1463 0 : if( j != connection->_recvBuffers.end() || i == 0 )
1464 : {
1465 0 : nacks[ i ].end = reqID;
1466 0 : LBLOG( LOG_RSP ) << nacks[i].end;
1467 0 : ++i;
1468 : }
1469 0 : else if( uint16_t( reqID - nacks[i-1].end ) < _numBuffers )
1470 : {
1471 0 : nacks[i].start = nacks[i-1].end + 1;
1472 0 : nacks[i].end = reqID;
1473 0 : LBLOG( LOG_RSP ) << nacks[i].start << ".." << nacks[i].end;
1474 0 : ++i;
1475 : }
1476 0 : if( nacks[ i -1 ].end < nacks[ i - 1 ].start )
1477 : {
1478 0 : LBASSERT( nacks[ i - 1 ].end < _numBuffers );
1479 0 : nacks[ i ].start = 0;
1480 0 : nacks[ i ].end = nacks[ i - 1 ].end;
1481 0 : nacks[ i - 1 ].end = std::numeric_limits< uint16_t >::max();
1482 0 : ++i;
1483 : }
1484 :
1485 0 : LBLOG( LOG_RSP ) << std::endl << lunchbox::enableFlush << "send " << i
1486 0 : << " nacks to " << connection->_id << std::endl;
1487 :
1488 0 : LBASSERT( i > 0 );
1489 0 : _sendNack( connection->_id, nacks, i );
1490 0 : return true;
1491 : }
1492 :
1493 0 : void RSPConnection::_checkNewID( uint16_t id )
1494 : {
1495 : // look if the new ID exist in another connection
1496 0 : if( id == _id || _findConnection( id ))
1497 : {
1498 0 : LBLOG( LOG_RSP ) << "Deny " << id << std::endl;
1499 0 : _sendSimpleDatagram( ID_DENY, _id );
1500 : }
1501 : else
1502 0 : _sendSimpleDatagram( ID_HELLO_REPLY, _id );
1503 0 : }
1504 :
1505 14558 : RSPConnectionPtr RSPConnection::_findConnection( const uint16_t id )
1506 : {
1507 14558 : for( RSPConnectionsCIter i = _children.begin(); i != _children.end(); ++i )
1508 : {
1509 14556 : if( (*i)->_id == id )
1510 14556 : return *i;
1511 : }
1512 2 : return 0;
1513 : }
1514 :
1515 2 : bool RSPConnection::_addConnection( const uint16_t id, const uint16_t sequence )
1516 : {
1517 2 : if( _findConnection( id ))
1518 0 : return false;
1519 :
1520 2 : LBINFO << "add connection " << id << std::endl;
1521 2 : RSPConnectionPtr connection = new RSPConnection();
1522 2 : connection->_id = id;
1523 2 : connection->_parent = this;
1524 2 : connection->_setState( STATE_CONNECTED );
1525 2 : connection->_setDescription( _getDescription( ));
1526 2 : connection->_sequence = sequence;
1527 2 : LBASSERT( connection->_appBuffers.isEmpty( ));
1528 :
1529 : // Make all buffers available for reading
1530 390 : for( BuffersCIter i = connection->_buffers.begin();
1531 260 : i != connection->_buffers.end(); ++i )
1532 : {
1533 128 : Buffer* buffer = *i;
1534 128 : LBCHECK( connection->_threadBuffers.push( buffer ));
1535 : }
1536 :
1537 2 : _children.push_back( connection );
1538 2 : _sendCountNode();
1539 :
1540 4 : lunchbox::ScopedWrite mutex( _mutexConnection );
1541 2 : _newChildren.push_back( connection );
1542 :
1543 4 : lunchbox::ScopedWrite mutex2( _mutexEvent );
1544 2 : _event->set();
1545 4 : return true;
1546 : }
1547 :
1548 0 : void RSPConnection::_removeConnection( const uint16_t id )
1549 : {
1550 0 : LBINFO << "remove connection " << id << std::endl;
1551 0 : if( id == _id )
1552 0 : return;
1553 :
1554 0 : for( RSPConnectionsIter i = _children.begin(); i != _children.end(); ++i )
1555 : {
1556 0 : RSPConnectionPtr child = *i;
1557 0 : if( child->_id == id )
1558 : {
1559 0 : _children.erase( i );
1560 :
1561 0 : lunchbox::ScopedWrite mutex( child->_mutexEvent );
1562 0 : child->_appBuffers.push( 0 );
1563 0 : child->_event->set();
1564 0 : break;
1565 : }
1566 0 : }
1567 :
1568 0 : _sendCountNode();
1569 : }
1570 :
1571 67596 : int64_t RSPConnection::write( const void* inData, const uint64_t bytes )
1572 : {
1573 67596 : if( _parent )
1574 0 : return _parent->write( inData, bytes );
1575 :
1576 67596 : LBASSERT( isListening( ));
1577 67596 : if( !_write )
1578 0 : return -1;
1579 :
1580 : // compute number of datagrams
1581 67596 : uint64_t nDatagrams = bytes / _payloadSize;
1582 67596 : if( nDatagrams * _payloadSize != bytes )
1583 67596 : ++nDatagrams;
1584 :
1585 : // queue each datagram (might block if buffers are exhausted)
1586 67596 : const uint8_t* data = reinterpret_cast< const uint8_t* >( inData );
1587 67596 : const uint8_t* end = data + bytes;
1588 136043 : for( uint64_t i = 0; i < nDatagrams; ++i )
1589 : {
1590 68447 : size_t packetSize = end - data;
1591 68447 : packetSize = LB_MIN( packetSize, _payloadSize );
1592 :
1593 68447 : if( _appBuffers.isEmpty( ))
1594 : // trigger processing
1595 1631 : _postWakeup();
1596 :
1597 : Buffer* buffer;
1598 68447 : if ( !_appBuffers.timedPop( _writeTimeOut, buffer ) )
1599 : {
1600 0 : LBERROR << "Timeout while writing" << std::endl;
1601 0 : buffer = 0;
1602 : }
1603 :
1604 68447 : if( !buffer )
1605 : {
1606 0 : close();
1607 0 : return -1;
1608 : }
1609 :
1610 : // prepare packet header (sequence is done by thread)
1611 : DatagramData* header =
1612 68447 : reinterpret_cast< DatagramData* >( buffer->getData( ));
1613 68447 : header->type = DATA;
1614 68447 : header->size = uint16_t( packetSize );
1615 68447 : header->writerID = _id;
1616 :
1617 68447 : memcpy( header + 1, data, packetSize );
1618 68447 : data += packetSize;
1619 :
1620 68447 : LBCHECK( _threadBuffers.push( buffer ));
1621 : }
1622 67596 : _postWakeup();
1623 67596 : LBLOG( LOG_RSP ) << "queued " << nDatagrams << " datagrams, "
1624 67596 : << bytes << " bytes" << std::endl;
1625 67596 : return bytes;
1626 : }
1627 :
1628 0 : void RSPConnection::finish()
1629 : {
1630 0 : if( _parent.isValid( ))
1631 : {
1632 0 : LBASSERTINFO( !_parent, "Writes are only allowed on RSP listeners" );
1633 0 : return;
1634 : }
1635 0 : LBASSERT( isListening( ));
1636 0 : _appBuffers.waitSize( _buffers.size( ));
1637 : }
1638 :
1639 42 : void RSPConnection::_sendCountNode()
1640 : {
1641 42 : if( !_findConnection( _id ))
1642 42 : return;
1643 :
1644 42 : LBLOG( LOG_RSP ) << _children.size() << " nodes" << std::endl;
1645 : DatagramNode count = { COUNTNODE, CO_RSP_PROTOCOL_VERSION, _id,
1646 42 : uint16_t( _children.size( )) };
1647 42 : count.byteswap();
1648 42 : _write->send( boost::asio::buffer( &count, sizeof( count )) );
1649 : }
1650 :
1651 44 : void RSPConnection::_sendSimpleDatagram( const DatagramType type,
1652 : const uint16_t id )
1653 : {
1654 : DatagramNode simple = { uint16_t( type ), CO_RSP_PROTOCOL_VERSION, id,
1655 44 : _sequence };
1656 44 : simple.byteswap();
1657 44 : _write->send( boost::asio::buffer( &simple, sizeof( simple )) );
1658 44 : }
1659 :
1660 0 : void RSPConnection::_sendAck( const uint16_t writerID,
1661 : const uint16_t sequence )
1662 : {
1663 0 : LBASSERT( _id != writerID );
1664 : #ifdef CO_INSTRUMENT_RSP
1665 : ++nAcksSend;
1666 : #endif
1667 :
1668 0 : LBLOG( LOG_RSP ) << "send ack " << sequence << std::endl;
1669 0 : DatagramAck ack = { ACK, _id, writerID, sequence };
1670 0 : ack.byteswap();
1671 0 : _write->send( boost::asio::buffer( &ack, sizeof( ack )) );
1672 0 : }
1673 :
1674 0 : void RSPConnection::_sendNack( const uint16_t writerID, const Nack* nacks,
1675 : const uint16_t count )
1676 : {
1677 0 : LBASSERT( count > 0 );
1678 0 : LBASSERT( count <= CO_RSP_MAX_NACKS );
1679 : #ifdef CO_INSTRUMENT_RSP
1680 : ++nNAcksSend;
1681 : #endif
1682 : /* optimization: use the direct access to the reader. */
1683 0 : if( writerID == _id )
1684 : {
1685 0 : _addRepeat( nacks, count );
1686 0 : return;
1687 : }
1688 :
1689 : const size_t size = sizeof( DatagramNack ) -
1690 0 : (CO_RSP_MAX_NACKS - count) * sizeof( Nack );
1691 :
1692 : // set the header
1693 : DatagramNack packet;
1694 0 : packet.set( _id, writerID, count );
1695 0 : memcpy( packet.nacks, nacks, count * sizeof( Nack ));
1696 0 : packet.byteswap();
1697 0 : _write->send( boost::asio::buffer( &packet, size ));
1698 : }
1699 :
1700 0 : void RSPConnection::_sendAckRequest()
1701 : {
1702 : #ifdef CO_INSTRUMENT_RSP
1703 : ++nAckRequests;
1704 : #endif
1705 0 : LBLOG( LOG_RSP ) << "send ack request for " << uint16_t( _sequence -1 )
1706 0 : << std::endl;
1707 0 : DatagramAckRequest ackRequest = { ACKREQ, _id, uint16_t( _sequence - 1 ) };
1708 0 : ackRequest.byteswap();
1709 0 : _write->send( boost::asio::buffer( &ackRequest, sizeof( DatagramAckRequest )) );
1710 0 : }
1711 :
1712 0 : std::ostream& operator << ( std::ostream& os,
1713 : const RSPConnection& connection )
1714 : {
1715 0 : os << lunchbox::disableFlush << lunchbox::disableHeader
1716 0 : << "RSPConnection id " << connection.getID() << " send rate "
1717 0 : << connection.getSendRate();
1718 :
1719 : #ifdef CO_INSTRUMENT_RSP
1720 : const int prec = os.precision();
1721 : os.precision( 3 );
1722 :
1723 : const float time = instrumentClock.getTimef();
1724 : const float mbps = 1048.576f * time;
1725 : os << ": " << lunchbox::indent << std::endl
1726 : << float( nBytesRead ) / mbps << " / " << float( nBytesWritten ) / mbps
1727 : << " MB/s r/w using " << nDatagrams << " dgrams " << nRepeated
1728 : << " repeats " << nMergedDatagrams
1729 : << " merged"
1730 : << std::endl;
1731 :
1732 : os.precision( prec );
1733 : os << "sender: " << nAckRequests << " ack requests " << nAcksAccepted << "/"
1734 : << nAcksRead << " acks " << nNAcksRead << " nacks, throttle "
1735 : << writeWaitTime << " ms"
1736 : << std::endl
1737 : << "receiver: " << nAcksSend << " acks " << nNAcksSend << " nacks"
1738 : << lunchbox::exdent;
1739 :
1740 : nReadData = 0;
1741 : nBytesRead = 0;
1742 : nBytesWritten = 0;
1743 : nDatagrams = 0;
1744 : nRepeated = 0;
1745 : nMergedDatagrams = 0;
1746 : nAckRequests = 0;
1747 : nAcksSend = 0;
1748 : nAcksRead = 0;
1749 : nAcksAccepted = 0;
1750 : nNAcksSend = 0;
1751 : nNAcksRead = 0;
1752 : writeWaitTime = 0.f;
1753 : #endif
1754 0 : os << std::endl << lunchbox::enableHeader << lunchbox::enableFlush;
1755 :
1756 0 : return os;
1757 : }
1758 :
1759 60 : }
|