LCOV - code coverage report
Current view: top level - co - rdmaConnection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 752 1093 68.8 %
Date: 2016-12-14 01:26:48 Functions: 64 67 95.5 %

          Line data    Source code
       1             : // -*- mode: c++ -*-
       2             : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
       3             :  *                     United States Naval Research Laboratory
       4             :  *               2012-2013, Stefan Eilemann <eile@eyescale.ch>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : #include "rdmaConnection.h"
      22             : 
      23             : #include "connectionType.h"
      24             : #include "connectionDescription.h"
      25             : #include "global.h"
      26             : 
      27             : #include <lunchbox/clock.h>
      28             : 
      29             : #ifdef _WIN32
      30             : #  pragma warning( disable : 4018 )
      31             : #  include <boost/interprocess/mapped_region.hpp>
      32             : #  pragma warning( default : 4018 )
      33             : #else
      34             : #  include <arpa/inet.h>
      35             : #  include <sys/epoll.h>
      36             : #  include <sys/mman.h>
      37             : #  include <poll.h>
      38             : #  include <unistd.h>
      39             : #endif
      40             : 
      41             : #include <errno.h>
      42             : #include <fcntl.h>
      43             : #include <limits>
      44             : #include <sstream>
      45             : #include <stddef.h>
      46             : 
      47             : #include <rdma/rdma_verbs.h>
      48             : 
      49             : #define IPV6_DEFAULT 0
      50             : 
      51             : #define RDMA_PROTOCOL_MAGIC     0xC0
      52             : #define RDMA_PROTOCOL_VERSION   0x03
      53             : 
      54             : namespace co
      55             : {
      56             : //namespace { static const uint64_t ONE = 1ULL; }
      57             : 
      58             : /**
      59             :  * Message types
      60             :  */
      61             : enum OpCode
      62             : {
      63             :     SETUP = 1 << 0,
      64             :     FC    = 1 << 1,
      65             : };
      66             : 
      67             : /**
      68             :  * Initial setup message used to exchange sink MR parameters.
      69             :  */
      70             : struct RDMASetupPayload
      71             : {
      72             :     uint64_t rbase;
      73             :     uint64_t rlen;
      74             :     uint64_t rkey;
      75             : };
      76             : 
      77             : /**
      78             :  * "ACK" messages sent after read, tells source about receive progress.
      79             :  */
      80             : struct RDMAFCPayload
      81             : {
      82             :     uint32_t bytes_received;
      83             :     uint32_t writes_received;
      84             : };
      85             : 
      86             : /**
      87             :  * Payload wrapper
      88             :  */
      89             : struct RDMAMessage
      90             : {
      91             :     enum OpCode opcode;
      92             :     uint8_t length;
      93             :     union
      94             :     {
      95             :         struct RDMASetupPayload setup;
      96             :         struct RDMAFCPayload fc;
      97             :     } payload;
      98             : };
      99             : 
     100             : /**
     101             :  * "IMM" data sent with RDMA write, tells sink about send progress.
     102             :  */
     103             : struct RDMAFCImm
     104             : {
     105             :     uint32_t bytes_sent:28;
     106             :     uint32_t fcs_received:4;
     107             : };
     108             : 
     109             : namespace {
     110             : // We send a max of 28 bits worth of byte counts per RDMA write.
     111             : static const uint64_t MAX_BS = (( 2 << ( 28 - 1 )) - 1 );
     112             : // We send a max of four bits worth of fc counts per RDMA write.
     113             : static const uint16_t MAX_FC = (( 2 << ( 4 - 1 )) - 1 );
     114             : 
     115             : #ifdef _WIN32
     116             : static const uint32_t RINGBUFFER_ALLOC_RETRIES = 8;
     117             : static const uint32_t WINDOWS_CONNECTION_BACKLOG = 1024;
     118             : #endif
     119             : static const uint32_t FC_MESSAGE_FREQUENCY = 12;
     120             : 
     121           4 : bool _isInetFamily( const rdma_addrinfo* addrinfo)
     122             : {
     123             : #ifndef _WIN32
     124           4 :     return addrinfo->ai_family == AF_INET || addrinfo->ai_family == AF_INET6;
     125             : #else
     126             :     return addrinfo->ai_family == AF_INET;
     127             : #endif
     128             : }
     129             : 
     130             : }
     131             : 
     132             : /**
     133             :  * An RDMA connection implementation.
     134             :  *
     135             :  * The protocol is simple, e.g.:
     136             :  *
     137             :  *      initiator                        target
     138             :  * -----------------------------------------------------
     139             :  *                                  resolve/bind/listen
     140             :  * resolve/prepost/connect
     141             :  *                                    prepost/accept
     142             :  *     send setup         <------->    send setup
     143             :  *   wait for setup                   wait for setup
     144             :  * RDMA_WRITE_WITH_IMM WR  -------> RDMA_WRITE(DATA) WC
     145             :  *     RECV(FC) WC        <-------      SEND WR
     146             :  *                            .
     147             :  *                            .
     148             :  *                            .
     149             :  *
     150             :  * The setup phase exchanges the MR parameters of a fixed size circular buffer
     151             :  * to which remote writes are sent.  Sender tracks available space on the
     152             :  * receiver by accepting "Flow Control" messages that update the tail pointer
     153             :  * of the local "view" of the remote sink MR.
     154             :  *
     155             :  * Once setup is complete, either side may begin operations on the other's MR
     156             :  * (the initiator doesn't have to send first, as in the above example).
     157             :  *
     158             :  * If either credits or buffer space are exhausted, sender will spin waiting
     159             :  * for flow control messages.  Receiver will also not send flow control if
     160             :  * there are no credits available.
     161             :  *
     162             :  * One catch is that Collage will only monitor a single "notifier" for events
     163             :  * and we have three that need to be monitored: one for connection status
     164             :  * events (the RDMA event channel) - RDMA_CM_EVENT_DISCONNECTED in particular,
     165             :  * one for the receive completion queue (upon incoming RDMA write), and an
     166             :  * additional eventfd(2) used to keep the notifier "hot" after partial reads.
     167             :  * We leverage the feature of epoll(7) in that "If an epoll file descriptor
     168             :  * has events waiting then it will indicate as being readable".
     169             :  *
     170             :  * Quite interesting is the effect of RDMA_RING_BUFFER_SIZE_MB and
     171             :  * RDMA_SEND_QUEUE_DEPTH depending on the communication pattern.  Basically,
     172             :  * bigger doesn't necessarily equate to faster!  The defaults are suited for
     173             :  * low latency conditions and would need tuning otherwise.
     174             :  *
     175             :  * ib_write_bw
     176             :  * -----------
     177             :  *  #bytes    num iterations  BW peak[MB/sec]    BW average[MB/sec]
     178             :  * 1048576    10000           3248.10            3247.94
     179             :  *
     180             :  * netperf
     181             :  * -------
     182             :  * Send perf: 3240.72MB/s (3240.72pps)
     183             :  * Send perf: 3240.72MB/s (3240.72pps)
     184             :  * Send perf: 3240.95MB/s (3240.95pps)
     185             :  */
     186           6 : RDMAConnection::RDMAConnection( )
     187             : #ifdef _WIN32
     188             :     : _notifier()
     189             : #else
     190             :     : _notifier( -1 )
     191             : #endif
     192           6 :     , _timeout( Global::getIAttribute( Global::IATTR_RDMA_RESOLVE_TIMEOUT_MS ))
     193             :     , _rai( NULL )
     194             :     , _addrinfo( NULL )
     195             :     , _cm( NULL )
     196             :     , _cm_id( NULL )
     197             :     , _new_cm_id( NULL )
     198             :     , _cc( NULL )
     199             :     , _cq( NULL )
     200             :     , _pd( NULL )
     201             :     , _wcs ( 0 )
     202             :     , _readBytes( 0 )
     203             :     , _established( false )
     204             :     , _depth( 0L )
     205             :     , _writes( 0L )
     206             :     , _fcs( 0L )
     207             :     , _wcredits( 0L )
     208             :     , _fcredits( 0L )
     209             :     , _completions( 0U )
     210             :     , _msgbuf( sizeof(RDMAMessage) )
     211             :     , _sourcebuf( 0 )
     212             :     , _sourceptr( 0 )
     213             :     , _sinkbuf( IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE )
     214             :     , _sinkptr( 0 )
     215             :     , _rptr( 0UL )
     216             :     , _rbase( 0ULL )
     217          12 :     , _rkey( 0ULL )
     218             : #ifdef _WIN32
     219             :     , _availBytes( 0 )
     220             :     , _eventFlag( 0 )
     221             :     , _cmWaitObj( 0 )
     222             :     , _ccWaitObj( 0 )
     223             : #endif
     224             : {
     225             : #ifndef _WIN32
     226           6 :     _pipe_fd[0] = -1;
     227           6 :     _pipe_fd[1] = -1;
     228             : #endif
     229             : 
     230           6 :     ::memset( (void *)&_addr, 0, sizeof(_addr) );
     231           6 :     ::memset( (void *)&_serv, 0, sizeof(_serv) );
     232             : 
     233           6 :     ::memset( (void *)&_cpd, 0, sizeof(struct RDMAConnParamData) );
     234             : 
     235          12 :     ConnectionDescriptionPtr description = _getDescription();
     236           6 :     description->type = CONNECTIONTYPE_RDMA;
     237          12 :     description->bandwidth = // QDR default, report "actual" 8b/10b bandwidth
     238           6 :         ( ::ibv_rate_to_mult( IBV_RATE_40_GBPS ) * 2.5 * 1024000 / 8 ) * 0.8;
     239           6 : }
     240             : 
     241           2 : bool RDMAConnection::connect( )
     242             : {
     243           4 :     ConstConnectionDescriptionPtr description = getDescription();
     244           2 :     LBASSERT( CONNECTIONTYPE_RDMA == description->type );
     245             : 
     246           2 :     if( !isClosed() || 0u == description->port )
     247           0 :         return false;
     248             : 
     249           2 :     _cleanup( );
     250           2 :     _setState( STATE_CONNECTING );
     251             : 
     252           2 :     if( !_lookupAddress( false ) || ( NULL == _addrinfo ))
     253             :     {
     254           0 :         LBERROR << "Failed to lookup destination address." << std::endl;
     255           0 :         goto err;
     256             :     }
     257             : 
     258           2 :     _updateInfo( _addrinfo->ai_dst_addr );
     259             : 
     260           2 :     if( !_createNotifier( ))
     261             :     {
     262           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     263           0 :         goto err;
     264             :     }
     265             : 
     266           2 :     if( !_createEventChannel( ))
     267             :     {
     268           0 :         LBERROR << "Failed to create communication event channel." << std::endl;
     269           0 :         goto err;
     270             :     }
     271             : 
     272           2 :     if( !_createId( ))
     273             :     {
     274           0 :         LBERROR << "Failed to create communication identifier." << std::endl;
     275           0 :         goto err;
     276             :     }
     277             : 
     278           2 :     if( !_resolveAddress( ))
     279             :     {
     280           0 :         LBERROR << "Failed to resolve destination address for : "
     281           0 :             << _addr << ":" << _serv << std::endl;
     282           0 :         goto err;
     283             :     }
     284             : 
     285           2 :     _updateInfo( ::rdma_get_peer_addr( _cm_id ));
     286             : 
     287           2 :     _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
     288             : 
     289           2 :     LBVERB << "Initiating connection on " << std::dec
     290           0 :         << _device_name << ":" << (int)_cm_id->port_num
     291             :         << " to "
     292             :         << _addr << ":" << _serv
     293           2 :         << " (" << description->toString( ) << ")"
     294           6 :         << std::endl;
     295             : 
     296           2 :     if( !_initProtocol( Global::getIAttribute(
     297             :             Global::IATTR_RDMA_SEND_QUEUE_DEPTH )))
     298             :     {
     299           0 :         LBERROR << "Failed to initialize protocol variables." << std::endl;
     300           0 :         goto err;
     301             :     }
     302             : 
     303           2 :     if( !_initVerbs( ))
     304             :     {
     305           0 :         LBERROR << "Failed to initialize verbs." << std::endl;
     306           0 :         goto err;
     307             :     }
     308             : 
     309           2 :     if( !_createQP( ))
     310             :     {
     311           0 :         LBERROR << "Failed to create queue pair." << std::endl;
     312           0 :         goto err;
     313             :     }
     314             : 
     315           2 :     if( !_createBytesAvailableFD( ))
     316             :     {
     317           0 :         LBERROR << "Failed to create available byte notifier." << std::endl;
     318           0 :         goto err;
     319             :     }
     320             : 
     321           2 :     if( !_initBuffers( ))
     322             :     {
     323           0 :         LBERROR << "Failed to initialize ring buffers." << std::endl;
     324           0 :         goto err;
     325             :     }
     326             : 
     327           2 :     if( !_resolveRoute( ))
     328             :     {
     329           0 :         LBERROR << "Failed to resolve route to destination : "
     330           0 :             << _addr << ":" << _serv << std::endl;
     331           0 :         goto err;
     332             :     }
     333             : 
     334           2 :     if( !_postReceives( _depth ))
     335             :     {
     336           0 :         LBERROR << "Failed to pre-post receives." << std::endl;
     337           0 :         goto err;
     338             :     }
     339             : 
     340           2 :     if( !_connect( ))
     341             :     {
     342           0 :         LBERROR << "Failed to connect to destination : "
     343           0 :             << _addr << ":" << _serv << std::endl;
     344           0 :         goto err;
     345             :     }
     346             : 
     347           2 :     LBASSERT( _established );
     348             : 
     349           4 :     if(( RDMA_PROTOCOL_MAGIC != _cpd.magic ) ||
     350           2 :         ( RDMA_PROTOCOL_VERSION != _cpd.version ))
     351             :     {
     352           0 :         LBERROR << "Protocol mismatch with target : "
     353           0 :             << _addr << ":" << _serv << std::endl;
     354           0 :         goto err;
     355             :     }
     356             : 
     357           2 :     if( !_postSetup( ))
     358             :     {
     359           0 :         LBERROR << "Failed to post setup message." << std::endl;
     360           0 :         goto err;
     361             :     }
     362             : 
     363           2 :     if( !_waitRecvSetup( ))
     364             :     {
     365           0 :         LBERROR << "Failed to receive setup message." << std::endl;
     366           0 :         goto err;
     367             :     }
     368             : 
     369           2 :     LBVERB << "Connection established on " << std::dec
     370           0 :         << _device_name << ":" << (int)_cm_id->port_num
     371             :         << " to "
     372             :         << _addr << ":" << _serv
     373           2 :         << " (" << description->toString( ) << ")"
     374           6 :         << std::endl;
     375             : 
     376           2 :     _setState( STATE_CONNECTED );
     377           2 :     _updateNotifier();
     378           2 :     return true;
     379             : 
     380             : err:
     381           0 :     LBVERB << "Connection failed on " << std::dec
     382           0 :         << _device_name << ":" << (int)_cm_id->port_num
     383             :         << " to "
     384             :         << _addr << ":" << _serv
     385           0 :         << " (" << description->toString( ) << ")"
     386           0 :         << std::endl;
     387             : 
     388           0 :     close( );
     389             : 
     390           0 :     return false;
     391             : }
     392             : 
     393           2 : bool RDMAConnection::listen( )
     394             : {
     395           4 :     ConstConnectionDescriptionPtr description = getDescription();
     396           2 :     LBASSERT( CONNECTIONTYPE_RDMA == description->type );
     397             : 
     398           2 :     if( !isClosed() )
     399           0 :         return false;
     400             : 
     401           2 :     _cleanup( );
     402           2 :     _setState( STATE_CONNECTING );
     403             : 
     404           2 :     if( !_lookupAddress( true ))
     405             :     {
     406           0 :         LBERROR << "Failed to lookup local address." << std::endl;
     407           0 :         goto err;
     408             :     }
     409             : 
     410           2 :     if( NULL != _addrinfo )
     411           2 :         _updateInfo( _addrinfo->ai_src_addr );
     412             : 
     413           2 :     if( !_createNotifier( ))
     414             :     {
     415           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     416           0 :         goto err;
     417             :     }
     418             : 
     419           2 :     if( !_createEventChannel( ))
     420             :     {
     421           0 :         LBERROR << "Failed to create communication event channel." << std::endl;
     422           0 :         goto err;
     423             :     }
     424             : 
     425           2 :     if( !_createId( ))
     426             :     {
     427           0 :         LBERROR << "Failed to create communication identifier." << std::endl;
     428           0 :         goto err;
     429             :     }
     430             : 
     431             : #if 0
     432             :     /* NOT IMPLEMENTED */
     433             : 
     434             :     if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
     435             :             (void *)&ONE, sizeof(ONE) ))
     436             :     {
     437             :         LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
     438             :         goto err;
     439             :     }
     440             : #endif
     441             : 
     442           2 :     if( !_bindAddress( ))
     443             :     {
     444           0 :         LBERROR << "Failed to bind to local address : "
     445           0 :             << _addr << ":" << _serv << std::endl;
     446           0 :         goto err;
     447             :     }
     448             : 
     449           2 :     _updateInfo( ::rdma_get_local_addr( _cm_id ));
     450             : 
     451             : #ifdef _WIN32
     452             :     if( !_listen( WINDOWS_CONNECTION_BACKLOG ))
     453             : #else
     454           2 :     if( !_listen( SOMAXCONN ))
     455             : #endif
     456             :     {
     457           0 :         LBERROR << "Failed to listen on bound address : "
     458           0 :             << _addr << ":" << _serv << std::endl;
     459           0 :         goto err;
     460             :     }
     461             : 
     462           2 :     if( NULL != _cm_id->verbs )
     463           0 :         _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
     464             : 
     465           2 :     LBVERB << "Listening on " << std::dec
     466           0 :         << _device_name << ":" << (int)_cm_id->port_num
     467             :         << " at "
     468             :         << _addr << ":" << _serv
     469           2 :         << " (" << description->toString( ) << ")"
     470           6 :         << std::endl;
     471             : 
     472           2 :     _setState( STATE_LISTENING );
     473           2 :     return true;
     474             : 
     475             : err:
     476           0 :     close();
     477           0 :     return false;
     478             : }
     479             : 
     480           2 : void RDMAConnection::acceptNB( ) { /* NOP */ }
     481             : 
     482           2 : ConnectionPtr RDMAConnection::acceptSync( )
     483             : {
     484           2 :     RDMAConnection *newConnection = NULL;
     485             : 
     486           2 :     if( !isListening() )
     487             :     {
     488           0 :         LBERROR << "Connection not in listening state." << std::endl;
     489           0 :         goto out;
     490             :     }
     491             : 
     492           2 :     if( !_waitForCMEvent( RDMA_CM_EVENT_CONNECT_REQUEST ))
     493             :     {
     494           0 :         LBERROR << "Failed to receive valid connect request." << std::endl;
     495           0 :         goto out;
     496             :     }
     497             : 
     498           2 :     LBASSERT( NULL != _new_cm_id );
     499             : 
     500           2 :     newConnection = new RDMAConnection( );
     501             : 
     502           2 :     if( !newConnection->_finishAccept( _new_cm_id, _cpd ))
     503             :     {
     504           0 :         delete newConnection;
     505           0 :         newConnection = NULL;
     506             :     }
     507             : 
     508             : out:
     509           2 :     _new_cm_id = NULL;
     510           2 :     _updateNotifier();
     511           2 :     return newConnection;
     512             : }
     513             : 
     514      442791 : void RDMAConnection::readNB( void*, const uint64_t ) { /* NOP */ }
     515             : 
     516      442791 : int64_t RDMAConnection::readSync( void* buffer, const uint64_t bytes,
     517             :         const bool block )
     518             : {
     519      885582 :     lunchbox::Clock clock;
     520      442791 :     const int64_t start = clock.getTime64( );
     521      442791 :     const uint32_t timeout = Global::getTimeout( );
     522      442791 :     eventset events;
     523      442791 :     uint64_t available_bytes = 0ULL;
     524      442791 :     uint32_t bytes_taken = 0UL;
     525      442791 :     bool extra_event = false;
     526             : 
     527      442791 :     if( !isConnected() )
     528           0 :         goto err;
     529             : 
     530             : //    LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
     531             : //       << std::endl;
     532             : 
     533      442791 :     _stats.reads++;
     534             : 
     535             : retry:
     536      465106 :     if( !_checkDisconnected( events ))
     537             :     {
     538           0 :         LBERROR << "Error while checking event state." << std::endl;
     539           0 :         goto err;
     540             :     }
     541             : 
     542      465106 :     if( events.test( CQ_EVENT ))
     543             :     {
     544       87131 :         if( !_rearmCQ( ))
     545             :         {
     546           0 :             LBERROR << "Error while rearming receive channel." << std::endl;
     547           0 :             goto err;
     548             :         }
     549             :     }
     550             : 
     551             :     // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
     552      465106 :     if( !_checkCQ( events.test( CQ_EVENT )))
     553             :     {
     554           0 :         LBERROR << "Error while polling completion queues." << std::endl;
     555           0 :         goto err;
     556             :     }
     557             : 
     558      465106 :     LBASSERT( _fcredits >= 0L );
     559             : 
     560      465106 :     if( _established && _needFC( ) && ( 0L == _fcredits ))
     561             :     {
     562           0 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     563             :         {
     564           0 :             if(( clock.getTime64( ) - start ) > timeout )
     565             :             {
     566           0 :                 LBERROR << "Timed out trying to acquire credit." << std::endl;
     567           0 :                 goto err;
     568             :             }
     569             :         }
     570             : 
     571             :         //LBWARN << "No credit for flow control." << std::endl;
     572           0 :         lunchbox::Thread::yield( );
     573           0 :         _stats.no_credits_fc++;
     574           0 :         goto retry;
     575             :     }
     576             : 
     577             :     // "Note that an extra event may be triggered without having a
     578             :     // corresponding completion entry in the CQ." (per ibv_get_cq_event(3))
     579      465106 :     if( _established && !events.test( BUF_EVENT ))
     580             :     {
     581             :         // Special case: If LocalNode is reading the length part of a message
     582             :         // it will ignore this zero return and restart the select.
     583       22315 :         if( extra_event && !block )
     584             :         {
     585           0 :             _updateNotifier();
     586           0 :             return 0LL;
     587             :         }
     588             : 
     589       22315 :         extra_event = true;
     590       22315 :         goto retry;
     591             :     }
     592             : 
     593      442791 :     if( events.test( BUF_EVENT ))
     594             :     {
     595      442789 :         available_bytes = _getAvailableBytes( );
     596      442789 :         if( 0ULL == available_bytes )
     597             :         {
     598           0 :             LBERROR << "Error while reading from event fd." << std::endl;
     599           0 :             goto err;
     600             :         }
     601             :     }
     602             : 
     603             :     // Modifies _sinkptr.TAIL
     604      442791 :     bytes_taken = _drain( buffer, bytes );
     605             : 
     606      442791 :     if( 0UL == bytes_taken )
     607             :     {
     608           2 :         if( _sinkptr.isEmpty( ) && !_established )
     609             :         {
     610          10 :             LBINFO << "Got EOF, closing " << getDescription( )->toString( )
     611           8 :                 << std::endl;
     612           2 :             goto err;
     613             :         }
     614             : 
     615           0 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     616             :         {
     617           0 :             if(( clock.getTime64( ) - start ) > timeout )
     618             :             {
     619           0 :                 LBERROR << "Timed out trying to drain buffer." << std::endl;
     620           0 :                 goto err;
     621             :             }
     622             :         }
     623             : 
     624             :         //LBWARN << "Sink buffer empty." << std::endl;
     625           0 :         lunchbox::Thread::yield( );
     626           0 :         _stats.buffer_empty++;
     627           0 :         goto retry;
     628             :     }
     629             : 
     630             :     // Put back what wasn't taken (ensure the master notifier stays "hot").
     631      442789 :     if( available_bytes > bytes_taken )
     632      422144 :         _incrAvailableBytes( available_bytes - bytes_taken );
     633             : #ifdef _WIN32
     634             :     else
     635             :         _updateNotifier();
     636             : #endif
     637             : 
     638      442789 :     _readBytes += bytes_taken;
     639             : 
     640      442789 :     if( _established && _needFC( ) && !_postFC( ))
     641           0 :         LBWARN << "Error while posting flow control message." << std::endl;
     642             : 
     643             : //    LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
     644             : //       << " took " << bytes_taken << " bytes"
     645             : //       << " (" << _sinkptr.available( ) << " still available)" << std::endl;
     646             : 
     647      442789 :     return bytes_taken;
     648             : 
     649             : err:
     650           2 :     close( );
     651             : 
     652           2 :     return -1LL;
     653             : }
     654             : 
     655      464638 : int64_t RDMAConnection::write( const void* buffer, const uint64_t bytes )
     656             : {
     657      929276 :     lunchbox::Clock clock;
     658      464638 :     const int64_t start = clock.getTime64( );
     659      464638 :     const uint32_t timeout = Global::getTimeout( );
     660      464638 :     eventset events;
     661      464638 :     const uint32_t can_put = std::min( bytes, MAX_BS );
     662             :     uint32_t bytes_put;
     663             : 
     664      464638 :     if( !isConnected() )
     665           0 :         goto err;
     666             : 
     667             : //    LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
     668             : //        << std::endl;
     669             : 
     670      464638 :     _stats.writes++;
     671             : 
     672             : retry:
     673      787118 :     if( !_checkDisconnected( events ))
     674             :     {
     675           0 :         LBERROR << "Error while checking connection state." << std::endl;
     676           0 :         goto err;
     677             :     }
     678             : 
     679      787118 :     if( !_established )
     680             :     {
     681           0 :         LBWARN << "Disconnected in write." << std::endl;
     682           0 :         goto err;
     683             :     }
     684             : 
     685             :     // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
     686      787118 :     if( !_checkCQ( false ))
     687             :     {
     688           0 :         LBERROR << "Error while polling completion queues." << std::endl;
     689           0 :         goto err;
     690             :     }
     691             : 
     692      787118 :     LBASSERT( _wcredits >= 0L );
     693             : 
     694      787118 :     if( 0L == _wcredits )
     695             :     {
     696          19 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     697             :         {
     698          19 :             if(( clock.getTime64( ) - start ) > timeout )
     699             :             {
     700           0 :                 LBERROR << "Timed out trying to acquire credit." << std::endl;
     701           0 :                 goto err;
     702             :             }
     703             :         }
     704             : 
     705             :         //LBWARN << "No credits for RDMA." << std::endl;
     706          19 :         lunchbox::Thread::yield( );
     707          19 :         _stats.no_credits_rdma++;
     708          19 :         goto retry;
     709             :     }
     710             : 
     711             :     // Modifies _sourceptr.HEAD
     712      787099 :     bytes_put = _fill( buffer, can_put );
     713             : 
     714      787099 :     if( 0UL == bytes_put )
     715             :     {
     716      322461 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     717             :         {
     718      322461 :             if(( clock.getTime64( ) - start ) > timeout )
     719             :             {
     720           0 :                 LBERROR << "Timed out trying to fill buffer." << std::endl;
     721           0 :                 goto err;
     722             :             }
     723             :         }
     724             : 
     725             :         //LBWARN << "Source buffer full." << std::endl;
     726      322461 :         lunchbox::Thread::yield( );
     727      322461 :         if( _sourceptr.isFull( ) || _rptr.isFull( ))
     728      322461 :             _stats.buffer_full++;
     729      322461 :         goto retry;
     730             :     }
     731             : 
     732             :     // Modifies _sourceptr.MIDDLE & _rptr.HEAD
     733      464638 :     if( !_postRDMAWrite( ))
     734             :     {
     735           0 :         LBERROR << "Error while posting RDMA write." << std::endl;
     736           0 :         goto err;
     737             :     }
     738             : 
     739             : //    LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
     740             : //       << " put " << bytes_put << " bytes" << std::endl;
     741             : 
     742      464638 :     return bytes_put;
     743             : 
     744             : err:
     745           0 :     return -1LL;
     746             : }
     747             : 
     748          18 : RDMAConnection::~RDMAConnection( )
     749             : {
     750           6 :     _close( );
     751          12 : }
     752             : 
     753             : ////////////////////////////////////////////////////////////////////////////////
     754             : 
     755          12 : void RDMAConnection::_close( )
     756             : {
     757          24 :     lunchbox::ScopedWrite close_mutex( _poll_lock );
     758             : 
     759          12 :     if( !isClosed() )
     760             :     {
     761           6 :         LBASSERT( !isClosing() );
     762           6 :         _setState( STATE_CLOSING );
     763             : 
     764           6 :         if( _established && ::rdma_disconnect( _cm_id ))
     765           0 :             LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
     766             : 
     767           6 :         _setState( STATE_CLOSED );
     768           6 :         _cleanup( );
     769             :     }
     770          12 : }
     771             : 
     772          10 : void RDMAConnection::_cleanup( )
     773             : {
     774          10 :     LBASSERT( isClosed() );
     775             : 
     776          10 :     _sourcebuf.clear( );
     777          10 :     _sinkbuf.clear( );
     778          10 :     _msgbuf.clear( );
     779             : 
     780          10 :     if( _completions > 0U )
     781             :     {
     782           2 :         ::ibv_ack_cq_events( _cq, _completions );
     783           2 :         _completions = 0U;
     784             :     }
     785             : 
     786          10 :     delete[] _wcs;
     787          10 :     _wcs = 0;
     788             : 
     789             : #ifdef _WIN32
     790             :     if ( _ccWaitObj )
     791             :         UnregisterWait( _ccWaitObj );
     792             :     _ccWaitObj = 0;
     793             : 
     794             :     if ( _cmWaitObj )
     795             :         UnregisterWait( _cmWaitObj );
     796             :     _cmWaitObj = 0;
     797             : #endif
     798             : 
     799          10 :     if( NULL != _cm_id )
     800           6 :         ::rdma_destroy_ep( _cm_id );
     801          10 :     _cm_id = NULL;
     802             : 
     803          10 :     if(( NULL != _cq ) && ::rdma_seterrno( ::ibv_destroy_cq( _cq )))
     804           0 :         LBWARN << "ibv_destroy_cq : " << lunchbox::sysError << std::endl;
     805          10 :     _cq = NULL;
     806             : 
     807          10 :     if(( NULL != _cc ) && ::rdma_seterrno( ::ibv_destroy_comp_channel( _cc )))
     808           0 :         LBWARN << "ibv_destroy_comp_channel : " << lunchbox::sysError
     809           0 :             << std::endl;
     810          10 :     _cc = NULL;
     811             : 
     812          10 :     if(( NULL != _pd ) && ::rdma_seterrno( ::ibv_dealloc_pd( _pd )))
     813           0 :         LBWARN << "ibv_dealloc_pd : " << lunchbox::sysError << std::endl;
     814          10 :     _pd = NULL;
     815             : 
     816          10 :     if( NULL != _cm )
     817           6 :         ::rdma_destroy_event_channel( _cm );
     818          10 :     _cm = NULL;
     819             : 
     820          10 :     if( NULL != _rai )
     821           4 :         ::rdma_freeaddrinfo( _rai );
     822          10 :     _rai = NULL;
     823             : 
     824          10 :     _rptr = 0UL;
     825          10 :     _rbase = _rkey = 0ULL;
     826             : #ifndef _WIN32
     827          10 :     if(( _notifier >= 0 ) && TEMP_FAILURE_RETRY( ::close( _notifier )))
     828           0 :         LBWARN << "close : " << lunchbox::sysError << std::endl;
     829          10 :     _notifier = -1;
     830             : #endif
     831          10 : }
     832             : 
     833           2 : bool RDMAConnection::_finishAccept( struct rdma_cm_id *new_cm_id,
     834             :         const RDMAConnParamData &cpd )
     835             : {
     836           2 :     LBASSERT( isClosed() );
     837           2 :     _setState( STATE_CONNECTING );
     838             : 
     839           2 :     LBASSERT( NULL != new_cm_id );
     840             : 
     841           2 :     _cm_id = new_cm_id;
     842             : 
     843             :     {
     844             :         // FIXME : RDMA CM appears to send up invalid addresses when receiving
     845             :         // connections that use a different protocol than what was bound.  E.g.
     846             :         // if an IPv6 listener gets an IPv4 connection then the sa_family
     847             :         // will be AF_INET6 but the actual data is struct sockaddr_in.  Example:
     848             :         //
     849             :         // 0000000: 0a00 bc10 c0a8 b01a 0000 0000 0000 0000  ................
     850             :         //
     851             :         // However, in the reverse case, when an IPv4 listener gets an IPv6
     852             :         // connection not only is the address family incorrect, but the actual
     853             :         // IPv6 address is only partially there:
     854             :         //
     855             :         // 0000000: 0200 bc11 0000 0000 fe80 0000 0000 0000  ................
     856             :         // 0000010: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     857             :         // 0000020: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     858             :         // 0000030: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     859             :         //
     860             :         // Surely seems to be a bug in RDMA CM.
     861             : 
     862             :         union
     863             :         {
     864             :             struct sockaddr     addr;
     865             :             struct sockaddr_in  sin;
     866             :             struct sockaddr_in6 sin6;
     867             :             struct sockaddr_storage storage;
     868             :         } sss;
     869             : 
     870             :         // Make a copy since we might change it.
     871             :         //sss.storage = _cm_id->route.addr.dst_storage;
     872             :         ::memcpy( (void *)&sss.storage,
     873           2 :             (const void *)::rdma_get_peer_addr( _cm_id ),
     874           2 :             sizeof(struct sockaddr_storage) );
     875             : #ifndef _WIN32
     876           4 :         if(( AF_INET == sss.storage.ss_family ) &&
     877           4 :            ( sss.sin6.sin6_addr.s6_addr32[0] != 0 ||
     878           4 :              sss.sin6.sin6_addr.s6_addr32[1] != 0 ||
     879           4 :              sss.sin6.sin6_addr.s6_addr32[2] != 0 ||
     880           2 :              sss.sin6.sin6_addr.s6_addr32[3] != 0 ))
     881             :         {
     882           0 :             LBWARN << "IPv6 address detected but likely invalid!" << std::endl;
     883           0 :             sss.storage.ss_family = AF_INET6;
     884             :         }
     885             :         else
     886             : #endif
     887           2 :         if(( AF_INET6 == sss.storage.ss_family ) &&
     888           0 :                 ( INADDR_ANY != sss.sin.sin_addr.s_addr ))
     889             :         {
     890           0 :             sss.storage.ss_family = AF_INET;
     891             :         }
     892             : 
     893           2 :         _updateInfo( &sss.addr );
     894             :     }
     895             : 
     896           2 :     _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
     897             : 
     898           2 :     LBVERB << "Connection initiated on " << std::dec
     899           0 :         << _device_name << ":" << (int)_cm_id->port_num
     900             :         << " from "
     901             :         << _addr << ":" << _serv
     902           2 :         << " (" << getDescription()->toString( ) << ")"
     903           6 :         << std::endl;
     904             : 
     905           4 :     if(( RDMA_PROTOCOL_MAGIC != cpd.magic ) ||
     906           2 :         ( RDMA_PROTOCOL_VERSION != cpd.version ))
     907             :     {
     908           0 :         LBERROR << "Protocol mismatch with initiator : "
     909           0 :             << _addr << ":" << _serv << std::endl;
     910           0 :         goto err_reject;
     911             :     }
     912             : 
     913           2 :     if( !_createNotifier( ))
     914             :     {
     915           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     916           0 :         goto err_reject;
     917             :     }
     918             : 
     919           2 :     if( !_createEventChannel( ))
     920             :     {
     921           0 :         LBERROR << "Failed to create event channel." << std::endl;
     922           0 :         goto err_reject;
     923             :     }
     924             : 
     925           2 :     if( !_migrateId( ))
     926             :     {
     927           0 :         LBERROR << "Failed to migrate communication identifier." << std::endl;
     928           0 :         goto err_reject;
     929             :     }
     930             : 
     931           2 :     if( !_initProtocol( cpd.depth ))
     932             :     {
     933           0 :         LBERROR << "Failed to initialize protocol variables." << std::endl;
     934           0 :         goto err_reject;
     935             :     }
     936             : 
     937           2 :     if( !_initVerbs( ))
     938             :     {
     939           0 :         LBERROR << "Failed to initialize verbs." << std::endl;
     940           0 :         goto err_reject;
     941             :     }
     942             : 
     943           2 :     if( !_createQP( ))
     944             :     {
     945           0 :         LBERROR << "Failed to create queue pair." << std::endl;
     946           0 :         goto err_reject;
     947             :     }
     948             : 
     949           2 :     if( !_createBytesAvailableFD( ))
     950             :     {
     951           0 :         LBERROR << "Failed to create available byte notifier." << std::endl;
     952           0 :         goto err_reject;
     953             :     }
     954             : 
     955           2 :     if( !_initBuffers( ))
     956             :     {
     957           0 :         LBERROR << "Failed to initialize ring buffers." << std::endl;
     958           0 :         goto err_reject;
     959             :     }
     960             : 
     961           2 :     if( !_postReceives( _depth ))
     962             :     {
     963           0 :         LBERROR << "Failed to pre-post receives." << std::endl;
     964           0 :         goto err_reject;
     965             :     }
     966             : 
     967           2 :     if( !_accept( ))
     968             :     {
     969           0 :         LBERROR << "Failed to accept remote connection from : "
     970           0 :             << _addr << ":" << _serv << std::endl;
     971           0 :         goto err;
     972             :     }
     973             : 
     974           2 :     LBASSERT( _established );
     975             : 
     976           2 :     if( !_waitRecvSetup( ))
     977             :     {
     978           0 :         LBERROR << "Failed to receive setup message." << std::endl;
     979           0 :         goto err;
     980             :     }
     981             : 
     982           2 :     if( !_postSetup( ))
     983             :     {
     984           0 :         LBERROR << "Failed to post setup message." << std::endl;
     985           0 :         goto err;
     986             :     }
     987             : 
     988           2 :     LBVERB << "Connection accepted on " << std::dec
     989           0 :         << _device_name << ":" << (int)_cm_id->port_num
     990             :         << " from "
     991             :         << _addr << ":" << _serv
     992           2 :         << " (" << getDescription()->toString( ) << ")"
     993           6 :         << std::endl;
     994             : 
     995           2 :     _setState( STATE_CONNECTED );
     996           2 :     _updateNotifier();
     997           2 :     return true;
     998             : 
     999             : err_reject:
    1000           0 :     LBWARN << "Rejecting connection from remote address : "
    1001           0 :         << _addr << ":" << _serv << std::endl;
    1002             : 
    1003           0 :     if( !_reject( ))
    1004           0 :         LBWARN << "Failed to issue connection reject." << std::endl;
    1005             : 
    1006             : err:
    1007           0 :     close( );
    1008             : 
    1009           0 :     return false;
    1010             : }
    1011             : 
    1012           4 : bool RDMAConnection::_lookupAddress( const bool passive )
    1013             : {
    1014             :     struct rdma_addrinfo hints;
    1015           4 :     char* node = 0;
    1016           4 :     char* service = 0;
    1017           8 :     std::string s;
    1018             : 
    1019           4 :     ::memset( (void *)&hints, 0, sizeof(struct rdma_addrinfo) );
    1020           4 :     if( passive )
    1021           2 :         hints.ai_flags |= RAI_PASSIVE;
    1022             : 
    1023           8 :     ConstConnectionDescriptionPtr description = getDescription();
    1024           4 :     const std::string &hostname = description->getHostname( );
    1025           4 :     if( !hostname.empty( ))
    1026           4 :         node = const_cast< char * >( hostname.c_str( ));
    1027             : 
    1028           4 :     if( 0u != description->port )
    1029             :     {
    1030           6 :         std::stringstream ss;
    1031           3 :         ss << description->port;
    1032           3 :         s = ss.str( );
    1033           3 :         service = const_cast< char * >( s.c_str( ));
    1034             :     }
    1035             : 
    1036           4 :     if(( NULL != node ) && ::rdma_getaddrinfo( node, service, &hints, &_rai ))
    1037             :     {
    1038           0 :         LBERROR << "rdma_getaddrinfo : " << lunchbox::sysError << std::endl;
    1039           0 :         return false;
    1040             :     }
    1041             : 
    1042           4 :     _addrinfo = _rai;
    1043           4 :     while( _addrinfo && !_isInetFamily( _addrinfo ))
    1044           0 :         _addrinfo = _addrinfo->ai_next;
    1045             : 
    1046           4 :     if( !_addrinfo )
    1047             :     {
    1048           0 :         if( _rai )
    1049             :         {
    1050           0 :             LBWARN << "No IP or IPv6 address found by rdma_getaddrinfo. "
    1051           0 :                       "Connection establishment may fail." << std::endl;
    1052             :         }
    1053           0 :         _addrinfo = _rai;
    1054             :     }
    1055             : 
    1056           4 :     if(( NULL != _addrinfo ) && ( _addrinfo->ai_connect_len > 0 ))
    1057           0 :         LBWARN << "WARNING : ai_connect data specified!" << std::endl;
    1058             : 
    1059           4 :     return true;
    1060             : }
    1061             : 
    1062          10 : void RDMAConnection::_updateInfo( struct sockaddr *addr )
    1063             : {
    1064          10 :     int salen = sizeof(struct sockaddr);
    1065          10 :     bool is_unspec = false;
    1066             : 
    1067          10 :     if( AF_INET == addr->sa_family )
    1068             :     {
    1069             :         struct sockaddr_in *sin =
    1070          10 :             reinterpret_cast< struct sockaddr_in * >( addr );
    1071          10 :         is_unspec = ( INADDR_ANY == sin->sin_addr.s_addr );
    1072          10 :         salen = sizeof(struct sockaddr_in);
    1073             :     }
    1074             :     // TODO: IPv6 for WIndows
    1075             : #ifndef _WIN32
    1076           0 :     else if( AF_INET6 == addr->sa_family )
    1077             :     {
    1078             :         struct sockaddr_in6 *sin6 =
    1079           0 :             reinterpret_cast< struct sockaddr_in6 * >( addr );
    1080             : 
    1081           0 :         is_unspec = ( sin6->sin6_addr.s6_addr32[0] == 0 &&
    1082           0 :                       sin6->sin6_addr.s6_addr32[1] == 0 &&
    1083           0 :                       sin6->sin6_addr.s6_addr32[2] == 0 &&
    1084           0 :                       sin6->sin6_addr.s6_addr32[3] == 0 );
    1085           0 :         salen = sizeof(struct sockaddr_in6);
    1086             :     }
    1087             : #endif
    1088             :     else
    1089             :     {
    1090           0 :         LBERROR << "Unsupported socket address family "
    1091           0 :                 <<  addr->sa_family << std::endl;
    1092           0 :         return;
    1093             :     }
    1094             : 
    1095             :     int err;
    1096          10 :     if(( err = ::getnameinfo( addr, salen, _addr, sizeof(_addr),
    1097             :             _serv, sizeof(_serv), NI_NUMERICHOST | NI_NUMERICSERV )))
    1098           0 :         LBWARN << "Name info lookup failed : " << err << std::endl;
    1099             : 
    1100          10 :     if( is_unspec )
    1101           0 :         ::gethostname( _addr, NI_MAXHOST );
    1102             : 
    1103          20 :     ConnectionDescriptionPtr description = _getDescription();
    1104          10 :     if( description->getHostname( ).empty( ))
    1105           2 :         description->setHostname( _addr );
    1106          10 :     if( 0u == description->port )
    1107           4 :         description->port = atoi( _serv );
    1108             : }
    1109             : 
    1110           6 : bool RDMAConnection::_createEventChannel( )
    1111             : {
    1112           6 :     LBASSERT( NULL == _cm );
    1113             : 
    1114           6 :     _cm = ::rdma_create_event_channel( );
    1115           6 :     if( NULL == _cm )
    1116             :     {
    1117           0 :         LBERROR << "rdma_create_event_channel : " << lunchbox::sysError <<
    1118           0 :             std::endl;
    1119           0 :         return false;
    1120             :     }
    1121             : 
    1122             : #ifdef _WIN32
    1123             :     if ( !RegisterWaitForSingleObject(
    1124             :             &_cmWaitObj,
    1125             :             _cm->channel.Event,
    1126             :             WAITORTIMERCALLBACK( &_triggerNotifierCM ),
    1127             :             this,
    1128             :             INFINITE,
    1129             :             WT_EXECUTEINWAITTHREAD ))
    1130             :     {
    1131             :         LBERROR << "RegisterWaitForSingleObject : " << co::base::sysError
    1132             :                 << std::endl;
    1133             :         return false;
    1134             :     }
    1135             : #else
    1136             :     struct epoll_event evctl;
    1137             : 
    1138           6 :     LBASSERT( _notifier >= 0 );
    1139             : 
    1140             :     // Use the CM fd to signal Collage of connection events.
    1141           6 :     ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
    1142           6 :     evctl.events = EPOLLIN;
    1143           6 :     evctl.data.fd = _cm->fd;
    1144           6 :     if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
    1145             :     {
    1146           0 :         LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
    1147           0 :         return false;
    1148             :     }
    1149             : #endif
    1150             : 
    1151           6 :     return true;
    1152             : }
    1153             : 
    1154           4 : bool RDMAConnection::_createId( )
    1155             : {
    1156           4 :     LBASSERT( NULL != _cm );
    1157           4 :     LBASSERT( NULL == _cm_id );
    1158             : 
    1159           4 :     if( ::rdma_create_id( _cm, &_cm_id, NULL, RDMA_PS_TCP ))
    1160             :     {
    1161           0 :         LBERROR << "rdma_create_id : " << lunchbox::sysError << std::endl;
    1162           0 :         return false;
    1163             :     }
    1164             : 
    1165           4 :     return true;
    1166             : }
    1167             : 
    1168           4 : bool RDMAConnection::_initVerbs( )
    1169             : {
    1170           4 :     LBASSERT( NULL != _cm_id );
    1171           4 :     LBASSERT( NULL != _cm_id->verbs );
    1172             : 
    1173           4 :     LBASSERT( NULL == _pd );
    1174             : 
    1175             :     // Allocate protection domain.
    1176           4 :     _pd = ::ibv_alloc_pd( _cm_id->verbs );
    1177           4 :     if( NULL == _pd )
    1178             :     {
    1179           0 :         LBERROR << "ibv_alloc_pd : " << lunchbox::sysError << std::endl;
    1180           0 :         return false;
    1181             :     }
    1182             : 
    1183           4 :     LBASSERT( NULL == _cc );
    1184             : 
    1185             :     // Create completion channel.
    1186           4 :     _cc = ::ibv_create_comp_channel( _cm_id->verbs );
    1187           4 :     if( NULL == _cc )
    1188             :     {
    1189           0 :         LBERROR << "ibv_create_comp_channel : " << lunchbox::sysError
    1190           0 :             << std::endl;
    1191           0 :         return false;
    1192             :     }
    1193             : 
    1194             : #ifdef _WIN32
    1195             :     if ( !RegisterWaitForSingleObject(
    1196             :         &_ccWaitObj,
    1197             :         _cc->comp_channel.Event,
    1198             :         WAITORTIMERCALLBACK( &_triggerNotifierCQ ),
    1199             :         this,
    1200             :         INFINITE,
    1201             :         WT_EXECUTEINWAITTHREAD ))
    1202             :     {
    1203             :         LBERROR << "RegisterWaitForSingleObject : " << lunchbox::sysError
    1204             :             << std::endl;
    1205             :         return false;
    1206             :     }
    1207             : #else
    1208           4 :     LBASSERT( _notifier >= 0 );
    1209             : 
    1210             :     // Use the completion channel fd to signal Collage of RDMA writes received.
    1211             :     struct epoll_event evctl;
    1212           4 :     ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
    1213           4 :     evctl.events = EPOLLIN;
    1214           4 :     evctl.data.fd = _cc->fd;
    1215           4 :     if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
    1216             :     {
    1217           0 :         LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
    1218           0 :         return false;
    1219             :     }
    1220             : #endif
    1221             : 
    1222           4 :     LBASSERT( NULL == _cq );
    1223             : 
    1224             :     // Create a single completion queue for both sends & receives */
    1225           4 :     _cq = ::ibv_create_cq( _cm_id->verbs, _depth * 2, NULL, _cc, 0 );
    1226           4 :     if( NULL == _cq )
    1227             :     {
    1228           0 :         LBERROR << "ibv_create_cq : " << lunchbox::sysError << std::endl;
    1229           0 :         return false;
    1230             :     }
    1231             : 
    1232             :     // Request IBV_SEND_SOLICITED events only (i.e. RDMA writes, not FC)
    1233           4 :     if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
    1234             :     {
    1235           0 :         LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
    1236           0 :         return false;
    1237             :     }
    1238             : 
    1239           4 :     _wcs = new struct ibv_wc[ _depth ];
    1240           4 :     return true;
    1241             : }
    1242             : 
    1243           4 : bool RDMAConnection::_createQP( )
    1244             : {
    1245             :     struct ibv_qp_init_attr init_attr;
    1246             : 
    1247           4 :     LBASSERT( _depth > 0 );
    1248           4 :     LBASSERT( NULL != _cm_id );
    1249           4 :     LBASSERT( NULL != _pd );
    1250           4 :     LBASSERT( NULL != _cq );
    1251             : 
    1252           4 :     ::memset( (void *)&init_attr, 0, sizeof(struct ibv_qp_init_attr) );
    1253           4 :     init_attr.qp_type = IBV_QPT_RC;
    1254           4 :     init_attr.cap.max_send_wr = _depth;
    1255           4 :     init_attr.cap.max_recv_wr = _depth;
    1256           4 :     init_attr.cap.max_recv_sge = 1;
    1257           4 :     init_attr.cap.max_send_sge = 1;
    1258           4 :     init_attr.recv_cq = _cq;
    1259           4 :     init_attr.send_cq = _cq;
    1260           4 :     init_attr.sq_sig_all = 1; // i.e. always IBV_SEND_SIGNALED
    1261             : 
    1262             :     // Create queue pair.
    1263           4 :     if( ::rdma_create_qp( _cm_id, _pd, &init_attr ))
    1264             :     {
    1265           0 :         LBERROR << "rdma_create_qp : " << lunchbox::sysError << std::endl;
    1266           0 :         return false;
    1267             :     }
    1268             : 
    1269           4 :     LBVERB << "RDMA QP caps : " << std::dec <<
    1270           0 :         init_attr.cap.max_recv_wr << " receives, " <<
    1271           4 :         init_attr.cap.max_send_wr << " sends, " << std::endl;
    1272             : 
    1273           4 :     return true;
    1274             : }
    1275             : 
    1276           4 : bool RDMAConnection::_initBuffers( )
    1277             : {
    1278           4 :     const size_t rbs = 1024 * 1024 *
    1279           8 :         Global::getIAttribute( Global::IATTR_RDMA_RING_BUFFER_SIZE_MB );
    1280             : 
    1281           4 :     LBASSERT( _depth > 0 );
    1282           4 :     LBASSERT( NULL != _pd );
    1283             : 
    1284           4 :     if( 0 == rbs )
    1285             :     {
    1286           0 :         LBERROR << "Invalid RDMA ring buffer size." << std::endl;
    1287           0 :         return false;
    1288             :     }
    1289             : 
    1290           4 :     if( !_sourcebuf.resize( _pd, rbs ))
    1291             :     {
    1292           0 :         LBERROR << "Failed to resize source buffer." << std::endl;
    1293           0 :         return false;
    1294             :     }
    1295             : 
    1296           4 :     if( !_sinkbuf.resize( _pd, rbs ))
    1297             :     {
    1298           0 :         LBERROR << "Failed to resize sink buffer." << std::endl;
    1299           0 :         return false;
    1300             :     }
    1301             : 
    1302           4 :     _sourceptr.clear( uint32_t( _sourcebuf.getSize( )));
    1303           4 :     _sinkptr.clear( uint32_t( _sinkbuf.getSize( )));
    1304             : 
    1305             :     // Need enough space for both sends and receives.
    1306           4 :     if( !_msgbuf.resize( _pd, _depth * 2 ))
    1307             :     {
    1308           0 :         LBERROR << "Failed to resize message buffer pool." << std::endl;
    1309           0 :         return false;
    1310             :     }
    1311             : 
    1312           4 :     return true;
    1313             : }
    1314             : 
    1315           2 : bool RDMAConnection::_resolveAddress( )
    1316             : {
    1317           2 :     LBASSERT( NULL != _cm_id );
    1318           2 :     LBASSERT( NULL != _addrinfo );
    1319             : 
    1320           2 :     if( ::rdma_resolve_addr( _cm_id, _addrinfo->ai_src_addr, _addrinfo->ai_dst_addr,
    1321           2 :             _timeout ))
    1322             :     {
    1323           0 :         LBERROR << "rdma_resolve_addr : " << lunchbox::sysError << std::endl;
    1324           0 :         return false;
    1325             :     }
    1326             : 
    1327           2 :     return _waitForCMEvent( RDMA_CM_EVENT_ADDR_RESOLVED );
    1328             : }
    1329             : 
    1330           2 : bool RDMAConnection::_resolveRoute( )
    1331             : {
    1332           2 :     LBASSERT( NULL != _cm_id );
    1333           2 :     LBASSERT( NULL != _addrinfo );
    1334             : 
    1335           4 :     if(( IBV_TRANSPORT_IB == _cm_id->verbs->device->transport_type ) &&
    1336           2 :             ( _addrinfo->ai_route_len > 0 ))
    1337             :     {
    1338             : #ifdef _WIN32
    1339             :         if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
    1340             : #else
    1341           0 :         if( ::rdma_set_option( _cm_id, RDMA_OPTION_IB, RDMA_OPTION_IB_PATH,
    1342             : #endif
    1343           0 :                 _addrinfo->ai_route, _addrinfo->ai_route_len ))
    1344             :         {
    1345           0 :             LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
    1346           0 :             return false;
    1347             :         }
    1348             :     }
    1349           2 :     else if( ::rdma_resolve_route( _cm_id, _timeout ))
    1350             :     {
    1351           0 :         LBERROR << "rdma_resolve_route : " << lunchbox::sysError << std::endl;
    1352           0 :         return false;
    1353             :     }
    1354             : 
    1355           2 :     return _waitForCMEvent( RDMA_CM_EVENT_ROUTE_RESOLVED );
    1356             : }
    1357             : 
    1358           2 : bool RDMAConnection::_connect( )
    1359             : {
    1360           2 :     LBASSERT( NULL != _cm_id );
    1361           2 :     LBASSERT( !_established );
    1362             : 
    1363             :     struct rdma_conn_param conn_param;
    1364             : 
    1365           2 :     ::memset( (void *)&conn_param, 0, sizeof(struct rdma_conn_param) );
    1366             : 
    1367           2 :     _cpd.magic = RDMA_PROTOCOL_MAGIC;
    1368           2 :     _cpd.version = RDMA_PROTOCOL_VERSION;
    1369           2 :     _cpd.depth = _depth;
    1370           2 :     conn_param.private_data = reinterpret_cast< const void * >( &_cpd );
    1371           2 :     conn_param.private_data_len = sizeof(struct RDMAConnParamData);
    1372           2 :     conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
    1373           2 :     conn_param.responder_resources = RDMA_MAX_RESP_RES;
    1374             :     // Magic 3-bit values.
    1375             :     //conn_param.retry_count = 5;
    1376             :     //conn_param.rnr_retry_count = 7;
    1377             : 
    1378           6 :     LBINFO << "Connect on source lid : " << std::showbase
    1379           4 :         << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
    1380           4 :         << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
    1381           2 :         << "to dest lid : "
    1382           4 :         << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
    1383           4 :         << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
    1384           6 :         << std::endl;
    1385             : 
    1386           2 :     if( ::rdma_connect( _cm_id, &conn_param ))
    1387             :     {
    1388           0 :         LBERROR << "rdma_connect : " << lunchbox::sysError << std::endl;
    1389           0 :         return false;
    1390             :     }
    1391             : 
    1392           2 :     bool ret = _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
    1393             : 
    1394           2 :     return ret;
    1395             : }
    1396             : 
    1397           2 : bool RDMAConnection::_bindAddress( )
    1398             : {
    1399           2 :     LBASSERT( NULL != _cm_id );
    1400           4 :     ConstConnectionDescriptionPtr description = getDescription();
    1401             : 
    1402             : #if IPV6_DEFAULT
    1403             :     struct sockaddr_in6 sin;
    1404             :     memset( (void *)&sin, 0, sizeof(struct sockaddr_in6) );
    1405             :     sin.sin6_family = AF_INET6;
    1406             :     sin.sin6_port = htons( description->port );
    1407             :     sin.sin6_addr = in6addr_any;
    1408             : #else
    1409             :     struct sockaddr_in sin;
    1410           2 :     memset( (void *)&sin, 0, sizeof(struct sockaddr_in) );
    1411           2 :     sin.sin_family = AF_INET;
    1412           2 :     sin.sin_port = htons( description->port );
    1413           2 :     sin.sin_addr.s_addr = INADDR_ANY;
    1414             : #endif
    1415             : 
    1416           2 :     if( ::rdma_bind_addr( _cm_id, ( NULL != _addrinfo ) ? _addrinfo->ai_src_addr :
    1417             :             reinterpret_cast< struct sockaddr * >( &sin )))
    1418             :     {
    1419           0 :         LBERROR << "rdma_bind_addr : " << lunchbox::sysError << std::endl;
    1420           0 :         return false;
    1421             :     }
    1422             : 
    1423           2 :     const uint16_t port = ntohs(rdma_get_src_port(_cm_id));
    1424           8 :     LBDEBUG << "Listening on " << description->getHostname() << "["
    1425          10 :             << _addr << "]:" << port << " (" << description->toString()
    1426           8 :             << ")" << std::endl;
    1427             : 
    1428           2 :     return true;
    1429             : }
    1430             : 
    1431           2 : bool RDMAConnection::_listen( int backlog )
    1432             : {
    1433           2 :     LBASSERT( NULL != _cm_id );
    1434             : 
    1435           2 :     if( ::rdma_listen( _cm_id, backlog ))
    1436             :     {
    1437           0 :         LBERROR << "rdma_listen : " << lunchbox::sysError << std::endl;
    1438           0 :         return false;
    1439             :     }
    1440             : 
    1441           2 :     return true;
    1442             : }
    1443             : 
    1444           2 : bool RDMAConnection::_migrateId( )
    1445             : {
    1446           2 :     LBASSERT( NULL != _cm_id );
    1447           2 :     LBASSERT( NULL != _cm );
    1448             : 
    1449           2 :     if( ::rdma_migrate_id( _cm_id, _cm ))
    1450             :     {
    1451           0 :         LBERROR << "rdma_migrate_id : " << lunchbox::sysError << std::endl;
    1452           0 :         return false;
    1453             :     }
    1454             : 
    1455           2 :     return true;
    1456             : }
    1457             : 
    1458           2 : bool RDMAConnection::_accept( )
    1459             : {
    1460             :     struct rdma_conn_param accept_param;
    1461             : 
    1462           2 :     LBASSERT( NULL != _cm_id );
    1463           2 :     LBASSERT( !_established );
    1464             : 
    1465           2 :     ::memset( (void *)&accept_param, 0, sizeof(struct rdma_conn_param) );
    1466             : 
    1467           2 :     _cpd.magic = RDMA_PROTOCOL_MAGIC;
    1468           2 :     _cpd.version = RDMA_PROTOCOL_VERSION;
    1469           2 :     _cpd.depth = _depth;
    1470           2 :     accept_param.private_data = reinterpret_cast< const void * >( &_cpd );
    1471           2 :     accept_param.private_data_len = sizeof(struct RDMAConnParamData);
    1472           2 :     accept_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
    1473           2 :     accept_param.responder_resources = RDMA_MAX_RESP_RES;
    1474             :     // Magic 3-bit value.
    1475             :     //accept_param.rnr_retry_count = 7;
    1476             : 
    1477           6 :     LBINFO << "Accept on source lid : "<< std::showbase
    1478           4 :            << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
    1479           4 :            << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
    1480           2 :            << "from dest lid : "
    1481           4 :            << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
    1482           4 :            << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
    1483           6 :            << std::endl;
    1484             : 
    1485           2 :     if( ::rdma_accept( _cm_id, &accept_param ))
    1486             :     {
    1487           0 :         LBERROR << "rdma_accept : " << lunchbox::sysError << std::endl;
    1488           0 :         return false;
    1489             :     }
    1490             : 
    1491           2 :     return _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
    1492             : }
    1493             : 
    1494           0 : bool RDMAConnection::_reject( )
    1495             : {
    1496           0 :     LBASSERT( NULL != _cm_id );
    1497           0 :     LBASSERT( !_established );
    1498             : 
    1499           0 :     if( ::rdma_reject( _cm_id, NULL, 0 ))
    1500             :     {
    1501           0 :         LBERROR << "rdma_reject : " << lunchbox::sysError << std::endl;
    1502           0 :         return false;
    1503             :     }
    1504             : 
    1505           0 :     return true;
    1506             : }
    1507             : 
    1508             : ////////////////////////////////////////////////////////////////////////////////
    1509             : 
    1510           4 : bool RDMAConnection::_initProtocol( int32_t depth )
    1511             : {
    1512           4 :     if( depth < 2L )
    1513             :     {
    1514           0 :         LBERROR << "Invalid queue depth." << std::endl;
    1515           0 :         return false;
    1516             :     }
    1517             : 
    1518           4 :     _depth = depth;
    1519           4 :     _writes = 0L;
    1520           4 :     _fcs = 0L;
    1521           4 :     _readBytes = 0u;
    1522           4 :     _wcredits = _depth / 2 - 2;
    1523           4 :     _fcredits = _depth / 2 + 2;
    1524             : 
    1525           4 :     return true;
    1526             : }
    1527             : 
    1528             : /* inline */
    1529      907893 : bool RDMAConnection::_needFC( )
    1530             : {
    1531             :     bool bytesAvail;
    1532             : #ifdef _WIN32
    1533             :     lunchbox::ScopedFastRead mutex( _eventLock );
    1534             :     bytesAvail = ( _availBytes != 0 );
    1535             : #else
    1536             :     pollfd pfd;
    1537      907893 :     pfd.fd = _pipe_fd[0];
    1538      907893 :     pfd.events = EPOLLIN;
    1539      907893 :     pfd.revents = 0;
    1540      907893 :     int ret = poll( &pfd, 1, 0 );
    1541      907893 :     if ( ret == -1 )
    1542             :     {
    1543           0 :         LBERROR << "poll: " << lunchbox::sysError << std::endl;
    1544           0 :         return true;
    1545             :     }
    1546      907893 :     bytesAvail = ret != 0;
    1547             : #endif
    1548      907893 :     return ( !bytesAvail || (uint32_t)_writes >= FC_MESSAGE_FREQUENCY );
    1549             : }
    1550             : 
    1551      231901 : bool RDMAConnection::_postReceives( const uint32_t count )
    1552             : {
    1553      231901 :     LBASSERT( NULL != _cm_id->qp );
    1554      231902 :     LBASSERT( count > 0UL );
    1555             : 
    1556      231902 :     ibv_sge* sge = new ibv_sge[count];
    1557      231906 :     ibv_recv_wr* wrs = new ibv_recv_wr[count];
    1558             : 
    1559      749514 :     for( uint32_t i = 0UL; i != count; i++ )
    1560             :     {
    1561      517609 :         sge[i].addr = (uint64_t)(uintptr_t)_msgbuf.getBuffer( );
    1562      517614 :         sge[i].length = uint32_t( _msgbuf.getBufferSize( ));
    1563      517614 :         sge[i].lkey = _msgbuf.getMR( )->lkey;
    1564             : 
    1565      517614 :         wrs[i].wr_id = sge[i].addr;
    1566      517614 :         wrs[i].next = &wrs[i + 1];
    1567      517614 :         wrs[i].sg_list = &sge[i];
    1568      517614 :         wrs[i].num_sge = 1;
    1569             :     }
    1570      231905 :     wrs[count - 1].next = NULL;
    1571             : 
    1572             :     struct ibv_recv_wr *bad_wr;
    1573             :     const bool error =
    1574      231905 :         ::rdma_seterrno( ::ibv_post_recv( _cm_id->qp, wrs, &bad_wr ));
    1575             : 
    1576      231904 :     delete[] sge;
    1577      231904 :     delete[] wrs;
    1578             : 
    1579      231904 :     if( error )
    1580             :     {
    1581           0 :         LBERROR << "ibv_post_recv : "  << lunchbox::sysError << std::endl;
    1582           0 :         return false;
    1583             :     }
    1584             : 
    1585      231904 :     return true;
    1586             : }
    1587             : 
    1588             : /* inline */
    1589      464638 : void RDMAConnection::_recvRDMAWrite( const uint32_t imm_data )
    1590             : {
    1591             :     union
    1592             :     {
    1593             :         uint32_t val;
    1594             :         RDMAFCImm fc;
    1595             :     } x;
    1596      464638 :     x.val = ntohl( imm_data );
    1597             : 
    1598             :     // Analysis:
    1599             :     //
    1600             :     // Since the ring pointers are circular, a malicious (presumably overflow)
    1601             :     // value here would at worst only result in us reading arbitrary regions
    1602             :     // from our sink buffer, not segfaulting.  If the other side wanted us to
    1603             :     // reread a previous message it should just resend it!
    1604      464638 :     _sinkptr.incrHead( x.fc.bytes_sent );
    1605      464638 :     _incrAvailableBytes( x.fc.bytes_sent );
    1606             : 
    1607      464638 :     _fcredits += x.fc.fcs_received;
    1608      464638 :     LBASSERTINFO( _fcredits <= _depth, _fcredits << " > " << _depth );
    1609             : 
    1610      464638 :     _writes++;
    1611      464638 : }
    1612             : 
    1613             : /* inline */
    1614      464638 : uint32_t RDMAConnection::_makeImm( const uint32_t b )
    1615             : {
    1616             :     union
    1617             :     {
    1618             :         uint32_t val;
    1619             :         RDMAFCImm fc;
    1620             :     } x;
    1621             : 
    1622      464638 :     x.fc.fcs_received = std::min( MAX_FC, static_cast< uint16_t >( _fcs ));
    1623      464638 :     _fcs -= x.fc.fcs_received;
    1624      464638 :     LBASSERT( _fcs >= 0 );
    1625             : 
    1626      464638 :     LBASSERT( b <= MAX_BS );
    1627      464638 :     x.fc.bytes_sent = b;
    1628             : 
    1629      464638 :     return htonl( x.val );
    1630             : }
    1631             : 
    1632      464638 : bool RDMAConnection::_postRDMAWrite( )
    1633             : {
    1634             :     struct ibv_sge sge;
    1635             :     struct ibv_send_wr wr;
    1636             : 
    1637      929276 :     sge.addr = (uint64_t)( (uintptr_t)_sourcebuf.getBase( ) +
    1638      464638 :         _sourceptr.ptr( _sourceptr.MIDDLE ));
    1639      464638 :     sge.length = (uint64_t)_sourceptr.available( _sourceptr.HEAD,
    1640             :         _sourceptr.MIDDLE );
    1641      464638 :     sge.lkey = _sourcebuf.getMR( )->lkey;
    1642      464638 :     _sourceptr.incr( _sourceptr.MIDDLE, (uint32_t)sge.length );
    1643             : 
    1644      464638 :     wr.wr_id = (uint64_t)sge.length;
    1645      464638 :     wr.next = NULL;
    1646      464638 :     wr.sg_list = &sge;
    1647      464638 :     wr.num_sge = 1;
    1648      464638 :     wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
    1649      464638 :     wr.send_flags = IBV_SEND_SOLICITED; // Important!
    1650      464638 :     wr.imm_data = _makeImm( (uint32_t)sge.length );
    1651      464638 :     wr.wr.rdma.rkey = _rkey;
    1652      929276 :     wr.wr.rdma.remote_addr = (uint64_t)( (uintptr_t)_rbase +
    1653      464638 :         _rptr.ptr( _rptr.HEAD ));
    1654      464638 :     _rptr.incrHead( (uint32_t)sge.length );
    1655             : 
    1656      464638 :     _wcredits--;
    1657      464638 :     LBASSERT( _wcredits >= 0L );
    1658             : 
    1659             :     struct ibv_send_wr *bad_wr;
    1660      464638 :     if( ::rdma_seterrno( ::ibv_post_send( _cm_id->qp, &wr, &bad_wr )))
    1661             :     {
    1662           0 :         LBERROR << "ibv_post_send : "  << lunchbox::sysError << std::endl;
    1663           0 :         return false;
    1664             :     }
    1665             : 
    1666      464638 :     return true;
    1667             : }
    1668             : 
    1669       50998 : bool RDMAConnection::_postMessage( const RDMAMessage &message )
    1670             : {
    1671       50998 :     _fcredits--;
    1672       50998 :     LBASSERT( _fcredits >= 0L );
    1673             : 
    1674      101996 :     if( ::rdma_post_send( _cm_id, (void *)&message, (void *)&message,
    1675       50998 :             offsetof( RDMAMessage, payload ) + message.length, _msgbuf.getMR( ),
    1676             :             0 ))
    1677             :     {
    1678           0 :         LBERROR << "rdma_post_send : "  << lunchbox::sysError << std::endl;
    1679           0 :         return false;
    1680             :     }
    1681             : 
    1682       50998 :     return true;
    1683             : }
    1684             : 
    1685       50929 : void RDMAConnection::_recvMessage( const RDMAMessage &message )
    1686             : {
    1687       50929 :     switch( message.opcode )
    1688             :     {
    1689             :         case FC:
    1690       50925 :             if( sizeof(struct RDMAFCPayload) == (size_t)message.length )
    1691       50925 :                 _recvFC( message.payload.fc );
    1692             :             else
    1693           0 :                 LBWARN << "Invalid flow control message received!" << std::endl;
    1694       50925 :             break;
    1695             :         case SETUP:
    1696           4 :             if( sizeof(struct RDMASetupPayload) == (size_t)message.length )
    1697           4 :                 _recvSetup( message.payload.setup );
    1698             :             else
    1699           0 :                 LBWARN << "Invalid setup message received!" << std::endl;
    1700           4 :             break;
    1701             :         default:
    1702           0 :             LBWARN << "Invalid message received: "
    1703           0 :                 << std::hex << (int)message.opcode << std::dec << std::endl;
    1704             :     }
    1705       50929 : }
    1706             : 
    1707             : /* inline */
    1708       50925 : void RDMAConnection::_recvFC( const RDMAFCPayload &fc )
    1709             : {
    1710             :     // Analysis:
    1711             :     //
    1712             :     // Since we will only write a maximum of _sourceptr.available( ) bytes
    1713             :     // to our source buffer, a malicious (presumably overflow) value here would
    1714             :     // have no chance of causing us to write beyond our buffer as we have local
    1715             :     // control over those ring pointers.  Worst case, we'd and up writing to
    1716             :     // arbitrary regions of the remote buffer, since this ring pointer is
    1717             :     // circular as well.
    1718       50925 :     _rptr.incrTail( fc.bytes_received );
    1719             : 
    1720       50925 :     _wcredits += fc.writes_received;
    1721       50925 :     LBASSERTINFO( _wcredits <= _depth, _wcredits << " > " << _depth );
    1722             : 
    1723       50925 :     _fcs++;
    1724       50925 : }
    1725             : 
    1726       50994 : bool RDMAConnection::_postFC()
    1727             : {
    1728             :     RDMAMessage &message =
    1729       50994 :         *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
    1730             : 
    1731       50994 :     message.opcode = FC;
    1732       50994 :     message.length = (uint8_t)sizeof(struct RDMAFCPayload);
    1733             : 
    1734       50994 :     message.payload.fc.bytes_received = _readBytes;
    1735       50994 :     message.payload.fc.writes_received = _writes;
    1736       50994 :     _writes -= message.payload.fc.writes_received;
    1737       50994 :     _readBytes = 0;
    1738             : 
    1739       50994 :     LBASSERT( _writes >= 0 );
    1740       50994 :     return _postMessage( message );
    1741             : }
    1742             : 
    1743           4 : void RDMAConnection::_recvSetup( const RDMASetupPayload &setup )
    1744             : {
    1745             :     // Analysis:
    1746             :     //
    1747             :     // Malicious values here would only affect the receiver, we're willing
    1748             :     // to RDMA write to anywhere specified!
    1749           4 :     _rbase = setup.rbase;
    1750           4 :     _rptr.clear( setup.rlen );
    1751           4 :     _rkey = setup.rkey;
    1752             : 
    1753           4 :     LBVERB << "RDMA MR: " << std::showbase
    1754           0 :         << std::dec << setup.rlen << " @ "
    1755           4 :         << std::hex << setup.rbase << std::dec << std::endl;
    1756           4 : }
    1757             : 
    1758           4 : bool RDMAConnection::_postSetup( )
    1759             : {
    1760             :     RDMAMessage &message =
    1761           4 :         *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
    1762             : 
    1763           4 :     message.opcode = SETUP;
    1764           4 :     message.length = (uint8_t)sizeof(struct RDMASetupPayload);
    1765             : 
    1766           4 :     message.payload.setup.rbase = (uint64_t)(uintptr_t)_sinkbuf.getBase( );
    1767           4 :     message.payload.setup.rlen = (uint64_t)_sinkbuf.getSize( );
    1768           4 :     message.payload.setup.rkey = _sinkbuf.getMR( )->rkey;
    1769             : 
    1770           4 :     return _postMessage( message );
    1771             : }
    1772             : 
    1773           4 : bool RDMAConnection::_waitRecvSetup( )
    1774             : {
    1775           8 :     lunchbox::Clock clock;
    1776           4 :     const int64_t start = clock.getTime64( );
    1777           4 :     const uint32_t timeout = Global::getTimeout( );
    1778           4 :     eventset events;
    1779             : 
    1780             : retry:
    1781          70 :     if( !_checkDisconnected( events ))
    1782             :     {
    1783           0 :         LBERROR << "Error while checking event state." << std::endl;
    1784           0 :         return false;
    1785             :     }
    1786             : 
    1787          70 :     if( !_established )
    1788             :     {
    1789           0 :         LBERROR << "Disconnected while waiting for setup message." << std::endl;
    1790           0 :         return false;
    1791             :     }
    1792             : 
    1793          70 :     if( !_checkCQ( true ))
    1794             :     {
    1795           0 :         LBERROR << "Error while polling receive completion queue." << std::endl;
    1796           0 :         return false;
    1797             :     }
    1798             : 
    1799          70 :     if( 0ULL == _rkey )
    1800             :     {
    1801          66 :         if( LB_TIMEOUT_INDEFINITE != timeout )
    1802             :         {
    1803          66 :             if(( clock.getTime64( ) - start ) > timeout )
    1804             :             {
    1805           0 :                 LBERROR << "Timed out waiting for setup message." << std::endl;
    1806           0 :                 return false;
    1807             :             }
    1808             :         }
    1809             : 
    1810          66 :         lunchbox::Thread::yield( );
    1811          66 :         goto retry;
    1812             :     }
    1813             : 
    1814           4 :     return true;
    1815             : }
    1816             : 
    1817             : ////////////////////////////////////////////////////////////////////////////////
    1818             : 
    1819           6 : bool RDMAConnection::_createNotifier( )
    1820             : {
    1821           6 :     LBASSERT( _notifier < 0 );
    1822             : #ifdef _WIN32
    1823             :     _notifier = CreateEvent( 0, TRUE, FALSE, 0 );
    1824             :     return true;
    1825             : #else
    1826           6 :     _notifier = ::epoll_create( 1 );
    1827           6 :     if( _notifier < 0 )
    1828             :     {
    1829           0 :         LBERROR << "epoll_create1 : " << lunchbox::sysError << std::endl;
    1830           0 :         return false;
    1831             :     }
    1832             : 
    1833           6 :     return true;
    1834             : #endif
    1835             : }
    1836             : 
    1837           6 : void RDMAConnection::_updateNotifier()
    1838             : {
    1839             : #ifdef _WIN32
    1840             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    1841             :     if ( _availBytes == 0 && !_cm->channel.Head  &&
    1842             :             ( !_cc || !_cc->comp_channel.Head ))
    1843             :         ResetEvent( _notifier );
    1844             : #else
    1845           6 :     eventset events;
    1846           6 :     _checkEvents( events );
    1847             : #endif
    1848           6 : }
    1849             : 
    1850     1338718 : bool RDMAConnection::_checkEvents( eventset &events )
    1851             : {
    1852     1338718 :     events.reset( );
    1853             : 
    1854             : #ifdef _WIN32
    1855             :     lunchbox::ScopedFastRead mutex( _eventLock );
    1856             :     if ( _availBytes != 0 )
    1857             :         events.set( BUF_EVENT );
    1858             :     if ( _cc  && ( _cc->comp_channel.Head != 0 ))
    1859             :         events.set( CQ_EVENT );
    1860             :     if ( _cm->channel.Head )
    1861             :         events.set( CM_EVENT );
    1862             : 
    1863             :     return true;
    1864             : #else
    1865             :     struct epoll_event evts[3];
    1866             : 
    1867     1338573 :     int nfds = TEMP_FAILURE_RETRY( ::epoll_wait( _notifier, evts, 3, 0 ));
    1868     1335287 :     if( nfds < 0 )
    1869             :     {
    1870           0 :         LBERROR << "epoll_wait : " << lunchbox::sysError << std::endl;
    1871           0 :         return false;
    1872             :     }
    1873             : 
    1874     1865258 :     for( int i = 0; i < nfds; i++ )
    1875             :     {
    1876      529932 :         const int fd = evts[i].data.fd;
    1877      529932 :         if(( _pipe_fd[0] >= 0 ) && ( fd == _pipe_fd[0] ))
    1878      442789 :             events.set( BUF_EVENT );
    1879       87143 :         else if( _cc && ( fd == _cc->fd ))
    1880       87131 :             events.set( CQ_EVENT );
    1881          12 :         else if( _cm && ( fd == _cm->fd ))
    1882          12 :             events.set( CM_EVENT );
    1883             :         else
    1884           0 :             LBUNREACHABLE;
    1885             :     }
    1886             : 
    1887     1335326 :     return true;
    1888             : #endif
    1889             : }
    1890             : 
    1891     1250666 : bool RDMAConnection::_checkDisconnected( eventset &events )
    1892             : {
    1893     2498300 :     lunchbox::ScopedWrite poll_mutex( _poll_lock );
    1894             : 
    1895     1252069 :     if( !_checkEvents( events ))
    1896             :     {
    1897           0 :         LBERROR << "Error while checking event state." << std::endl;
    1898           0 :         return false;
    1899             :     }
    1900             : 
    1901     1247736 :     if( events.test( CM_EVENT ))
    1902             :     {
    1903           2 :         if( !_doCMEvent( RDMA_CM_EVENT_DISCONNECTED ))
    1904             :         {
    1905           0 :             LBERROR << "Unexpected connection manager event." << std::endl;
    1906           0 :             return false;
    1907             :         }
    1908             : 
    1909           2 :         LBASSERT( !_established );
    1910             :     }
    1911             : 
    1912     1247634 :     return true;
    1913             : }
    1914             : 
    1915           4 : bool RDMAConnection::_createBytesAvailableFD( )
    1916             : {
    1917             : #ifndef _WIN32
    1918           4 :     if ( pipe( _pipe_fd ) == -1 )
    1919             :     {
    1920           0 :         LBERROR << "pipe: " << lunchbox::sysError << std::endl;
    1921           0 :         return false;
    1922             :     }
    1923             : 
    1924           4 :     LBASSERT( _pipe_fd[0] >= 0 && _pipe_fd[1] >= 0);
    1925             : 
    1926             :     // Use the event fd to signal Collage of bytes remaining.
    1927             :     struct epoll_event evctl;
    1928           4 :     ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
    1929           4 :     evctl.events = EPOLLIN;
    1930           4 :     evctl.data.fd = _pipe_fd[0];
    1931           4 :     if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
    1932             :     {
    1933           0 :         LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
    1934           0 :         return false;
    1935             :     }
    1936             : #endif
    1937           4 :     return true;
    1938             : }
    1939             : 
    1940      886782 : bool RDMAConnection::_incrAvailableBytes( const uint64_t b )
    1941             : {
    1942             : #ifdef _WIN32
    1943             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    1944             :     _availBytes += b;
    1945             :     ::SetEvent( _notifier );
    1946             : #else
    1947      886782 :     if( ::write( _pipe_fd[1], (const void *)&b, sizeof(b) ) != sizeof(b) )
    1948             :     {
    1949           0 :         LBERROR << "write : " << lunchbox::sysError << std::endl;
    1950           0 :         return false;
    1951             :     }
    1952             : #endif
    1953      886782 :     return true;
    1954             : }
    1955             : 
    1956      442789 : uint64_t RDMAConnection::_getAvailableBytes( )
    1957             : {
    1958      442789 :     uint64_t available_bytes = 0;
    1959             : #ifdef _WIN32
    1960             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    1961             :     available_bytes = static_cast<uint64_t>( _availBytes );
    1962             :     _availBytes = 0;
    1963             :     return available_bytes;
    1964             : #else
    1965      442789 :     uint64_t currBytes = 0;
    1966             :     ssize_t count;
    1967             :     pollfd pfd;
    1968      442789 :     pfd.fd = _pipe_fd[0];
    1969      442789 :     pfd.events = EPOLLIN;
    1970             : 
    1971      443993 :     do
    1972             :     {
    1973      886782 :         count = ::read( _pipe_fd[0], (void *)&currBytes, sizeof( currBytes ));
    1974      886782 :         if ( count > 0 && count < (ssize_t)sizeof( currBytes ) )
    1975           0 :             return 0ULL;
    1976      886782 :         available_bytes += currBytes;
    1977      886782 :         pfd.revents = 0;
    1978      886782 :         if ( ::poll( &pfd, 1, 0 ) == -1 )
    1979             :         {
    1980           0 :             LBERROR << "poll : " << lunchbox::sysError << std::endl;
    1981           0 :             return 0ULL;
    1982             :         }
    1983      886782 :     } while ( pfd.revents > 0 );
    1984             : 
    1985      442789 :     if ( count == -1 )
    1986             :     {
    1987           0 :         LBERROR << "read : " << lunchbox::sysError << std::endl;
    1988           0 :         return 0ULL;
    1989             :     }
    1990             : 
    1991      442789 :     LBASSERT( available_bytes > 0ULL );
    1992      442789 :     return available_bytes;
    1993             : #endif
    1994             : }
    1995             : 
    1996          10 : bool RDMAConnection::_waitForCMEvent( enum rdma_cm_event_type expected )
    1997             : {
    1998          20 :     lunchbox::Clock clock;
    1999          10 :     const int64_t start = clock.getTime64( );
    2000          10 :     const uint32_t timeout = Global::getTimeout( );
    2001          10 :     bool done = false;
    2002          10 :     eventset events;
    2003             : 
    2004             : retry:
    2005       86683 :     if( !_checkEvents( events ))
    2006             :     {
    2007           0 :         LBERROR << "Error while checking event state." << std::endl;
    2008           0 :         return false;
    2009             :     }
    2010             : 
    2011       86700 :     if( events.test( CM_EVENT ))
    2012             :     {
    2013          10 :         done = _doCMEvent( expected );
    2014          10 :         if( !done )
    2015             :         {
    2016           0 :             LBERROR << "Unexpected connection manager event." << std::endl;
    2017           0 :             return false;
    2018             :         }
    2019             :     }
    2020             : 
    2021       86690 :     if( !done )
    2022             :     {
    2023       86680 :         if( LB_TIMEOUT_INDEFINITE != timeout )
    2024             :         {
    2025       86680 :             if(( clock.getTime64( ) - start ) > timeout )
    2026             :             {
    2027           0 :                 LBERROR << "Timed out waiting for setup message." << std::endl;
    2028           0 :                 return false;
    2029             :             }
    2030             :         }
    2031             : 
    2032       86685 :         lunchbox::Thread::yield( );
    2033       86673 :         goto retry;
    2034             :     }
    2035             : 
    2036          10 :     return true;
    2037             : }
    2038             : 
    2039          12 : bool RDMAConnection::_doCMEvent( enum rdma_cm_event_type expected )
    2040             : {
    2041          12 :     bool ok = false;
    2042             :     struct rdma_cm_event *event;
    2043             : 
    2044          12 :     if( ::rdma_get_cm_event( _cm, &event ))
    2045             :     {
    2046           0 :         LBERROR << "rdma_get_cm_event : " << lunchbox::sysError << std::endl;
    2047           0 :         return false;
    2048             :     }
    2049             : 
    2050          12 :     ok = ( event->event == expected );
    2051             : 
    2052             : #ifndef NDEBUG
    2053          12 :     if( ok )
    2054             :     {
    2055          12 :         LBVERB << (void *)this
    2056             :             << " (" << _addr << ":" << _serv << ")"
    2057           0 :             << " event : " << ::rdma_event_str( event->event )
    2058          12 :             << std::endl;
    2059             :     }
    2060             :     else
    2061             :     {
    2062           0 :         LBINFO << (void *)this
    2063             :             << " (" << _addr << ":" << _serv << ")"
    2064             :             << " event : " << ::rdma_event_str( event->event )
    2065           0 :             << " expected: " << ::rdma_event_str( expected )
    2066           0 :             << std::endl;
    2067             :     }
    2068             : #endif
    2069             : 
    2070          12 :     if( ok && ( RDMA_CM_EVENT_DISCONNECTED == event->event ))
    2071           2 :         _established = false;
    2072             : 
    2073          12 :     if( ok && ( RDMA_CM_EVENT_ESTABLISHED == event->event ))
    2074             :     {
    2075           4 :         _established = true;
    2076             : 
    2077           4 :         struct rdma_conn_param *cp = &event->param.conn;
    2078             : 
    2079           4 :         ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
    2080             :         // Note that the actual amount of data transferred to the remote side
    2081             :         // is transport dependent and may be larger than that requested.
    2082           4 :         if( cp->private_data_len >= sizeof(RDMAConnParamData) )
    2083           2 :             _cpd = *reinterpret_cast< const RDMAConnParamData * >(
    2084           2 :                 cp->private_data );
    2085             :     }
    2086             : 
    2087          12 :     if( ok && ( RDMA_CM_EVENT_CONNECT_REQUEST == event->event ))
    2088             :     {
    2089           2 :         _new_cm_id = event->id;
    2090             : 
    2091           2 :         struct rdma_conn_param *cp = &event->param.conn;
    2092             : 
    2093           2 :         ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
    2094             :         // TODO : Not sure what happens when initiator sent ai_connect data
    2095             :         // (assuming the underlying transport doesn't strip it)?
    2096           2 :         if( cp->private_data_len >= sizeof(RDMAConnParamData) )
    2097           2 :             _cpd = *reinterpret_cast< const RDMAConnParamData * >(
    2098           2 :                 cp->private_data );
    2099             :     }
    2100             : 
    2101          12 :     if( RDMA_CM_EVENT_REJECTED == event->event )
    2102           0 :         LBINFO << "Connection reject status : " << event->status << std::endl;
    2103             : 
    2104          12 :     if( ::rdma_ack_cm_event( event ))
    2105           0 :         LBWARN << "rdma_ack_cm_event : "  << lunchbox::sysError << std::endl;
    2106             : 
    2107          12 :     return ok;
    2108             : }
    2109             : 
    2110       87131 : bool RDMAConnection::_rearmCQ( )
    2111             : {
    2112             :     struct ibv_cq *ev_cq;
    2113             :     void *ev_ctx;
    2114             : 
    2115             : #ifdef _WIN32
    2116             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    2117             : #endif
    2118       87131 :     if( ::ibv_get_cq_event( _cc, &ev_cq, &ev_ctx ))
    2119             :     {
    2120           0 :         LBERROR << "ibv_get_cq_event : " << lunchbox::sysError << std::endl;
    2121           0 :         return false;
    2122             :     }
    2123             : 
    2124             :     // http://lists.openfabrics.org/pipermail/general/2008-November/055237.html
    2125       87131 :     _completions++;
    2126       87131 :     if( std::numeric_limits< unsigned int >::max( ) == _completions )
    2127             :     {
    2128           0 :         ::ibv_ack_cq_events( _cq, _completions );
    2129           0 :         _completions = 0U;
    2130             :     }
    2131             : 
    2132             :     // Solicited only!
    2133       87131 :     if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
    2134             :     {
    2135           0 :         LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
    2136           0 :         return false;
    2137             :     }
    2138             : 
    2139       87131 :     return true;
    2140             : }
    2141             : 
    2142     1249873 : bool RDMAConnection::_checkCQ( bool drain )
    2143             : {
    2144             :     uint32_t num_recvs;
    2145             :     int count;
    2146             : 
    2147     2501158 :     lunchbox::ScopedWrite poll_mutex( _poll_lock );
    2148             : 
    2149     1252187 :     if( NULL == _cq )
    2150           0 :         return true;
    2151             : 
    2152             : repoll:
    2153             :     /* CHECK RECEIVE COMPLETIONS */
    2154     1436006 :     count = ::ibv_poll_cq( _cq, _depth, _wcs );
    2155     1430009 :     if( count < 0 )
    2156             :     {
    2157           0 :         LBERROR << "ibv_poll_cq : " << lunchbox::sysError << std::endl;
    2158           0 :         return false;
    2159             :     }
    2160             : 
    2161     1430009 :     num_recvs = 0UL;
    2162     2454701 :     for( int i = 0; i != count ; i++ )
    2163             :     {
    2164     1019599 :         struct ibv_wc &wc = _wcs[i];
    2165             : 
    2166     1019599 :         if( IBV_WC_SUCCESS != wc.status )
    2167             :         {
    2168             :             // Non-fatal.
    2169           0 :             if( IBV_WC_WR_FLUSH_ERR == wc.status )
    2170           0 :                 continue;
    2171             : 
    2172           0 :             LBWARN << (void *)this << " !IBV_WC_SUCCESS : " << std::showbase
    2173           0 :                 << std::hex << "wr_id = " << wc.wr_id
    2174           0 :                 << ", status = \"" << ::ibv_wc_status_str( wc.status ) << "\""
    2175           0 :                 << std::dec << " (" << (unsigned int)wc.status << ")"
    2176           0 :                 << std::hex << ", vendor_err = " << wc.vendor_err
    2177           0 :                 << std::dec << std::endl;
    2178             : 
    2179             :             // All others are fatal.
    2180           0 :             return false;
    2181             :         }
    2182             : 
    2183     1019599 :         LBASSERT( IBV_WC_SUCCESS == wc.status );
    2184             : 
    2185             : #ifdef _WIN32 //----------------------------------------------------------------
    2186             :         // WINDOWS IBV API WORKAROUND.
    2187             :         // TODO: remove this as soon as IBV API is fixed
    2188             :         if ( wc.opcode == IBV_WC_RECV && wc.wc_flags == IBV_WC_WITH_IMM )
    2189             :             wc.opcode = IBV_WC_RECV_RDMA_WITH_IMM;
    2190             : #endif //-----------------------------------------------------------------------
    2191             : 
    2192     1031087 :         if( IBV_WC_RECV_RDMA_WITH_IMM == wc.opcode )
    2193      464638 :             _recvRDMAWrite( wc.imm_data );
    2194      566449 :         else if( IBV_WC_RECV == wc.opcode )
    2195       50929 :             _recvMessage( *reinterpret_cast< RDMAMessage * >( wc.wr_id ));
    2196      515520 :         else if( IBV_WC_SEND == wc.opcode )
    2197       50998 :             _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
    2198      464522 :         else if( IBV_WC_RDMA_WRITE == wc.opcode )
    2199      464522 :             _sourceptr.incrTail( (uint32_t)wc.wr_id );
    2200             :         else
    2201           0 :             LBUNREACHABLE;
    2202             : 
    2203     1024692 :         if( IBV_WC_RECV & wc.opcode )
    2204             :         {
    2205      515567 :             _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
    2206             :             // All receive completions need to be reposted.
    2207      515567 :             num_recvs++;
    2208             :         }
    2209             :     }
    2210             : 
    2211     1435102 :     if(( num_recvs > 0UL ) && !_postReceives( num_recvs ))
    2212           0 :         return false;
    2213             : 
    2214     1435104 :     if( drain && ( count > 0 ))
    2215      183819 :         goto repoll;
    2216             : 
    2217     1251285 :     return true;
    2218             : }
    2219             : 
    2220             : /* inline */
    2221      442791 : uint32_t RDMAConnection::_drain( void *buffer, const uint32_t bytes )
    2222             : {
    2223      442791 :     const uint32_t b = std::min( bytes, _sinkptr.available( ));
    2224      885582 :     ::memcpy( buffer, (const void *)((uintptr_t)_sinkbuf.getBase( ) +
    2225      885582 :         _sinkptr.tail( )), b );
    2226      442791 :     _sinkptr.incrTail( b );
    2227      442791 :     return b;
    2228             : }
    2229             : 
    2230             : /* inline */
    2231      787099 : uint32_t RDMAConnection::_fill( const void *buffer, const uint32_t bytes )
    2232             : {
    2233     2361297 :     uint32_t b = std::min( bytes, std::min( _sourceptr.negAvailable( ),
    2234     3148396 :         _rptr.negAvailable( )));
    2235             : #ifndef WRAP
    2236     2361297 :     b = std::min( b, (uint32_t)(_sourcebuf.getSize() -
    2237     1574198 :                                             _sourceptr.ptr( _sourceptr.HEAD )));
    2238             : #endif
    2239     1574198 :     ::memcpy( (void *)((uintptr_t)_sourcebuf.getBase( ) +
    2240     1574198 :         _sourceptr.ptr( _sourceptr.HEAD )), buffer, b );
    2241      787099 :     _sourceptr.incrHead( b );
    2242      787099 :     return b;
    2243             : }
    2244             : 
    2245           0 : Connection::Notifier RDMAConnection::getNotifier() const
    2246             : {
    2247           0 :     return _notifier;
    2248             : }
    2249             : 
    2250             : #ifdef _WIN32
    2251             : void RDMAConnection::_triggerNotifierCQ( RDMAConnection* conn )
    2252             : {
    2253             :     conn->_triggerNotifierWorker( CQ_EVENT );
    2254             : }
    2255             : 
    2256             : void RDMAConnection::_triggerNotifierCM( RDMAConnection* conn )
    2257             : {
    2258             :     conn->_triggerNotifierWorker( CM_EVENT );
    2259             : }
    2260             : 
    2261             : void RDMAConnection::_triggerNotifierWorker( Events event )
    2262             : {
    2263             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    2264             :     COMP_CHANNEL* chan = event == CQ_EVENT ? &_cc->comp_channel : &_cm->channel;
    2265             :     EnterCriticalSection( &chan->Lock );
    2266             :     ResetEvent( chan->Event );
    2267             :     if ( chan->Head )
    2268             :         SetEvent( _notifier );
    2269             :     LeaveCriticalSection( &chan->Lock );
    2270             : }
    2271             : #endif
    2272             : 
    2273             : //////////////////////////////////////////////////////////////////////////////
    2274             : 
    2275           0 : void RDMAConnection::_showStats( )
    2276             : {
    2277           0 :     LBVERB << std::dec
    2278           0 :         << "reads = " << _stats.reads
    2279           0 :         << ", buffer_empty = " << _stats.buffer_empty
    2280           0 :         << ", no_credits_fc = " << _stats.no_credits_fc
    2281           0 :         << ", writes = " << _stats.writes
    2282           0 :         << ", buffer_full = " << _stats.buffer_full
    2283           0 :         << ", no_credits_rdma = " << _stats.no_credits_rdma
    2284           0 :         << std::endl;
    2285           0 : }
    2286             : 
    2287             : //////////////////////////////////////////////////////////////////////////////
    2288             : 
    2289           6 : BufferPool::BufferPool( size_t buffer_size )
    2290             :     : _buffer_size( buffer_size )
    2291             :     , _num_bufs( 0 )
    2292             :     , _buffer( NULL )
    2293             :     , _mr( NULL )
    2294           6 :     , _ring( 0 )
    2295             : {
    2296           6 : }
    2297             : 
    2298          12 : BufferPool::~BufferPool( )
    2299             : {
    2300           6 :     clear( );
    2301           6 : }
    2302             : 
    2303          20 : void BufferPool::clear( )
    2304             : {
    2305          20 :     _num_bufs = 0;
    2306          20 :     _ring.clear( _num_bufs );
    2307             : 
    2308          20 :     if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
    2309           0 :         LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
    2310          20 :     _mr = NULL;
    2311             : 
    2312          20 :     if( NULL != _buffer )
    2313             : #ifdef _WIN32
    2314             :         _aligned_free( _buffer );
    2315             : #else
    2316           4 :         ::free( _buffer );
    2317             : #endif
    2318          20 :     _buffer = NULL;
    2319          20 : }
    2320             : 
    2321           4 : bool BufferPool::resize( ibv_pd *pd, uint32_t num_bufs )
    2322             : {
    2323           4 :     clear( );
    2324             : 
    2325           4 :     if( num_bufs )
    2326             :     {
    2327           4 :         _num_bufs = num_bufs;
    2328           4 :         _ring.clear( _num_bufs );
    2329             : 
    2330             : #ifdef _WIN32
    2331             :         _buffer = _aligned_malloc((size_t)( _num_bufs * _buffer_size ),
    2332             :             boost::interprocess::mapped_region::get_page_size());
    2333             :         if ( !_buffer )
    2334             :         {
    2335             :             LBERROR << "_aligned_malloc : Couldn't allocate aligned memory. "
    2336             :                     << lunchbox::sysError << std::endl;
    2337             :             return false;
    2338             :         }
    2339             : #else
    2340           4 :         if( ::posix_memalign( &_buffer, (size_t)::getpagesize( ),
    2341           4 :                 (size_t)( _num_bufs * _buffer_size )))
    2342             :         {
    2343           0 :             LBERROR << "posix_memalign : " << lunchbox::sysError << std::endl;
    2344           0 :             return false;
    2345             :         }
    2346             : #endif
    2347           4 :         ::memset( _buffer, 0xff, (size_t)( _num_bufs * _buffer_size ));
    2348           4 :         _mr = ::ibv_reg_mr( pd, _buffer, (size_t)( _num_bufs * _buffer_size ),
    2349             :             IBV_ACCESS_LOCAL_WRITE );
    2350           4 :         if( NULL == _mr )
    2351             :         {
    2352           0 :             LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
    2353           0 :             return false;
    2354             :         }
    2355             : 
    2356        4100 :         for( uint32_t i = 0; i != _num_bufs; i++ )
    2357        4096 :             _ring.put( i );
    2358             :     }
    2359             : 
    2360           4 :     return true;
    2361             : }
    2362             : 
    2363             : //////////////////////////////////////////////////////////////////////////////
    2364             : 
    2365          12 : RingBuffer::RingBuffer( int access )
    2366             :     : _access( access )
    2367             :     , _size( 0 )
    2368             : #ifdef _WIN32
    2369             :     , _mapping( 0 )
    2370             :     , _map( 0 )
    2371             : #else
    2372             :     , _map( MAP_FAILED )
    2373             : #endif
    2374          12 :     , _mr( 0 )
    2375             : {
    2376          12 : }
    2377             : 
    2378          24 : RingBuffer::~RingBuffer( )
    2379             : {
    2380          12 :     clear( );
    2381          12 : }
    2382             : 
    2383          40 : void RingBuffer::clear( )
    2384             : {
    2385          40 :     if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
    2386           0 :         LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
    2387          40 :     _mr = NULL;
    2388             : 
    2389             : #ifdef _WIN32
    2390             :     if ( _map )
    2391             :     {
    2392             :         UnmapViewOfFile( static_cast<char*>( _map ));
    2393             :         UnmapViewOfFile( static_cast<char*>( _map ) + _size );
    2394             :     }
    2395             :     if ( _mapping )
    2396             :         CloseHandle( _mapping );
    2397             : 
    2398             :     _map = 0;
    2399             :     _mapping = 0;
    2400             : #else
    2401          40 :     if(( MAP_FAILED != _map ) && ::munmap( _map, _size << 1 ))
    2402           0 :         LBWARN << "munmap : " << lunchbox::sysError << std::endl;
    2403          40 :     _map = MAP_FAILED;
    2404             : #endif
    2405          40 :     _size = 0;
    2406          40 : }
    2407             : 
    2408           8 : bool RingBuffer::resize( ibv_pd *pd, size_t size )
    2409             : {
    2410           8 :     bool ok = false;
    2411           8 :     int fd = -1;
    2412             : 
    2413           8 :     clear( );
    2414             : 
    2415           8 :     if( size )
    2416             :     {
    2417             : #ifdef _WIN32
    2418             :         uint32_t num_retries = RINGBUFFER_ALLOC_RETRIES;
    2419             :         while (!_map && num_retries-- != 0)
    2420             :         {
    2421             :             void *target_addr = determineViableAddr( size * 2 );
    2422             :             if (target_addr)
    2423             :                 allocAt( size, target_addr );
    2424             :         }
    2425             : 
    2426             :         if ( !_map )
    2427             :         {
    2428             :             LBERROR << "Couldn't allocate desired RingBuffer memory after "
    2429             :                     << RINGBUFFER_ALLOC_RETRIES << " retries." << std::endl;
    2430             :         }
    2431             :         else
    2432             :         {
    2433             :             LBVERB << "Allocated RDMA Ringbuffer memory in "
    2434             :                     << ( RINGBUFFER_ALLOC_RETRIES - num_retries ) << " tries."
    2435             :                     << std::endl;
    2436             :             ok = true;
    2437             :         }
    2438             : 
    2439             : #else
    2440             :         void *addr1, *addr2;
    2441           8 :         char path[] = "/dev/shm/co-rdma-buffer-XXXXXX";
    2442             : 
    2443           8 :         _size = size;
    2444             : 
    2445           8 :         fd = ::mkstemp( path );
    2446           8 :         if( fd < 0 )
    2447             :         {
    2448           0 :             LBERROR << "mkstemp : " << lunchbox::sysError << std::endl;
    2449           0 :             goto out;
    2450             :         }
    2451             : 
    2452           8 :         if( ::unlink( path ))
    2453             :         {
    2454           0 :             LBERROR << "unlink : " << lunchbox::sysError << std::endl;
    2455           0 :             goto out;
    2456             :         }
    2457             : 
    2458           8 :         if( ::ftruncate( fd, _size ))
    2459             :         {
    2460           0 :             LBERROR << "ftruncate : " << lunchbox::sysError << std::endl;
    2461           0 :             goto out;
    2462             :         }
    2463             : 
    2464           8 :         _map = ::mmap( NULL, _size << 1,
    2465             :             PROT_NONE,
    2466             :             MAP_ANONYMOUS | MAP_PRIVATE, -1, 0 );
    2467           8 :         if( MAP_FAILED == _map )
    2468             :         {
    2469           0 :             LBERROR << "mmap : " << lunchbox::sysError << std::endl;
    2470           0 :             goto out;
    2471             :         }
    2472             : 
    2473           8 :         addr1 = ::mmap( _map, _size,
    2474             :             PROT_READ | PROT_WRITE,
    2475           8 :             MAP_FIXED | MAP_SHARED, fd, 0 );
    2476           8 :         if( MAP_FAILED == addr1 )
    2477             :         {
    2478           0 :             LBERROR << "mmap : " << lunchbox::sysError << std::endl;
    2479           0 :             goto out;
    2480             :         }
    2481             : 
    2482           8 :         addr2 = ::mmap( (void *)( (uintptr_t)_map + _size ), _size,
    2483             :             PROT_READ | PROT_WRITE,
    2484           8 :             MAP_FIXED | MAP_SHARED, fd, 0 );
    2485           8 :         if( MAP_FAILED == addr2 )
    2486             :         {
    2487           0 :             LBERROR << "mmap : " << lunchbox::sysError << std::endl;
    2488           0 :             goto out;
    2489             :         }
    2490             : 
    2491           8 :         LBASSERT( addr1 == _map );
    2492           8 :         LBASSERT( addr2 == (void *)( (uintptr_t)_map + _size ));
    2493             : 
    2494             : #endif
    2495             : #ifdef WRAP
    2496             :         _mr = ::ibv_reg_mr( pd, _map, _size << 1, _access );
    2497             : #else
    2498           8 :         _mr = ::ibv_reg_mr( pd, _map, _size, _access );
    2499             : #endif
    2500             : 
    2501           8 :         if( NULL == _mr )
    2502             :         {
    2503           0 :             LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
    2504           0 :             goto out;
    2505             :         }
    2506             : 
    2507           8 :         ::memset( _map, 0, _size );
    2508           8 :         *reinterpret_cast< uint8_t * >( _map ) = 0x45;
    2509           8 :         LBASSERT( 0x45 ==
    2510             :             *reinterpret_cast< uint8_t * >( (uintptr_t)_map + _size ));
    2511             :     }
    2512             : 
    2513           8 :     ok = true;
    2514             : 
    2515             : out:
    2516             : #ifndef _WIN32
    2517           8 :     if(( fd >= 0 ) && TEMP_FAILURE_RETRY( ::close( fd )))
    2518           0 :         LBWARN << "close : " << lunchbox::sysError << std::endl;
    2519             : #endif
    2520           8 :     return ok;
    2521             : }
    2522             : 
    2523             : #ifdef _WIN32
    2524             : void* RingBuffer::determineViableAddr( size_t size )
    2525             : {
    2526             :     void *ptr = VirtualAlloc(0, size, MEM_RESERVE, PAGE_NOACCESS);
    2527             :     if (!ptr)
    2528             :         return 0;
    2529             : 
    2530             :     VirtualFree(ptr, 0, MEM_RELEASE);
    2531             :     return ptr;
    2532             : }
    2533             : 
    2534             : void RingBuffer::allocAt( size_t size, void* desiredAddr )
    2535             : {
    2536             :     // if we already hold one allocation, refuse to make another.
    2537             :     LBASSERT( !_map );
    2538             :     LBASSERT( !_mapping );
    2539             :     if ( _map || _mapping )
    2540             :         return;
    2541             : 
    2542             :     // is ring_size a multiple of 64k? if not, this won't ever work!
    2543             :     if (( size & 0xffff ) != 0 )
    2544             :         return;
    2545             : 
    2546             :     // try to allocate and map our space
    2547             :     size_t alloc_size = size * 2;
    2548             :     _mapping = CreateFileMappingA(  INVALID_HANDLE_VALUE,
    2549             :                                     0,
    2550             :                                     PAGE_READWRITE,
    2551             :                                     (unsigned long long)alloc_size >> 32,
    2552             :                                     alloc_size & 0xffffffffu,
    2553             :                                     0 );
    2554             : 
    2555             :     if ( !_mapping )
    2556             :     {
    2557             :         LBERROR << "CreateFileMappingA failed" << std::endl;
    2558             :         goto err;
    2559             :     }
    2560             : 
    2561             :     _map = MapViewOfFileEx( _mapping,
    2562             :                             FILE_MAP_ALL_ACCESS,
    2563             :                             0, 0,
    2564             :                             size,
    2565             :                             desiredAddr );
    2566             : 
    2567             :     if ( !_map )
    2568             :     {
    2569             :         LBERROR << "First MapViewOfFileEx failed" << std::endl;
    2570             :         goto err;
    2571             :     }
    2572             : 
    2573             :     if (!MapViewOfFileEx(   _mapping,
    2574             :                             FILE_MAP_ALL_ACCESS,
    2575             :                             0, 0,
    2576             :                             size,
    2577             :                             (char *)desiredAddr + size))
    2578             :     {
    2579             :         LBERROR << "Second MapViewOfFileEx failed" << std::endl;
    2580             :         goto err;
    2581             :     }
    2582             : 
    2583             :     _size = size;
    2584             :     return;
    2585             : err:
    2586             :     // something went wrong - clean up
    2587             :     clear();
    2588             : }
    2589             : #endif
    2590             : 
    2591             : 
    2592          66 : } // namespace co

Generated by: LCOV version 1.11