Line data Source code
1 : // -*- mode: c++ -*-
2 : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
3 : * United States Naval Research Laboratory
4 : * 2012-2013, Stefan Eilemann <eile@eyescale.ch>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 : #include "rdmaConnection.h"
22 :
23 : #include "connectionType.h"
24 : #include "connectionDescription.h"
25 : #include "global.h"
26 :
27 : #include <lunchbox/clock.h>
28 :
29 : #ifdef _WIN32
30 : # pragma warning( disable : 4018 )
31 : # include <boost/interprocess/mapped_region.hpp>
32 : # pragma warning( default : 4018 )
33 : #else
34 : # include <arpa/inet.h>
35 : # include <sys/epoll.h>
36 : # include <sys/mman.h>
37 : # include <poll.h>
38 : # include <unistd.h>
39 : #endif
40 :
41 : #include <errno.h>
42 : #include <fcntl.h>
43 : #include <limits>
44 : #include <sstream>
45 : #include <stddef.h>
46 :
47 : #include <rdma/rdma_verbs.h>
48 :
49 : #define IPV6_DEFAULT 0
50 :
51 : #define RDMA_PROTOCOL_MAGIC 0xC0
52 : #define RDMA_PROTOCOL_VERSION 0x03
53 :
54 : namespace co
55 : {
56 : //namespace { static const uint64_t ONE = 1ULL; }
57 :
58 : /**
59 : * Message types
60 : */
61 : enum OpCode
62 : {
63 : SETUP = 1 << 0,
64 : FC = 1 << 1,
65 : };
66 :
67 : /**
68 : * Initial setup message used to exchange sink MR parameters.
69 : */
70 : struct RDMASetupPayload
71 : {
72 : uint64_t rbase;
73 : uint64_t rlen;
74 : uint64_t rkey;
75 : };
76 :
77 : /**
78 : * "ACK" messages sent after read, tells source about receive progress.
79 : */
80 : struct RDMAFCPayload
81 : {
82 : uint32_t bytes_received;
83 : uint32_t writes_received;
84 : };
85 :
86 : /**
87 : * Payload wrapper
88 : */
89 : struct RDMAMessage
90 : {
91 : enum OpCode opcode;
92 : uint8_t length;
93 : union
94 : {
95 : struct RDMASetupPayload setup;
96 : struct RDMAFCPayload fc;
97 : } payload;
98 : };
99 :
100 : /**
101 : * "IMM" data sent with RDMA write, tells sink about send progress.
102 : */
103 : struct RDMAFCImm
104 : {
105 : uint32_t bytes_sent:28;
106 : uint32_t fcs_received:4;
107 : };
108 :
109 : namespace {
110 : // We send a max of 28 bits worth of byte counts per RDMA write.
111 : static const uint64_t MAX_BS = (( 2 << ( 28 - 1 )) - 1 );
112 : // We send a max of four bits worth of fc counts per RDMA write.
113 : static const uint16_t MAX_FC = (( 2 << ( 4 - 1 )) - 1 );
114 :
115 : #ifdef _WIN32
116 : static const uint32_t RINGBUFFER_ALLOC_RETRIES = 8;
117 : static const uint32_t WINDOWS_CONNECTION_BACKLOG = 1024;
118 : #endif
119 : static const uint32_t FC_MESSAGE_FREQUENCY = 12;
120 :
121 4 : bool _isInetFamily( const rdma_addrinfo* addrinfo)
122 : {
123 : #ifndef _WIN32
124 4 : return addrinfo->ai_family == AF_INET || addrinfo->ai_family == AF_INET6;
125 : #else
126 : return addrinfo->ai_family == AF_INET;
127 : #endif
128 : }
129 :
130 : }
131 :
132 : /**
133 : * An RDMA connection implementation.
134 : *
135 : * The protocol is simple, e.g.:
136 : *
137 : * initiator target
138 : * -----------------------------------------------------
139 : * resolve/bind/listen
140 : * resolve/prepost/connect
141 : * prepost/accept
142 : * send setup <-------> send setup
143 : * wait for setup wait for setup
144 : * RDMA_WRITE_WITH_IMM WR -------> RDMA_WRITE(DATA) WC
145 : * RECV(FC) WC <------- SEND WR
146 : * .
147 : * .
148 : * .
149 : *
150 : * The setup phase exchanges the MR parameters of a fixed size circular buffer
151 : * to which remote writes are sent. Sender tracks available space on the
152 : * receiver by accepting "Flow Control" messages that update the tail pointer
153 : * of the local "view" of the remote sink MR.
154 : *
155 : * Once setup is complete, either side may begin operations on the other's MR
156 : * (the initiator doesn't have to send first, as in the above example).
157 : *
158 : * If either credits or buffer space are exhausted, sender will spin waiting
159 : * for flow control messages. Receiver will also not send flow control if
160 : * there are no credits available.
161 : *
162 : * One catch is that Collage will only monitor a single "notifier" for events
163 : * and we have three that need to be monitored: one for connection status
164 : * events (the RDMA event channel) - RDMA_CM_EVENT_DISCONNECTED in particular,
165 : * one for the receive completion queue (upon incoming RDMA write), and an
166 : * additional eventfd(2) used to keep the notifier "hot" after partial reads.
167 : * We leverage the feature of epoll(7) in that "If an epoll file descriptor
168 : * has events waiting then it will indicate as being readable".
169 : *
170 : * Quite interesting is the effect of RDMA_RING_BUFFER_SIZE_MB and
171 : * RDMA_SEND_QUEUE_DEPTH depending on the communication pattern. Basically,
172 : * bigger doesn't necessarily equate to faster! The defaults are suited for
173 : * low latency conditions and would need tuning otherwise.
174 : *
175 : * ib_write_bw
176 : * -----------
177 : * #bytes num iterations BW peak[MB/sec] BW average[MB/sec]
178 : * 1048576 10000 3248.10 3247.94
179 : *
180 : * netperf
181 : * -------
182 : * Send perf: 3240.72MB/s (3240.72pps)
183 : * Send perf: 3240.72MB/s (3240.72pps)
184 : * Send perf: 3240.95MB/s (3240.95pps)
185 : */
186 6 : RDMAConnection::RDMAConnection( )
187 : #ifdef _WIN32
188 : : _notifier()
189 : #else
190 : : _notifier( -1 )
191 : #endif
192 6 : , _timeout( Global::getIAttribute( Global::IATTR_RDMA_RESOLVE_TIMEOUT_MS ))
193 : , _rai( NULL )
194 : , _addrinfo( NULL )
195 : , _cm( NULL )
196 : , _cm_id( NULL )
197 : , _new_cm_id( NULL )
198 : , _cc( NULL )
199 : , _cq( NULL )
200 : , _pd( NULL )
201 : , _wcs ( 0 )
202 : , _readBytes( 0 )
203 : , _established( false )
204 : , _depth( 0L )
205 : , _writes( 0L )
206 : , _fcs( 0L )
207 : , _wcredits( 0L )
208 : , _fcredits( 0L )
209 : , _completions( 0U )
210 : , _msgbuf( sizeof(RDMAMessage) )
211 : , _sourcebuf( 0 )
212 : , _sourceptr( 0 )
213 : , _sinkbuf( IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE )
214 : , _sinkptr( 0 )
215 : , _rptr( 0UL )
216 : , _rbase( 0ULL )
217 12 : , _rkey( 0ULL )
218 : #ifdef _WIN32
219 : , _availBytes( 0 )
220 : , _eventFlag( 0 )
221 : , _cmWaitObj( 0 )
222 : , _ccWaitObj( 0 )
223 : #endif
224 : {
225 : #ifndef _WIN32
226 6 : _pipe_fd[0] = -1;
227 6 : _pipe_fd[1] = -1;
228 : #endif
229 :
230 6 : ::memset( (void *)&_addr, 0, sizeof(_addr) );
231 6 : ::memset( (void *)&_serv, 0, sizeof(_serv) );
232 :
233 6 : ::memset( (void *)&_cpd, 0, sizeof(struct RDMAConnParamData) );
234 :
235 6 : ConnectionDescriptionPtr description = _getDescription();
236 6 : description->type = CONNECTIONTYPE_RDMA;
237 6 : description->bandwidth = // QDR default, report "actual" 8b/10b bandwidth
238 6 : ( ::ibv_rate_to_mult( IBV_RATE_40_GBPS ) * 2.5 * 1024000 / 8 ) * 0.8;
239 6 : }
240 :
241 2 : bool RDMAConnection::connect( )
242 : {
243 2 : ConstConnectionDescriptionPtr description = getDescription();
244 2 : LBASSERT( CONNECTIONTYPE_RDMA == description->type );
245 :
246 2 : if( !isClosed() || 0u == description->port )
247 0 : return false;
248 :
249 2 : _cleanup( );
250 2 : _setState( STATE_CONNECTING );
251 :
252 2 : if( !_lookupAddress( false ) || ( NULL == _addrinfo ))
253 : {
254 0 : LBERROR << "Failed to lookup destination address." << std::endl;
255 0 : goto err;
256 : }
257 :
258 2 : _updateInfo( _addrinfo->ai_dst_addr );
259 :
260 2 : if( !_createNotifier( ))
261 : {
262 0 : LBERROR << "Failed to create master notifier." << std::endl;
263 0 : goto err;
264 : }
265 :
266 2 : if( !_createEventChannel( ))
267 : {
268 0 : LBERROR << "Failed to create communication event channel." << std::endl;
269 0 : goto err;
270 : }
271 :
272 2 : if( !_createId( ))
273 : {
274 0 : LBERROR << "Failed to create communication identifier." << std::endl;
275 0 : goto err;
276 : }
277 :
278 2 : if( !_resolveAddress( ))
279 : {
280 0 : LBERROR << "Failed to resolve destination address for : "
281 0 : << _addr << ":" << _serv << std::endl;
282 0 : goto err;
283 : }
284 :
285 2 : _updateInfo( ::rdma_get_peer_addr( _cm_id ));
286 :
287 2 : _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
288 :
289 2 : LBVERB << "Initiating connection on " << std::dec
290 0 : << _device_name << ":" << (int)_cm_id->port_num
291 0 : << " to "
292 0 : << _addr << ":" << _serv
293 2 : << " (" << description->toString( ) << ")"
294 6 : << std::endl;
295 :
296 2 : if( !_initProtocol( Global::getIAttribute(
297 2 : Global::IATTR_RDMA_SEND_QUEUE_DEPTH )))
298 : {
299 0 : LBERROR << "Failed to initialize protocol variables." << std::endl;
300 0 : goto err;
301 : }
302 :
303 2 : if( !_initVerbs( ))
304 : {
305 0 : LBERROR << "Failed to initialize verbs." << std::endl;
306 0 : goto err;
307 : }
308 :
309 2 : if( !_createQP( ))
310 : {
311 0 : LBERROR << "Failed to create queue pair." << std::endl;
312 0 : goto err;
313 : }
314 :
315 2 : if( !_createBytesAvailableFD( ))
316 : {
317 0 : LBERROR << "Failed to create available byte notifier." << std::endl;
318 0 : goto err;
319 : }
320 :
321 2 : if( !_initBuffers( ))
322 : {
323 0 : LBERROR << "Failed to initialize ring buffers." << std::endl;
324 0 : goto err;
325 : }
326 :
327 2 : if( !_resolveRoute( ))
328 : {
329 0 : LBERROR << "Failed to resolve route to destination : "
330 0 : << _addr << ":" << _serv << std::endl;
331 0 : goto err;
332 : }
333 :
334 2 : if( !_postReceives( _depth ))
335 : {
336 0 : LBERROR << "Failed to pre-post receives." << std::endl;
337 0 : goto err;
338 : }
339 :
340 2 : if( !_connect( ))
341 : {
342 0 : LBERROR << "Failed to connect to destination : "
343 0 : << _addr << ":" << _serv << std::endl;
344 0 : goto err;
345 : }
346 :
347 2 : LBASSERT( _established );
348 :
349 4 : if(( RDMA_PROTOCOL_MAGIC != _cpd.magic ) ||
350 2 : ( RDMA_PROTOCOL_VERSION != _cpd.version ))
351 : {
352 0 : LBERROR << "Protocol mismatch with target : "
353 0 : << _addr << ":" << _serv << std::endl;
354 0 : goto err;
355 : }
356 :
357 2 : if( !_postSetup( ))
358 : {
359 0 : LBERROR << "Failed to post setup message." << std::endl;
360 0 : goto err;
361 : }
362 :
363 2 : if( !_waitRecvSetup( ))
364 : {
365 0 : LBERROR << "Failed to receive setup message." << std::endl;
366 0 : goto err;
367 : }
368 :
369 2 : LBVERB << "Connection established on " << std::dec
370 0 : << _device_name << ":" << (int)_cm_id->port_num
371 0 : << " to "
372 0 : << _addr << ":" << _serv
373 2 : << " (" << description->toString( ) << ")"
374 6 : << std::endl;
375 :
376 2 : _setState( STATE_CONNECTED );
377 2 : _updateNotifier();
378 2 : return true;
379 :
380 : err:
381 0 : LBVERB << "Connection failed on " << std::dec
382 0 : << _device_name << ":" << (int)_cm_id->port_num
383 0 : << " to "
384 0 : << _addr << ":" << _serv
385 0 : << " (" << description->toString( ) << ")"
386 0 : << std::endl;
387 :
388 0 : close( );
389 :
390 0 : return false;
391 : }
392 :
393 2 : bool RDMAConnection::listen( )
394 : {
395 2 : ConstConnectionDescriptionPtr description = getDescription();
396 2 : LBASSERT( CONNECTIONTYPE_RDMA == description->type );
397 :
398 2 : if( !isClosed() )
399 0 : return false;
400 :
401 2 : _cleanup( );
402 2 : _setState( STATE_CONNECTING );
403 :
404 2 : if( !_lookupAddress( true ))
405 : {
406 0 : LBERROR << "Failed to lookup local address." << std::endl;
407 0 : goto err;
408 : }
409 :
410 2 : if( NULL != _addrinfo )
411 2 : _updateInfo( _addrinfo->ai_src_addr );
412 :
413 2 : if( !_createNotifier( ))
414 : {
415 0 : LBERROR << "Failed to create master notifier." << std::endl;
416 0 : goto err;
417 : }
418 :
419 2 : if( !_createEventChannel( ))
420 : {
421 0 : LBERROR << "Failed to create communication event channel." << std::endl;
422 0 : goto err;
423 : }
424 :
425 2 : if( !_createId( ))
426 : {
427 0 : LBERROR << "Failed to create communication identifier." << std::endl;
428 0 : goto err;
429 : }
430 :
431 : #if 0
432 : /* NOT IMPLEMENTED */
433 :
434 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
435 : (void *)&ONE, sizeof(ONE) ))
436 : {
437 : LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
438 : goto err;
439 : }
440 : #endif
441 :
442 2 : if( !_bindAddress( ))
443 : {
444 0 : LBERROR << "Failed to bind to local address : "
445 0 : << _addr << ":" << _serv << std::endl;
446 0 : goto err;
447 : }
448 :
449 2 : _updateInfo( ::rdma_get_local_addr( _cm_id ));
450 :
451 : #ifdef _WIN32
452 : if( !_listen( WINDOWS_CONNECTION_BACKLOG ))
453 : #else
454 2 : if( !_listen( SOMAXCONN ))
455 : #endif
456 : {
457 0 : LBERROR << "Failed to listen on bound address : "
458 0 : << _addr << ":" << _serv << std::endl;
459 0 : goto err;
460 : }
461 :
462 2 : if( NULL != _cm_id->verbs )
463 0 : _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
464 :
465 2 : LBVERB << "Listening on " << std::dec
466 0 : << _device_name << ":" << (int)_cm_id->port_num
467 0 : << " at "
468 0 : << _addr << ":" << _serv
469 2 : << " (" << description->toString( ) << ")"
470 6 : << std::endl;
471 :
472 2 : _setState( STATE_LISTENING );
473 2 : return true;
474 :
475 : err:
476 0 : close();
477 0 : return false;
478 : }
479 :
480 2 : void RDMAConnection::acceptNB( ) { /* NOP */ }
481 :
482 2 : ConnectionPtr RDMAConnection::acceptSync( )
483 : {
484 2 : RDMAConnection *newConnection = NULL;
485 :
486 2 : if( !isListening() )
487 : {
488 0 : LBERROR << "Connection not in listening state." << std::endl;
489 0 : goto out;
490 : }
491 :
492 2 : if( !_waitForCMEvent( RDMA_CM_EVENT_CONNECT_REQUEST ))
493 : {
494 0 : LBERROR << "Failed to receive valid connect request." << std::endl;
495 0 : goto out;
496 : }
497 :
498 2 : LBASSERT( NULL != _new_cm_id );
499 :
500 2 : newConnection = new RDMAConnection( );
501 :
502 2 : if( !newConnection->_finishAccept( _new_cm_id, _cpd ))
503 : {
504 0 : delete newConnection;
505 0 : newConnection = NULL;
506 : }
507 :
508 : out:
509 2 : _new_cm_id = NULL;
510 2 : _updateNotifier();
511 2 : return newConnection;
512 : }
513 :
514 508593 : void RDMAConnection::readNB( void*, const uint64_t ) { /* NOP */ }
515 :
516 508593 : int64_t RDMAConnection::readSync( void* buffer, const uint64_t bytes,
517 : const bool block )
518 : {
519 508593 : lunchbox::Clock clock;
520 508593 : const int64_t start = clock.getTime64( );
521 508593 : const uint32_t timeout = Global::getTimeout( );
522 508593 : eventset events;
523 508593 : uint64_t available_bytes = 0ULL;
524 508593 : uint32_t bytes_taken = 0UL;
525 508593 : bool extra_event = false;
526 :
527 508593 : if( !isConnected() )
528 0 : goto err;
529 :
530 : // LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
531 : // << std::endl;
532 :
533 508593 : _stats.reads++;
534 :
535 : retry:
536 525909 : if( !_checkDisconnected( events ))
537 : {
538 0 : LBERROR << "Error while checking event state." << std::endl;
539 0 : goto err;
540 : }
541 :
542 525909 : if( events.test( CQ_EVENT ))
543 : {
544 69492 : if( !_rearmCQ( ))
545 : {
546 0 : LBERROR << "Error while rearming receive channel." << std::endl;
547 0 : goto err;
548 : }
549 : }
550 :
551 : // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
552 525909 : if( !_checkCQ( events.test( CQ_EVENT )))
553 : {
554 0 : LBERROR << "Error while polling completion queues." << std::endl;
555 0 : goto err;
556 : }
557 :
558 525909 : LBASSERT( _fcredits >= 0L );
559 :
560 525909 : if( _established && _needFC( ) && ( 0L == _fcredits ))
561 : {
562 0 : if( LB_TIMEOUT_INDEFINITE != timeout )
563 : {
564 0 : if(( clock.getTime64( ) - start ) > timeout )
565 : {
566 0 : LBERROR << "Timed out trying to acquire credit." << std::endl;
567 0 : goto err;
568 : }
569 : }
570 :
571 : //LBWARN << "No credit for flow control." << std::endl;
572 0 : lunchbox::Thread::yield( );
573 0 : _stats.no_credits_fc++;
574 0 : goto retry;
575 : }
576 :
577 : // "Note that an extra event may be triggered without having a
578 : // corresponding completion entry in the CQ." (per ibv_get_cq_event(3))
579 525909 : if( _established && !events.test( BUF_EVENT ))
580 : {
581 : // Special case: If LocalNode is reading the length part of a message
582 : // it will ignore this zero return and restart the select.
583 17316 : if( extra_event && !block )
584 : {
585 0 : _updateNotifier();
586 0 : return 0LL;
587 : }
588 :
589 17316 : extra_event = true;
590 17316 : goto retry;
591 : }
592 :
593 508593 : if( events.test( BUF_EVENT ))
594 : {
595 508591 : available_bytes = _getAvailableBytes( );
596 508591 : if( 0ULL == available_bytes )
597 : {
598 0 : LBERROR << "Error while reading from event fd." << std::endl;
599 0 : goto err;
600 : }
601 : }
602 :
603 : // Modifies _sinkptr.TAIL
604 508593 : bytes_taken = _drain( buffer, bytes );
605 :
606 508593 : if( 0UL == bytes_taken )
607 : {
608 2 : if( _sinkptr.isEmpty( ) && !_established )
609 : {
610 10 : LBINFO << "Got EOF, closing " << getDescription( )->toString( )
611 8 : << std::endl;
612 2 : goto err;
613 : }
614 :
615 0 : if( LB_TIMEOUT_INDEFINITE != timeout )
616 : {
617 0 : if(( clock.getTime64( ) - start ) > timeout )
618 : {
619 0 : LBERROR << "Timed out trying to drain buffer." << std::endl;
620 0 : goto err;
621 : }
622 : }
623 :
624 : //LBWARN << "Sink buffer empty." << std::endl;
625 0 : lunchbox::Thread::yield( );
626 0 : _stats.buffer_empty++;
627 0 : goto retry;
628 : }
629 :
630 : // Put back what wasn't taken (ensure the master notifier stays "hot").
631 508591 : if( available_bytes > bytes_taken )
632 492718 : _incrAvailableBytes( available_bytes - bytes_taken );
633 : #ifdef _WIN32
634 : else
635 : _updateNotifier();
636 : #endif
637 :
638 508591 : _readBytes += bytes_taken;
639 :
640 508591 : if( _established && _needFC( ) && !_postFC( ))
641 0 : LBWARN << "Error while posting flow control message." << std::endl;
642 :
643 : // LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
644 : // << " took " << bytes_taken << " bytes"
645 : // << " (" << _sinkptr.available( ) << " still available)" << std::endl;
646 :
647 508591 : return bytes_taken;
648 :
649 : err:
650 2 : close( );
651 :
652 2 : return -1LL;
653 : }
654 :
655 526159 : int64_t RDMAConnection::write( const void* buffer, const uint64_t bytes )
656 : {
657 526159 : lunchbox::Clock clock;
658 526159 : const int64_t start = clock.getTime64( );
659 526159 : const uint32_t timeout = Global::getTimeout( );
660 526159 : eventset events;
661 526159 : const uint32_t can_put = std::min( bytes, MAX_BS );
662 : uint32_t bytes_put;
663 :
664 526159 : if( !isConnected() )
665 0 : goto err;
666 :
667 : // LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
668 : // << std::endl;
669 :
670 526159 : _stats.writes++;
671 :
672 : retry:
673 905785 : if( !_checkDisconnected( events ))
674 : {
675 0 : LBERROR << "Error while checking connection state." << std::endl;
676 0 : goto err;
677 : }
678 :
679 905785 : if( !_established )
680 : {
681 0 : LBWARN << "Disconnected in write." << std::endl;
682 0 : goto err;
683 : }
684 :
685 : // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
686 905785 : if( !_checkCQ( false ))
687 : {
688 0 : LBERROR << "Error while polling completion queues." << std::endl;
689 0 : goto err;
690 : }
691 :
692 905785 : LBASSERT( _wcredits >= 0L );
693 :
694 905785 : if( 0L == _wcredits )
695 : {
696 0 : if( LB_TIMEOUT_INDEFINITE != timeout )
697 : {
698 0 : if(( clock.getTime64( ) - start ) > timeout )
699 : {
700 0 : LBERROR << "Timed out trying to acquire credit." << std::endl;
701 0 : goto err;
702 : }
703 : }
704 :
705 : //LBWARN << "No credits for RDMA." << std::endl;
706 0 : lunchbox::Thread::yield( );
707 0 : _stats.no_credits_rdma++;
708 0 : goto retry;
709 : }
710 :
711 : // Modifies _sourceptr.HEAD
712 905785 : bytes_put = _fill( buffer, can_put );
713 :
714 905785 : if( 0UL == bytes_put )
715 : {
716 379626 : if( LB_TIMEOUT_INDEFINITE != timeout )
717 : {
718 379626 : if(( clock.getTime64( ) - start ) > timeout )
719 : {
720 0 : LBERROR << "Timed out trying to fill buffer." << std::endl;
721 0 : goto err;
722 : }
723 : }
724 :
725 : //LBWARN << "Source buffer full." << std::endl;
726 379626 : lunchbox::Thread::yield( );
727 379626 : if( _sourceptr.isFull( ) || _rptr.isFull( ))
728 379626 : _stats.buffer_full++;
729 379626 : goto retry;
730 : }
731 :
732 : // Modifies _sourceptr.MIDDLE & _rptr.HEAD
733 526159 : if( !_postRDMAWrite( ))
734 : {
735 0 : LBERROR << "Error while posting RDMA write." << std::endl;
736 0 : goto err;
737 : }
738 :
739 : // LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
740 : // << " put " << bytes_put << " bytes" << std::endl;
741 :
742 526159 : return bytes_put;
743 :
744 : err:
745 0 : return -1LL;
746 : }
747 :
748 18 : RDMAConnection::~RDMAConnection( )
749 : {
750 6 : _close( );
751 12 : }
752 :
753 : ////////////////////////////////////////////////////////////////////////////////
754 :
755 12 : void RDMAConnection::_close( )
756 : {
757 12 : lunchbox::ScopedWrite close_mutex( _poll_lock );
758 :
759 12 : if( !isClosed() )
760 : {
761 6 : LBASSERT( !isClosing() );
762 6 : _setState( STATE_CLOSING );
763 :
764 6 : if( _established && ::rdma_disconnect( _cm_id ))
765 0 : LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
766 :
767 6 : _setState( STATE_CLOSED );
768 6 : _cleanup( );
769 12 : }
770 12 : }
771 :
772 10 : void RDMAConnection::_cleanup( )
773 : {
774 10 : LBASSERT( isClosed() );
775 :
776 10 : _sourcebuf.clear( );
777 10 : _sinkbuf.clear( );
778 10 : _msgbuf.clear( );
779 :
780 10 : if( _completions > 0U )
781 : {
782 2 : ::ibv_ack_cq_events( _cq, _completions );
783 2 : _completions = 0U;
784 : }
785 :
786 10 : delete[] _wcs;
787 10 : _wcs = 0;
788 :
789 : #ifdef _WIN32
790 : if ( _ccWaitObj )
791 : UnregisterWait( _ccWaitObj );
792 : _ccWaitObj = 0;
793 :
794 : if ( _cmWaitObj )
795 : UnregisterWait( _cmWaitObj );
796 : _cmWaitObj = 0;
797 : #endif
798 :
799 10 : if( NULL != _cm_id )
800 6 : ::rdma_destroy_ep( _cm_id );
801 10 : _cm_id = NULL;
802 :
803 10 : if(( NULL != _cq ) && ::rdma_seterrno( ::ibv_destroy_cq( _cq )))
804 0 : LBWARN << "ibv_destroy_cq : " << lunchbox::sysError << std::endl;
805 10 : _cq = NULL;
806 :
807 10 : if(( NULL != _cc ) && ::rdma_seterrno( ::ibv_destroy_comp_channel( _cc )))
808 0 : LBWARN << "ibv_destroy_comp_channel : " << lunchbox::sysError
809 0 : << std::endl;
810 10 : _cc = NULL;
811 :
812 10 : if(( NULL != _pd ) && ::rdma_seterrno( ::ibv_dealloc_pd( _pd )))
813 0 : LBWARN << "ibv_dealloc_pd : " << lunchbox::sysError << std::endl;
814 10 : _pd = NULL;
815 :
816 10 : if( NULL != _cm )
817 6 : ::rdma_destroy_event_channel( _cm );
818 10 : _cm = NULL;
819 :
820 10 : if( NULL != _rai )
821 4 : ::rdma_freeaddrinfo( _rai );
822 10 : _rai = NULL;
823 :
824 10 : _rptr = 0UL;
825 10 : _rbase = _rkey = 0ULL;
826 : #ifndef _WIN32
827 10 : if(( _notifier >= 0 ) && TEMP_FAILURE_RETRY( ::close( _notifier )))
828 0 : LBWARN << "close : " << lunchbox::sysError << std::endl;
829 10 : _notifier = -1;
830 : #endif
831 10 : }
832 :
833 2 : bool RDMAConnection::_finishAccept( struct rdma_cm_id *new_cm_id,
834 : const RDMAConnParamData &cpd )
835 : {
836 2 : LBASSERT( isClosed() );
837 2 : _setState( STATE_CONNECTING );
838 :
839 2 : LBASSERT( NULL != new_cm_id );
840 :
841 2 : _cm_id = new_cm_id;
842 :
843 : {
844 : // FIXME : RDMA CM appears to send up invalid addresses when receiving
845 : // connections that use a different protocol than what was bound. E.g.
846 : // if an IPv6 listener gets an IPv4 connection then the sa_family
847 : // will be AF_INET6 but the actual data is struct sockaddr_in. Example:
848 : //
849 : // 0000000: 0a00 bc10 c0a8 b01a 0000 0000 0000 0000 ................
850 : //
851 : // However, in the reverse case, when an IPv4 listener gets an IPv6
852 : // connection not only is the address family incorrect, but the actual
853 : // IPv6 address is only partially there:
854 : //
855 : // 0000000: 0200 bc11 0000 0000 fe80 0000 0000 0000 ................
856 : // 0000010: 0000 0000 0000 0000 0000 0000 0000 0000 ................
857 : // 0000020: 0000 0000 0000 0000 0000 0000 0000 0000 ................
858 : // 0000030: 0000 0000 0000 0000 0000 0000 0000 0000 ................
859 : //
860 : // Surely seems to be a bug in RDMA CM.
861 :
862 : union
863 : {
864 : struct sockaddr addr;
865 : struct sockaddr_in sin;
866 : struct sockaddr_in6 sin6;
867 : struct sockaddr_storage storage;
868 : } sss;
869 :
870 : // Make a copy since we might change it.
871 : //sss.storage = _cm_id->route.addr.dst_storage;
872 : ::memcpy( (void *)&sss.storage,
873 2 : (const void *)::rdma_get_peer_addr( _cm_id ),
874 2 : sizeof(struct sockaddr_storage) );
875 : #ifndef _WIN32
876 4 : if(( AF_INET == sss.storage.ss_family ) &&
877 4 : ( sss.sin6.sin6_addr.s6_addr32[0] != 0 ||
878 4 : sss.sin6.sin6_addr.s6_addr32[1] != 0 ||
879 4 : sss.sin6.sin6_addr.s6_addr32[2] != 0 ||
880 2 : sss.sin6.sin6_addr.s6_addr32[3] != 0 ))
881 : {
882 0 : LBWARN << "IPv6 address detected but likely invalid!" << std::endl;
883 0 : sss.storage.ss_family = AF_INET6;
884 : }
885 : else
886 : #endif
887 2 : if(( AF_INET6 == sss.storage.ss_family ) &&
888 0 : ( INADDR_ANY != sss.sin.sin_addr.s_addr ))
889 : {
890 0 : sss.storage.ss_family = AF_INET;
891 : }
892 :
893 2 : _updateInfo( &sss.addr );
894 : }
895 :
896 2 : _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
897 :
898 2 : LBVERB << "Connection initiated on " << std::dec
899 0 : << _device_name << ":" << (int)_cm_id->port_num
900 0 : << " from "
901 0 : << _addr << ":" << _serv
902 2 : << " (" << getDescription()->toString( ) << ")"
903 6 : << std::endl;
904 :
905 4 : if(( RDMA_PROTOCOL_MAGIC != cpd.magic ) ||
906 2 : ( RDMA_PROTOCOL_VERSION != cpd.version ))
907 : {
908 0 : LBERROR << "Protocol mismatch with initiator : "
909 0 : << _addr << ":" << _serv << std::endl;
910 0 : goto err_reject;
911 : }
912 :
913 2 : if( !_createNotifier( ))
914 : {
915 0 : LBERROR << "Failed to create master notifier." << std::endl;
916 0 : goto err_reject;
917 : }
918 :
919 2 : if( !_createEventChannel( ))
920 : {
921 0 : LBERROR << "Failed to create event channel." << std::endl;
922 0 : goto err_reject;
923 : }
924 :
925 2 : if( !_migrateId( ))
926 : {
927 0 : LBERROR << "Failed to migrate communication identifier." << std::endl;
928 0 : goto err_reject;
929 : }
930 :
931 2 : if( !_initProtocol( cpd.depth ))
932 : {
933 0 : LBERROR << "Failed to initialize protocol variables." << std::endl;
934 0 : goto err_reject;
935 : }
936 :
937 2 : if( !_initVerbs( ))
938 : {
939 0 : LBERROR << "Failed to initialize verbs." << std::endl;
940 0 : goto err_reject;
941 : }
942 :
943 2 : if( !_createQP( ))
944 : {
945 0 : LBERROR << "Failed to create queue pair." << std::endl;
946 0 : goto err_reject;
947 : }
948 :
949 2 : if( !_createBytesAvailableFD( ))
950 : {
951 0 : LBERROR << "Failed to create available byte notifier." << std::endl;
952 0 : goto err_reject;
953 : }
954 :
955 2 : if( !_initBuffers( ))
956 : {
957 0 : LBERROR << "Failed to initialize ring buffers." << std::endl;
958 0 : goto err_reject;
959 : }
960 :
961 2 : if( !_postReceives( _depth ))
962 : {
963 0 : LBERROR << "Failed to pre-post receives." << std::endl;
964 0 : goto err_reject;
965 : }
966 :
967 2 : if( !_accept( ))
968 : {
969 0 : LBERROR << "Failed to accept remote connection from : "
970 0 : << _addr << ":" << _serv << std::endl;
971 0 : goto err;
972 : }
973 :
974 2 : LBASSERT( _established );
975 :
976 2 : if( !_waitRecvSetup( ))
977 : {
978 0 : LBERROR << "Failed to receive setup message." << std::endl;
979 0 : goto err;
980 : }
981 :
982 2 : if( !_postSetup( ))
983 : {
984 0 : LBERROR << "Failed to post setup message." << std::endl;
985 0 : goto err;
986 : }
987 :
988 2 : LBVERB << "Connection accepted on " << std::dec
989 0 : << _device_name << ":" << (int)_cm_id->port_num
990 0 : << " from "
991 0 : << _addr << ":" << _serv
992 2 : << " (" << getDescription()->toString( ) << ")"
993 6 : << std::endl;
994 :
995 2 : _setState( STATE_CONNECTED );
996 2 : _updateNotifier();
997 2 : return true;
998 :
999 : err_reject:
1000 0 : LBWARN << "Rejecting connection from remote address : "
1001 0 : << _addr << ":" << _serv << std::endl;
1002 :
1003 0 : if( !_reject( ))
1004 0 : LBWARN << "Failed to issue connection reject." << std::endl;
1005 :
1006 : err:
1007 0 : close( );
1008 :
1009 0 : return false;
1010 : }
1011 :
1012 4 : bool RDMAConnection::_lookupAddress( const bool passive )
1013 : {
1014 : struct rdma_addrinfo hints;
1015 4 : char* node = 0;
1016 4 : char* service = 0;
1017 4 : std::string s;
1018 :
1019 4 : ::memset( (void *)&hints, 0, sizeof(struct rdma_addrinfo) );
1020 4 : if( passive )
1021 2 : hints.ai_flags |= RAI_PASSIVE;
1022 :
1023 8 : ConstConnectionDescriptionPtr description = getDescription();
1024 4 : const std::string &hostname = description->getHostname( );
1025 4 : if( !hostname.empty( ))
1026 4 : node = const_cast< char * >( hostname.c_str( ));
1027 :
1028 4 : if( 0u != description->port )
1029 : {
1030 3 : std::stringstream ss;
1031 3 : ss << description->port;
1032 3 : s = ss.str( );
1033 3 : service = const_cast< char * >( s.c_str( ));
1034 : }
1035 :
1036 4 : if(( NULL != node ) && ::rdma_getaddrinfo( node, service, &hints, &_rai ))
1037 : {
1038 0 : LBERROR << "rdma_getaddrinfo : " << lunchbox::sysError << std::endl;
1039 0 : return false;
1040 : }
1041 :
1042 4 : _addrinfo = _rai;
1043 8 : while( _addrinfo && !_isInetFamily( _addrinfo ))
1044 0 : _addrinfo = _addrinfo->ai_next;
1045 :
1046 4 : if( !_addrinfo )
1047 : {
1048 0 : if( _rai )
1049 : {
1050 0 : LBWARN << "No IP or IPv6 address found by rdma_getaddrinfo. "
1051 0 : "Connection establishment may fail." << std::endl;
1052 : }
1053 0 : _addrinfo = _rai;
1054 : }
1055 :
1056 4 : if(( NULL != _addrinfo ) && ( _addrinfo->ai_connect_len > 0 ))
1057 0 : LBWARN << "WARNING : ai_connect data specified!" << std::endl;
1058 :
1059 8 : return true;
1060 : }
1061 :
1062 10 : void RDMAConnection::_updateInfo( struct sockaddr *addr )
1063 : {
1064 10 : int salen = sizeof(struct sockaddr);
1065 10 : bool is_unspec = false;
1066 :
1067 10 : if( AF_INET == addr->sa_family )
1068 : {
1069 : struct sockaddr_in *sin =
1070 10 : reinterpret_cast< struct sockaddr_in * >( addr );
1071 10 : is_unspec = ( INADDR_ANY == sin->sin_addr.s_addr );
1072 10 : salen = sizeof(struct sockaddr_in);
1073 : }
1074 : // TODO: IPv6 for WIndows
1075 : #ifndef _WIN32
1076 0 : else if( AF_INET6 == addr->sa_family )
1077 : {
1078 : struct sockaddr_in6 *sin6 =
1079 0 : reinterpret_cast< struct sockaddr_in6 * >( addr );
1080 :
1081 0 : is_unspec = ( sin6->sin6_addr.s6_addr32[0] == 0 &&
1082 0 : sin6->sin6_addr.s6_addr32[1] == 0 &&
1083 0 : sin6->sin6_addr.s6_addr32[2] == 0 &&
1084 0 : sin6->sin6_addr.s6_addr32[3] == 0 );
1085 0 : salen = sizeof(struct sockaddr_in6);
1086 : }
1087 : #endif
1088 : else
1089 : {
1090 0 : LBERROR << "Unsupported socket address family "
1091 0 : << addr->sa_family << std::endl;
1092 0 : return;
1093 : }
1094 :
1095 : int err;
1096 10 : if(( err = ::getnameinfo( addr, salen, _addr, sizeof(_addr),
1097 10 : _serv, sizeof(_serv), NI_NUMERICHOST | NI_NUMERICSERV )))
1098 0 : LBWARN << "Name info lookup failed : " << err << std::endl;
1099 :
1100 10 : if( is_unspec )
1101 0 : ::gethostname( _addr, NI_MAXHOST );
1102 :
1103 10 : ConnectionDescriptionPtr description = _getDescription();
1104 10 : if( description->getHostname( ).empty( ))
1105 2 : description->setHostname( _addr );
1106 10 : if( 0u == description->port )
1107 4 : description->port = atoi( _serv );
1108 : }
1109 :
1110 6 : bool RDMAConnection::_createEventChannel( )
1111 : {
1112 6 : LBASSERT( NULL == _cm );
1113 :
1114 6 : _cm = ::rdma_create_event_channel( );
1115 6 : if( NULL == _cm )
1116 : {
1117 0 : LBERROR << "rdma_create_event_channel : " << lunchbox::sysError <<
1118 0 : std::endl;
1119 0 : return false;
1120 : }
1121 :
1122 : #ifdef _WIN32
1123 : if ( !RegisterWaitForSingleObject(
1124 : &_cmWaitObj,
1125 : _cm->channel.Event,
1126 : WAITORTIMERCALLBACK( &_triggerNotifierCM ),
1127 : this,
1128 : INFINITE,
1129 : WT_EXECUTEINWAITTHREAD ))
1130 : {
1131 : LBERROR << "RegisterWaitForSingleObject : " << co::base::sysError
1132 : << std::endl;
1133 : return false;
1134 : }
1135 : #else
1136 : struct epoll_event evctl;
1137 :
1138 6 : LBASSERT( _notifier >= 0 );
1139 :
1140 : // Use the CM fd to signal Collage of connection events.
1141 6 : ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
1142 6 : evctl.events = EPOLLIN;
1143 6 : evctl.data.fd = _cm->fd;
1144 6 : if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
1145 : {
1146 0 : LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
1147 0 : return false;
1148 : }
1149 : #endif
1150 :
1151 6 : return true;
1152 : }
1153 :
1154 4 : bool RDMAConnection::_createId( )
1155 : {
1156 4 : LBASSERT( NULL != _cm );
1157 4 : LBASSERT( NULL == _cm_id );
1158 :
1159 4 : if( ::rdma_create_id( _cm, &_cm_id, NULL, RDMA_PS_TCP ))
1160 : {
1161 0 : LBERROR << "rdma_create_id : " << lunchbox::sysError << std::endl;
1162 0 : return false;
1163 : }
1164 :
1165 4 : return true;
1166 : }
1167 :
1168 4 : bool RDMAConnection::_initVerbs( )
1169 : {
1170 4 : LBASSERT( NULL != _cm_id );
1171 4 : LBASSERT( NULL != _cm_id->verbs );
1172 :
1173 4 : LBASSERT( NULL == _pd );
1174 :
1175 : // Allocate protection domain.
1176 4 : _pd = ::ibv_alloc_pd( _cm_id->verbs );
1177 4 : if( NULL == _pd )
1178 : {
1179 0 : LBERROR << "ibv_alloc_pd : " << lunchbox::sysError << std::endl;
1180 0 : return false;
1181 : }
1182 :
1183 4 : LBASSERT( NULL == _cc );
1184 :
1185 : // Create completion channel.
1186 4 : _cc = ::ibv_create_comp_channel( _cm_id->verbs );
1187 4 : if( NULL == _cc )
1188 : {
1189 0 : LBERROR << "ibv_create_comp_channel : " << lunchbox::sysError
1190 0 : << std::endl;
1191 0 : return false;
1192 : }
1193 :
1194 : #ifdef _WIN32
1195 : if ( !RegisterWaitForSingleObject(
1196 : &_ccWaitObj,
1197 : _cc->comp_channel.Event,
1198 : WAITORTIMERCALLBACK( &_triggerNotifierCQ ),
1199 : this,
1200 : INFINITE,
1201 : WT_EXECUTEINWAITTHREAD ))
1202 : {
1203 : LBERROR << "RegisterWaitForSingleObject : " << lunchbox::sysError
1204 : << std::endl;
1205 : return false;
1206 : }
1207 : #else
1208 4 : LBASSERT( _notifier >= 0 );
1209 :
1210 : // Use the completion channel fd to signal Collage of RDMA writes received.
1211 : struct epoll_event evctl;
1212 4 : ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
1213 4 : evctl.events = EPOLLIN;
1214 4 : evctl.data.fd = _cc->fd;
1215 4 : if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
1216 : {
1217 0 : LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
1218 0 : return false;
1219 : }
1220 : #endif
1221 :
1222 4 : LBASSERT( NULL == _cq );
1223 :
1224 : // Create a single completion queue for both sends & receives */
1225 4 : _cq = ::ibv_create_cq( _cm_id->verbs, _depth * 2, NULL, _cc, 0 );
1226 4 : if( NULL == _cq )
1227 : {
1228 0 : LBERROR << "ibv_create_cq : " << lunchbox::sysError << std::endl;
1229 0 : return false;
1230 : }
1231 :
1232 : // Request IBV_SEND_SOLICITED events only (i.e. RDMA writes, not FC)
1233 4 : if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
1234 : {
1235 0 : LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
1236 0 : return false;
1237 : }
1238 :
1239 4 : _wcs = new struct ibv_wc[ _depth ];
1240 4 : return true;
1241 : }
1242 :
1243 4 : bool RDMAConnection::_createQP( )
1244 : {
1245 : struct ibv_qp_init_attr init_attr;
1246 :
1247 4 : LBASSERT( _depth > 0 );
1248 4 : LBASSERT( NULL != _cm_id );
1249 4 : LBASSERT( NULL != _pd );
1250 4 : LBASSERT( NULL != _cq );
1251 :
1252 4 : ::memset( (void *)&init_attr, 0, sizeof(struct ibv_qp_init_attr) );
1253 4 : init_attr.qp_type = IBV_QPT_RC;
1254 4 : init_attr.cap.max_send_wr = _depth;
1255 4 : init_attr.cap.max_recv_wr = _depth;
1256 4 : init_attr.cap.max_recv_sge = 1;
1257 4 : init_attr.cap.max_send_sge = 1;
1258 4 : init_attr.recv_cq = _cq;
1259 4 : init_attr.send_cq = _cq;
1260 4 : init_attr.sq_sig_all = 1; // i.e. always IBV_SEND_SIGNALED
1261 :
1262 : // Create queue pair.
1263 4 : if( ::rdma_create_qp( _cm_id, _pd, &init_attr ))
1264 : {
1265 0 : LBERROR << "rdma_create_qp : " << lunchbox::sysError << std::endl;
1266 0 : return false;
1267 : }
1268 :
1269 4 : LBVERB << "RDMA QP caps : " << std::dec <<
1270 0 : init_attr.cap.max_recv_wr << " receives, " <<
1271 4 : init_attr.cap.max_send_wr << " sends, " << std::endl;
1272 :
1273 4 : return true;
1274 : }
1275 :
1276 4 : bool RDMAConnection::_initBuffers( )
1277 : {
1278 4 : const size_t rbs = 1024 * 1024 *
1279 8 : Global::getIAttribute( Global::IATTR_RDMA_RING_BUFFER_SIZE_MB );
1280 :
1281 4 : LBASSERT( _depth > 0 );
1282 4 : LBASSERT( NULL != _pd );
1283 :
1284 4 : if( 0 == rbs )
1285 : {
1286 0 : LBERROR << "Invalid RDMA ring buffer size." << std::endl;
1287 0 : return false;
1288 : }
1289 :
1290 4 : if( !_sourcebuf.resize( _pd, rbs ))
1291 : {
1292 0 : LBERROR << "Failed to resize source buffer." << std::endl;
1293 0 : return false;
1294 : }
1295 :
1296 4 : if( !_sinkbuf.resize( _pd, rbs ))
1297 : {
1298 0 : LBERROR << "Failed to resize sink buffer." << std::endl;
1299 0 : return false;
1300 : }
1301 :
1302 4 : _sourceptr.clear( uint32_t( _sourcebuf.getSize( )));
1303 4 : _sinkptr.clear( uint32_t( _sinkbuf.getSize( )));
1304 :
1305 : // Need enough space for both sends and receives.
1306 4 : if( !_msgbuf.resize( _pd, _depth * 2 ))
1307 : {
1308 0 : LBERROR << "Failed to resize message buffer pool." << std::endl;
1309 0 : return false;
1310 : }
1311 :
1312 4 : return true;
1313 : }
1314 :
1315 2 : bool RDMAConnection::_resolveAddress( )
1316 : {
1317 2 : LBASSERT( NULL != _cm_id );
1318 2 : LBASSERT( NULL != _addrinfo );
1319 :
1320 2 : if( ::rdma_resolve_addr( _cm_id, _addrinfo->ai_src_addr, _addrinfo->ai_dst_addr,
1321 2 : _timeout ))
1322 : {
1323 0 : LBERROR << "rdma_resolve_addr : " << lunchbox::sysError << std::endl;
1324 0 : return false;
1325 : }
1326 :
1327 2 : return _waitForCMEvent( RDMA_CM_EVENT_ADDR_RESOLVED );
1328 : }
1329 :
1330 2 : bool RDMAConnection::_resolveRoute( )
1331 : {
1332 2 : LBASSERT( NULL != _cm_id );
1333 2 : LBASSERT( NULL != _addrinfo );
1334 :
1335 4 : if(( IBV_TRANSPORT_IB == _cm_id->verbs->device->transport_type ) &&
1336 2 : ( _addrinfo->ai_route_len > 0 ))
1337 : {
1338 : #ifdef _WIN32
1339 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
1340 : #else
1341 0 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_IB, RDMA_OPTION_IB_PATH,
1342 : #endif
1343 0 : _addrinfo->ai_route, _addrinfo->ai_route_len ))
1344 : {
1345 0 : LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
1346 0 : return false;
1347 : }
1348 : }
1349 2 : else if( ::rdma_resolve_route( _cm_id, _timeout ))
1350 : {
1351 0 : LBERROR << "rdma_resolve_route : " << lunchbox::sysError << std::endl;
1352 0 : return false;
1353 : }
1354 :
1355 2 : return _waitForCMEvent( RDMA_CM_EVENT_ROUTE_RESOLVED );
1356 : }
1357 :
1358 2 : bool RDMAConnection::_connect( )
1359 : {
1360 2 : LBASSERT( NULL != _cm_id );
1361 2 : LBASSERT( !_established );
1362 :
1363 : struct rdma_conn_param conn_param;
1364 :
1365 2 : ::memset( (void *)&conn_param, 0, sizeof(struct rdma_conn_param) );
1366 :
1367 2 : _cpd.magic = RDMA_PROTOCOL_MAGIC;
1368 2 : _cpd.version = RDMA_PROTOCOL_VERSION;
1369 2 : _cpd.depth = _depth;
1370 2 : conn_param.private_data = reinterpret_cast< const void * >( &_cpd );
1371 2 : conn_param.private_data_len = sizeof(struct RDMAConnParamData);
1372 2 : conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
1373 2 : conn_param.responder_resources = RDMA_MAX_RESP_RES;
1374 : // Magic 3-bit values.
1375 : //conn_param.retry_count = 5;
1376 : //conn_param.rnr_retry_count = 7;
1377 :
1378 6 : LBINFO << "Connect on source lid : " << std::showbase
1379 4 : << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
1380 4 : << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
1381 2 : << "to dest lid : "
1382 4 : << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
1383 4 : << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
1384 6 : << std::endl;
1385 :
1386 2 : if( ::rdma_connect( _cm_id, &conn_param ))
1387 : {
1388 0 : LBERROR << "rdma_connect : " << lunchbox::sysError << std::endl;
1389 0 : return false;
1390 : }
1391 :
1392 2 : bool ret = _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
1393 :
1394 2 : return ret;
1395 : }
1396 :
1397 2 : bool RDMAConnection::_bindAddress( )
1398 : {
1399 2 : LBASSERT( NULL != _cm_id );
1400 2 : ConstConnectionDescriptionPtr description = getDescription();
1401 :
1402 : #if IPV6_DEFAULT
1403 : struct sockaddr_in6 sin;
1404 : memset( (void *)&sin, 0, sizeof(struct sockaddr_in6) );
1405 : sin.sin6_family = AF_INET6;
1406 : sin.sin6_port = htons( description->port );
1407 : sin.sin6_addr = in6addr_any;
1408 : #else
1409 : struct sockaddr_in sin;
1410 2 : memset( (void *)&sin, 0, sizeof(struct sockaddr_in) );
1411 2 : sin.sin_family = AF_INET;
1412 2 : sin.sin_port = htons( description->port );
1413 2 : sin.sin_addr.s_addr = INADDR_ANY;
1414 : #endif
1415 :
1416 4 : if( ::rdma_bind_addr( _cm_id, ( NULL != _addrinfo ) ? _addrinfo->ai_src_addr :
1417 2 : reinterpret_cast< struct sockaddr * >( &sin )))
1418 : {
1419 0 : LBERROR << "rdma_bind_addr : " << lunchbox::sysError << std::endl;
1420 0 : return false;
1421 : }
1422 :
1423 2 : const uint16_t port = ntohs(rdma_get_src_port(_cm_id));
1424 8 : LBDEBUG << "Listening on " << description->getHostname() << "["
1425 10 : << _addr << "]:" << port << " (" << description->toString()
1426 8 : << ")" << std::endl;
1427 :
1428 2 : return true;
1429 : }
1430 :
1431 2 : bool RDMAConnection::_listen( int backlog )
1432 : {
1433 2 : LBASSERT( NULL != _cm_id );
1434 :
1435 2 : if( ::rdma_listen( _cm_id, backlog ))
1436 : {
1437 0 : LBERROR << "rdma_listen : " << lunchbox::sysError << std::endl;
1438 0 : return false;
1439 : }
1440 :
1441 2 : return true;
1442 : }
1443 :
1444 2 : bool RDMAConnection::_migrateId( )
1445 : {
1446 2 : LBASSERT( NULL != _cm_id );
1447 2 : LBASSERT( NULL != _cm );
1448 :
1449 2 : if( ::rdma_migrate_id( _cm_id, _cm ))
1450 : {
1451 0 : LBERROR << "rdma_migrate_id : " << lunchbox::sysError << std::endl;
1452 0 : return false;
1453 : }
1454 :
1455 2 : return true;
1456 : }
1457 :
1458 2 : bool RDMAConnection::_accept( )
1459 : {
1460 : struct rdma_conn_param accept_param;
1461 :
1462 2 : LBASSERT( NULL != _cm_id );
1463 2 : LBASSERT( !_established );
1464 :
1465 2 : ::memset( (void *)&accept_param, 0, sizeof(struct rdma_conn_param) );
1466 :
1467 2 : _cpd.magic = RDMA_PROTOCOL_MAGIC;
1468 2 : _cpd.version = RDMA_PROTOCOL_VERSION;
1469 2 : _cpd.depth = _depth;
1470 2 : accept_param.private_data = reinterpret_cast< const void * >( &_cpd );
1471 2 : accept_param.private_data_len = sizeof(struct RDMAConnParamData);
1472 2 : accept_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
1473 2 : accept_param.responder_resources = RDMA_MAX_RESP_RES;
1474 : // Magic 3-bit value.
1475 : //accept_param.rnr_retry_count = 7;
1476 :
1477 6 : LBINFO << "Accept on source lid : "<< std::showbase
1478 4 : << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
1479 4 : << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
1480 2 : << "from dest lid : "
1481 4 : << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
1482 4 : << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
1483 6 : << std::endl;
1484 :
1485 2 : if( ::rdma_accept( _cm_id, &accept_param ))
1486 : {
1487 0 : LBERROR << "rdma_accept : " << lunchbox::sysError << std::endl;
1488 0 : return false;
1489 : }
1490 :
1491 2 : return _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
1492 : }
1493 :
1494 0 : bool RDMAConnection::_reject( )
1495 : {
1496 0 : LBASSERT( NULL != _cm_id );
1497 0 : LBASSERT( !_established );
1498 :
1499 0 : if( ::rdma_reject( _cm_id, NULL, 0 ))
1500 : {
1501 0 : LBERROR << "rdma_reject : " << lunchbox::sysError << std::endl;
1502 0 : return false;
1503 : }
1504 :
1505 0 : return true;
1506 : }
1507 :
1508 : ////////////////////////////////////////////////////////////////////////////////
1509 :
1510 4 : bool RDMAConnection::_initProtocol( int32_t depth )
1511 : {
1512 4 : if( depth < 2L )
1513 : {
1514 0 : LBERROR << "Invalid queue depth." << std::endl;
1515 0 : return false;
1516 : }
1517 :
1518 4 : _depth = depth;
1519 4 : _writes = 0L;
1520 4 : _fcs = 0L;
1521 4 : _readBytes = 0u;
1522 4 : _wcredits = _depth / 2 - 2;
1523 4 : _fcredits = _depth / 2 + 2;
1524 :
1525 4 : return true;
1526 : }
1527 :
1528 : /* inline */
1529 1034498 : bool RDMAConnection::_needFC( )
1530 : {
1531 : bool bytesAvail;
1532 : #ifdef _WIN32
1533 : lunchbox::ScopedFastRead mutex( _eventLock );
1534 : bytesAvail = ( _availBytes != 0 );
1535 : #else
1536 : pollfd pfd;
1537 1034498 : pfd.fd = _pipe_fd[0];
1538 1034498 : pfd.events = EPOLLIN;
1539 1034498 : pfd.revents = 0;
1540 1034498 : int ret = poll( &pfd, 1, 0 );
1541 1034498 : if ( ret == -1 )
1542 : {
1543 0 : LBERROR << "poll: " << lunchbox::sysError << std::endl;
1544 0 : return true;
1545 : }
1546 1034498 : bytesAvail = ret != 0;
1547 : #endif
1548 1034498 : return ( !bytesAvail || (uint32_t)_writes >= FC_MESSAGE_FREQUENCY );
1549 : }
1550 :
1551 197483 : bool RDMAConnection::_postReceives( const uint32_t count )
1552 : {
1553 197483 : LBASSERT( NULL != _cm_id->qp );
1554 197484 : LBASSERT( count > 0UL );
1555 :
1556 197484 : ibv_sge* sge = new ibv_sge[count];
1557 197484 : ibv_recv_wr* wrs = new ibv_recv_wr[count];
1558 :
1559 760233 : for( uint32_t i = 0UL; i != count; i++ )
1560 : {
1561 562750 : sge[i].addr = (uint64_t)(uintptr_t)_msgbuf.getBuffer( );
1562 562753 : sge[i].length = uint32_t( _msgbuf.getBufferSize( ));
1563 562753 : sge[i].lkey = _msgbuf.getMR( )->lkey;
1564 :
1565 562753 : wrs[i].wr_id = sge[i].addr;
1566 562753 : wrs[i].next = &wrs[i + 1];
1567 562753 : wrs[i].sg_list = &sge[i];
1568 562753 : wrs[i].num_sge = 1;
1569 : }
1570 197483 : wrs[count - 1].next = NULL;
1571 :
1572 : struct ibv_recv_wr *bad_wr;
1573 : const bool error =
1574 197483 : ::rdma_seterrno( ::ibv_post_recv( _cm_id->qp, wrs, &bad_wr ));
1575 :
1576 197484 : delete[] sge;
1577 197484 : delete[] wrs;
1578 :
1579 197484 : if( error )
1580 : {
1581 0 : LBERROR << "ibv_post_recv : " << lunchbox::sysError << std::endl;
1582 0 : return false;
1583 : }
1584 :
1585 197484 : return true;
1586 : }
1587 :
1588 : /* inline */
1589 526159 : void RDMAConnection::_recvRDMAWrite( const uint32_t imm_data )
1590 : {
1591 : union
1592 : {
1593 : uint32_t val;
1594 : RDMAFCImm fc;
1595 : } x;
1596 526159 : x.val = ntohl( imm_data );
1597 :
1598 : // Analysis:
1599 : //
1600 : // Since the ring pointers are circular, a malicious (presumably overflow)
1601 : // value here would at worst only result in us reading arbitrary regions
1602 : // from our sink buffer, not segfaulting. If the other side wanted us to
1603 : // reread a previous message it should just resend it!
1604 526159 : _sinkptr.incrHead( x.fc.bytes_sent );
1605 526159 : _incrAvailableBytes( x.fc.bytes_sent );
1606 :
1607 526159 : _fcredits += x.fc.fcs_received;
1608 526159 : LBASSERTINFO( _fcredits <= _depth, _fcredits << " > " << _depth );
1609 :
1610 526159 : _writes++;
1611 526159 : }
1612 :
1613 : /* inline */
1614 526159 : uint32_t RDMAConnection::_makeImm( const uint32_t b )
1615 : {
1616 : union
1617 : {
1618 : uint32_t val;
1619 : RDMAFCImm fc;
1620 : } x;
1621 :
1622 526159 : x.fc.fcs_received = std::min( MAX_FC, static_cast< uint16_t >( _fcs ));
1623 526159 : _fcs -= x.fc.fcs_received;
1624 526159 : LBASSERT( _fcs >= 0 );
1625 :
1626 526159 : LBASSERT( b <= MAX_BS );
1627 526159 : x.fc.bytes_sent = b;
1628 :
1629 526159 : return htonl( x.val );
1630 : }
1631 :
1632 526159 : bool RDMAConnection::_postRDMAWrite( )
1633 : {
1634 : struct ibv_sge sge;
1635 : struct ibv_send_wr wr;
1636 :
1637 1052318 : sge.addr = (uint64_t)( (uintptr_t)_sourcebuf.getBase( ) +
1638 1052318 : _sourceptr.ptr( _sourceptr.MIDDLE ));
1639 : sge.length = (uint64_t)_sourceptr.available( _sourceptr.HEAD,
1640 526159 : _sourceptr.MIDDLE );
1641 526159 : sge.lkey = _sourcebuf.getMR( )->lkey;
1642 526159 : _sourceptr.incr( _sourceptr.MIDDLE, (uint32_t)sge.length );
1643 :
1644 526159 : wr.wr_id = (uint64_t)sge.length;
1645 526159 : wr.next = NULL;
1646 526159 : wr.sg_list = &sge;
1647 526159 : wr.num_sge = 1;
1648 526159 : wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1649 526159 : wr.send_flags = IBV_SEND_SOLICITED; // Important!
1650 526159 : wr.imm_data = _makeImm( (uint32_t)sge.length );
1651 526159 : wr.wr.rdma.rkey = _rkey;
1652 1052318 : wr.wr.rdma.remote_addr = (uint64_t)( (uintptr_t)_rbase +
1653 1052318 : _rptr.ptr( _rptr.HEAD ));
1654 526159 : _rptr.incrHead( (uint32_t)sge.length );
1655 :
1656 526159 : _wcredits--;
1657 526159 : LBASSERT( _wcredits >= 0L );
1658 :
1659 : struct ibv_send_wr *bad_wr;
1660 526159 : if( ::rdma_seterrno( ::ibv_post_send( _cm_id->qp, &wr, &bad_wr )))
1661 : {
1662 0 : LBERROR << "ibv_post_send : " << lunchbox::sysError << std::endl;
1663 0 : return false;
1664 : }
1665 :
1666 526159 : return true;
1667 : }
1668 :
1669 34618 : bool RDMAConnection::_postMessage( const RDMAMessage &message )
1670 : {
1671 34618 : _fcredits--;
1672 34618 : LBASSERT( _fcredits >= 0L );
1673 :
1674 34618 : if( ::rdma_post_send( _cm_id, (void *)&message, (void *)&message,
1675 : offsetof( RDMAMessage, payload ) + message.length, _msgbuf.getMR( ),
1676 34618 : 0 ))
1677 : {
1678 0 : LBERROR << "rdma_post_send : " << lunchbox::sysError << std::endl;
1679 0 : return false;
1680 : }
1681 :
1682 34618 : return true;
1683 : }
1684 :
1685 34548 : void RDMAConnection::_recvMessage( const RDMAMessage &message )
1686 : {
1687 34548 : switch( message.opcode )
1688 : {
1689 : case FC:
1690 34544 : if( sizeof(struct RDMAFCPayload) == (size_t)message.length )
1691 34544 : _recvFC( message.payload.fc );
1692 : else
1693 0 : LBWARN << "Invalid flow control message received!" << std::endl;
1694 34544 : break;
1695 : case SETUP:
1696 4 : if( sizeof(struct RDMASetupPayload) == (size_t)message.length )
1697 4 : _recvSetup( message.payload.setup );
1698 : else
1699 0 : LBWARN << "Invalid setup message received!" << std::endl;
1700 4 : break;
1701 : default:
1702 0 : LBWARN << "Invalid message received: "
1703 0 : << std::hex << (int)message.opcode << std::dec << std::endl;
1704 : }
1705 34548 : }
1706 :
1707 : /* inline */
1708 34544 : void RDMAConnection::_recvFC( const RDMAFCPayload &fc )
1709 : {
1710 : // Analysis:
1711 : //
1712 : // Since we will only write a maximum of _sourceptr.available( ) bytes
1713 : // to our source buffer, a malicious (presumably overflow) value here would
1714 : // have no chance of causing us to write beyond our buffer as we have local
1715 : // control over those ring pointers. Worst case, we'd and up writing to
1716 : // arbitrary regions of the remote buffer, since this ring pointer is
1717 : // circular as well.
1718 34544 : _rptr.incrTail( fc.bytes_received );
1719 :
1720 34544 : _wcredits += fc.writes_received;
1721 34544 : LBASSERTINFO( _wcredits <= _depth, _wcredits << " > " << _depth );
1722 :
1723 34544 : _fcs++;
1724 34544 : }
1725 :
1726 34614 : bool RDMAConnection::_postFC()
1727 : {
1728 : RDMAMessage &message =
1729 34614 : *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
1730 :
1731 34614 : message.opcode = FC;
1732 34614 : message.length = (uint8_t)sizeof(struct RDMAFCPayload);
1733 :
1734 34614 : message.payload.fc.bytes_received = _readBytes;
1735 34614 : message.payload.fc.writes_received = _writes;
1736 34614 : _writes -= message.payload.fc.writes_received;
1737 34614 : _readBytes = 0;
1738 :
1739 34614 : LBASSERT( _writes >= 0 );
1740 34614 : return _postMessage( message );
1741 : }
1742 :
1743 4 : void RDMAConnection::_recvSetup( const RDMASetupPayload &setup )
1744 : {
1745 : // Analysis:
1746 : //
1747 : // Malicious values here would only affect the receiver, we're willing
1748 : // to RDMA write to anywhere specified!
1749 4 : _rbase = setup.rbase;
1750 4 : _rptr.clear( setup.rlen );
1751 4 : _rkey = setup.rkey;
1752 :
1753 4 : LBVERB << "RDMA MR: " << std::showbase
1754 0 : << std::dec << setup.rlen << " @ "
1755 4 : << std::hex << setup.rbase << std::dec << std::endl;
1756 4 : }
1757 :
1758 4 : bool RDMAConnection::_postSetup( )
1759 : {
1760 : RDMAMessage &message =
1761 4 : *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
1762 :
1763 4 : message.opcode = SETUP;
1764 4 : message.length = (uint8_t)sizeof(struct RDMASetupPayload);
1765 :
1766 4 : message.payload.setup.rbase = (uint64_t)(uintptr_t)_sinkbuf.getBase( );
1767 4 : message.payload.setup.rlen = (uint64_t)_sinkbuf.getSize( );
1768 4 : message.payload.setup.rkey = _sinkbuf.getMR( )->rkey;
1769 :
1770 4 : return _postMessage( message );
1771 : }
1772 :
1773 4 : bool RDMAConnection::_waitRecvSetup( )
1774 : {
1775 4 : lunchbox::Clock clock;
1776 4 : const int64_t start = clock.getTime64( );
1777 4 : const uint32_t timeout = Global::getTimeout( );
1778 4 : eventset events;
1779 :
1780 : retry:
1781 54 : if( !_checkDisconnected( events ))
1782 : {
1783 0 : LBERROR << "Error while checking event state." << std::endl;
1784 0 : return false;
1785 : }
1786 :
1787 54 : if( !_established )
1788 : {
1789 0 : LBERROR << "Disconnected while waiting for setup message." << std::endl;
1790 0 : return false;
1791 : }
1792 :
1793 54 : if( !_checkCQ( true ))
1794 : {
1795 0 : LBERROR << "Error while polling receive completion queue." << std::endl;
1796 0 : return false;
1797 : }
1798 :
1799 54 : if( 0ULL == _rkey )
1800 : {
1801 50 : if( LB_TIMEOUT_INDEFINITE != timeout )
1802 : {
1803 50 : if(( clock.getTime64( ) - start ) > timeout )
1804 : {
1805 0 : LBERROR << "Timed out waiting for setup message." << std::endl;
1806 0 : return false;
1807 : }
1808 : }
1809 :
1810 50 : lunchbox::Thread::yield( );
1811 50 : goto retry;
1812 : }
1813 :
1814 4 : return true;
1815 : }
1816 :
1817 : ////////////////////////////////////////////////////////////////////////////////
1818 :
1819 6 : bool RDMAConnection::_createNotifier( )
1820 : {
1821 6 : LBASSERT( _notifier < 0 );
1822 : #ifdef _WIN32
1823 : _notifier = CreateEvent( 0, TRUE, FALSE, 0 );
1824 : return true;
1825 : #else
1826 6 : _notifier = ::epoll_create( 1 );
1827 6 : if( _notifier < 0 )
1828 : {
1829 0 : LBERROR << "epoll_create1 : " << lunchbox::sysError << std::endl;
1830 0 : return false;
1831 : }
1832 :
1833 6 : return true;
1834 : #endif
1835 : }
1836 :
1837 6 : void RDMAConnection::_updateNotifier()
1838 : {
1839 : #ifdef _WIN32
1840 : lunchbox::ScopedFastWrite mutex( _eventLock );
1841 : if ( _availBytes == 0 && !_cm->channel.Head &&
1842 : ( !_cc || !_cc->comp_channel.Head ))
1843 : ResetEvent( _notifier );
1844 : #else
1845 6 : eventset events;
1846 6 : _checkEvents( events );
1847 : #endif
1848 6 : }
1849 :
1850 1520954 : bool RDMAConnection::_checkEvents( eventset &events )
1851 : {
1852 1520954 : events.reset( );
1853 :
1854 : #ifdef _WIN32
1855 : lunchbox::ScopedFastRead mutex( _eventLock );
1856 : if ( _availBytes != 0 )
1857 : events.set( BUF_EVENT );
1858 : if ( _cc && ( _cc->comp_channel.Head != 0 ))
1859 : events.set( CQ_EVENT );
1860 : if ( _cm->channel.Head )
1861 : events.set( CM_EVENT );
1862 :
1863 : return true;
1864 : #else
1865 : struct epoll_event evts[3];
1866 :
1867 1523114 : int nfds = TEMP_FAILURE_RETRY( ::epoll_wait( _notifier, evts, 3, 0 ));
1868 1517426 : if( nfds < 0 )
1869 : {
1870 24 : LBERROR << "epoll_wait : " << lunchbox::sysError << std::endl;
1871 24 : return false;
1872 : }
1873 :
1874 2095497 : for( int i = 0; i < nfds; i++ )
1875 : {
1876 578095 : const int fd = evts[i].data.fd;
1877 578095 : if(( _pipe_fd[0] >= 0 ) && ( fd == _pipe_fd[0] ))
1878 508591 : events.set( BUF_EVENT );
1879 69504 : else if( _cc && ( fd == _cc->fd ))
1880 69492 : events.set( CQ_EVENT );
1881 12 : else if( _cm && ( fd == _cm->fd ))
1882 12 : events.set( CM_EVENT );
1883 : else
1884 0 : LBUNREACHABLE;
1885 : }
1886 :
1887 1517402 : return true;
1888 : #endif
1889 : }
1890 :
1891 1429645 : bool RDMAConnection::_checkDisconnected( eventset &events )
1892 : {
1893 1429645 : lunchbox::ScopedWrite poll_mutex( _poll_lock );
1894 :
1895 1431075 : if( !_checkEvents( events ))
1896 : {
1897 0 : LBERROR << "Error while checking event state." << std::endl;
1898 0 : return false;
1899 : }
1900 :
1901 1427881 : if( events.test( CM_EVENT ))
1902 : {
1903 2 : if( !_doCMEvent( RDMA_CM_EVENT_DISCONNECTED ))
1904 : {
1905 0 : LBERROR << "Unexpected connection manager event." << std::endl;
1906 0 : return false;
1907 : }
1908 :
1909 2 : LBASSERT( !_established );
1910 : }
1911 :
1912 1427717 : return true;
1913 : }
1914 :
1915 4 : bool RDMAConnection::_createBytesAvailableFD( )
1916 : {
1917 : #ifndef _WIN32
1918 4 : if ( pipe( _pipe_fd ) == -1 )
1919 : {
1920 0 : LBERROR << "pipe: " << lunchbox::sysError << std::endl;
1921 0 : return false;
1922 : }
1923 :
1924 4 : LBASSERT( _pipe_fd[0] >= 0 && _pipe_fd[1] >= 0);
1925 :
1926 : // Use the event fd to signal Collage of bytes remaining.
1927 : struct epoll_event evctl;
1928 4 : ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
1929 4 : evctl.events = EPOLLIN;
1930 4 : evctl.data.fd = _pipe_fd[0];
1931 4 : if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
1932 : {
1933 0 : LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
1934 0 : return false;
1935 : }
1936 : #endif
1937 4 : return true;
1938 : }
1939 :
1940 1018877 : bool RDMAConnection::_incrAvailableBytes( const uint64_t b )
1941 : {
1942 : #ifdef _WIN32
1943 : lunchbox::ScopedFastWrite mutex( _eventLock );
1944 : _availBytes += b;
1945 : ::SetEvent( _notifier );
1946 : #else
1947 1018877 : if( ::write( _pipe_fd[1], (const void *)&b, sizeof(b) ) != sizeof(b) )
1948 : {
1949 0 : LBERROR << "write : " << lunchbox::sysError << std::endl;
1950 0 : return false;
1951 : }
1952 : #endif
1953 1018877 : return true;
1954 : }
1955 :
1956 508591 : uint64_t RDMAConnection::_getAvailableBytes( )
1957 : {
1958 508591 : uint64_t available_bytes = 0;
1959 : #ifdef _WIN32
1960 : lunchbox::ScopedFastWrite mutex( _eventLock );
1961 : available_bytes = static_cast<uint64_t>( _availBytes );
1962 : _availBytes = 0;
1963 : return available_bytes;
1964 : #else
1965 508591 : uint64_t currBytes = 0;
1966 : ssize_t count;
1967 : pollfd pfd;
1968 508591 : pfd.fd = _pipe_fd[0];
1969 508591 : pfd.events = EPOLLIN;
1970 :
1971 1018877 : do
1972 : {
1973 1018877 : count = ::read( _pipe_fd[0], (void *)&currBytes, sizeof( currBytes ));
1974 1018877 : if ( count > 0 && count < (ssize_t)sizeof( currBytes ) )
1975 0 : return 0ULL;
1976 1018877 : available_bytes += currBytes;
1977 1018877 : pfd.revents = 0;
1978 1018877 : if ( ::poll( &pfd, 1, 0 ) == -1 )
1979 : {
1980 0 : LBERROR << "poll : " << lunchbox::sysError << std::endl;
1981 0 : return 0ULL;
1982 : }
1983 1018877 : } while ( pfd.revents > 0 );
1984 :
1985 508591 : if ( count == -1 )
1986 : {
1987 0 : LBERROR << "read : " << lunchbox::sysError << std::endl;
1988 0 : return 0ULL;
1989 : }
1990 :
1991 508591 : LBASSERT( available_bytes > 0ULL );
1992 508591 : return available_bytes;
1993 : #endif
1994 : }
1995 :
1996 10 : bool RDMAConnection::_waitForCMEvent( enum rdma_cm_event_type expected )
1997 : {
1998 10 : lunchbox::Clock clock;
1999 10 : const int64_t start = clock.getTime64( );
2000 10 : const uint32_t timeout = Global::getTimeout( );
2001 10 : bool done = false;
2002 10 : eventset events;
2003 :
2004 : retry:
2005 89874 : if( !_checkEvents( events ))
2006 : {
2007 0 : LBERROR << "Error while checking event state." << std::endl;
2008 0 : return false;
2009 : }
2010 :
2011 89874 : if( events.test( CM_EVENT ))
2012 : {
2013 10 : done = _doCMEvent( expected );
2014 10 : if( !done )
2015 : {
2016 0 : LBERROR << "Unexpected connection manager event." << std::endl;
2017 0 : return false;
2018 : }
2019 : }
2020 :
2021 89874 : if( !done )
2022 : {
2023 89864 : if( LB_TIMEOUT_INDEFINITE != timeout )
2024 : {
2025 89864 : if(( clock.getTime64( ) - start ) > timeout )
2026 : {
2027 0 : LBERROR << "Timed out waiting for setup message." << std::endl;
2028 0 : return false;
2029 : }
2030 : }
2031 :
2032 89866 : lunchbox::Thread::yield( );
2033 89864 : goto retry;
2034 : }
2035 :
2036 10 : return true;
2037 : }
2038 :
2039 12 : bool RDMAConnection::_doCMEvent( enum rdma_cm_event_type expected )
2040 : {
2041 12 : bool ok = false;
2042 : struct rdma_cm_event *event;
2043 :
2044 12 : if( ::rdma_get_cm_event( _cm, &event ))
2045 : {
2046 0 : LBERROR << "rdma_get_cm_event : " << lunchbox::sysError << std::endl;
2047 0 : return false;
2048 : }
2049 :
2050 12 : ok = ( event->event == expected );
2051 :
2052 : #ifndef NDEBUG
2053 12 : if( ok )
2054 : {
2055 12 : LBVERB << (void *)this
2056 0 : << " (" << _addr << ":" << _serv << ")"
2057 0 : << " event : " << ::rdma_event_str( event->event )
2058 12 : << std::endl;
2059 : }
2060 : else
2061 : {
2062 0 : LBINFO << (void *)this
2063 0 : << " (" << _addr << ":" << _serv << ")"
2064 0 : << " event : " << ::rdma_event_str( event->event )
2065 0 : << " expected: " << ::rdma_event_str( expected )
2066 0 : << std::endl;
2067 : }
2068 : #endif
2069 :
2070 12 : if( ok && ( RDMA_CM_EVENT_DISCONNECTED == event->event ))
2071 2 : _established = false;
2072 :
2073 12 : if( ok && ( RDMA_CM_EVENT_ESTABLISHED == event->event ))
2074 : {
2075 4 : _established = true;
2076 :
2077 4 : struct rdma_conn_param *cp = &event->param.conn;
2078 :
2079 4 : ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
2080 : // Note that the actual amount of data transferred to the remote side
2081 : // is transport dependent and may be larger than that requested.
2082 4 : if( cp->private_data_len >= sizeof(RDMAConnParamData) )
2083 : _cpd = *reinterpret_cast< const RDMAConnParamData * >(
2084 2 : cp->private_data );
2085 : }
2086 :
2087 12 : if( ok && ( RDMA_CM_EVENT_CONNECT_REQUEST == event->event ))
2088 : {
2089 2 : _new_cm_id = event->id;
2090 :
2091 2 : struct rdma_conn_param *cp = &event->param.conn;
2092 :
2093 2 : ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
2094 : // TODO : Not sure what happens when initiator sent ai_connect data
2095 : // (assuming the underlying transport doesn't strip it)?
2096 2 : if( cp->private_data_len >= sizeof(RDMAConnParamData) )
2097 : _cpd = *reinterpret_cast< const RDMAConnParamData * >(
2098 2 : cp->private_data );
2099 : }
2100 :
2101 12 : if( RDMA_CM_EVENT_REJECTED == event->event )
2102 0 : LBINFO << "Connection reject status : " << event->status << std::endl;
2103 :
2104 12 : if( ::rdma_ack_cm_event( event ))
2105 0 : LBWARN << "rdma_ack_cm_event : " << lunchbox::sysError << std::endl;
2106 :
2107 12 : return ok;
2108 : }
2109 :
2110 69492 : bool RDMAConnection::_rearmCQ( )
2111 : {
2112 : struct ibv_cq *ev_cq;
2113 : void *ev_ctx;
2114 :
2115 : #ifdef _WIN32
2116 : lunchbox::ScopedFastWrite mutex( _eventLock );
2117 : #endif
2118 69492 : if( ::ibv_get_cq_event( _cc, &ev_cq, &ev_ctx ))
2119 : {
2120 0 : LBERROR << "ibv_get_cq_event : " << lunchbox::sysError << std::endl;
2121 0 : return false;
2122 : }
2123 :
2124 : // http://lists.openfabrics.org/pipermail/general/2008-November/055237.html
2125 69492 : _completions++;
2126 69492 : if( std::numeric_limits< unsigned int >::max( ) == _completions )
2127 : {
2128 0 : ::ibv_ack_cq_events( _cq, _completions );
2129 0 : _completions = 0U;
2130 : }
2131 :
2132 : // Solicited only!
2133 69492 : if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
2134 : {
2135 0 : LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
2136 0 : return false;
2137 : }
2138 :
2139 69492 : return true;
2140 : }
2141 :
2142 1429516 : bool RDMAConnection::_checkCQ( bool drain )
2143 : {
2144 : uint32_t num_recvs;
2145 : int count;
2146 :
2147 1429516 : lunchbox::ScopedWrite poll_mutex( _poll_lock );
2148 :
2149 1431581 : if( NULL == _cq )
2150 0 : return true;
2151 :
2152 : repoll:
2153 : /* CHECK RECEIVE COMPLETIONS */
2154 1596942 : count = ::ibv_poll_cq( _cq, _depth, _wcs );
2155 1606322 : if( count < 0 )
2156 : {
2157 0 : LBERROR << "ibv_poll_cq : " << lunchbox::sysError << std::endl;
2158 0 : return false;
2159 : }
2160 :
2161 1606322 : num_recvs = 0UL;
2162 2695016 : for( int i = 0; i != count ; i++ )
2163 : {
2164 1099067 : struct ibv_wc &wc = _wcs[i];
2165 :
2166 1099067 : if( IBV_WC_SUCCESS != wc.status )
2167 : {
2168 : // Non-fatal.
2169 0 : if( IBV_WC_WR_FLUSH_ERR == wc.status )
2170 0 : continue;
2171 :
2172 0 : LBWARN << (void *)this << " !IBV_WC_SUCCESS : " << std::showbase
2173 0 : << std::hex << "wr_id = " << wc.wr_id
2174 0 : << ", status = \"" << ::ibv_wc_status_str( wc.status ) << "\""
2175 0 : << std::dec << " (" << (unsigned int)wc.status << ")"
2176 0 : << std::hex << ", vendor_err = " << wc.vendor_err
2177 0 : << std::dec << std::endl;
2178 :
2179 : // All others are fatal.
2180 0 : return false;
2181 : }
2182 :
2183 1099067 : LBASSERT( IBV_WC_SUCCESS == wc.status );
2184 :
2185 : #ifdef _WIN32 //----------------------------------------------------------------
2186 : // WINDOWS IBV API WORKAROUND.
2187 : // TODO: remove this as soon as IBV API is fixed
2188 : if ( wc.opcode == IBV_WC_RECV && wc.wc_flags == IBV_WC_WITH_IMM )
2189 : wc.opcode = IBV_WC_RECV_RDMA_WITH_IMM;
2190 : #endif //-----------------------------------------------------------------------
2191 :
2192 1121371 : if( IBV_WC_RECV_RDMA_WITH_IMM == wc.opcode )
2193 526159 : _recvRDMAWrite( wc.imm_data );
2194 595212 : else if( IBV_WC_RECV == wc.opcode )
2195 34548 : _recvMessage( *reinterpret_cast< RDMAMessage * >( wc.wr_id ));
2196 560664 : else if( IBV_WC_SEND == wc.opcode )
2197 34618 : _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
2198 526046 : else if( IBV_WC_RDMA_WRITE == wc.opcode )
2199 526046 : _sourceptr.incrTail( (uint32_t)wc.wr_id );
2200 : else
2201 0 : LBUNREACHABLE;
2202 :
2203 1088695 : if( IBV_WC_RECV & wc.opcode )
2204 : {
2205 560707 : _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
2206 : // All receive completions need to be reposted.
2207 560706 : num_recvs++;
2208 : }
2209 : }
2210 :
2211 1595949 : if(( num_recvs > 0UL ) && !_postReceives( num_recvs ))
2212 0 : return false;
2213 :
2214 1595949 : if( drain && ( count > 0 ))
2215 165361 : goto repoll;
2216 :
2217 1430588 : return true;
2218 : }
2219 :
2220 : /* inline */
2221 508593 : uint32_t RDMAConnection::_drain( void *buffer, const uint32_t bytes )
2222 : {
2223 508593 : const uint32_t b = std::min( bytes, _sinkptr.available( ));
2224 1017186 : ::memcpy( buffer, (const void *)((uintptr_t)_sinkbuf.getBase( ) +
2225 1525779 : _sinkptr.tail( )), b );
2226 508593 : _sinkptr.incrTail( b );
2227 508593 : return b;
2228 : }
2229 :
2230 : /* inline */
2231 905785 : uint32_t RDMAConnection::_fill( const void *buffer, const uint32_t bytes )
2232 : {
2233 905785 : uint32_t b = std::min( bytes, std::min( _sourceptr.negAvailable( ),
2234 1811570 : _rptr.negAvailable( )));
2235 : #ifndef WRAP
2236 905785 : b = std::min( b, (uint32_t)(_sourcebuf.getSize() -
2237 905785 : _sourceptr.ptr( _sourceptr.HEAD )));
2238 : #endif
2239 1811570 : ::memcpy( (void *)((uintptr_t)_sourcebuf.getBase( ) +
2240 2717355 : _sourceptr.ptr( _sourceptr.HEAD )), buffer, b );
2241 905785 : _sourceptr.incrHead( b );
2242 905785 : return b;
2243 : }
2244 :
2245 0 : Connection::Notifier RDMAConnection::getNotifier() const
2246 : {
2247 0 : return _notifier;
2248 : }
2249 :
2250 : #ifdef _WIN32
2251 : void RDMAConnection::_triggerNotifierCQ( RDMAConnection* conn )
2252 : {
2253 : conn->_triggerNotifierWorker( CQ_EVENT );
2254 : }
2255 :
2256 : void RDMAConnection::_triggerNotifierCM( RDMAConnection* conn )
2257 : {
2258 : conn->_triggerNotifierWorker( CM_EVENT );
2259 : }
2260 :
2261 : void RDMAConnection::_triggerNotifierWorker( Events event )
2262 : {
2263 : lunchbox::ScopedFastWrite mutex( _eventLock );
2264 : COMP_CHANNEL* chan = event == CQ_EVENT ? &_cc->comp_channel : &_cm->channel;
2265 : EnterCriticalSection( &chan->Lock );
2266 : ResetEvent( chan->Event );
2267 : if ( chan->Head )
2268 : SetEvent( _notifier );
2269 : LeaveCriticalSection( &chan->Lock );
2270 : }
2271 : #endif
2272 :
2273 : //////////////////////////////////////////////////////////////////////////////
2274 :
2275 0 : void RDMAConnection::_showStats( )
2276 : {
2277 0 : LBVERB << std::dec
2278 0 : << "reads = " << _stats.reads
2279 0 : << ", buffer_empty = " << _stats.buffer_empty
2280 0 : << ", no_credits_fc = " << _stats.no_credits_fc
2281 0 : << ", writes = " << _stats.writes
2282 0 : << ", buffer_full = " << _stats.buffer_full
2283 0 : << ", no_credits_rdma = " << _stats.no_credits_rdma
2284 0 : << std::endl;
2285 0 : }
2286 :
2287 : //////////////////////////////////////////////////////////////////////////////
2288 :
2289 6 : BufferPool::BufferPool( size_t buffer_size )
2290 : : _buffer_size( buffer_size )
2291 : , _num_bufs( 0 )
2292 : , _buffer( NULL )
2293 : , _mr( NULL )
2294 6 : , _ring( 0 )
2295 : {
2296 6 : }
2297 :
2298 12 : BufferPool::~BufferPool( )
2299 : {
2300 6 : clear( );
2301 6 : }
2302 :
2303 20 : void BufferPool::clear( )
2304 : {
2305 20 : _num_bufs = 0;
2306 20 : _ring.clear( _num_bufs );
2307 :
2308 20 : if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
2309 0 : LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
2310 20 : _mr = NULL;
2311 :
2312 20 : if( NULL != _buffer )
2313 : #ifdef _WIN32
2314 : _aligned_free( _buffer );
2315 : #else
2316 4 : ::free( _buffer );
2317 : #endif
2318 20 : _buffer = NULL;
2319 20 : }
2320 :
2321 4 : bool BufferPool::resize( ibv_pd *pd, uint32_t num_bufs )
2322 : {
2323 4 : clear( );
2324 :
2325 4 : if( num_bufs )
2326 : {
2327 4 : _num_bufs = num_bufs;
2328 4 : _ring.clear( _num_bufs );
2329 :
2330 : #ifdef _WIN32
2331 : _buffer = _aligned_malloc((size_t)( _num_bufs * _buffer_size ),
2332 : boost::interprocess::mapped_region::get_page_size());
2333 : if ( !_buffer )
2334 : {
2335 : LBERROR << "_aligned_malloc : Couldn't allocate aligned memory. "
2336 : << lunchbox::sysError << std::endl;
2337 : return false;
2338 : }
2339 : #else
2340 8 : if( ::posix_memalign( &_buffer, (size_t)::getpagesize( ),
2341 8 : (size_t)( _num_bufs * _buffer_size )))
2342 : {
2343 0 : LBERROR << "posix_memalign : " << lunchbox::sysError << std::endl;
2344 0 : return false;
2345 : }
2346 : #endif
2347 4 : ::memset( _buffer, 0xff, (size_t)( _num_bufs * _buffer_size ));
2348 : _mr = ::ibv_reg_mr( pd, _buffer, (size_t)( _num_bufs * _buffer_size ),
2349 4 : IBV_ACCESS_LOCAL_WRITE );
2350 4 : if( NULL == _mr )
2351 : {
2352 0 : LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
2353 0 : return false;
2354 : }
2355 :
2356 4100 : for( uint32_t i = 0; i != _num_bufs; i++ )
2357 4096 : _ring.put( i );
2358 : }
2359 :
2360 4 : return true;
2361 : }
2362 :
2363 : //////////////////////////////////////////////////////////////////////////////
2364 :
2365 12 : RingBuffer::RingBuffer( int access )
2366 : : _access( access )
2367 : , _size( 0 )
2368 : #ifdef _WIN32
2369 : , _mapping( 0 )
2370 : , _map( 0 )
2371 : #else
2372 : , _map( MAP_FAILED )
2373 : #endif
2374 12 : , _mr( 0 )
2375 : {
2376 12 : }
2377 :
2378 12 : RingBuffer::~RingBuffer( )
2379 : {
2380 12 : clear( );
2381 12 : }
2382 :
2383 40 : void RingBuffer::clear( )
2384 : {
2385 40 : if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
2386 0 : LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
2387 40 : _mr = NULL;
2388 :
2389 : #ifdef _WIN32
2390 : if ( _map )
2391 : {
2392 : UnmapViewOfFile( static_cast<char*>( _map ));
2393 : UnmapViewOfFile( static_cast<char*>( _map ) + _size );
2394 : }
2395 : if ( _mapping )
2396 : CloseHandle( _mapping );
2397 :
2398 : _map = 0;
2399 : _mapping = 0;
2400 : #else
2401 40 : if(( MAP_FAILED != _map ) && ::munmap( _map, _size << 1 ))
2402 0 : LBWARN << "munmap : " << lunchbox::sysError << std::endl;
2403 40 : _map = MAP_FAILED;
2404 : #endif
2405 40 : _size = 0;
2406 40 : }
2407 :
2408 8 : bool RingBuffer::resize( ibv_pd *pd, size_t size )
2409 : {
2410 8 : bool ok = false;
2411 8 : int fd = -1;
2412 :
2413 8 : clear( );
2414 :
2415 8 : if( size )
2416 : {
2417 : #ifdef _WIN32
2418 : uint32_t num_retries = RINGBUFFER_ALLOC_RETRIES;
2419 : while (!_map && num_retries-- != 0)
2420 : {
2421 : void *target_addr = determineViableAddr( size * 2 );
2422 : if (target_addr)
2423 : allocAt( size, target_addr );
2424 : }
2425 :
2426 : if ( !_map )
2427 : {
2428 : LBERROR << "Couldn't allocate desired RingBuffer memory after "
2429 : << RINGBUFFER_ALLOC_RETRIES << " retries." << std::endl;
2430 : }
2431 : else
2432 : {
2433 : LBVERB << "Allocated RDMA Ringbuffer memory in "
2434 : << ( RINGBUFFER_ALLOC_RETRIES - num_retries ) << " tries."
2435 : << std::endl;
2436 : ok = true;
2437 : }
2438 :
2439 : #else
2440 : void *addr1, *addr2;
2441 8 : char path[] = "/dev/shm/co-rdma-buffer-XXXXXX";
2442 :
2443 8 : _size = size;
2444 :
2445 8 : fd = ::mkstemp( path );
2446 8 : if( fd < 0 )
2447 : {
2448 0 : LBERROR << "mkstemp : " << lunchbox::sysError << std::endl;
2449 0 : goto out;
2450 : }
2451 :
2452 8 : if( ::unlink( path ))
2453 : {
2454 0 : LBERROR << "unlink : " << lunchbox::sysError << std::endl;
2455 0 : goto out;
2456 : }
2457 :
2458 8 : if( ::ftruncate( fd, _size ))
2459 : {
2460 0 : LBERROR << "ftruncate : " << lunchbox::sysError << std::endl;
2461 0 : goto out;
2462 : }
2463 :
2464 : _map = ::mmap( NULL, _size << 1,
2465 : PROT_NONE,
2466 8 : MAP_ANONYMOUS | MAP_PRIVATE, -1, 0 );
2467 8 : if( MAP_FAILED == _map )
2468 : {
2469 0 : LBERROR << "mmap : " << lunchbox::sysError << std::endl;
2470 0 : goto out;
2471 : }
2472 :
2473 : addr1 = ::mmap( _map, _size,
2474 : PROT_READ | PROT_WRITE,
2475 8 : MAP_FIXED | MAP_SHARED, fd, 0 );
2476 8 : if( MAP_FAILED == addr1 )
2477 : {
2478 0 : LBERROR << "mmap : " << lunchbox::sysError << std::endl;
2479 0 : goto out;
2480 : }
2481 :
2482 8 : addr2 = ::mmap( (void *)( (uintptr_t)_map + _size ), _size,
2483 : PROT_READ | PROT_WRITE,
2484 16 : MAP_FIXED | MAP_SHARED, fd, 0 );
2485 8 : if( MAP_FAILED == addr2 )
2486 : {
2487 0 : LBERROR << "mmap : " << lunchbox::sysError << std::endl;
2488 0 : goto out;
2489 : }
2490 :
2491 8 : LBASSERT( addr1 == _map );
2492 8 : LBASSERT( addr2 == (void *)( (uintptr_t)_map + _size ));
2493 :
2494 : #endif
2495 : #ifdef WRAP
2496 : _mr = ::ibv_reg_mr( pd, _map, _size << 1, _access );
2497 : #else
2498 8 : _mr = ::ibv_reg_mr( pd, _map, _size, _access );
2499 : #endif
2500 :
2501 8 : if( NULL == _mr )
2502 : {
2503 0 : LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
2504 0 : goto out;
2505 : }
2506 :
2507 8 : ::memset( _map, 0, _size );
2508 8 : *reinterpret_cast< uint8_t * >( _map ) = 0x45;
2509 8 : LBASSERT( 0x45 ==
2510 : *reinterpret_cast< uint8_t * >( (uintptr_t)_map + _size ));
2511 : }
2512 :
2513 8 : ok = true;
2514 :
2515 : out:
2516 : #ifndef _WIN32
2517 8 : if(( fd >= 0 ) && TEMP_FAILURE_RETRY( ::close( fd )))
2518 0 : LBWARN << "close : " << lunchbox::sysError << std::endl;
2519 : #endif
2520 8 : return ok;
2521 : }
2522 :
2523 : #ifdef _WIN32
2524 : void* RingBuffer::determineViableAddr( size_t size )
2525 : {
2526 : void *ptr = VirtualAlloc(0, size, MEM_RESERVE, PAGE_NOACCESS);
2527 : if (!ptr)
2528 : return 0;
2529 :
2530 : VirtualFree(ptr, 0, MEM_RELEASE);
2531 : return ptr;
2532 : }
2533 :
2534 : void RingBuffer::allocAt( size_t size, void* desiredAddr )
2535 : {
2536 : // if we already hold one allocation, refuse to make another.
2537 : LBASSERT( !_map );
2538 : LBASSERT( !_mapping );
2539 : if ( _map || _mapping )
2540 : return;
2541 :
2542 : // is ring_size a multiple of 64k? if not, this won't ever work!
2543 : if (( size & 0xffff ) != 0 )
2544 : return;
2545 :
2546 : // try to allocate and map our space
2547 : size_t alloc_size = size * 2;
2548 : _mapping = CreateFileMappingA( INVALID_HANDLE_VALUE,
2549 : 0,
2550 : PAGE_READWRITE,
2551 : (unsigned long long)alloc_size >> 32,
2552 : alloc_size & 0xffffffffu,
2553 : 0 );
2554 :
2555 : if ( !_mapping )
2556 : {
2557 : LBERROR << "CreateFileMappingA failed" << std::endl;
2558 : goto err;
2559 : }
2560 :
2561 : _map = MapViewOfFileEx( _mapping,
2562 : FILE_MAP_ALL_ACCESS,
2563 : 0, 0,
2564 : size,
2565 : desiredAddr );
2566 :
2567 : if ( !_map )
2568 : {
2569 : LBERROR << "First MapViewOfFileEx failed" << std::endl;
2570 : goto err;
2571 : }
2572 :
2573 : if (!MapViewOfFileEx( _mapping,
2574 : FILE_MAP_ALL_ACCESS,
2575 : 0, 0,
2576 : size,
2577 : (char *)desiredAddr + size))
2578 : {
2579 : LBERROR << "Second MapViewOfFileEx failed" << std::endl;
2580 : goto err;
2581 : }
2582 :
2583 : _size = size;
2584 : return;
2585 : err:
2586 : // something went wrong - clean up
2587 : clear();
2588 : }
2589 : #endif
2590 :
2591 :
2592 66 : } // namespace co
|