Line data Source code
1 : // -*- mode: c++ -*-
2 : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
3 : * United States Naval Research Laboratory
4 : * 2012-2013, Stefan Eilemann <eile@eyescale.ch>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 : #include "rdmaConnection.h"
22 :
23 : #include "connectionType.h"
24 : #include "connectionDescription.h"
25 : #include "global.h"
26 :
27 : #include <lunchbox/clock.h>
28 :
29 : #ifdef _WIN32
30 : # pragma warning( disable : 4018 )
31 : # include <boost/interprocess/mapped_region.hpp>
32 : # pragma warning( default : 4018 )
33 : #else
34 : # include <arpa/inet.h>
35 : # include <sys/epoll.h>
36 : # include <sys/mman.h>
37 : # include <poll.h>
38 : # include <unistd.h>
39 : #endif
40 :
41 : #include <errno.h>
42 : #include <fcntl.h>
43 : #include <limits>
44 : #include <sstream>
45 : #include <stddef.h>
46 :
47 : #include <rdma/rdma_verbs.h>
48 :
49 : #define IPV6_DEFAULT 0
50 :
51 : #define RDMA_PROTOCOL_MAGIC 0xC0
52 : #define RDMA_PROTOCOL_VERSION 0x03
53 :
54 : namespace co
55 : {
56 : //namespace { static const uint64_t ONE = 1ULL; }
57 :
58 : /**
59 : * Message types
60 : */
61 : enum OpCode
62 : {
63 : SETUP = 1 << 0,
64 : FC = 1 << 1,
65 : };
66 :
67 : /**
68 : * Initial setup message used to exchange sink MR parameters.
69 : */
70 : struct RDMASetupPayload
71 : {
72 : uint64_t rbase;
73 : uint64_t rlen;
74 : uint64_t rkey;
75 : };
76 :
77 : /**
78 : * "ACK" messages sent after read, tells source about receive progress.
79 : */
80 : struct RDMAFCPayload
81 : {
82 : uint32_t bytes_received;
83 : uint32_t writes_received;
84 : };
85 :
86 : /**
87 : * Payload wrapper
88 : */
89 : struct RDMAMessage
90 : {
91 : enum OpCode opcode;
92 : uint8_t length;
93 : union
94 : {
95 : struct RDMASetupPayload setup;
96 : struct RDMAFCPayload fc;
97 : } payload;
98 : };
99 :
100 : /**
101 : * "IMM" data sent with RDMA write, tells sink about send progress.
102 : */
103 : struct RDMAFCImm
104 : {
105 : uint32_t bytes_sent:28;
106 : uint32_t fcs_received:4;
107 : };
108 :
109 : namespace {
110 : // We send a max of 28 bits worth of byte counts per RDMA write.
111 : static const uint64_t MAX_BS = (( 2 << ( 28 - 1 )) - 1 );
112 : // We send a max of four bits worth of fc counts per RDMA write.
113 : static const uint16_t MAX_FC = (( 2 << ( 4 - 1 )) - 1 );
114 :
115 : #ifdef _WIN32
116 : static const uint32_t RINGBUFFER_ALLOC_RETRIES = 8;
117 : static const uint32_t WINDOWS_CONNECTION_BACKLOG = 1024;
118 : #endif
119 : static const uint32_t FC_MESSAGE_FREQUENCY = 12;
120 : }
121 :
122 : /**
123 : * An RDMA connection implementation.
124 : *
125 : * The protocol is simple, e.g.:
126 : *
127 : * initiator target
128 : * -----------------------------------------------------
129 : * resolve/bind/listen
130 : * resolve/prepost/connect
131 : * prepost/accept
132 : * send setup <-------> send setup
133 : * wait for setup wait for setup
134 : * RDMA_WRITE_WITH_IMM WR -------> RDMA_WRITE(DATA) WC
135 : * RECV(FC) WC <------- SEND WR
136 : * .
137 : * .
138 : * .
139 : *
140 : * The setup phase exchanges the MR parameters of a fixed size circular buffer
141 : * to which remote writes are sent. Sender tracks available space on the
142 : * receiver by accepting "Flow Control" messages that update the tail pointer
143 : * of the local "view" of the remote sink MR.
144 : *
145 : * Once setup is complete, either side may begin operations on the other's MR
146 : * (the initiator doesn't have to send first, as in the above example).
147 : *
148 : * If either credits or buffer space are exhausted, sender will spin waiting
149 : * for flow control messages. Receiver will also not send flow control if
150 : * there are no credits available.
151 : *
152 : * One catch is that Collage will only monitor a single "notifier" for events
153 : * and we have three that need to be monitored: one for connection status
154 : * events (the RDMA event channel) - RDMA_CM_EVENT_DISCONNECTED in particular,
155 : * one for the receive completion queue (upon incoming RDMA write), and an
156 : * additional eventfd(2) used to keep the notifier "hot" after partial reads.
157 : * We leverage the feature of epoll(7) in that "If an epoll file descriptor
158 : * has events waiting then it will indicate as being readable".
159 : *
160 : * Quite interesting is the effect of RDMA_RING_BUFFER_SIZE_MB and
161 : * RDMA_SEND_QUEUE_DEPTH depending on the communication pattern. Basically,
162 : * bigger doesn't necessarily equate to faster! The defaults are suited for
163 : * low latency conditions and would need tuning otherwise.
164 : *
165 : * ib_write_bw
166 : * -----------
167 : * #bytes num iterations BW peak[MB/sec] BW average[MB/sec]
168 : * 1048576 10000 3248.10 3247.94
169 : *
170 : * netperf
171 : * -------
172 : * Send perf: 3240.72MB/s (3240.72pps)
173 : * Send perf: 3240.72MB/s (3240.72pps)
174 : * Send perf: 3240.95MB/s (3240.95pps)
175 : */
176 6 : RDMAConnection::RDMAConnection( )
177 : #ifdef _WIN32
178 : : _notifier()
179 : #else
180 : : _notifier( -1 )
181 : #endif
182 6 : , _timeout( Global::getIAttribute( Global::IATTR_RDMA_RESOLVE_TIMEOUT_MS ))
183 : , _rai( NULL )
184 : , _cm( NULL )
185 : , _cm_id( NULL )
186 : , _new_cm_id( NULL )
187 : , _cc( NULL )
188 : , _cq( NULL )
189 : , _pd( NULL )
190 : , _wcs ( 0 )
191 : , _readBytes( 0 )
192 : , _established( false )
193 : , _depth( 0L )
194 : , _writes( 0L )
195 : , _fcs( 0L )
196 : , _wcredits( 0L )
197 : , _fcredits( 0L )
198 : , _completions( 0U )
199 : , _msgbuf( sizeof(RDMAMessage) )
200 : , _sourcebuf( 0 )
201 : , _sourceptr( 0 )
202 : , _sinkbuf( IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE )
203 : , _sinkptr( 0 )
204 : , _rptr( 0UL )
205 : , _rbase( 0ULL )
206 12 : , _rkey( 0ULL )
207 : #ifdef _WIN32
208 : , _availBytes( 0 )
209 : , _eventFlag( 0 )
210 : , _cmWaitObj( 0 )
211 : , _ccWaitObj( 0 )
212 : #endif
213 : {
214 : #ifndef _WIN32
215 6 : _pipe_fd[0] = -1;
216 6 : _pipe_fd[1] = -1;
217 : #endif
218 :
219 6 : ::memset( (void *)&_addr, 0, sizeof(_addr) );
220 6 : ::memset( (void *)&_serv, 0, sizeof(_serv) );
221 :
222 6 : ::memset( (void *)&_cpd, 0, sizeof(struct RDMAConnParamData) );
223 :
224 6 : ConnectionDescriptionPtr description = _getDescription();
225 6 : description->type = CONNECTIONTYPE_RDMA;
226 6 : description->bandwidth = // QDR default, report "actual" 8b/10b bandwidth
227 6 : ( ::ibv_rate_to_mult( IBV_RATE_40_GBPS ) * 2.5 * 1024000 / 8 ) * 0.8;
228 6 : }
229 :
230 2 : bool RDMAConnection::connect( )
231 : {
232 2 : ConstConnectionDescriptionPtr description = getDescription();
233 2 : LBASSERT( CONNECTIONTYPE_RDMA == description->type );
234 :
235 2 : if( !isClosed() || 0u == description->port )
236 0 : return false;
237 :
238 2 : _cleanup( );
239 2 : _setState( STATE_CONNECTING );
240 :
241 2 : if( !_lookupAddress( false ) || ( NULL == _rai ))
242 : {
243 0 : LBERROR << "Failed to lookup destination address." << std::endl;
244 0 : goto err;
245 : }
246 :
247 2 : _updateInfo( _rai->ai_dst_addr );
248 :
249 2 : if( !_createNotifier( ))
250 : {
251 0 : LBERROR << "Failed to create master notifier." << std::endl;
252 0 : goto err;
253 : }
254 :
255 2 : if( !_createEventChannel( ))
256 : {
257 0 : LBERROR << "Failed to create communication event channel." << std::endl;
258 0 : goto err;
259 : }
260 :
261 2 : if( !_createId( ))
262 : {
263 0 : LBERROR << "Failed to create communication identifier." << std::endl;
264 0 : goto err;
265 : }
266 :
267 2 : if( !_resolveAddress( ))
268 : {
269 0 : LBERROR << "Failed to resolve destination address for : "
270 0 : << _addr << ":" << _serv << std::endl;
271 0 : goto err;
272 : }
273 :
274 2 : _updateInfo( ::rdma_get_peer_addr( _cm_id ));
275 :
276 2 : _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
277 :
278 2 : LBVERB << "Initiating connection on " << std::dec
279 0 : << _device_name << ":" << (int)_cm_id->port_num
280 0 : << " to "
281 0 : << _addr << ":" << _serv
282 2 : << " (" << description->toString( ) << ")"
283 6 : << std::endl;
284 :
285 2 : if( !_initProtocol( Global::getIAttribute(
286 2 : Global::IATTR_RDMA_SEND_QUEUE_DEPTH )))
287 : {
288 0 : LBERROR << "Failed to initialize protocol variables." << std::endl;
289 0 : goto err;
290 : }
291 :
292 2 : if( !_initVerbs( ))
293 : {
294 0 : LBERROR << "Failed to initialize verbs." << std::endl;
295 0 : goto err;
296 : }
297 :
298 2 : if( !_createQP( ))
299 : {
300 0 : LBERROR << "Failed to create queue pair." << std::endl;
301 0 : goto err;
302 : }
303 :
304 2 : if( !_createBytesAvailableFD( ))
305 : {
306 0 : LBERROR << "Failed to create available byte notifier." << std::endl;
307 0 : goto err;
308 : }
309 :
310 2 : if( !_initBuffers( ))
311 : {
312 0 : LBERROR << "Failed to initialize ring buffers." << std::endl;
313 0 : goto err;
314 : }
315 :
316 2 : if( !_resolveRoute( ))
317 : {
318 0 : LBERROR << "Failed to resolve route to destination : "
319 0 : << _addr << ":" << _serv << std::endl;
320 0 : goto err;
321 : }
322 :
323 2 : if( !_postReceives( _depth ))
324 : {
325 0 : LBERROR << "Failed to pre-post receives." << std::endl;
326 0 : goto err;
327 : }
328 :
329 2 : if( !_connect( ))
330 : {
331 0 : LBERROR << "Failed to connect to destination : "
332 0 : << _addr << ":" << _serv << std::endl;
333 0 : goto err;
334 : }
335 :
336 2 : LBASSERT( _established );
337 :
338 4 : if(( RDMA_PROTOCOL_MAGIC != _cpd.magic ) ||
339 2 : ( RDMA_PROTOCOL_VERSION != _cpd.version ))
340 : {
341 0 : LBERROR << "Protocol mismatch with target : "
342 0 : << _addr << ":" << _serv << std::endl;
343 0 : goto err;
344 : }
345 :
346 2 : if( !_postSetup( ))
347 : {
348 0 : LBERROR << "Failed to post setup message." << std::endl;
349 0 : goto err;
350 : }
351 :
352 2 : if( !_waitRecvSetup( ))
353 : {
354 0 : LBERROR << "Failed to receive setup message." << std::endl;
355 0 : goto err;
356 : }
357 :
358 2 : LBVERB << "Connection established on " << std::dec
359 0 : << _device_name << ":" << (int)_cm_id->port_num
360 0 : << " to "
361 0 : << _addr << ":" << _serv
362 2 : << " (" << description->toString( ) << ")"
363 6 : << std::endl;
364 :
365 2 : _setState( STATE_CONNECTED );
366 2 : _updateNotifier();
367 2 : return true;
368 :
369 : err:
370 0 : LBVERB << "Connection failed on " << std::dec
371 0 : << _device_name << ":" << (int)_cm_id->port_num
372 0 : << " to "
373 0 : << _addr << ":" << _serv
374 0 : << " (" << description->toString( ) << ")"
375 0 : << std::endl;
376 :
377 0 : close( );
378 :
379 0 : return false;
380 : }
381 :
382 2 : bool RDMAConnection::listen( )
383 : {
384 2 : ConstConnectionDescriptionPtr description = getDescription();
385 2 : LBASSERT( CONNECTIONTYPE_RDMA == description->type );
386 :
387 2 : if( !isClosed() )
388 0 : return false;
389 :
390 2 : _cleanup( );
391 2 : _setState( STATE_CONNECTING );
392 :
393 2 : if( !_lookupAddress( true ))
394 : {
395 0 : LBERROR << "Failed to lookup local address." << std::endl;
396 0 : goto err;
397 : }
398 :
399 2 : if( NULL != _rai )
400 2 : _updateInfo( _rai->ai_src_addr );
401 :
402 2 : if( !_createNotifier( ))
403 : {
404 0 : LBERROR << "Failed to create master notifier." << std::endl;
405 0 : goto err;
406 : }
407 :
408 2 : if( !_createEventChannel( ))
409 : {
410 0 : LBERROR << "Failed to create communication event channel." << std::endl;
411 0 : goto err;
412 : }
413 :
414 2 : if( !_createId( ))
415 : {
416 0 : LBERROR << "Failed to create communication identifier." << std::endl;
417 0 : goto err;
418 : }
419 :
420 : #if 0
421 : /* NOT IMPLEMENTED */
422 :
423 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
424 : (void *)&ONE, sizeof(ONE) ))
425 : {
426 : LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
427 : goto err;
428 : }
429 : #endif
430 :
431 2 : if( !_bindAddress( ))
432 : {
433 0 : LBERROR << "Failed to bind to local address : "
434 0 : << _addr << ":" << _serv << std::endl;
435 0 : goto err;
436 : }
437 :
438 2 : _updateInfo( ::rdma_get_local_addr( _cm_id ));
439 :
440 : #ifdef _WIN32
441 : if( !_listen( WINDOWS_CONNECTION_BACKLOG ))
442 : #else
443 2 : if( !_listen( SOMAXCONN ))
444 : #endif
445 : {
446 0 : LBERROR << "Failed to listen on bound address : "
447 0 : << _addr << ":" << _serv << std::endl;
448 0 : goto err;
449 : }
450 :
451 2 : if( NULL != _cm_id->verbs )
452 0 : _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
453 :
454 2 : LBVERB << "Listening on " << std::dec
455 0 : << _device_name << ":" << (int)_cm_id->port_num
456 0 : << " at "
457 0 : << _addr << ":" << _serv
458 2 : << " (" << description->toString( ) << ")"
459 6 : << std::endl;
460 :
461 2 : _setState( STATE_LISTENING );
462 2 : return true;
463 :
464 : err:
465 0 : close();
466 0 : return false;
467 : }
468 :
469 2 : void RDMAConnection::acceptNB( ) { /* NOP */ }
470 :
471 2 : ConnectionPtr RDMAConnection::acceptSync( )
472 : {
473 2 : RDMAConnection *newConnection = NULL;
474 :
475 2 : if( !isListening() )
476 : {
477 0 : LBERROR << "Connection not in listening state." << std::endl;
478 0 : goto out;
479 : }
480 :
481 2 : if( !_waitForCMEvent( RDMA_CM_EVENT_CONNECT_REQUEST ))
482 : {
483 0 : LBERROR << "Failed to receive valid connect request." << std::endl;
484 0 : goto out;
485 : }
486 :
487 2 : LBASSERT( NULL != _new_cm_id );
488 :
489 2 : newConnection = new RDMAConnection( );
490 :
491 2 : if( !newConnection->_finishAccept( _new_cm_id, _cpd ))
492 : {
493 0 : delete newConnection;
494 0 : newConnection = NULL;
495 : }
496 :
497 : out:
498 2 : _new_cm_id = NULL;
499 2 : _updateNotifier();
500 2 : return newConnection;
501 : }
502 :
503 405650 : void RDMAConnection::readNB( void*, const uint64_t ) { /* NOP */ }
504 :
505 405650 : int64_t RDMAConnection::readSync( void* buffer, const uint64_t bytes,
506 : const bool block )
507 : {
508 405650 : lunchbox::Clock clock;
509 405650 : const int64_t start = clock.getTime64( );
510 405650 : const uint32_t timeout = Global::getTimeout( );
511 405650 : eventset events;
512 405650 : uint64_t available_bytes = 0ULL;
513 405650 : uint32_t bytes_taken = 0UL;
514 405650 : bool extra_event = false;
515 :
516 405650 : if( !isConnected() )
517 0 : goto err;
518 :
519 : // LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
520 : // << std::endl;
521 :
522 405650 : _stats.reads++;
523 :
524 : retry:
525 410158 : if( !_checkDisconnected( events ))
526 : {
527 0 : LBERROR << "Error while checking event state." << std::endl;
528 0 : goto err;
529 : }
530 :
531 410158 : if( events.test( CQ_EVENT ))
532 : {
533 41185 : if( !_rearmCQ( ))
534 : {
535 0 : LBERROR << "Error while rearming receive channel." << std::endl;
536 0 : goto err;
537 : }
538 : }
539 :
540 : // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
541 410158 : if( !_checkCQ( events.test( CQ_EVENT )))
542 : {
543 0 : LBERROR << "Error while polling completion queues." << std::endl;
544 0 : goto err;
545 : }
546 :
547 410158 : LBASSERT( _fcredits >= 0L );
548 :
549 410158 : if( _established && _needFC( ) && ( 0L == _fcredits ))
550 : {
551 0 : if( LB_TIMEOUT_INDEFINITE != timeout )
552 : {
553 0 : if(( clock.getTime64( ) - start ) > timeout )
554 : {
555 0 : LBERROR << "Timed out trying to acquire credit." << std::endl;
556 0 : goto err;
557 : }
558 : }
559 :
560 : //LBWARN << "No credit for flow control." << std::endl;
561 0 : lunchbox::Thread::yield( );
562 0 : _stats.no_credits_fc++;
563 0 : goto retry;
564 : }
565 :
566 : // "Note that an extra event may be triggered without having a
567 : // corresponding completion entry in the CQ." (per ibv_get_cq_event(3))
568 410158 : if( _established && !events.test( BUF_EVENT ))
569 : {
570 : // Special case: If LocalNode is reading the length part of a message
571 : // it will ignore this zero return and restart the select.
572 4508 : if( extra_event && !block )
573 : {
574 0 : _updateNotifier();
575 0 : return 0LL;
576 : }
577 :
578 4508 : extra_event = true;
579 4508 : goto retry;
580 : }
581 :
582 405650 : if( events.test( BUF_EVENT ))
583 : {
584 405648 : available_bytes = _getAvailableBytes( );
585 405648 : if( 0ULL == available_bytes )
586 : {
587 0 : LBERROR << "Error while reading from event fd." << std::endl;
588 0 : goto err;
589 : }
590 : }
591 :
592 : // Modifies _sinkptr.TAIL
593 405650 : bytes_taken = _drain( buffer, bytes );
594 :
595 405650 : if( 0UL == bytes_taken )
596 : {
597 2 : if( _sinkptr.isEmpty( ) && !_established )
598 : {
599 10 : LBINFO << "Got EOF, closing " << getDescription( )->toString( )
600 8 : << std::endl;
601 2 : goto err;
602 : }
603 :
604 0 : if( LB_TIMEOUT_INDEFINITE != timeout )
605 : {
606 0 : if(( clock.getTime64( ) - start ) > timeout )
607 : {
608 0 : LBERROR << "Timed out trying to drain buffer." << std::endl;
609 0 : goto err;
610 : }
611 : }
612 :
613 : //LBWARN << "Sink buffer empty." << std::endl;
614 0 : lunchbox::Thread::yield( );
615 0 : _stats.buffer_empty++;
616 0 : goto retry;
617 : }
618 :
619 : // Put back what wasn't taken (ensure the master notifier stays "hot").
620 405648 : if( available_bytes > bytes_taken )
621 405074 : _incrAvailableBytes( available_bytes - bytes_taken );
622 : #ifdef _WIN32
623 : else
624 : _updateNotifier();
625 : #endif
626 :
627 405648 : _readBytes += bytes_taken;
628 :
629 405648 : if( _established && _needFC( ) && !_postFC( ))
630 0 : LBWARN << "Error while posting flow control message." << std::endl;
631 :
632 : // LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
633 : // << " took " << bytes_taken << " bytes"
634 : // << " (" << _sinkptr.available( ) << " still available)" << std::endl;
635 :
636 405648 : return bytes_taken;
637 :
638 : err:
639 2 : close( );
640 :
641 2 : return -1LL;
642 : }
643 :
644 408041 : int64_t RDMAConnection::write( const void* buffer, const uint64_t bytes )
645 : {
646 408041 : lunchbox::Clock clock;
647 408041 : const int64_t start = clock.getTime64( );
648 408041 : const uint32_t timeout = Global::getTimeout( );
649 408041 : eventset events;
650 408041 : const uint32_t can_put = std::min( bytes, MAX_BS );
651 : uint32_t bytes_put;
652 :
653 408041 : if( !isConnected() )
654 0 : goto err;
655 :
656 : // LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
657 : // << std::endl;
658 :
659 408041 : _stats.writes++;
660 :
661 : retry:
662 793520 : if( !_checkDisconnected( events ))
663 : {
664 0 : LBERROR << "Error while checking connection state." << std::endl;
665 0 : goto err;
666 : }
667 :
668 793520 : if( !_established )
669 : {
670 0 : LBWARN << "Disconnected in write." << std::endl;
671 0 : goto err;
672 : }
673 :
674 : // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
675 793520 : if( !_checkCQ( false ))
676 : {
677 0 : LBERROR << "Error while polling completion queues." << std::endl;
678 0 : goto err;
679 : }
680 :
681 793520 : LBASSERT( _wcredits >= 0L );
682 :
683 793520 : if( 0L == _wcredits )
684 : {
685 0 : if( LB_TIMEOUT_INDEFINITE != timeout )
686 : {
687 0 : if(( clock.getTime64( ) - start ) > timeout )
688 : {
689 0 : LBERROR << "Timed out trying to acquire credit." << std::endl;
690 0 : goto err;
691 : }
692 : }
693 :
694 : //LBWARN << "No credits for RDMA." << std::endl;
695 0 : lunchbox::Thread::yield( );
696 0 : _stats.no_credits_rdma++;
697 0 : goto retry;
698 : }
699 :
700 : // Modifies _sourceptr.HEAD
701 793520 : bytes_put = _fill( buffer, can_put );
702 :
703 793520 : if( 0UL == bytes_put )
704 : {
705 385479 : if( LB_TIMEOUT_INDEFINITE != timeout )
706 : {
707 385479 : if(( clock.getTime64( ) - start ) > timeout )
708 : {
709 0 : LBERROR << "Timed out trying to fill buffer." << std::endl;
710 0 : goto err;
711 : }
712 : }
713 :
714 : //LBWARN << "Source buffer full." << std::endl;
715 385479 : lunchbox::Thread::yield( );
716 385479 : if( _sourceptr.isFull( ) || _rptr.isFull( ))
717 385479 : _stats.buffer_full++;
718 385479 : goto retry;
719 : }
720 :
721 : // Modifies _sourceptr.MIDDLE & _rptr.HEAD
722 408041 : if( !_postRDMAWrite( ))
723 : {
724 0 : LBERROR << "Error while posting RDMA write." << std::endl;
725 0 : goto err;
726 : }
727 :
728 : // LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
729 : // << " put " << bytes_put << " bytes" << std::endl;
730 :
731 408041 : return bytes_put;
732 :
733 : err:
734 0 : return -1LL;
735 : }
736 :
737 18 : RDMAConnection::~RDMAConnection( )
738 : {
739 6 : _close( );
740 12 : }
741 :
742 : ////////////////////////////////////////////////////////////////////////////////
743 :
744 12 : void RDMAConnection::_close( )
745 : {
746 12 : lunchbox::ScopedWrite close_mutex( _poll_lock );
747 :
748 12 : if( !isClosed() )
749 : {
750 6 : LBASSERT( !isClosing() );
751 6 : _setState( STATE_CLOSING );
752 :
753 6 : if( _established && ::rdma_disconnect( _cm_id ))
754 0 : LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
755 :
756 6 : _setState( STATE_CLOSED );
757 6 : _cleanup( );
758 12 : }
759 12 : }
760 :
761 10 : void RDMAConnection::_cleanup( )
762 : {
763 10 : LBASSERT( isClosed() );
764 :
765 10 : _sourcebuf.clear( );
766 10 : _sinkbuf.clear( );
767 10 : _msgbuf.clear( );
768 :
769 10 : if( _completions > 0U )
770 : {
771 2 : ::ibv_ack_cq_events( _cq, _completions );
772 2 : _completions = 0U;
773 : }
774 :
775 10 : delete[] _wcs;
776 10 : _wcs = 0;
777 :
778 : #ifdef _WIN32
779 : if ( _ccWaitObj )
780 : UnregisterWait( _ccWaitObj );
781 : _ccWaitObj = 0;
782 :
783 : if ( _cmWaitObj )
784 : UnregisterWait( _cmWaitObj );
785 : _cmWaitObj = 0;
786 : #endif
787 :
788 10 : if( NULL != _cm_id )
789 6 : ::rdma_destroy_ep( _cm_id );
790 10 : _cm_id = NULL;
791 :
792 10 : if(( NULL != _cq ) && ::rdma_seterrno( ::ibv_destroy_cq( _cq )))
793 0 : LBWARN << "ibv_destroy_cq : " << lunchbox::sysError << std::endl;
794 10 : _cq = NULL;
795 :
796 10 : if(( NULL != _cc ) && ::rdma_seterrno( ::ibv_destroy_comp_channel( _cc )))
797 0 : LBWARN << "ibv_destroy_comp_channel : " << lunchbox::sysError
798 0 : << std::endl;
799 10 : _cc = NULL;
800 :
801 10 : if(( NULL != _pd ) && ::rdma_seterrno( ::ibv_dealloc_pd( _pd )))
802 0 : LBWARN << "ibv_dealloc_pd : " << lunchbox::sysError << std::endl;
803 10 : _pd = NULL;
804 :
805 10 : if( NULL != _cm )
806 6 : ::rdma_destroy_event_channel( _cm );
807 10 : _cm = NULL;
808 :
809 10 : if( NULL != _rai )
810 4 : ::rdma_freeaddrinfo( _rai );
811 10 : _rai = NULL;
812 :
813 10 : _rptr = 0UL;
814 10 : _rbase = _rkey = 0ULL;
815 : #ifndef _WIN32
816 10 : if(( _notifier >= 0 ) && TEMP_FAILURE_RETRY( ::close( _notifier )))
817 0 : LBWARN << "close : " << lunchbox::sysError << std::endl;
818 10 : _notifier = -1;
819 : #endif
820 10 : }
821 :
822 2 : bool RDMAConnection::_finishAccept( struct rdma_cm_id *new_cm_id,
823 : const RDMAConnParamData &cpd )
824 : {
825 2 : LBASSERT( isClosed() );
826 2 : _setState( STATE_CONNECTING );
827 :
828 2 : LBASSERT( NULL != new_cm_id );
829 :
830 2 : _cm_id = new_cm_id;
831 :
832 : {
833 : // FIXME : RDMA CM appears to send up invalid addresses when receiving
834 : // connections that use a different protocol than what was bound. E.g.
835 : // if an IPv6 listener gets an IPv4 connection then the sa_family
836 : // will be AF_INET6 but the actual data is struct sockaddr_in. Example:
837 : //
838 : // 0000000: 0a00 bc10 c0a8 b01a 0000 0000 0000 0000 ................
839 : //
840 : // However, in the reverse case, when an IPv4 listener gets an IPv6
841 : // connection not only is the address family incorrect, but the actual
842 : // IPv6 address is only partially there:
843 : //
844 : // 0000000: 0200 bc11 0000 0000 fe80 0000 0000 0000 ................
845 : // 0000010: 0000 0000 0000 0000 0000 0000 0000 0000 ................
846 : // 0000020: 0000 0000 0000 0000 0000 0000 0000 0000 ................
847 : // 0000030: 0000 0000 0000 0000 0000 0000 0000 0000 ................
848 : //
849 : // Surely seems to be a bug in RDMA CM.
850 :
851 : union
852 : {
853 : struct sockaddr addr;
854 : struct sockaddr_in sin;
855 : struct sockaddr_in6 sin6;
856 : struct sockaddr_storage storage;
857 : } sss;
858 :
859 : // Make a copy since we might change it.
860 : //sss.storage = _cm_id->route.addr.dst_storage;
861 : ::memcpy( (void *)&sss.storage,
862 2 : (const void *)::rdma_get_peer_addr( _cm_id ),
863 2 : sizeof(struct sockaddr_storage) );
864 : #ifndef _WIN32
865 4 : if(( AF_INET == sss.storage.ss_family ) &&
866 4 : ( sss.sin6.sin6_addr.s6_addr32[0] != 0 ||
867 4 : sss.sin6.sin6_addr.s6_addr32[1] != 0 ||
868 4 : sss.sin6.sin6_addr.s6_addr32[2] != 0 ||
869 2 : sss.sin6.sin6_addr.s6_addr32[3] != 0 ))
870 : {
871 0 : LBWARN << "IPv6 address detected but likely invalid!" << std::endl;
872 0 : sss.storage.ss_family = AF_INET6;
873 : }
874 : else
875 : #endif
876 2 : if(( AF_INET6 == sss.storage.ss_family ) &&
877 0 : ( INADDR_ANY != sss.sin.sin_addr.s_addr ))
878 : {
879 0 : sss.storage.ss_family = AF_INET;
880 : }
881 :
882 2 : _updateInfo( &sss.addr );
883 : }
884 :
885 2 : _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
886 :
887 2 : LBVERB << "Connection initiated on " << std::dec
888 0 : << _device_name << ":" << (int)_cm_id->port_num
889 0 : << " from "
890 0 : << _addr << ":" << _serv
891 2 : << " (" << getDescription()->toString( ) << ")"
892 6 : << std::endl;
893 :
894 4 : if(( RDMA_PROTOCOL_MAGIC != cpd.magic ) ||
895 2 : ( RDMA_PROTOCOL_VERSION != cpd.version ))
896 : {
897 0 : LBERROR << "Protocol mismatch with initiator : "
898 0 : << _addr << ":" << _serv << std::endl;
899 0 : goto err_reject;
900 : }
901 :
902 2 : if( !_createNotifier( ))
903 : {
904 0 : LBERROR << "Failed to create master notifier." << std::endl;
905 0 : goto err_reject;
906 : }
907 :
908 2 : if( !_createEventChannel( ))
909 : {
910 0 : LBERROR << "Failed to create event channel." << std::endl;
911 0 : goto err_reject;
912 : }
913 :
914 2 : if( !_migrateId( ))
915 : {
916 0 : LBERROR << "Failed to migrate communication identifier." << std::endl;
917 0 : goto err_reject;
918 : }
919 :
920 2 : if( !_initProtocol( cpd.depth ))
921 : {
922 0 : LBERROR << "Failed to initialize protocol variables." << std::endl;
923 0 : goto err_reject;
924 : }
925 :
926 2 : if( !_initVerbs( ))
927 : {
928 0 : LBERROR << "Failed to initialize verbs." << std::endl;
929 0 : goto err_reject;
930 : }
931 :
932 2 : if( !_createQP( ))
933 : {
934 0 : LBERROR << "Failed to create queue pair." << std::endl;
935 0 : goto err_reject;
936 : }
937 :
938 2 : if( !_createBytesAvailableFD( ))
939 : {
940 0 : LBERROR << "Failed to create available byte notifier." << std::endl;
941 0 : goto err_reject;
942 : }
943 :
944 2 : if( !_initBuffers( ))
945 : {
946 0 : LBERROR << "Failed to initialize ring buffers." << std::endl;
947 0 : goto err_reject;
948 : }
949 :
950 2 : if( !_postReceives( _depth ))
951 : {
952 0 : LBERROR << "Failed to pre-post receives." << std::endl;
953 0 : goto err_reject;
954 : }
955 :
956 2 : if( !_accept( ))
957 : {
958 0 : LBERROR << "Failed to accept remote connection from : "
959 0 : << _addr << ":" << _serv << std::endl;
960 0 : goto err;
961 : }
962 :
963 2 : LBASSERT( _established );
964 :
965 2 : if( !_waitRecvSetup( ))
966 : {
967 0 : LBERROR << "Failed to receive setup message." << std::endl;
968 0 : goto err;
969 : }
970 :
971 2 : if( !_postSetup( ))
972 : {
973 0 : LBERROR << "Failed to post setup message." << std::endl;
974 0 : goto err;
975 : }
976 :
977 2 : LBVERB << "Connection accepted on " << std::dec
978 0 : << _device_name << ":" << (int)_cm_id->port_num
979 0 : << " from "
980 0 : << _addr << ":" << _serv
981 2 : << " (" << getDescription()->toString( ) << ")"
982 6 : << std::endl;
983 :
984 2 : _setState( STATE_CONNECTED );
985 2 : _updateNotifier();
986 2 : return true;
987 :
988 : err_reject:
989 0 : LBWARN << "Rejecting connection from remote address : "
990 0 : << _addr << ":" << _serv << std::endl;
991 :
992 0 : if( !_reject( ))
993 0 : LBWARN << "Failed to issue connection reject." << std::endl;
994 :
995 : err:
996 0 : close( );
997 :
998 0 : return false;
999 : }
1000 :
1001 4 : bool RDMAConnection::_lookupAddress( const bool passive )
1002 : {
1003 : struct rdma_addrinfo hints;
1004 4 : char *node = NULL, *service = NULL;
1005 4 : std::string s;
1006 :
1007 4 : ::memset( (void *)&hints, 0, sizeof(struct rdma_addrinfo) );
1008 : //hints.ai_flags |= RAI_NOROUTE;
1009 4 : if( passive )
1010 2 : hints.ai_flags |= RAI_PASSIVE;
1011 :
1012 8 : ConstConnectionDescriptionPtr description = getDescription();
1013 4 : const std::string &hostname = description->getHostname( );
1014 4 : if( !hostname.empty( ))
1015 4 : node = const_cast< char * >( hostname.c_str( ));
1016 :
1017 4 : if( 0u != description->port )
1018 : {
1019 3 : std::stringstream ss;
1020 3 : ss << description->port;
1021 3 : s = ss.str( );
1022 3 : service = const_cast< char * >( s.c_str( ));
1023 : }
1024 :
1025 4 : if(( NULL != node ) && ::rdma_getaddrinfo( node, service, &hints, &_rai ))
1026 : {
1027 0 : LBERROR << "rdma_getaddrinfo : " << lunchbox::sysError << std::endl;
1028 0 : goto err;
1029 : }
1030 :
1031 4 : if(( NULL != _rai ) && ( NULL != _rai->ai_next ))
1032 0 : LBWARN << "Multiple getaddrinfo results, using first." << std::endl;
1033 :
1034 4 : if(( NULL != _rai ) && ( _rai->ai_connect_len > 0 ))
1035 0 : LBWARN << "WARNING : ai_connect data specified!" << std::endl;
1036 :
1037 4 : return true;
1038 :
1039 : err:
1040 4 : return false;
1041 : }
1042 :
1043 10 : void RDMAConnection::_updateInfo( struct sockaddr *addr )
1044 : {
1045 10 : int salen = sizeof(struct sockaddr);
1046 10 : bool is_unspec = false;
1047 :
1048 10 : if( AF_INET == addr->sa_family )
1049 : {
1050 : struct sockaddr_in *sin =
1051 10 : reinterpret_cast< struct sockaddr_in * >( addr );
1052 10 : is_unspec = ( INADDR_ANY == sin->sin_addr.s_addr );
1053 10 : salen = sizeof(struct sockaddr_in);
1054 : }
1055 : // TODO: IPv6 for WIndows
1056 : #ifndef _WIN32
1057 0 : else if( AF_INET6 == addr->sa_family )
1058 : {
1059 : struct sockaddr_in6 *sin6 =
1060 0 : reinterpret_cast< struct sockaddr_in6 * >( addr );
1061 :
1062 0 : is_unspec = ( sin6->sin6_addr.s6_addr32[0] == 0 &&
1063 0 : sin6->sin6_addr.s6_addr32[1] == 0 &&
1064 0 : sin6->sin6_addr.s6_addr32[2] == 0 &&
1065 0 : sin6->sin6_addr.s6_addr32[3] == 0 );
1066 0 : salen = sizeof(struct sockaddr_in6);
1067 : }
1068 : #endif
1069 : int err;
1070 10 : if(( err = ::getnameinfo( addr, salen, _addr, sizeof(_addr),
1071 10 : _serv, sizeof(_serv), NI_NUMERICHOST | NI_NUMERICSERV )))
1072 0 : LBWARN << "Name info lookup failed : " << err << std::endl;
1073 :
1074 10 : if( is_unspec )
1075 0 : ::gethostname( _addr, NI_MAXHOST );
1076 :
1077 10 : ConnectionDescriptionPtr description = _getDescription();
1078 10 : if( description->getHostname( ).empty( ))
1079 2 : description->setHostname( _addr );
1080 10 : if( 0u == description->port )
1081 4 : description->port = atoi( _serv );
1082 10 : }
1083 :
1084 6 : bool RDMAConnection::_createEventChannel( )
1085 : {
1086 6 : LBASSERT( NULL == _cm );
1087 :
1088 6 : _cm = ::rdma_create_event_channel( );
1089 6 : if( NULL == _cm )
1090 : {
1091 0 : LBERROR << "rdma_create_event_channel : " << lunchbox::sysError <<
1092 0 : std::endl;
1093 0 : goto err;
1094 : }
1095 :
1096 : #ifdef _WIN32
1097 : if ( !RegisterWaitForSingleObject(
1098 : &_cmWaitObj,
1099 : _cm->channel.Event,
1100 : WAITORTIMERCALLBACK( &_triggerNotifierCM ),
1101 : this,
1102 : INFINITE,
1103 : WT_EXECUTEINWAITTHREAD ))
1104 : {
1105 : LBERROR << "RegisterWaitForSingleObject : " << co::base::sysError
1106 : << std::endl;
1107 : goto err;
1108 : }
1109 : #else
1110 : struct epoll_event evctl;
1111 :
1112 6 : LBASSERT( _notifier >= 0 );
1113 :
1114 : // Use the CM fd to signal Collage of connection events.
1115 6 : ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
1116 6 : evctl.events = EPOLLIN;
1117 6 : evctl.data.fd = _cm->fd;
1118 6 : if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
1119 : {
1120 0 : LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
1121 0 : goto err;
1122 : }
1123 : #endif
1124 :
1125 6 : return true;
1126 :
1127 : err:
1128 0 : return false;
1129 : }
1130 :
1131 4 : bool RDMAConnection::_createId( )
1132 : {
1133 4 : LBASSERT( NULL != _cm );
1134 4 : LBASSERT( NULL == _cm_id );
1135 :
1136 4 : if( ::rdma_create_id( _cm, &_cm_id, NULL, RDMA_PS_TCP ))
1137 : {
1138 0 : LBERROR << "rdma_create_id : " << lunchbox::sysError << std::endl;
1139 0 : goto err;
1140 : }
1141 :
1142 4 : return true;
1143 :
1144 : err:
1145 0 : return false;
1146 : }
1147 :
1148 4 : bool RDMAConnection::_initVerbs( )
1149 : {
1150 4 : LBASSERT( NULL != _cm_id );
1151 4 : LBASSERT( NULL != _cm_id->verbs );
1152 :
1153 4 : LBASSERT( NULL == _pd );
1154 :
1155 : // Allocate protection domain.
1156 4 : _pd = ::ibv_alloc_pd( _cm_id->verbs );
1157 4 : if( NULL == _pd )
1158 : {
1159 0 : LBERROR << "ibv_alloc_pd : " << lunchbox::sysError << std::endl;
1160 0 : goto err;
1161 : }
1162 :
1163 4 : LBASSERT( NULL == _cc );
1164 :
1165 : // Create completion channel.
1166 4 : _cc = ::ibv_create_comp_channel( _cm_id->verbs );
1167 4 : if( NULL == _cc )
1168 : {
1169 0 : LBERROR << "ibv_create_comp_channel : " << lunchbox::sysError
1170 0 : << std::endl;
1171 0 : goto err;
1172 : }
1173 :
1174 : #ifdef _WIN32
1175 : if ( !RegisterWaitForSingleObject(
1176 : &_ccWaitObj,
1177 : _cc->comp_channel.Event,
1178 : WAITORTIMERCALLBACK( &_triggerNotifierCQ ),
1179 : this,
1180 : INFINITE,
1181 : WT_EXECUTEINWAITTHREAD ))
1182 : {
1183 : LBERROR << "RegisterWaitForSingleObject : " << lunchbox::sysError
1184 : << std::endl;
1185 : goto err;
1186 : }
1187 : #else
1188 4 : LBASSERT( _notifier >= 0 );
1189 :
1190 : // Use the completion channel fd to signal Collage of RDMA writes received.
1191 : struct epoll_event evctl;
1192 4 : ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
1193 4 : evctl.events = EPOLLIN;
1194 4 : evctl.data.fd = _cc->fd;
1195 4 : if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
1196 : {
1197 0 : LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
1198 0 : goto err;
1199 : }
1200 : #endif
1201 :
1202 4 : LBASSERT( NULL == _cq );
1203 :
1204 : // Create a single completion queue for both sends & receives */
1205 4 : _cq = ::ibv_create_cq( _cm_id->verbs, _depth * 2, NULL, _cc, 0 );
1206 4 : if( NULL == _cq )
1207 : {
1208 0 : LBERROR << "ibv_create_cq : " << lunchbox::sysError << std::endl;
1209 0 : goto err;
1210 : }
1211 :
1212 : // Request IBV_SEND_SOLICITED events only (i.e. RDMA writes, not FC)
1213 4 : if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
1214 : {
1215 0 : LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
1216 0 : goto err;
1217 : }
1218 :
1219 4 : _wcs = new struct ibv_wc[ _depth ];
1220 4 : return true;
1221 :
1222 : err:
1223 0 : return false;
1224 : }
1225 :
1226 4 : bool RDMAConnection::_createQP( )
1227 : {
1228 : struct ibv_qp_init_attr init_attr;
1229 :
1230 4 : LBASSERT( _depth > 0 );
1231 4 : LBASSERT( NULL != _cm_id );
1232 4 : LBASSERT( NULL != _pd );
1233 4 : LBASSERT( NULL != _cq );
1234 :
1235 4 : ::memset( (void *)&init_attr, 0, sizeof(struct ibv_qp_init_attr) );
1236 4 : init_attr.qp_type = IBV_QPT_RC;
1237 4 : init_attr.cap.max_send_wr = _depth;
1238 4 : init_attr.cap.max_recv_wr = _depth;
1239 4 : init_attr.cap.max_recv_sge = 1;
1240 4 : init_attr.cap.max_send_sge = 1;
1241 4 : init_attr.recv_cq = _cq;
1242 4 : init_attr.send_cq = _cq;
1243 4 : init_attr.sq_sig_all = 1; // i.e. always IBV_SEND_SIGNALED
1244 :
1245 : // Create queue pair.
1246 4 : if( ::rdma_create_qp( _cm_id, _pd, &init_attr ))
1247 : {
1248 0 : LBERROR << "rdma_create_qp : " << lunchbox::sysError << std::endl;
1249 0 : goto err;
1250 : }
1251 :
1252 4 : LBVERB << "RDMA QP caps : " << std::dec <<
1253 0 : init_attr.cap.max_recv_wr << " receives, " <<
1254 4 : init_attr.cap.max_send_wr << " sends, " << std::endl;
1255 :
1256 4 : return true;
1257 :
1258 : err:
1259 0 : return false;
1260 : }
1261 :
1262 4 : bool RDMAConnection::_initBuffers( )
1263 : {
1264 4 : const size_t rbs = 1024 * 1024 *
1265 8 : Global::getIAttribute( Global::IATTR_RDMA_RING_BUFFER_SIZE_MB );
1266 :
1267 4 : LBASSERT( _depth > 0 );
1268 4 : LBASSERT( NULL != _pd );
1269 :
1270 4 : if( 0 == rbs )
1271 : {
1272 0 : LBERROR << "Invalid RDMA ring buffer size." << std::endl;
1273 0 : goto err;
1274 : }
1275 :
1276 4 : if( !_sourcebuf.resize( _pd, rbs ))
1277 : {
1278 0 : LBERROR << "Failed to resize source buffer." << std::endl;
1279 0 : goto err;
1280 : }
1281 :
1282 4 : if( !_sinkbuf.resize( _pd, rbs ))
1283 : {
1284 0 : LBERROR << "Failed to resize sink buffer." << std::endl;
1285 0 : goto err;
1286 : }
1287 :
1288 4 : _sourceptr.clear( uint32_t( _sourcebuf.getSize( )));
1289 4 : _sinkptr.clear( uint32_t( _sinkbuf.getSize( )));
1290 :
1291 : // Need enough space for both sends and receives.
1292 4 : if( !_msgbuf.resize( _pd, _depth * 2 ))
1293 : {
1294 0 : LBERROR << "Failed to resize message buffer pool." << std::endl;
1295 0 : goto err;
1296 : }
1297 :
1298 4 : return true;
1299 :
1300 : err:
1301 0 : return false;
1302 : }
1303 :
1304 2 : bool RDMAConnection::_resolveAddress( )
1305 : {
1306 2 : LBASSERT( NULL != _cm_id );
1307 2 : LBASSERT( NULL != _rai );
1308 :
1309 2 : if( ::rdma_resolve_addr( _cm_id, _rai->ai_src_addr, _rai->ai_dst_addr,
1310 2 : _timeout ))
1311 : {
1312 0 : LBERROR << "rdma_resolve_addr : " << lunchbox::sysError << std::endl;
1313 0 : goto err;
1314 : }
1315 :
1316 2 : return _waitForCMEvent( RDMA_CM_EVENT_ADDR_RESOLVED );
1317 :
1318 : err:
1319 0 : return false;
1320 : }
1321 :
1322 2 : bool RDMAConnection::_resolveRoute( )
1323 : {
1324 2 : LBASSERT( NULL != _cm_id );
1325 2 : LBASSERT( NULL != _rai );
1326 :
1327 4 : if(( IBV_TRANSPORT_IB == _cm_id->verbs->device->transport_type ) &&
1328 2 : ( _rai->ai_route_len > 0 ))
1329 : {
1330 : #ifdef _WIN32
1331 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
1332 : #else
1333 0 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_IB, RDMA_OPTION_IB_PATH,
1334 : #endif
1335 0 : _rai->ai_route, _rai->ai_route_len ))
1336 : {
1337 0 : LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
1338 0 : goto err;
1339 : }
1340 :
1341 : // rdma_resolve_route not required (TODO : is this really true?)
1342 0 : return true;
1343 : }
1344 :
1345 2 : if( ::rdma_resolve_route( _cm_id, _timeout ))
1346 : {
1347 0 : LBERROR << "rdma_resolve_route : " << lunchbox::sysError << std::endl;
1348 0 : goto err;
1349 : }
1350 :
1351 2 : return _waitForCMEvent( RDMA_CM_EVENT_ROUTE_RESOLVED );
1352 :
1353 : err:
1354 0 : return false;
1355 : }
1356 :
1357 2 : bool RDMAConnection::_connect( )
1358 : {
1359 2 : LBASSERT( NULL != _cm_id );
1360 2 : LBASSERT( !_established );
1361 :
1362 : #if 0 // TODO
1363 : static const uint8_t DSCP = 0;
1364 :
1365 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
1366 : (void *)&DSCP, sizeof(DSCP) ))
1367 : {
1368 : LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
1369 : goto err;
1370 : }
1371 : #endif
1372 :
1373 : struct rdma_conn_param conn_param;
1374 :
1375 2 : ::memset( (void *)&conn_param, 0, sizeof(struct rdma_conn_param) );
1376 :
1377 2 : _cpd.magic = RDMA_PROTOCOL_MAGIC;
1378 2 : _cpd.version = RDMA_PROTOCOL_VERSION;
1379 2 : _cpd.depth = _depth;
1380 2 : conn_param.private_data = reinterpret_cast< const void * >( &_cpd );
1381 2 : conn_param.private_data_len = sizeof(struct RDMAConnParamData);
1382 2 : conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
1383 2 : conn_param.responder_resources = RDMA_MAX_RESP_RES;
1384 : // Magic 3-bit values.
1385 : //conn_param.retry_count = 5;
1386 : //conn_param.rnr_retry_count = 7;
1387 :
1388 6 : LBINFO << "Connect on source lid : " << std::showbase
1389 4 : << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
1390 4 : << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
1391 2 : << "to dest lid : "
1392 4 : << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
1393 4 : << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
1394 6 : << std::endl;
1395 :
1396 2 : if( ::rdma_connect( _cm_id, &conn_param ))
1397 : {
1398 0 : LBERROR << "rdma_connect : " << lunchbox::sysError << std::endl;
1399 0 : goto err;
1400 : }
1401 :
1402 2 : return _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
1403 :
1404 : err:
1405 0 : return false;
1406 : }
1407 :
1408 2 : bool RDMAConnection::_bindAddress( )
1409 : {
1410 2 : LBASSERT( NULL != _cm_id );
1411 2 : ConstConnectionDescriptionPtr description = getDescription();
1412 :
1413 : #if IPV6_DEFAULT
1414 : struct sockaddr_in6 sin;
1415 : memset( (void *)&sin, 0, sizeof(struct sockaddr_in6) );
1416 : sin.sin6_family = AF_INET6;
1417 : sin.sin6_port = htons( description->port );
1418 : sin.sin6_addr = in6addr_any;
1419 : #else
1420 : struct sockaddr_in sin;
1421 2 : memset( (void *)&sin, 0, sizeof(struct sockaddr_in) );
1422 2 : sin.sin_family = AF_INET;
1423 2 : sin.sin_port = htons( description->port );
1424 2 : sin.sin_addr.s_addr = INADDR_ANY;
1425 : #endif
1426 :
1427 4 : if( ::rdma_bind_addr( _cm_id, ( NULL != _rai ) ? _rai->ai_src_addr :
1428 2 : reinterpret_cast< struct sockaddr * >( &sin )))
1429 : {
1430 0 : LBERROR << "rdma_bind_addr : " << lunchbox::sysError << std::endl;
1431 0 : goto err;
1432 : }
1433 :
1434 2 : return true;
1435 :
1436 : err:
1437 0 : return false;
1438 : }
1439 :
1440 2 : bool RDMAConnection::_listen( int backlog )
1441 : {
1442 2 : LBASSERT( NULL != _cm_id );
1443 :
1444 2 : if( ::rdma_listen( _cm_id, backlog ))
1445 : {
1446 0 : LBERROR << "rdma_listen : " << lunchbox::sysError << std::endl;
1447 0 : goto err;
1448 : }
1449 :
1450 2 : return true;
1451 :
1452 : err:
1453 0 : return false;
1454 : }
1455 :
1456 2 : bool RDMAConnection::_migrateId( )
1457 : {
1458 2 : LBASSERT( NULL != _cm_id );
1459 2 : LBASSERT( NULL != _cm );
1460 :
1461 2 : if( ::rdma_migrate_id( _cm_id, _cm ))
1462 : {
1463 0 : LBERROR << "rdma_migrate_id : " << lunchbox::sysError << std::endl;
1464 0 : goto err;
1465 : }
1466 :
1467 2 : return true;
1468 :
1469 : err:
1470 0 : return false;
1471 : }
1472 :
1473 2 : bool RDMAConnection::_accept( )
1474 : {
1475 : struct rdma_conn_param accept_param;
1476 :
1477 2 : LBASSERT( NULL != _cm_id );
1478 2 : LBASSERT( !_established );
1479 :
1480 2 : ::memset( (void *)&accept_param, 0, sizeof(struct rdma_conn_param) );
1481 :
1482 2 : _cpd.magic = RDMA_PROTOCOL_MAGIC;
1483 2 : _cpd.version = RDMA_PROTOCOL_VERSION;
1484 2 : _cpd.depth = _depth;
1485 2 : accept_param.private_data = reinterpret_cast< const void * >( &_cpd );
1486 2 : accept_param.private_data_len = sizeof(struct RDMAConnParamData);
1487 2 : accept_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
1488 2 : accept_param.responder_resources = RDMA_MAX_RESP_RES;
1489 : // Magic 3-bit value.
1490 : //accept_param.rnr_retry_count = 7;
1491 :
1492 6 : LBINFO << "Accept on source lid : "<< std::showbase
1493 4 : << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
1494 4 : << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
1495 2 : << "from dest lid : "
1496 4 : << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
1497 4 : << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
1498 6 : << std::endl;
1499 :
1500 2 : if( ::rdma_accept( _cm_id, &accept_param ))
1501 : {
1502 0 : LBERROR << "rdma_accept : " << lunchbox::sysError << std::endl;
1503 0 : goto err;
1504 : }
1505 :
1506 2 : return _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
1507 :
1508 : err:
1509 0 : return false;
1510 : }
1511 :
1512 0 : bool RDMAConnection::_reject( )
1513 : {
1514 0 : LBASSERT( NULL != _cm_id );
1515 0 : LBASSERT( !_established );
1516 :
1517 0 : if( ::rdma_reject( _cm_id, NULL, 0 ))
1518 : {
1519 0 : LBERROR << "rdma_reject : " << lunchbox::sysError << std::endl;
1520 0 : goto err;
1521 : }
1522 :
1523 0 : return true;
1524 :
1525 : err:
1526 0 : return false;
1527 : }
1528 :
1529 : ////////////////////////////////////////////////////////////////////////////////
1530 :
1531 4 : bool RDMAConnection::_initProtocol( int32_t depth )
1532 : {
1533 4 : if( depth < 2L )
1534 : {
1535 0 : LBERROR << "Invalid queue depth." << std::endl;
1536 0 : goto err;
1537 : }
1538 :
1539 4 : _depth = depth;
1540 4 : _writes = 0L;
1541 4 : _fcs = 0L;
1542 4 : _readBytes = 0u;
1543 4 : _wcredits = _depth / 2 - 2;
1544 4 : _fcredits = _depth / 2 + 2;
1545 :
1546 4 : return true;
1547 :
1548 : err:
1549 0 : return false;
1550 : }
1551 :
1552 : /* inline */
1553 815804 : bool RDMAConnection::_needFC( )
1554 : {
1555 : bool bytesAvail;
1556 : #ifdef _WIN32
1557 : lunchbox::ScopedFastRead mutex( _eventLock );
1558 : bytesAvail = ( _availBytes != 0 );
1559 : #else
1560 : pollfd pfd;
1561 815804 : pfd.fd = _pipe_fd[0];
1562 815804 : pfd.events = EPOLLIN;
1563 815804 : pfd.revents = 0;
1564 815804 : int ret = poll( &pfd, 1, 0 );
1565 815804 : if ( ret == -1 )
1566 : {
1567 0 : LBERROR << "poll: " << lunchbox::sysError << std::endl;
1568 0 : return true;
1569 : }
1570 815804 : bytesAvail = ret != 0;
1571 : #endif
1572 815804 : return ( !bytesAvail || (uint32_t)_writes >= FC_MESSAGE_FREQUENCY );
1573 : }
1574 :
1575 156864 : bool RDMAConnection::_postReceives( const uint32_t count )
1576 : {
1577 156864 : LBASSERT( NULL != _cm_id->qp );
1578 156859 : LBASSERT( count > 0UL );
1579 :
1580 156859 : ibv_sge* sge = new ibv_sge[count];
1581 156882 : ibv_recv_wr* wrs = new ibv_recv_wr[count];
1582 :
1583 589009 : for( uint32_t i = 0UL; i != count; i++ )
1584 : {
1585 432153 : sge[i].addr = (uint64_t)(uintptr_t)_msgbuf.getBuffer( );
1586 432146 : sge[i].length = uint32_t( _msgbuf.getBufferSize( ));
1587 432133 : sge[i].lkey = _msgbuf.getMR( )->lkey;
1588 :
1589 432107 : wrs[i].wr_id = sge[i].addr;
1590 432107 : wrs[i].next = &wrs[i + 1];
1591 432107 : wrs[i].sg_list = &sge[i];
1592 432107 : wrs[i].num_sge = 1;
1593 : }
1594 156856 : wrs[count - 1].next = NULL;
1595 :
1596 : struct ibv_recv_wr *bad_wr;
1597 156856 : if( ::rdma_seterrno( ::ibv_post_recv( _cm_id->qp, wrs, &bad_wr )))
1598 : {
1599 0 : LBERROR << "ibv_post_recv : " << lunchbox::sysError << std::endl;
1600 0 : goto err;
1601 : }
1602 :
1603 156804 : delete[] sge;
1604 156882 : delete[] wrs;
1605 156880 : return true;
1606 :
1607 : err:
1608 0 : delete[] sge;
1609 0 : delete[] wrs;
1610 0 : return false;
1611 : }
1612 :
1613 : /* inline */
1614 408041 : void RDMAConnection::_recvRDMAWrite( const uint32_t imm_data )
1615 : {
1616 : union
1617 : {
1618 : uint32_t val;
1619 : RDMAFCImm fc;
1620 : } x;
1621 408041 : x.val = ntohl( imm_data );
1622 :
1623 : // Analysis:
1624 : //
1625 : // Since the ring pointers are circular, a malicious (presumably overflow)
1626 : // value here would at worst only result in us reading arbitrary regions
1627 : // from our sink buffer, not segfaulting. If the other side wanted us to
1628 : // reread a previous message it should just resend it!
1629 408041 : _sinkptr.incrHead( x.fc.bytes_sent );
1630 408041 : _incrAvailableBytes( x.fc.bytes_sent );
1631 :
1632 408041 : _fcredits += x.fc.fcs_received;
1633 408041 : LBASSERTINFO( _fcredits <= _depth, _fcredits << " > " << _depth );
1634 :
1635 408041 : _writes++;
1636 408041 : }
1637 :
1638 : /* inline */
1639 408041 : uint32_t RDMAConnection::_makeImm( const uint32_t b )
1640 : {
1641 : union
1642 : {
1643 : uint32_t val;
1644 : RDMAFCImm fc;
1645 : } x;
1646 :
1647 408041 : x.fc.fcs_received = std::min( MAX_FC, static_cast< uint16_t >( _fcs ));
1648 408041 : _fcs -= x.fc.fcs_received;
1649 408041 : LBASSERT( _fcs >= 0 );
1650 :
1651 408041 : LBASSERT( b <= MAX_BS );
1652 408041 : x.fc.bytes_sent = b;
1653 :
1654 408041 : return htonl( x.val );
1655 : }
1656 :
1657 408041 : bool RDMAConnection::_postRDMAWrite( )
1658 : {
1659 : struct ibv_sge sge;
1660 : struct ibv_send_wr wr;
1661 :
1662 816082 : sge.addr = (uint64_t)( (uintptr_t)_sourcebuf.getBase( ) +
1663 816082 : _sourceptr.ptr( _sourceptr.MIDDLE ));
1664 : sge.length = (uint64_t)_sourceptr.available( _sourceptr.HEAD,
1665 408041 : _sourceptr.MIDDLE );
1666 408041 : sge.lkey = _sourcebuf.getMR( )->lkey;
1667 408041 : _sourceptr.incr( _sourceptr.MIDDLE, (uint32_t)sge.length );
1668 :
1669 408041 : wr.wr_id = (uint64_t)sge.length;
1670 408041 : wr.next = NULL;
1671 408041 : wr.sg_list = &sge;
1672 408041 : wr.num_sge = 1;
1673 408041 : wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1674 408041 : wr.send_flags = IBV_SEND_SOLICITED; // Important!
1675 408041 : wr.imm_data = _makeImm( (uint32_t)sge.length );
1676 408041 : wr.wr.rdma.rkey = _rkey;
1677 816082 : wr.wr.rdma.remote_addr = (uint64_t)( (uintptr_t)_rbase +
1678 816082 : _rptr.ptr( _rptr.HEAD ));
1679 408041 : _rptr.incrHead( (uint32_t)sge.length );
1680 :
1681 408041 : _wcredits--;
1682 408041 : LBASSERT( _wcredits >= 0L );
1683 :
1684 : struct ibv_send_wr *bad_wr;
1685 408041 : if( ::rdma_seterrno( ::ibv_post_send( _cm_id->qp, &wr, &bad_wr )))
1686 : {
1687 0 : LBERROR << "ibv_post_send : " << lunchbox::sysError << std::endl;
1688 0 : goto err;
1689 : }
1690 :
1691 408041 : return true;
1692 :
1693 : err:
1694 0 : return false;
1695 : }
1696 :
1697 22093 : bool RDMAConnection::_postMessage( const RDMAMessage &message )
1698 : {
1699 22093 : _fcredits--;
1700 22093 : LBASSERT( _fcredits >= 0L );
1701 :
1702 22093 : if( ::rdma_post_send( _cm_id, (void *)&message, (void *)&message,
1703 : offsetof( RDMAMessage, payload ) + message.length, _msgbuf.getMR( ),
1704 22093 : 0 ))
1705 : {
1706 0 : LBERROR << "rdma_post_send : " << lunchbox::sysError << std::endl;
1707 0 : goto err;
1708 : }
1709 :
1710 22093 : return true;
1711 :
1712 : err:
1713 0 : return false;
1714 : }
1715 :
1716 22087 : void RDMAConnection::_recvMessage( const RDMAMessage &message )
1717 : {
1718 22087 : switch( message.opcode )
1719 : {
1720 : case FC:
1721 22083 : if( sizeof(struct RDMAFCPayload) == (size_t)message.length )
1722 22083 : _recvFC( message.payload.fc );
1723 : else
1724 0 : LBWARN << "Invalid flow control message received!" << std::endl;
1725 22083 : break;
1726 : case SETUP:
1727 4 : if( sizeof(struct RDMASetupPayload) == (size_t)message.length )
1728 4 : _recvSetup( message.payload.setup );
1729 : else
1730 0 : LBWARN << "Invalid setup message received!" << std::endl;
1731 4 : break;
1732 : default:
1733 0 : LBWARN << "Invalid message received: "
1734 0 : << std::hex << (int)message.opcode << std::dec << std::endl;
1735 : }
1736 22087 : }
1737 :
1738 : /* inline */
1739 22083 : void RDMAConnection::_recvFC( const RDMAFCPayload &fc )
1740 : {
1741 : // Analysis:
1742 : //
1743 : // Since we will only write a maximum of _sourceptr.available( ) bytes
1744 : // to our source buffer, a malicious (presumably overflow) value here would
1745 : // have no chance of causing us to write beyond our buffer as we have local
1746 : // control over those ring pointers. Worst case, we'd and up writing to
1747 : // arbitrary regions of the remote buffer, since this ring pointer is
1748 : // circular as well.
1749 22083 : _rptr.incrTail( fc.bytes_received );
1750 :
1751 22083 : _wcredits += fc.writes_received;
1752 22083 : LBASSERTINFO( _wcredits <= _depth, _wcredits << " > " << _depth );
1753 :
1754 22083 : _fcs++;
1755 22083 : }
1756 :
1757 22089 : bool RDMAConnection::_postFC()
1758 : {
1759 : RDMAMessage &message =
1760 22089 : *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
1761 :
1762 22089 : message.opcode = FC;
1763 22089 : message.length = (uint8_t)sizeof(struct RDMAFCPayload);
1764 :
1765 22089 : message.payload.fc.bytes_received = _readBytes;
1766 22089 : message.payload.fc.writes_received = _writes;
1767 22089 : _writes -= message.payload.fc.writes_received;
1768 22089 : _readBytes = 0;
1769 :
1770 22089 : LBASSERT( _writes >= 0 );
1771 22089 : return _postMessage( message );
1772 : }
1773 :
1774 4 : void RDMAConnection::_recvSetup( const RDMASetupPayload &setup )
1775 : {
1776 : // Analysis:
1777 : //
1778 : // Malicious values here would only affect the receiver, we're willing
1779 : // to RDMA write to anywhere specified!
1780 4 : _rbase = setup.rbase;
1781 4 : _rptr.clear( setup.rlen );
1782 4 : _rkey = setup.rkey;
1783 :
1784 4 : LBVERB << "RDMA MR: " << std::showbase
1785 0 : << std::dec << setup.rlen << " @ "
1786 4 : << std::hex << setup.rbase << std::dec << std::endl;
1787 4 : }
1788 :
1789 4 : bool RDMAConnection::_postSetup( )
1790 : {
1791 : RDMAMessage &message =
1792 4 : *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
1793 :
1794 4 : message.opcode = SETUP;
1795 4 : message.length = (uint8_t)sizeof(struct RDMASetupPayload);
1796 :
1797 4 : message.payload.setup.rbase = (uint64_t)(uintptr_t)_sinkbuf.getBase( );
1798 4 : message.payload.setup.rlen = (uint64_t)_sinkbuf.getSize( );
1799 4 : message.payload.setup.rkey = _sinkbuf.getMR( )->rkey;
1800 :
1801 4 : return _postMessage( message );
1802 : }
1803 :
1804 4 : bool RDMAConnection::_waitRecvSetup( )
1805 : {
1806 4 : lunchbox::Clock clock;
1807 4 : const int64_t start = clock.getTime64( );
1808 4 : const uint32_t timeout = Global::getTimeout( );
1809 4 : eventset events;
1810 :
1811 : retry:
1812 40 : if( !_checkDisconnected( events ))
1813 : {
1814 0 : LBERROR << "Error while checking event state." << std::endl;
1815 0 : goto err;
1816 : }
1817 :
1818 40 : if( !_established )
1819 : {
1820 0 : LBERROR << "Disconnected while waiting for setup message." << std::endl;
1821 0 : goto err;
1822 : }
1823 :
1824 40 : if( !_checkCQ( true ))
1825 : {
1826 0 : LBERROR << "Error while polling receive completion queue." << std::endl;
1827 0 : goto err;
1828 : }
1829 :
1830 40 : if( 0ULL == _rkey )
1831 : {
1832 36 : if( LB_TIMEOUT_INDEFINITE != timeout )
1833 : {
1834 36 : if(( clock.getTime64( ) - start ) > timeout )
1835 : {
1836 0 : LBERROR << "Timed out waiting for setup message." << std::endl;
1837 0 : goto err;
1838 : }
1839 : }
1840 :
1841 36 : lunchbox::Thread::yield( );
1842 36 : goto retry;
1843 : }
1844 :
1845 4 : return true;
1846 :
1847 : err:
1848 0 : return false;
1849 : }
1850 :
1851 : ////////////////////////////////////////////////////////////////////////////////
1852 :
1853 6 : bool RDMAConnection::_createNotifier( )
1854 : {
1855 6 : LBASSERT( _notifier < 0 );
1856 : #ifdef _WIN32
1857 : _notifier = CreateEvent( 0, TRUE, FALSE, 0 );
1858 : return true;
1859 : #else
1860 6 : _notifier = ::epoll_create( 1 );
1861 6 : if( _notifier < 0 )
1862 : {
1863 0 : LBERROR << "epoll_create1 : " << lunchbox::sysError << std::endl;
1864 0 : goto err;
1865 : }
1866 :
1867 6 : return true;
1868 :
1869 : err:
1870 0 : return false;
1871 : #endif
1872 : }
1873 :
1874 6 : void RDMAConnection::_updateNotifier()
1875 : {
1876 : #ifdef _WIN32
1877 : lunchbox::ScopedFastWrite mutex( _eventLock );
1878 : if ( _availBytes == 0 && !_cm->channel.Head &&
1879 : ( !_cc || !_cc->comp_channel.Head ))
1880 : ResetEvent( _notifier );
1881 : #else
1882 6 : eventset events;
1883 6 : _checkEvents( events );
1884 : #endif
1885 6 : }
1886 :
1887 1287088 : bool RDMAConnection::_checkEvents( eventset &events )
1888 : {
1889 1287088 : events.reset( );
1890 :
1891 : #ifdef _WIN32
1892 : lunchbox::ScopedFastRead mutex( _eventLock );
1893 : if ( _availBytes != 0 )
1894 : events.set( BUF_EVENT );
1895 : if ( _cc && ( _cc->comp_channel.Head != 0 ))
1896 : events.set( CQ_EVENT );
1897 : if ( _cm->channel.Head )
1898 : events.set( CM_EVENT );
1899 :
1900 : return true;
1901 : #else
1902 : struct epoll_event evts[3];
1903 :
1904 1287907 : int nfds = TEMP_FAILURE_RETRY( ::epoll_wait( _notifier, evts, 3, 0 ));
1905 1285170 : if( nfds < 0 )
1906 : {
1907 0 : LBERROR << "epoll_wait : " << lunchbox::sysError << std::endl;
1908 0 : goto err;
1909 : }
1910 :
1911 1732015 : for( int i = 0; i < nfds; i++ )
1912 : {
1913 446845 : const int fd = evts[i].data.fd;
1914 446845 : if(( _pipe_fd[0] >= 0 ) && ( fd == _pipe_fd[0] ))
1915 405648 : events.set( BUF_EVENT );
1916 41197 : else if( _cc && ( fd == _cc->fd ))
1917 41185 : events.set( CQ_EVENT );
1918 12 : else if( _cm && ( fd == _cm->fd ))
1919 12 : events.set( CM_EVENT );
1920 : else
1921 0 : LBUNREACHABLE;
1922 : }
1923 :
1924 1285170 : return true;
1925 :
1926 : err:
1927 0 : return false;
1928 : #endif
1929 : }
1930 :
1931 1203071 : bool RDMAConnection::_checkDisconnected( eventset &events )
1932 : {
1933 1203071 : lunchbox::ScopedWrite poll_mutex( _poll_lock );
1934 :
1935 1203151 : if( !_checkEvents( events ))
1936 : {
1937 0 : LBERROR << "Error while checking event state." << std::endl;
1938 0 : goto err;
1939 : }
1940 :
1941 1200016 : if( events.test( CM_EVENT ))
1942 : {
1943 2 : if( !_doCMEvent( RDMA_CM_EVENT_DISCONNECTED ))
1944 : {
1945 0 : LBERROR << "Unexpected connection manager event." << std::endl;
1946 0 : goto err;
1947 : }
1948 :
1949 2 : LBASSERT( !_established );
1950 : }
1951 :
1952 1198303 : return true;
1953 :
1954 : err:
1955 0 : return false;
1956 : }
1957 :
1958 4 : bool RDMAConnection::_createBytesAvailableFD( )
1959 : {
1960 : #ifndef _WIN32
1961 4 : if ( pipe( _pipe_fd ) == -1 )
1962 : {
1963 0 : LBERROR << "pipe: " << lunchbox::sysError << std::endl;
1964 0 : return false;
1965 : }
1966 :
1967 4 : LBASSERT( _pipe_fd[0] >= 0 && _pipe_fd[1] >= 0);
1968 :
1969 : // Use the event fd to signal Collage of bytes remaining.
1970 : struct epoll_event evctl;
1971 4 : ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
1972 4 : evctl.events = EPOLLIN;
1973 4 : evctl.data.fd = _pipe_fd[0];
1974 4 : if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
1975 : {
1976 0 : LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
1977 0 : return false;
1978 : }
1979 : #endif
1980 4 : return true;
1981 : }
1982 :
1983 813115 : bool RDMAConnection::_incrAvailableBytes( const uint64_t b )
1984 : {
1985 : #ifdef _WIN32
1986 : lunchbox::ScopedFastWrite mutex( _eventLock );
1987 : _availBytes += b;
1988 : ::SetEvent( _notifier );
1989 : #else
1990 813115 : if( ::write( _pipe_fd[1], (const void *)&b, sizeof(b) ) != sizeof(b) )
1991 : {
1992 0 : LBERROR << "write : " << lunchbox::sysError << std::endl;
1993 0 : return false;
1994 : }
1995 : #endif
1996 813115 : return true;
1997 : }
1998 :
1999 405648 : uint64_t RDMAConnection::_getAvailableBytes( )
2000 : {
2001 405648 : uint64_t available_bytes = 0;
2002 : #ifdef _WIN32
2003 : lunchbox::ScopedFastWrite mutex( _eventLock );
2004 : available_bytes = static_cast<uint64_t>( _availBytes );
2005 : _availBytes = 0;
2006 : return available_bytes;
2007 : #else
2008 405648 : uint64_t currBytes = 0;
2009 : ssize_t count;
2010 : pollfd pfd;
2011 405648 : pfd.fd = _pipe_fd[0];
2012 405648 : pfd.events = EPOLLIN;
2013 :
2014 813115 : do
2015 : {
2016 813115 : count = ::read( _pipe_fd[0], (void *)&currBytes, sizeof( currBytes ));
2017 813115 : if ( count > 0 && count < (ssize_t)sizeof( currBytes ) )
2018 0 : goto err;
2019 813115 : available_bytes += currBytes;
2020 813115 : pfd.revents = 0;
2021 813115 : if ( ::poll( &pfd, 1, 0 ) == -1 )
2022 : {
2023 0 : LBERROR << "poll : " << lunchbox::sysError << std::endl;
2024 0 : goto err;
2025 : }
2026 813115 : } while ( pfd.revents > 0 );
2027 :
2028 405648 : if ( count == -1 )
2029 : {
2030 0 : LBERROR << "read : " << lunchbox::sysError << std::endl;
2031 0 : goto err;
2032 : }
2033 :
2034 405648 : LBASSERT( available_bytes > 0ULL );
2035 405648 : return available_bytes;
2036 : err:
2037 0 : return 0ULL;
2038 : #endif
2039 : }
2040 :
2041 10 : bool RDMAConnection::_waitForCMEvent( enum rdma_cm_event_type expected )
2042 : {
2043 10 : lunchbox::Clock clock;
2044 10 : const int64_t start = clock.getTime64( );
2045 10 : const uint32_t timeout = Global::getTimeout( );
2046 10 : bool done = false;
2047 10 : eventset events;
2048 :
2049 : retry:
2050 83954 : if( !_checkEvents( events ))
2051 : {
2052 0 : LBERROR << "Error while checking event state." << std::endl;
2053 0 : goto err;
2054 : }
2055 :
2056 83981 : if( events.test( CM_EVENT ))
2057 : {
2058 10 : done = _doCMEvent( expected );
2059 10 : if( !done )
2060 : {
2061 0 : LBERROR << "Unexpected connection manager event." << std::endl;
2062 0 : goto err;
2063 : }
2064 : }
2065 :
2066 83948 : if( !done )
2067 : {
2068 83938 : if( LB_TIMEOUT_INDEFINITE != timeout )
2069 : {
2070 83938 : if(( clock.getTime64( ) - start ) > timeout )
2071 : {
2072 0 : LBERROR << "Timed out waiting for setup message." << std::endl;
2073 0 : goto err;
2074 : }
2075 : }
2076 :
2077 83919 : lunchbox::Thread::yield( );
2078 83944 : goto retry;
2079 : }
2080 :
2081 10 : return true;
2082 :
2083 : err:
2084 0 : return false;
2085 : }
2086 :
2087 12 : bool RDMAConnection::_doCMEvent( enum rdma_cm_event_type expected )
2088 : {
2089 12 : bool ok = false;
2090 : struct rdma_cm_event *event;
2091 :
2092 12 : if( ::rdma_get_cm_event( _cm, &event ))
2093 : {
2094 0 : LBERROR << "rdma_get_cm_event : " << lunchbox::sysError << std::endl;
2095 0 : goto out;
2096 : }
2097 :
2098 12 : ok = ( event->event == expected );
2099 :
2100 : #ifndef NDEBUG
2101 12 : if( ok )
2102 : {
2103 12 : LBVERB << (void *)this
2104 0 : << " (" << _addr << ":" << _serv << ")"
2105 0 : << " event : " << ::rdma_event_str( event->event )
2106 12 : << std::endl;
2107 : }
2108 : else
2109 : {
2110 0 : LBINFO << (void *)this
2111 0 : << " (" << _addr << ":" << _serv << ")"
2112 0 : << " event : " << ::rdma_event_str( event->event )
2113 0 : << " expected: " << ::rdma_event_str( expected )
2114 0 : << std::endl;
2115 : }
2116 : #endif
2117 :
2118 12 : if( ok && ( RDMA_CM_EVENT_DISCONNECTED == event->event ))
2119 2 : _established = false;
2120 :
2121 12 : if( ok && ( RDMA_CM_EVENT_ESTABLISHED == event->event ))
2122 : {
2123 4 : _established = true;
2124 :
2125 4 : struct rdma_conn_param *cp = &event->param.conn;
2126 :
2127 4 : ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
2128 : // Note that the actual amount of data transferred to the remote side
2129 : // is transport dependent and may be larger than that requested.
2130 4 : if( cp->private_data_len >= sizeof(RDMAConnParamData) )
2131 : _cpd = *reinterpret_cast< const RDMAConnParamData * >(
2132 2 : cp->private_data );
2133 : }
2134 :
2135 12 : if( ok && ( RDMA_CM_EVENT_CONNECT_REQUEST == event->event ))
2136 : {
2137 2 : _new_cm_id = event->id;
2138 :
2139 2 : struct rdma_conn_param *cp = &event->param.conn;
2140 :
2141 2 : ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
2142 : // TODO : Not sure what happens when initiator sent ai_connect data
2143 : // (assuming the underlying transport doesn't strip it)?
2144 2 : if( cp->private_data_len >= sizeof(RDMAConnParamData) )
2145 : _cpd = *reinterpret_cast< const RDMAConnParamData * >(
2146 2 : cp->private_data );
2147 : }
2148 :
2149 12 : if( RDMA_CM_EVENT_REJECTED == event->event )
2150 0 : LBINFO << "Connection reject status : " << event->status << std::endl;
2151 :
2152 12 : if( ::rdma_ack_cm_event( event ))
2153 0 : LBWARN << "rdma_ack_cm_event : " << lunchbox::sysError << std::endl;
2154 :
2155 : out:
2156 12 : return ok;
2157 : }
2158 :
2159 41185 : bool RDMAConnection::_rearmCQ( )
2160 : {
2161 : struct ibv_cq *ev_cq;
2162 : void *ev_ctx;
2163 :
2164 : #ifdef _WIN32
2165 : lunchbox::ScopedFastWrite mutex( _eventLock );
2166 : #endif
2167 41185 : if( ::ibv_get_cq_event( _cc, &ev_cq, &ev_ctx ))
2168 : {
2169 0 : LBERROR << "ibv_get_cq_event : " << lunchbox::sysError << std::endl;
2170 0 : return false;
2171 : }
2172 :
2173 : // http://lists.openfabrics.org/pipermail/general/2008-November/055237.html
2174 41185 : _completions++;
2175 41185 : if( std::numeric_limits< unsigned int >::max( ) == _completions )
2176 : {
2177 0 : ::ibv_ack_cq_events( _cq, _completions );
2178 0 : _completions = 0U;
2179 : }
2180 :
2181 : // Solicited only!
2182 41185 : if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
2183 : {
2184 0 : LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
2185 0 : return false;
2186 : }
2187 :
2188 41185 : return true;
2189 : }
2190 :
2191 1201460 : bool RDMAConnection::_checkCQ( bool drain )
2192 : {
2193 : uint32_t num_recvs;
2194 : int count;
2195 :
2196 1201460 : lunchbox::ScopedWrite poll_mutex( _poll_lock );
2197 :
2198 1202307 : if( NULL == _cq )
2199 5511 : goto out;
2200 :
2201 : repoll:
2202 : /* CHECK RECEIVE COMPLETIONS */
2203 1332679 : count = ::ibv_poll_cq( _cq, _depth, _wcs );
2204 1338226 : if( count < 0 )
2205 : {
2206 0 : LBERROR << "ibv_poll_cq : " << lunchbox::sysError << std::endl;
2207 0 : goto err;
2208 : }
2209 :
2210 1338226 : num_recvs = 0UL;
2211 2185098 : for( int i = 0; i != count ; i++ )
2212 : {
2213 851621 : struct ibv_wc &wc = _wcs[i];
2214 :
2215 851621 : if( IBV_WC_SUCCESS != wc.status )
2216 : {
2217 : // Non-fatal.
2218 0 : if( IBV_WC_WR_FLUSH_ERR == wc.status )
2219 0 : continue;
2220 :
2221 0 : LBWARN << (void *)this << " !IBV_WC_SUCCESS : " << std::showbase
2222 0 : << std::hex << "wr_id = " << wc.wr_id
2223 0 : << ", status = \"" << ::ibv_wc_status_str( wc.status ) << "\""
2224 0 : << std::dec << " (" << (unsigned int)wc.status << ")"
2225 0 : << std::hex << ", vendor_err = " << wc.vendor_err
2226 0 : << std::dec << std::endl;
2227 :
2228 : // All others are fatal.
2229 0 : goto err;
2230 : }
2231 :
2232 851621 : LBASSERT( IBV_WC_SUCCESS == wc.status );
2233 :
2234 : #ifdef _WIN32 //----------------------------------------------------------------
2235 : // WINDOWS IBV API WORKAROUND.
2236 : // TODO: remove this as soon as IBV API is fixed
2237 : if ( wc.opcode == IBV_WC_RECV && wc.wc_flags == IBV_WC_WITH_IMM )
2238 : wc.opcode = IBV_WC_RECV_RDMA_WITH_IMM;
2239 : #endif //-----------------------------------------------------------------------
2240 :
2241 860225 : if( IBV_WC_RECV_RDMA_WITH_IMM == wc.opcode )
2242 408041 : _recvRDMAWrite( wc.imm_data );
2243 452184 : else if( IBV_WC_RECV == wc.opcode )
2244 22087 : _recvMessage( *reinterpret_cast< RDMAMessage * >( wc.wr_id ));
2245 430097 : else if( IBV_WC_SEND == wc.opcode )
2246 22093 : _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
2247 408004 : else if( IBV_WC_RDMA_WRITE == wc.opcode )
2248 408004 : _sourceptr.incrTail( (uint32_t)wc.wr_id );
2249 : else
2250 0 : LBUNREACHABLE;
2251 :
2252 846896 : if( IBV_WC_RECV & wc.opcode )
2253 : {
2254 430112 : _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
2255 : // All receive completions need to be reposted.
2256 430088 : num_recvs++;
2257 : }
2258 : }
2259 :
2260 1333477 : if(( num_recvs > 0UL ) && !_postReceives( num_recvs ))
2261 0 : goto err;
2262 :
2263 1333486 : if( drain && ( count > 0 ))
2264 135883 : goto repoll;
2265 :
2266 : out:
2267 1203114 : return true;
2268 :
2269 : err:
2270 0 : return false;
2271 : }
2272 :
2273 : /* inline */
2274 405650 : uint32_t RDMAConnection::_drain( void *buffer, const uint32_t bytes )
2275 : {
2276 405650 : const uint32_t b = std::min( bytes, _sinkptr.available( ));
2277 811300 : ::memcpy( buffer, (const void *)((uintptr_t)_sinkbuf.getBase( ) +
2278 1216950 : _sinkptr.tail( )), b );
2279 405650 : _sinkptr.incrTail( b );
2280 405650 : return b;
2281 : }
2282 :
2283 : /* inline */
2284 793520 : uint32_t RDMAConnection::_fill( const void *buffer, const uint32_t bytes )
2285 : {
2286 793520 : uint32_t b = std::min( bytes, std::min( _sourceptr.negAvailable( ),
2287 1587040 : _rptr.negAvailable( )));
2288 : #ifndef WRAP
2289 793520 : b = std::min( b, (uint32_t)(_sourcebuf.getSize() -
2290 793520 : _sourceptr.ptr( _sourceptr.HEAD )));
2291 : #endif
2292 1587040 : ::memcpy( (void *)((uintptr_t)_sourcebuf.getBase( ) +
2293 2380560 : _sourceptr.ptr( _sourceptr.HEAD )), buffer, b );
2294 793520 : _sourceptr.incrHead( b );
2295 793520 : return b;
2296 : }
2297 :
2298 0 : Connection::Notifier RDMAConnection::getNotifier() const
2299 : {
2300 0 : return _notifier;
2301 : }
2302 :
2303 : #ifdef _WIN32
2304 : void RDMAConnection::_triggerNotifierCQ( RDMAConnection* conn )
2305 : {
2306 : conn->_triggerNotifierWorker( CQ_EVENT );
2307 : }
2308 :
2309 : void RDMAConnection::_triggerNotifierCM( RDMAConnection* conn )
2310 : {
2311 : conn->_triggerNotifierWorker( CM_EVENT );
2312 : }
2313 :
2314 : void RDMAConnection::_triggerNotifierWorker( Events event )
2315 : {
2316 : lunchbox::ScopedFastWrite mutex( _eventLock );
2317 : COMP_CHANNEL* chan = event == CQ_EVENT ? &_cc->comp_channel : &_cm->channel;
2318 : EnterCriticalSection( &chan->Lock );
2319 : ResetEvent( chan->Event );
2320 : if ( chan->Head )
2321 : SetEvent( _notifier );
2322 : LeaveCriticalSection( &chan->Lock );
2323 : }
2324 : #endif
2325 :
2326 : //////////////////////////////////////////////////////////////////////////////
2327 :
2328 0 : void RDMAConnection::_showStats( )
2329 : {
2330 0 : LBVERB << std::dec
2331 0 : << "reads = " << _stats.reads
2332 0 : << ", buffer_empty = " << _stats.buffer_empty
2333 0 : << ", no_credits_fc = " << _stats.no_credits_fc
2334 0 : << ", writes = " << _stats.writes
2335 0 : << ", buffer_full = " << _stats.buffer_full
2336 0 : << ", no_credits_rdma = " << _stats.no_credits_rdma
2337 0 : << std::endl;
2338 0 : }
2339 :
2340 : //////////////////////////////////////////////////////////////////////////////
2341 :
2342 6 : BufferPool::BufferPool( size_t buffer_size )
2343 : : _buffer_size( buffer_size )
2344 : , _num_bufs( 0 )
2345 : , _buffer( NULL )
2346 : , _mr( NULL )
2347 6 : , _ring( 0 )
2348 : {
2349 6 : }
2350 :
2351 12 : BufferPool::~BufferPool( )
2352 : {
2353 6 : clear( );
2354 6 : }
2355 :
2356 20 : void BufferPool::clear( )
2357 : {
2358 20 : _num_bufs = 0;
2359 20 : _ring.clear( _num_bufs );
2360 :
2361 20 : if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
2362 0 : LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
2363 20 : _mr = NULL;
2364 :
2365 20 : if( NULL != _buffer )
2366 : #ifdef _WIN32
2367 : _aligned_free( _buffer );
2368 : #else
2369 4 : ::free( _buffer );
2370 : #endif
2371 20 : _buffer = NULL;
2372 20 : }
2373 :
2374 4 : bool BufferPool::resize( ibv_pd *pd, uint32_t num_bufs )
2375 : {
2376 4 : clear( );
2377 :
2378 4 : if( num_bufs )
2379 : {
2380 4 : _num_bufs = num_bufs;
2381 4 : _ring.clear( _num_bufs );
2382 :
2383 : #ifdef _WIN32
2384 : _buffer = _aligned_malloc((size_t)( _num_bufs * _buffer_size ),
2385 : boost::interprocess::mapped_region::get_page_size());
2386 : if ( !_buffer )
2387 : {
2388 : LBERROR << "_aligned_malloc : Couldn't allocate aligned memory. "
2389 : << lunchbox::sysError << std::endl;
2390 : goto err;
2391 : }
2392 : #else
2393 8 : if( ::posix_memalign( &_buffer, (size_t)::getpagesize( ),
2394 8 : (size_t)( _num_bufs * _buffer_size )))
2395 : {
2396 0 : LBERROR << "posix_memalign : " << lunchbox::sysError << std::endl;
2397 0 : goto err;
2398 : }
2399 : #endif
2400 4 : ::memset( _buffer, 0xff, (size_t)( _num_bufs * _buffer_size ));
2401 : _mr = ::ibv_reg_mr( pd, _buffer, (size_t)( _num_bufs * _buffer_size ),
2402 4 : IBV_ACCESS_LOCAL_WRITE );
2403 4 : if( NULL == _mr )
2404 : {
2405 0 : LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
2406 0 : goto err;
2407 : }
2408 :
2409 4100 : for( uint32_t i = 0; i != _num_bufs; i++ )
2410 4096 : _ring.put( i );
2411 : }
2412 :
2413 4 : return true;
2414 :
2415 : err:
2416 0 : return false;
2417 : }
2418 :
2419 : //////////////////////////////////////////////////////////////////////////////
2420 :
2421 12 : RingBuffer::RingBuffer( int access )
2422 : : _access( access )
2423 : , _size( 0 )
2424 : #ifdef _WIN32
2425 : , _mapping( 0 )
2426 : , _map( 0 )
2427 : #else
2428 : , _map( MAP_FAILED )
2429 : #endif
2430 12 : , _mr( 0 )
2431 : {
2432 12 : }
2433 :
2434 12 : RingBuffer::~RingBuffer( )
2435 : {
2436 12 : clear( );
2437 12 : }
2438 :
2439 40 : void RingBuffer::clear( )
2440 : {
2441 40 : if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
2442 0 : LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
2443 40 : _mr = NULL;
2444 :
2445 : #ifdef _WIN32
2446 : if ( _map )
2447 : {
2448 : UnmapViewOfFile( static_cast<char*>( _map ));
2449 : UnmapViewOfFile( static_cast<char*>( _map ) + _size );
2450 : }
2451 : if ( _mapping )
2452 : CloseHandle( _mapping );
2453 :
2454 : _map = 0;
2455 : _mapping = 0;
2456 : #else
2457 40 : if(( MAP_FAILED != _map ) && ::munmap( _map, _size << 1 ))
2458 0 : LBWARN << "munmap : " << lunchbox::sysError << std::endl;
2459 40 : _map = MAP_FAILED;
2460 : #endif
2461 40 : _size = 0;
2462 40 : }
2463 :
2464 8 : bool RingBuffer::resize( ibv_pd *pd, size_t size )
2465 : {
2466 8 : bool ok = false;
2467 8 : int fd = -1;
2468 :
2469 8 : clear( );
2470 :
2471 8 : if( size )
2472 : {
2473 : #ifdef _WIN32
2474 : uint32_t num_retries = RINGBUFFER_ALLOC_RETRIES;
2475 : while (!_map && num_retries-- != 0)
2476 : {
2477 : void *target_addr = determineViableAddr( size * 2 );
2478 : if (target_addr)
2479 : allocAt( size, target_addr );
2480 : }
2481 :
2482 : if ( !_map )
2483 : {
2484 : LBERROR << "Couldn't allocate desired RingBuffer memory after "
2485 : << RINGBUFFER_ALLOC_RETRIES << " retries." << std::endl;
2486 : }
2487 : else
2488 : {
2489 : LBVERB << "Allocated RDMA Ringbuffer memory in "
2490 : << ( RINGBUFFER_ALLOC_RETRIES - num_retries ) << " tries."
2491 : << std::endl;
2492 : ok = true;
2493 : }
2494 :
2495 : #else
2496 : void *addr1, *addr2;
2497 8 : char path[] = "/dev/shm/co-rdma-buffer-XXXXXX";
2498 :
2499 8 : _size = size;
2500 :
2501 8 : fd = ::mkstemp( path );
2502 8 : if( fd < 0 )
2503 : {
2504 0 : LBERROR << "mkstemp : " << lunchbox::sysError << std::endl;
2505 0 : goto out;
2506 : }
2507 :
2508 8 : if( ::unlink( path ))
2509 : {
2510 0 : LBERROR << "unlink : " << lunchbox::sysError << std::endl;
2511 0 : goto out;
2512 : }
2513 :
2514 8 : if( ::ftruncate( fd, _size ))
2515 : {
2516 0 : LBERROR << "ftruncate : " << lunchbox::sysError << std::endl;
2517 0 : goto out;
2518 : }
2519 :
2520 : _map = ::mmap( NULL, _size << 1,
2521 : PROT_NONE,
2522 8 : MAP_ANONYMOUS | MAP_PRIVATE, -1, 0 );
2523 8 : if( MAP_FAILED == _map )
2524 : {
2525 0 : LBERROR << "mmap : " << lunchbox::sysError << std::endl;
2526 0 : goto out;
2527 : }
2528 :
2529 : addr1 = ::mmap( _map, _size,
2530 : PROT_READ | PROT_WRITE,
2531 8 : MAP_FIXED | MAP_SHARED, fd, 0 );
2532 8 : if( MAP_FAILED == addr1 )
2533 : {
2534 0 : LBERROR << "mmap : " << lunchbox::sysError << std::endl;
2535 0 : goto out;
2536 : }
2537 :
2538 8 : addr2 = ::mmap( (void *)( (uintptr_t)_map + _size ), _size,
2539 : PROT_READ | PROT_WRITE,
2540 16 : MAP_FIXED | MAP_SHARED, fd, 0 );
2541 8 : if( MAP_FAILED == addr2 )
2542 : {
2543 0 : LBERROR << "mmap : " << lunchbox::sysError << std::endl;
2544 0 : goto out;
2545 : }
2546 :
2547 8 : LBASSERT( addr1 == _map );
2548 8 : LBASSERT( addr2 == (void *)( (uintptr_t)_map + _size ));
2549 :
2550 : #endif
2551 : #ifdef WRAP
2552 : _mr = ::ibv_reg_mr( pd, _map, _size << 1, _access );
2553 : #else
2554 8 : _mr = ::ibv_reg_mr( pd, _map, _size, _access );
2555 : #endif
2556 :
2557 8 : if( NULL == _mr )
2558 : {
2559 0 : LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
2560 0 : goto out;
2561 : }
2562 :
2563 8 : ::memset( _map, 0, _size );
2564 8 : *reinterpret_cast< uint8_t * >( _map ) = 0x45;
2565 8 : LBASSERT( 0x45 ==
2566 : *reinterpret_cast< uint8_t * >( (uintptr_t)_map + _size ));
2567 : }
2568 :
2569 8 : ok = true;
2570 :
2571 : out:
2572 : #ifndef _WIN32
2573 8 : if(( fd >= 0 ) && TEMP_FAILURE_RETRY( ::close( fd )))
2574 0 : LBWARN << "close : " << lunchbox::sysError << std::endl;
2575 : #endif
2576 8 : return ok;
2577 : }
2578 :
2579 : #ifdef _WIN32
2580 : void* RingBuffer::determineViableAddr( size_t size )
2581 : {
2582 : void *ptr = VirtualAlloc(0, size, MEM_RESERVE, PAGE_NOACCESS);
2583 : if (!ptr)
2584 : return 0;
2585 :
2586 : VirtualFree(ptr, 0, MEM_RELEASE);
2587 : return ptr;
2588 : }
2589 :
2590 : void RingBuffer::allocAt( size_t size, void* desiredAddr )
2591 : {
2592 : // if we already hold one allocation, refuse to make another.
2593 : LBASSERT( !_map );
2594 : LBASSERT( !_mapping );
2595 : if ( _map || _mapping )
2596 : return;
2597 :
2598 : // is ring_size a multiple of 64k? if not, this won't ever work!
2599 : if (( size & 0xffff ) != 0 )
2600 : return;
2601 :
2602 : // try to allocate and map our space
2603 : size_t alloc_size = size * 2;
2604 : _mapping = CreateFileMappingA( INVALID_HANDLE_VALUE,
2605 : 0,
2606 : PAGE_READWRITE,
2607 : (unsigned long long)alloc_size >> 32,
2608 : alloc_size & 0xffffffffu,
2609 : 0 );
2610 :
2611 : if ( !_mapping )
2612 : {
2613 : LBERROR << "CreateFileMappingA failed" << std::endl;
2614 : goto err;
2615 : }
2616 :
2617 : _map = MapViewOfFileEx( _mapping,
2618 : FILE_MAP_ALL_ACCESS,
2619 : 0, 0,
2620 : size,
2621 : desiredAddr );
2622 :
2623 : if ( !_map )
2624 : {
2625 : LBERROR << "First MapViewOfFileEx failed" << std::endl;
2626 : goto err;
2627 : }
2628 :
2629 : if (!MapViewOfFileEx( _mapping,
2630 : FILE_MAP_ALL_ACCESS,
2631 : 0, 0,
2632 : size,
2633 : (char *)desiredAddr + size))
2634 : {
2635 : LBERROR << "Second MapViewOfFileEx failed" << std::endl;
2636 : goto err;
2637 : }
2638 :
2639 : _size = size;
2640 : return;
2641 : err:
2642 : // something went wrong - clean up
2643 : clear();
2644 : }
2645 : #endif
2646 :
2647 :
2648 63 : } // namespace co
|