LCOV - code coverage report
Current view: top level - co - rdmaConnection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 740 1121 66.0 %
Date: 2015-11-03 13:48:53 Functions: 63 66 95.5 %

          Line data    Source code
       1             : // -*- mode: c++ -*-
       2             : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
       3             :  *                     United States Naval Research Laboratory
       4             :  *               2012-2013, Stefan Eilemann <eile@eyescale.ch>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : #include "rdmaConnection.h"
      22             : 
      23             : #include "connectionType.h"
      24             : #include "connectionDescription.h"
      25             : #include "global.h"
      26             : 
      27             : #include <lunchbox/clock.h>
      28             : 
      29             : #ifdef _WIN32
      30             : #  pragma warning( disable : 4018 )
      31             : #  include <boost/interprocess/mapped_region.hpp>
      32             : #  pragma warning( default : 4018 )
      33             : #else
      34             : #  include <arpa/inet.h>
      35             : #  include <sys/epoll.h>
      36             : #  include <sys/mman.h>
      37             : #  include <poll.h>
      38             : #  include <unistd.h>
      39             : #endif
      40             : 
      41             : #include <errno.h>
      42             : #include <fcntl.h>
      43             : #include <limits>
      44             : #include <sstream>
      45             : #include <stddef.h>
      46             : 
      47             : #include <rdma/rdma_verbs.h>
      48             : 
      49             : #define IPV6_DEFAULT 0
      50             : 
      51             : #define RDMA_PROTOCOL_MAGIC     0xC0
      52             : #define RDMA_PROTOCOL_VERSION   0x03
      53             : 
      54             : namespace co
      55             : {
      56             : //namespace { static const uint64_t ONE = 1ULL; }
      57             : 
      58             : /**
      59             :  * Message types
      60             :  */
      61             : enum OpCode
      62             : {
      63             :     SETUP = 1 << 0,
      64             :     FC    = 1 << 1,
      65             : };
      66             : 
      67             : /**
      68             :  * Initial setup message used to exchange sink MR parameters.
      69             :  */
      70             : struct RDMASetupPayload
      71             : {
      72             :     uint64_t rbase;
      73             :     uint64_t rlen;
      74             :     uint64_t rkey;
      75             : };
      76             : 
      77             : /**
      78             :  * "ACK" messages sent after read, tells source about receive progress.
      79             :  */
      80             : struct RDMAFCPayload
      81             : {
      82             :     uint32_t bytes_received;
      83             :     uint32_t writes_received;
      84             : };
      85             : 
      86             : /**
      87             :  * Payload wrapper
      88             :  */
      89             : struct RDMAMessage
      90             : {
      91             :     enum OpCode opcode;
      92             :     uint8_t length;
      93             :     union
      94             :     {
      95             :         struct RDMASetupPayload setup;
      96             :         struct RDMAFCPayload fc;
      97             :     } payload;
      98             : };
      99             : 
     100             : /**
     101             :  * "IMM" data sent with RDMA write, tells sink about send progress.
     102             :  */
     103             : struct RDMAFCImm
     104             : {
     105             :     uint32_t bytes_sent:28;
     106             :     uint32_t fcs_received:4;
     107             : };
     108             : 
     109             : namespace {
     110             : // We send a max of 28 bits worth of byte counts per RDMA write.
     111             : static const uint64_t MAX_BS = (( 2 << ( 28 - 1 )) - 1 );
     112             : // We send a max of four bits worth of fc counts per RDMA write.
     113             : static const uint16_t MAX_FC = (( 2 << ( 4 - 1 )) - 1 );
     114             : 
     115             : #ifdef _WIN32
     116             : static const uint32_t RINGBUFFER_ALLOC_RETRIES = 8;
     117             : static const uint32_t WINDOWS_CONNECTION_BACKLOG = 1024;
     118             : #endif
     119             : static const uint32_t FC_MESSAGE_FREQUENCY = 12;
     120             : }
     121             : 
     122             : /**
     123             :  * An RDMA connection implementation.
     124             :  *
     125             :  * The protocol is simple, e.g.:
     126             :  *
     127             :  *      initiator                        target
     128             :  * -----------------------------------------------------
     129             :  *                                  resolve/bind/listen
     130             :  * resolve/prepost/connect
     131             :  *                                    prepost/accept
     132             :  *     send setup         <------->    send setup
     133             :  *   wait for setup                   wait for setup
     134             :  * RDMA_WRITE_WITH_IMM WR  -------> RDMA_WRITE(DATA) WC
     135             :  *     RECV(FC) WC        <-------      SEND WR
     136             :  *                            .
     137             :  *                            .
     138             :  *                            .
     139             :  *
     140             :  * The setup phase exchanges the MR parameters of a fixed size circular buffer
     141             :  * to which remote writes are sent.  Sender tracks available space on the
     142             :  * receiver by accepting "Flow Control" messages that update the tail pointer
     143             :  * of the local "view" of the remote sink MR.
     144             :  *
     145             :  * Once setup is complete, either side may begin operations on the other's MR
     146             :  * (the initiator doesn't have to send first, as in the above example).
     147             :  *
     148             :  * If either credits or buffer space are exhausted, sender will spin waiting
     149             :  * for flow control messages.  Receiver will also not send flow control if
     150             :  * there are no credits available.
     151             :  *
     152             :  * One catch is that Collage will only monitor a single "notifier" for events
     153             :  * and we have three that need to be monitored: one for connection status
     154             :  * events (the RDMA event channel) - RDMA_CM_EVENT_DISCONNECTED in particular,
     155             :  * one for the receive completion queue (upon incoming RDMA write), and an
     156             :  * additional eventfd(2) used to keep the notifier "hot" after partial reads.
     157             :  * We leverage the feature of epoll(7) in that "If an epoll file descriptor
     158             :  * has events waiting then it will indicate as being readable".
     159             :  *
     160             :  * Quite interesting is the effect of RDMA_RING_BUFFER_SIZE_MB and
     161             :  * RDMA_SEND_QUEUE_DEPTH depending on the communication pattern.  Basically,
     162             :  * bigger doesn't necessarily equate to faster!  The defaults are suited for
     163             :  * low latency conditions and would need tuning otherwise.
     164             :  *
     165             :  * ib_write_bw
     166             :  * -----------
     167             :  *  #bytes    num iterations  BW peak[MB/sec]    BW average[MB/sec]
     168             :  * 1048576    10000           3248.10            3247.94
     169             :  *
     170             :  * netperf
     171             :  * -------
     172             :  * Send perf: 3240.72MB/s (3240.72pps)
     173             :  * Send perf: 3240.72MB/s (3240.72pps)
     174             :  * Send perf: 3240.95MB/s (3240.95pps)
     175             :  */
     176           6 : RDMAConnection::RDMAConnection( )
     177             : #ifdef _WIN32
     178             :     : _notifier()
     179             : #else
     180             :     : _notifier( -1 )
     181             : #endif
     182           6 :     , _timeout( Global::getIAttribute( Global::IATTR_RDMA_RESOLVE_TIMEOUT_MS ))
     183             :     , _rai( NULL )
     184             :     , _cm( NULL )
     185             :     , _cm_id( NULL )
     186             :     , _new_cm_id( NULL )
     187             :     , _cc( NULL )
     188             :     , _cq( NULL )
     189             :     , _pd( NULL )
     190             :     , _wcs ( 0 )
     191             :     , _readBytes( 0 )
     192             :     , _established( false )
     193             :     , _depth( 0L )
     194             :     , _writes( 0L )
     195             :     , _fcs( 0L )
     196             :     , _wcredits( 0L )
     197             :     , _fcredits( 0L )
     198             :     , _completions( 0U )
     199             :     , _msgbuf( sizeof(RDMAMessage) )
     200             :     , _sourcebuf( 0 )
     201             :     , _sourceptr( 0 )
     202             :     , _sinkbuf( IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE )
     203             :     , _sinkptr( 0 )
     204             :     , _rptr( 0UL )
     205             :     , _rbase( 0ULL )
     206          12 :     , _rkey( 0ULL )
     207             : #ifdef _WIN32
     208             :     , _availBytes( 0 )
     209             :     , _eventFlag( 0 )
     210             :     , _cmWaitObj( 0 )
     211             :     , _ccWaitObj( 0 )
     212             : #endif
     213             : {
     214             : #ifndef _WIN32
     215           6 :     _pipe_fd[0] = -1;
     216           6 :     _pipe_fd[1] = -1;
     217             : #endif
     218             : 
     219           6 :     ::memset( (void *)&_addr, 0, sizeof(_addr) );
     220           6 :     ::memset( (void *)&_serv, 0, sizeof(_serv) );
     221             : 
     222           6 :     ::memset( (void *)&_cpd, 0, sizeof(struct RDMAConnParamData) );
     223             : 
     224           6 :     ConnectionDescriptionPtr description = _getDescription();
     225           6 :     description->type = CONNECTIONTYPE_RDMA;
     226           6 :     description->bandwidth = // QDR default, report "actual" 8b/10b bandwidth
     227           6 :         ( ::ibv_rate_to_mult( IBV_RATE_40_GBPS ) * 2.5 * 1024000 / 8 ) * 0.8;
     228           6 : }
     229             : 
     230           2 : bool RDMAConnection::connect( )
     231             : {
     232           2 :     ConstConnectionDescriptionPtr description = getDescription();
     233           2 :     LBASSERT( CONNECTIONTYPE_RDMA == description->type );
     234             : 
     235           2 :     if( !isClosed() || 0u == description->port )
     236           0 :         return false;
     237             : 
     238           2 :     _cleanup( );
     239           2 :     _setState( STATE_CONNECTING );
     240             : 
     241           2 :     if( !_lookupAddress( false ) || ( NULL == _rai ))
     242             :     {
     243           0 :         LBERROR << "Failed to lookup destination address." << std::endl;
     244           0 :         goto err;
     245             :     }
     246             : 
     247           2 :     _updateInfo( _rai->ai_dst_addr );
     248             : 
     249           2 :     if( !_createNotifier( ))
     250             :     {
     251           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     252           0 :         goto err;
     253             :     }
     254             : 
     255           2 :     if( !_createEventChannel( ))
     256             :     {
     257           0 :         LBERROR << "Failed to create communication event channel." << std::endl;
     258           0 :         goto err;
     259             :     }
     260             : 
     261           2 :     if( !_createId( ))
     262             :     {
     263           0 :         LBERROR << "Failed to create communication identifier." << std::endl;
     264           0 :         goto err;
     265             :     }
     266             : 
     267           2 :     if( !_resolveAddress( ))
     268             :     {
     269           0 :         LBERROR << "Failed to resolve destination address for : "
     270           0 :             << _addr << ":" << _serv << std::endl;
     271           0 :         goto err;
     272             :     }
     273             : 
     274           2 :     _updateInfo( ::rdma_get_peer_addr( _cm_id ));
     275             : 
     276           2 :     _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
     277             : 
     278           2 :     LBVERB << "Initiating connection on " << std::dec
     279           0 :         << _device_name << ":" << (int)_cm_id->port_num
     280           0 :         << " to "
     281           0 :         << _addr << ":" << _serv
     282           2 :         << " (" << description->toString( ) << ")"
     283           6 :         << std::endl;
     284             : 
     285           2 :     if( !_initProtocol( Global::getIAttribute(
     286           2 :             Global::IATTR_RDMA_SEND_QUEUE_DEPTH )))
     287             :     {
     288           0 :         LBERROR << "Failed to initialize protocol variables." << std::endl;
     289           0 :         goto err;
     290             :     }
     291             : 
     292           2 :     if( !_initVerbs( ))
     293             :     {
     294           0 :         LBERROR << "Failed to initialize verbs." << std::endl;
     295           0 :         goto err;
     296             :     }
     297             : 
     298           2 :     if( !_createQP( ))
     299             :     {
     300           0 :         LBERROR << "Failed to create queue pair." << std::endl;
     301           0 :         goto err;
     302             :     }
     303             : 
     304           2 :     if( !_createBytesAvailableFD( ))
     305             :     {
     306           0 :         LBERROR << "Failed to create available byte notifier." << std::endl;
     307           0 :         goto err;
     308             :     }
     309             : 
     310           2 :     if( !_initBuffers( ))
     311             :     {
     312           0 :         LBERROR << "Failed to initialize ring buffers." << std::endl;
     313           0 :         goto err;
     314             :     }
     315             : 
     316           2 :     if( !_resolveRoute( ))
     317             :     {
     318           0 :         LBERROR << "Failed to resolve route to destination : "
     319           0 :             << _addr << ":" << _serv << std::endl;
     320           0 :         goto err;
     321             :     }
     322             : 
     323           2 :     if( !_postReceives( _depth ))
     324             :     {
     325           0 :         LBERROR << "Failed to pre-post receives." << std::endl;
     326           0 :         goto err;
     327             :     }
     328             : 
     329           2 :     if( !_connect( ))
     330             :     {
     331           0 :         LBERROR << "Failed to connect to destination : "
     332           0 :             << _addr << ":" << _serv << std::endl;
     333           0 :         goto err;
     334             :     }
     335             : 
     336           2 :     LBASSERT( _established );
     337             : 
     338           4 :     if(( RDMA_PROTOCOL_MAGIC != _cpd.magic ) ||
     339           2 :         ( RDMA_PROTOCOL_VERSION != _cpd.version ))
     340             :     {
     341           0 :         LBERROR << "Protocol mismatch with target : "
     342           0 :             << _addr << ":" << _serv << std::endl;
     343           0 :         goto err;
     344             :     }
     345             : 
     346           2 :     if( !_postSetup( ))
     347             :     {
     348           0 :         LBERROR << "Failed to post setup message." << std::endl;
     349           0 :         goto err;
     350             :     }
     351             : 
     352           2 :     if( !_waitRecvSetup( ))
     353             :     {
     354           0 :         LBERROR << "Failed to receive setup message." << std::endl;
     355           0 :         goto err;
     356             :     }
     357             : 
     358           2 :     LBVERB << "Connection established on " << std::dec
     359           0 :         << _device_name << ":" << (int)_cm_id->port_num
     360           0 :         << " to "
     361           0 :         << _addr << ":" << _serv
     362           2 :         << " (" << description->toString( ) << ")"
     363           6 :         << std::endl;
     364             : 
     365           2 :     _setState( STATE_CONNECTED );
     366           2 :     _updateNotifier();
     367           2 :     return true;
     368             : 
     369             : err:
     370           0 :     LBVERB << "Connection failed on " << std::dec
     371           0 :         << _device_name << ":" << (int)_cm_id->port_num
     372           0 :         << " to "
     373           0 :         << _addr << ":" << _serv
     374           0 :         << " (" << description->toString( ) << ")"
     375           0 :         << std::endl;
     376             : 
     377           0 :     close( );
     378             : 
     379           0 :     return false;
     380             : }
     381             : 
     382           2 : bool RDMAConnection::listen( )
     383             : {
     384           2 :     ConstConnectionDescriptionPtr description = getDescription();
     385           2 :     LBASSERT( CONNECTIONTYPE_RDMA == description->type );
     386             : 
     387           2 :     if( !isClosed() )
     388           0 :         return false;
     389             : 
     390           2 :     _cleanup( );
     391           2 :     _setState( STATE_CONNECTING );
     392             : 
     393           2 :     if( !_lookupAddress( true ))
     394             :     {
     395           0 :         LBERROR << "Failed to lookup local address." << std::endl;
     396           0 :         goto err;
     397             :     }
     398             : 
     399           2 :     if( NULL != _rai )
     400           2 :         _updateInfo( _rai->ai_src_addr );
     401             : 
     402           2 :     if( !_createNotifier( ))
     403             :     {
     404           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     405           0 :         goto err;
     406             :     }
     407             : 
     408           2 :     if( !_createEventChannel( ))
     409             :     {
     410           0 :         LBERROR << "Failed to create communication event channel." << std::endl;
     411           0 :         goto err;
     412             :     }
     413             : 
     414           2 :     if( !_createId( ))
     415             :     {
     416           0 :         LBERROR << "Failed to create communication identifier." << std::endl;
     417           0 :         goto err;
     418             :     }
     419             : 
     420             : #if 0
     421             :     /* NOT IMPLEMENTED */
     422             : 
     423             :     if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
     424             :             (void *)&ONE, sizeof(ONE) ))
     425             :     {
     426             :         LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
     427             :         goto err;
     428             :     }
     429             : #endif
     430             : 
     431           2 :     if( !_bindAddress( ))
     432             :     {
     433           0 :         LBERROR << "Failed to bind to local address : "
     434           0 :             << _addr << ":" << _serv << std::endl;
     435           0 :         goto err;
     436             :     }
     437             : 
     438           2 :     _updateInfo( ::rdma_get_local_addr( _cm_id ));
     439             : 
     440             : #ifdef _WIN32
     441             :     if( !_listen( WINDOWS_CONNECTION_BACKLOG ))
     442             : #else
     443           2 :     if( !_listen( SOMAXCONN ))
     444             : #endif
     445             :     {
     446           0 :         LBERROR << "Failed to listen on bound address : "
     447           0 :             << _addr << ":" << _serv << std::endl;
     448           0 :         goto err;
     449             :     }
     450             : 
     451           2 :     if( NULL != _cm_id->verbs )
     452           0 :         _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
     453             : 
     454           2 :     LBVERB << "Listening on " << std::dec
     455           0 :         << _device_name << ":" << (int)_cm_id->port_num
     456           0 :         << " at "
     457           0 :         << _addr << ":" << _serv
     458           2 :         << " (" << description->toString( ) << ")"
     459           6 :         << std::endl;
     460             : 
     461           2 :     _setState( STATE_LISTENING );
     462           2 :     return true;
     463             : 
     464             : err:
     465           0 :     close();
     466           0 :     return false;
     467             : }
     468             : 
     469           2 : void RDMAConnection::acceptNB( ) { /* NOP */ }
     470             : 
     471           2 : ConnectionPtr RDMAConnection::acceptSync( )
     472             : {
     473           2 :     RDMAConnection *newConnection = NULL;
     474             : 
     475           2 :     if( !isListening() )
     476             :     {
     477           0 :         LBERROR << "Connection not in listening state." << std::endl;
     478           0 :         goto out;
     479             :     }
     480             : 
     481           2 :     if( !_waitForCMEvent( RDMA_CM_EVENT_CONNECT_REQUEST ))
     482             :     {
     483           0 :         LBERROR << "Failed to receive valid connect request." << std::endl;
     484           0 :         goto out;
     485             :     }
     486             : 
     487           2 :     LBASSERT( NULL != _new_cm_id );
     488             : 
     489           2 :     newConnection = new RDMAConnection( );
     490             : 
     491           2 :     if( !newConnection->_finishAccept( _new_cm_id, _cpd ))
     492             :     {
     493           0 :         delete newConnection;
     494           0 :         newConnection = NULL;
     495             :     }
     496             : 
     497             : out:
     498           2 :     _new_cm_id = NULL;
     499           2 :     _updateNotifier();
     500           2 :     return newConnection;
     501             : }
     502             : 
     503      405650 : void RDMAConnection::readNB( void*, const uint64_t ) { /* NOP */ }
     504             : 
     505      405650 : int64_t RDMAConnection::readSync( void* buffer, const uint64_t bytes,
     506             :         const bool block )
     507             : {
     508      405650 :     lunchbox::Clock clock;
     509      405650 :     const int64_t start = clock.getTime64( );
     510      405650 :     const uint32_t timeout = Global::getTimeout( );
     511      405650 :     eventset events;
     512      405650 :     uint64_t available_bytes = 0ULL;
     513      405650 :     uint32_t bytes_taken = 0UL;
     514      405650 :     bool extra_event = false;
     515             : 
     516      405650 :     if( !isConnected() )
     517           0 :         goto err;
     518             : 
     519             : //    LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
     520             : //       << std::endl;
     521             : 
     522      405650 :     _stats.reads++;
     523             : 
     524             : retry:
     525      410158 :     if( !_checkDisconnected( events ))
     526             :     {
     527           0 :         LBERROR << "Error while checking event state." << std::endl;
     528           0 :         goto err;
     529             :     }
     530             : 
     531      410158 :     if( events.test( CQ_EVENT ))
     532             :     {
     533       41185 :         if( !_rearmCQ( ))
     534             :         {
     535           0 :             LBERROR << "Error while rearming receive channel." << std::endl;
     536           0 :             goto err;
     537             :         }
     538             :     }
     539             : 
     540             :     // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
     541      410158 :     if( !_checkCQ( events.test( CQ_EVENT )))
     542             :     {
     543           0 :         LBERROR << "Error while polling completion queues." << std::endl;
     544           0 :         goto err;
     545             :     }
     546             : 
     547      410158 :     LBASSERT( _fcredits >= 0L );
     548             : 
     549      410158 :     if( _established && _needFC( ) && ( 0L == _fcredits ))
     550             :     {
     551           0 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     552             :         {
     553           0 :             if(( clock.getTime64( ) - start ) > timeout )
     554             :             {
     555           0 :                 LBERROR << "Timed out trying to acquire credit." << std::endl;
     556           0 :                 goto err;
     557             :             }
     558             :         }
     559             : 
     560             :         //LBWARN << "No credit for flow control." << std::endl;
     561           0 :         lunchbox::Thread::yield( );
     562           0 :         _stats.no_credits_fc++;
     563           0 :         goto retry;
     564             :     }
     565             : 
     566             :     // "Note that an extra event may be triggered without having a
     567             :     // corresponding completion entry in the CQ." (per ibv_get_cq_event(3))
     568      410158 :     if( _established && !events.test( BUF_EVENT ))
     569             :     {
     570             :         // Special case: If LocalNode is reading the length part of a message
     571             :         // it will ignore this zero return and restart the select.
     572        4508 :         if( extra_event && !block )
     573             :         {
     574           0 :             _updateNotifier();
     575           0 :             return 0LL;
     576             :         }
     577             : 
     578        4508 :         extra_event = true;
     579        4508 :         goto retry;
     580             :     }
     581             : 
     582      405650 :     if( events.test( BUF_EVENT ))
     583             :     {
     584      405648 :         available_bytes = _getAvailableBytes( );
     585      405648 :         if( 0ULL == available_bytes )
     586             :         {
     587           0 :             LBERROR << "Error while reading from event fd." << std::endl;
     588           0 :             goto err;
     589             :         }
     590             :     }
     591             : 
     592             :     // Modifies _sinkptr.TAIL
     593      405650 :     bytes_taken = _drain( buffer, bytes );
     594             : 
     595      405650 :     if( 0UL == bytes_taken )
     596             :     {
     597           2 :         if( _sinkptr.isEmpty( ) && !_established )
     598             :         {
     599          10 :             LBINFO << "Got EOF, closing " << getDescription( )->toString( )
     600           8 :                 << std::endl;
     601           2 :             goto err;
     602             :         }
     603             : 
     604           0 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     605             :         {
     606           0 :             if(( clock.getTime64( ) - start ) > timeout )
     607             :             {
     608           0 :                 LBERROR << "Timed out trying to drain buffer." << std::endl;
     609           0 :                 goto err;
     610             :             }
     611             :         }
     612             : 
     613             :         //LBWARN << "Sink buffer empty." << std::endl;
     614           0 :         lunchbox::Thread::yield( );
     615           0 :         _stats.buffer_empty++;
     616           0 :         goto retry;
     617             :     }
     618             : 
     619             :     // Put back what wasn't taken (ensure the master notifier stays "hot").
     620      405648 :     if( available_bytes > bytes_taken )
     621      405074 :         _incrAvailableBytes( available_bytes - bytes_taken );
     622             : #ifdef _WIN32
     623             :     else
     624             :         _updateNotifier();
     625             : #endif
     626             : 
     627      405648 :     _readBytes += bytes_taken;
     628             : 
     629      405648 :     if( _established && _needFC( ) && !_postFC( ))
     630           0 :         LBWARN << "Error while posting flow control message." << std::endl;
     631             : 
     632             : //    LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
     633             : //       << " took " << bytes_taken << " bytes"
     634             : //       << " (" << _sinkptr.available( ) << " still available)" << std::endl;
     635             : 
     636      405648 :     return bytes_taken;
     637             : 
     638             : err:
     639           2 :     close( );
     640             : 
     641           2 :     return -1LL;
     642             : }
     643             : 
     644      408041 : int64_t RDMAConnection::write( const void* buffer, const uint64_t bytes )
     645             : {
     646      408041 :     lunchbox::Clock clock;
     647      408041 :     const int64_t start = clock.getTime64( );
     648      408041 :     const uint32_t timeout = Global::getTimeout( );
     649      408041 :     eventset events;
     650      408041 :     const uint32_t can_put = std::min( bytes, MAX_BS );
     651             :     uint32_t bytes_put;
     652             : 
     653      408041 :     if( !isConnected() )
     654           0 :         goto err;
     655             : 
     656             : //    LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
     657             : //        << std::endl;
     658             : 
     659      408041 :     _stats.writes++;
     660             : 
     661             : retry:
     662      793520 :     if( !_checkDisconnected( events ))
     663             :     {
     664           0 :         LBERROR << "Error while checking connection state." << std::endl;
     665           0 :         goto err;
     666             :     }
     667             : 
     668      793520 :     if( !_established )
     669             :     {
     670           0 :         LBWARN << "Disconnected in write." << std::endl;
     671           0 :         goto err;
     672             :     }
     673             : 
     674             :     // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
     675      793520 :     if( !_checkCQ( false ))
     676             :     {
     677           0 :         LBERROR << "Error while polling completion queues." << std::endl;
     678           0 :         goto err;
     679             :     }
     680             : 
     681      793520 :     LBASSERT( _wcredits >= 0L );
     682             : 
     683      793520 :     if( 0L == _wcredits )
     684             :     {
     685           0 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     686             :         {
     687           0 :             if(( clock.getTime64( ) - start ) > timeout )
     688             :             {
     689           0 :                 LBERROR << "Timed out trying to acquire credit." << std::endl;
     690           0 :                 goto err;
     691             :             }
     692             :         }
     693             : 
     694             :         //LBWARN << "No credits for RDMA." << std::endl;
     695           0 :         lunchbox::Thread::yield( );
     696           0 :         _stats.no_credits_rdma++;
     697           0 :         goto retry;
     698             :     }
     699             : 
     700             :     // Modifies _sourceptr.HEAD
     701      793520 :     bytes_put = _fill( buffer, can_put );
     702             : 
     703      793520 :     if( 0UL == bytes_put )
     704             :     {
     705      385479 :         if( LB_TIMEOUT_INDEFINITE != timeout )
     706             :         {
     707      385479 :             if(( clock.getTime64( ) - start ) > timeout )
     708             :             {
     709           0 :                 LBERROR << "Timed out trying to fill buffer." << std::endl;
     710           0 :                 goto err;
     711             :             }
     712             :         }
     713             : 
     714             :         //LBWARN << "Source buffer full." << std::endl;
     715      385479 :         lunchbox::Thread::yield( );
     716      385479 :         if( _sourceptr.isFull( ) || _rptr.isFull( ))
     717      385479 :             _stats.buffer_full++;
     718      385479 :         goto retry;
     719             :     }
     720             : 
     721             :     // Modifies _sourceptr.MIDDLE & _rptr.HEAD
     722      408041 :     if( !_postRDMAWrite( ))
     723             :     {
     724           0 :         LBERROR << "Error while posting RDMA write." << std::endl;
     725           0 :         goto err;
     726             :     }
     727             : 
     728             : //    LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
     729             : //       << " put " << bytes_put << " bytes" << std::endl;
     730             : 
     731      408041 :     return bytes_put;
     732             : 
     733             : err:
     734           0 :     return -1LL;
     735             : }
     736             : 
     737          18 : RDMAConnection::~RDMAConnection( )
     738             : {
     739           6 :     _close( );
     740          12 : }
     741             : 
     742             : ////////////////////////////////////////////////////////////////////////////////
     743             : 
     744          12 : void RDMAConnection::_close( )
     745             : {
     746          12 :     lunchbox::ScopedWrite close_mutex( _poll_lock );
     747             : 
     748          12 :     if( !isClosed() )
     749             :     {
     750           6 :         LBASSERT( !isClosing() );
     751           6 :         _setState( STATE_CLOSING );
     752             : 
     753           6 :         if( _established && ::rdma_disconnect( _cm_id ))
     754           0 :             LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
     755             : 
     756           6 :         _setState( STATE_CLOSED );
     757           6 :         _cleanup( );
     758          12 :     }
     759          12 : }
     760             : 
     761          10 : void RDMAConnection::_cleanup( )
     762             : {
     763          10 :     LBASSERT( isClosed() );
     764             : 
     765          10 :     _sourcebuf.clear( );
     766          10 :     _sinkbuf.clear( );
     767          10 :     _msgbuf.clear( );
     768             : 
     769          10 :     if( _completions > 0U )
     770             :     {
     771           2 :         ::ibv_ack_cq_events( _cq, _completions );
     772           2 :         _completions = 0U;
     773             :     }
     774             : 
     775          10 :     delete[] _wcs;
     776          10 :     _wcs = 0;
     777             : 
     778             : #ifdef _WIN32
     779             :     if ( _ccWaitObj )
     780             :         UnregisterWait( _ccWaitObj );
     781             :     _ccWaitObj = 0;
     782             : 
     783             :     if ( _cmWaitObj )
     784             :         UnregisterWait( _cmWaitObj );
     785             :     _cmWaitObj = 0;
     786             : #endif
     787             : 
     788          10 :     if( NULL != _cm_id )
     789           6 :         ::rdma_destroy_ep( _cm_id );
     790          10 :     _cm_id = NULL;
     791             : 
     792          10 :     if(( NULL != _cq ) && ::rdma_seterrno( ::ibv_destroy_cq( _cq )))
     793           0 :         LBWARN << "ibv_destroy_cq : " << lunchbox::sysError << std::endl;
     794          10 :     _cq = NULL;
     795             : 
     796          10 :     if(( NULL != _cc ) && ::rdma_seterrno( ::ibv_destroy_comp_channel( _cc )))
     797           0 :         LBWARN << "ibv_destroy_comp_channel : " << lunchbox::sysError
     798           0 :             << std::endl;
     799          10 :     _cc = NULL;
     800             : 
     801          10 :     if(( NULL != _pd ) && ::rdma_seterrno( ::ibv_dealloc_pd( _pd )))
     802           0 :         LBWARN << "ibv_dealloc_pd : " << lunchbox::sysError << std::endl;
     803          10 :     _pd = NULL;
     804             : 
     805          10 :     if( NULL != _cm )
     806           6 :         ::rdma_destroy_event_channel( _cm );
     807          10 :     _cm = NULL;
     808             : 
     809          10 :     if( NULL != _rai )
     810           4 :         ::rdma_freeaddrinfo( _rai );
     811          10 :     _rai = NULL;
     812             : 
     813          10 :     _rptr = 0UL;
     814          10 :     _rbase = _rkey = 0ULL;
     815             : #ifndef _WIN32
     816          10 :     if(( _notifier >= 0 ) && TEMP_FAILURE_RETRY( ::close( _notifier )))
     817           0 :         LBWARN << "close : " << lunchbox::sysError << std::endl;
     818          10 :     _notifier = -1;
     819             : #endif
     820          10 : }
     821             : 
     822           2 : bool RDMAConnection::_finishAccept( struct rdma_cm_id *new_cm_id,
     823             :         const RDMAConnParamData &cpd )
     824             : {
     825           2 :     LBASSERT( isClosed() );
     826           2 :     _setState( STATE_CONNECTING );
     827             : 
     828           2 :     LBASSERT( NULL != new_cm_id );
     829             : 
     830           2 :     _cm_id = new_cm_id;
     831             : 
     832             :     {
     833             :         // FIXME : RDMA CM appears to send up invalid addresses when receiving
     834             :         // connections that use a different protocol than what was bound.  E.g.
     835             :         // if an IPv6 listener gets an IPv4 connection then the sa_family
     836             :         // will be AF_INET6 but the actual data is struct sockaddr_in.  Example:
     837             :         //
     838             :         // 0000000: 0a00 bc10 c0a8 b01a 0000 0000 0000 0000  ................
     839             :         //
     840             :         // However, in the reverse case, when an IPv4 listener gets an IPv6
     841             :         // connection not only is the address family incorrect, but the actual
     842             :         // IPv6 address is only partially there:
     843             :         //
     844             :         // 0000000: 0200 bc11 0000 0000 fe80 0000 0000 0000  ................
     845             :         // 0000010: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     846             :         // 0000020: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     847             :         // 0000030: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     848             :         //
     849             :         // Surely seems to be a bug in RDMA CM.
     850             : 
     851             :         union
     852             :         {
     853             :             struct sockaddr     addr;
     854             :             struct sockaddr_in  sin;
     855             :             struct sockaddr_in6 sin6;
     856             :             struct sockaddr_storage storage;
     857             :         } sss;
     858             : 
     859             :         // Make a copy since we might change it.
     860             :         //sss.storage = _cm_id->route.addr.dst_storage;
     861             :         ::memcpy( (void *)&sss.storage,
     862           2 :             (const void *)::rdma_get_peer_addr( _cm_id ),
     863           2 :             sizeof(struct sockaddr_storage) );
     864             : #ifndef _WIN32
     865           4 :         if(( AF_INET == sss.storage.ss_family ) &&
     866           4 :            ( sss.sin6.sin6_addr.s6_addr32[0] != 0 ||
     867           4 :              sss.sin6.sin6_addr.s6_addr32[1] != 0 ||
     868           4 :              sss.sin6.sin6_addr.s6_addr32[2] != 0 ||
     869           2 :              sss.sin6.sin6_addr.s6_addr32[3] != 0 ))
     870             :         {
     871           0 :             LBWARN << "IPv6 address detected but likely invalid!" << std::endl;
     872           0 :             sss.storage.ss_family = AF_INET6;
     873             :         }
     874             :         else
     875             : #endif
     876           2 :         if(( AF_INET6 == sss.storage.ss_family ) &&
     877           0 :                 ( INADDR_ANY != sss.sin.sin_addr.s_addr ))
     878             :         {
     879           0 :             sss.storage.ss_family = AF_INET;
     880             :         }
     881             : 
     882           2 :         _updateInfo( &sss.addr );
     883             :     }
     884             : 
     885           2 :     _device_name = ::ibv_get_device_name( _cm_id->verbs->device );
     886             : 
     887           2 :     LBVERB << "Connection initiated on " << std::dec
     888           0 :         << _device_name << ":" << (int)_cm_id->port_num
     889           0 :         << " from "
     890           0 :         << _addr << ":" << _serv
     891           2 :         << " (" << getDescription()->toString( ) << ")"
     892           6 :         << std::endl;
     893             : 
     894           4 :     if(( RDMA_PROTOCOL_MAGIC != cpd.magic ) ||
     895           2 :         ( RDMA_PROTOCOL_VERSION != cpd.version ))
     896             :     {
     897           0 :         LBERROR << "Protocol mismatch with initiator : "
     898           0 :             << _addr << ":" << _serv << std::endl;
     899           0 :         goto err_reject;
     900             :     }
     901             : 
     902           2 :     if( !_createNotifier( ))
     903             :     {
     904           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     905           0 :         goto err_reject;
     906             :     }
     907             : 
     908           2 :     if( !_createEventChannel( ))
     909             :     {
     910           0 :         LBERROR << "Failed to create event channel." << std::endl;
     911           0 :         goto err_reject;
     912             :     }
     913             : 
     914           2 :     if( !_migrateId( ))
     915             :     {
     916           0 :         LBERROR << "Failed to migrate communication identifier." << std::endl;
     917           0 :         goto err_reject;
     918             :     }
     919             : 
     920           2 :     if( !_initProtocol( cpd.depth ))
     921             :     {
     922           0 :         LBERROR << "Failed to initialize protocol variables." << std::endl;
     923           0 :         goto err_reject;
     924             :     }
     925             : 
     926           2 :     if( !_initVerbs( ))
     927             :     {
     928           0 :         LBERROR << "Failed to initialize verbs." << std::endl;
     929           0 :         goto err_reject;
     930             :     }
     931             : 
     932           2 :     if( !_createQP( ))
     933             :     {
     934           0 :         LBERROR << "Failed to create queue pair." << std::endl;
     935           0 :         goto err_reject;
     936             :     }
     937             : 
     938           2 :     if( !_createBytesAvailableFD( ))
     939             :     {
     940           0 :         LBERROR << "Failed to create available byte notifier." << std::endl;
     941           0 :         goto err_reject;
     942             :     }
     943             : 
     944           2 :     if( !_initBuffers( ))
     945             :     {
     946           0 :         LBERROR << "Failed to initialize ring buffers." << std::endl;
     947           0 :         goto err_reject;
     948             :     }
     949             : 
     950           2 :     if( !_postReceives( _depth ))
     951             :     {
     952           0 :         LBERROR << "Failed to pre-post receives." << std::endl;
     953           0 :         goto err_reject;
     954             :     }
     955             : 
     956           2 :     if( !_accept( ))
     957             :     {
     958           0 :         LBERROR << "Failed to accept remote connection from : "
     959           0 :             << _addr << ":" << _serv << std::endl;
     960           0 :         goto err;
     961             :     }
     962             : 
     963           2 :     LBASSERT( _established );
     964             : 
     965           2 :     if( !_waitRecvSetup( ))
     966             :     {
     967           0 :         LBERROR << "Failed to receive setup message." << std::endl;
     968           0 :         goto err;
     969             :     }
     970             : 
     971           2 :     if( !_postSetup( ))
     972             :     {
     973           0 :         LBERROR << "Failed to post setup message." << std::endl;
     974           0 :         goto err;
     975             :     }
     976             : 
     977           2 :     LBVERB << "Connection accepted on " << std::dec
     978           0 :         << _device_name << ":" << (int)_cm_id->port_num
     979           0 :         << " from "
     980           0 :         << _addr << ":" << _serv
     981           2 :         << " (" << getDescription()->toString( ) << ")"
     982           6 :         << std::endl;
     983             : 
     984           2 :     _setState( STATE_CONNECTED );
     985           2 :     _updateNotifier();
     986           2 :     return true;
     987             : 
     988             : err_reject:
     989           0 :     LBWARN << "Rejecting connection from remote address : "
     990           0 :         << _addr << ":" << _serv << std::endl;
     991             : 
     992           0 :     if( !_reject( ))
     993           0 :         LBWARN << "Failed to issue connection reject." << std::endl;
     994             : 
     995             : err:
     996           0 :     close( );
     997             : 
     998           0 :     return false;
     999             : }
    1000             : 
    1001           4 : bool RDMAConnection::_lookupAddress( const bool passive )
    1002             : {
    1003             :     struct rdma_addrinfo hints;
    1004           4 :     char *node = NULL, *service = NULL;
    1005           4 :     std::string s;
    1006             : 
    1007           4 :     ::memset( (void *)&hints, 0, sizeof(struct rdma_addrinfo) );
    1008             :     //hints.ai_flags |= RAI_NOROUTE;
    1009           4 :     if( passive )
    1010           2 :         hints.ai_flags |= RAI_PASSIVE;
    1011             : 
    1012           8 :     ConstConnectionDescriptionPtr description = getDescription();
    1013           4 :     const std::string &hostname = description->getHostname( );
    1014           4 :     if( !hostname.empty( ))
    1015           4 :         node = const_cast< char * >( hostname.c_str( ));
    1016             : 
    1017           4 :     if( 0u != description->port )
    1018             :     {
    1019           3 :         std::stringstream ss;
    1020           3 :         ss << description->port;
    1021           3 :         s = ss.str( );
    1022           3 :         service = const_cast< char * >( s.c_str( ));
    1023             :     }
    1024             : 
    1025           4 :     if(( NULL != node ) && ::rdma_getaddrinfo( node, service, &hints, &_rai ))
    1026             :     {
    1027           0 :         LBERROR << "rdma_getaddrinfo : " << lunchbox::sysError << std::endl;
    1028           0 :         goto err;
    1029             :     }
    1030             : 
    1031           4 :     if(( NULL != _rai ) && ( NULL != _rai->ai_next ))
    1032           0 :         LBWARN << "Multiple getaddrinfo results, using first." << std::endl;
    1033             : 
    1034           4 :     if(( NULL != _rai ) && ( _rai->ai_connect_len > 0 ))
    1035           0 :         LBWARN << "WARNING : ai_connect data specified!" << std::endl;
    1036             : 
    1037           4 :     return true;
    1038             : 
    1039             : err:
    1040           4 :     return false;
    1041             : }
    1042             : 
    1043          10 : void RDMAConnection::_updateInfo( struct sockaddr *addr )
    1044             : {
    1045          10 :     int salen = sizeof(struct sockaddr);
    1046          10 :     bool is_unspec = false;
    1047             : 
    1048          10 :     if( AF_INET == addr->sa_family )
    1049             :     {
    1050             :         struct sockaddr_in *sin =
    1051          10 :             reinterpret_cast< struct sockaddr_in * >( addr );
    1052          10 :         is_unspec = ( INADDR_ANY == sin->sin_addr.s_addr );
    1053          10 :         salen = sizeof(struct sockaddr_in);
    1054             :     }
    1055             :     // TODO: IPv6 for WIndows
    1056             : #ifndef _WIN32
    1057           0 :     else if( AF_INET6 == addr->sa_family )
    1058             :     {
    1059             :         struct sockaddr_in6 *sin6 =
    1060           0 :             reinterpret_cast< struct sockaddr_in6 * >( addr );
    1061             : 
    1062           0 :         is_unspec = ( sin6->sin6_addr.s6_addr32[0] == 0 &&
    1063           0 :                       sin6->sin6_addr.s6_addr32[1] == 0 &&
    1064           0 :                       sin6->sin6_addr.s6_addr32[2] == 0 &&
    1065           0 :                       sin6->sin6_addr.s6_addr32[3] == 0 );
    1066           0 :         salen = sizeof(struct sockaddr_in6);
    1067             :     }
    1068             : #endif
    1069             :     int err;
    1070          10 :     if(( err = ::getnameinfo( addr, salen, _addr, sizeof(_addr),
    1071          10 :             _serv, sizeof(_serv), NI_NUMERICHOST | NI_NUMERICSERV )))
    1072           0 :         LBWARN << "Name info lookup failed : " << err << std::endl;
    1073             : 
    1074          10 :     if( is_unspec )
    1075           0 :         ::gethostname( _addr, NI_MAXHOST );
    1076             : 
    1077          10 :     ConnectionDescriptionPtr description = _getDescription();
    1078          10 :     if( description->getHostname( ).empty( ))
    1079           2 :         description->setHostname( _addr );
    1080          10 :     if( 0u == description->port )
    1081           4 :         description->port = atoi( _serv );
    1082          10 : }
    1083             : 
    1084           6 : bool RDMAConnection::_createEventChannel( )
    1085             : {
    1086           6 :     LBASSERT( NULL == _cm );
    1087             : 
    1088           6 :     _cm = ::rdma_create_event_channel( );
    1089           6 :     if( NULL == _cm )
    1090             :     {
    1091           0 :         LBERROR << "rdma_create_event_channel : " << lunchbox::sysError <<
    1092           0 :             std::endl;
    1093           0 :         goto err;
    1094             :     }
    1095             : 
    1096             : #ifdef _WIN32
    1097             :     if ( !RegisterWaitForSingleObject(
    1098             :             &_cmWaitObj,
    1099             :             _cm->channel.Event,
    1100             :             WAITORTIMERCALLBACK( &_triggerNotifierCM ),
    1101             :             this,
    1102             :             INFINITE,
    1103             :             WT_EXECUTEINWAITTHREAD ))
    1104             :     {
    1105             :         LBERROR << "RegisterWaitForSingleObject : " << co::base::sysError
    1106             :                 << std::endl;
    1107             :         goto err;
    1108             :     }
    1109             : #else
    1110             :     struct epoll_event evctl;
    1111             : 
    1112           6 :     LBASSERT( _notifier >= 0 );
    1113             : 
    1114             :     // Use the CM fd to signal Collage of connection events.
    1115           6 :     ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
    1116           6 :     evctl.events = EPOLLIN;
    1117           6 :     evctl.data.fd = _cm->fd;
    1118           6 :     if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
    1119             :     {
    1120           0 :         LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
    1121           0 :         goto err;
    1122             :     }
    1123             : #endif
    1124             : 
    1125           6 :     return true;
    1126             : 
    1127             : err:
    1128           0 :     return false;
    1129             : }
    1130             : 
    1131           4 : bool RDMAConnection::_createId( )
    1132             : {
    1133           4 :     LBASSERT( NULL != _cm );
    1134           4 :     LBASSERT( NULL == _cm_id );
    1135             : 
    1136           4 :     if( ::rdma_create_id( _cm, &_cm_id, NULL, RDMA_PS_TCP ))
    1137             :     {
    1138           0 :         LBERROR << "rdma_create_id : " << lunchbox::sysError << std::endl;
    1139           0 :         goto err;
    1140             :     }
    1141             : 
    1142           4 :     return true;
    1143             : 
    1144             : err:
    1145           0 :     return false;
    1146             : }
    1147             : 
    1148           4 : bool RDMAConnection::_initVerbs( )
    1149             : {
    1150           4 :     LBASSERT( NULL != _cm_id );
    1151           4 :     LBASSERT( NULL != _cm_id->verbs );
    1152             : 
    1153           4 :     LBASSERT( NULL == _pd );
    1154             : 
    1155             :     // Allocate protection domain.
    1156           4 :     _pd = ::ibv_alloc_pd( _cm_id->verbs );
    1157           4 :     if( NULL == _pd )
    1158             :     {
    1159           0 :         LBERROR << "ibv_alloc_pd : " << lunchbox::sysError << std::endl;
    1160           0 :         goto err;
    1161             :     }
    1162             : 
    1163           4 :     LBASSERT( NULL == _cc );
    1164             : 
    1165             :     // Create completion channel.
    1166           4 :     _cc = ::ibv_create_comp_channel( _cm_id->verbs );
    1167           4 :     if( NULL == _cc )
    1168             :     {
    1169           0 :         LBERROR << "ibv_create_comp_channel : " << lunchbox::sysError
    1170           0 :             << std::endl;
    1171           0 :         goto err;
    1172             :     }
    1173             : 
    1174             : #ifdef _WIN32
    1175             :     if ( !RegisterWaitForSingleObject(
    1176             :         &_ccWaitObj,
    1177             :         _cc->comp_channel.Event,
    1178             :         WAITORTIMERCALLBACK( &_triggerNotifierCQ ),
    1179             :         this,
    1180             :         INFINITE,
    1181             :         WT_EXECUTEINWAITTHREAD ))
    1182             :     {
    1183             :         LBERROR << "RegisterWaitForSingleObject : " << lunchbox::sysError
    1184             :             << std::endl;
    1185             :         goto err;
    1186             :     }
    1187             : #else
    1188           4 :     LBASSERT( _notifier >= 0 );
    1189             : 
    1190             :     // Use the completion channel fd to signal Collage of RDMA writes received.
    1191             :     struct epoll_event evctl;
    1192           4 :     ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
    1193           4 :     evctl.events = EPOLLIN;
    1194           4 :     evctl.data.fd = _cc->fd;
    1195           4 :     if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
    1196             :     {
    1197           0 :         LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
    1198           0 :         goto err;
    1199             :     }
    1200             : #endif
    1201             : 
    1202           4 :     LBASSERT( NULL == _cq );
    1203             : 
    1204             :     // Create a single completion queue for both sends & receives */
    1205           4 :     _cq = ::ibv_create_cq( _cm_id->verbs, _depth * 2, NULL, _cc, 0 );
    1206           4 :     if( NULL == _cq )
    1207             :     {
    1208           0 :         LBERROR << "ibv_create_cq : " << lunchbox::sysError << std::endl;
    1209           0 :         goto err;
    1210             :     }
    1211             : 
    1212             :     // Request IBV_SEND_SOLICITED events only (i.e. RDMA writes, not FC)
    1213           4 :     if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
    1214             :     {
    1215           0 :         LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
    1216           0 :         goto err;
    1217             :     }
    1218             : 
    1219           4 :     _wcs = new struct ibv_wc[ _depth ];
    1220           4 :     return true;
    1221             : 
    1222             : err:
    1223           0 :     return false;
    1224             : }
    1225             : 
    1226           4 : bool RDMAConnection::_createQP( )
    1227             : {
    1228             :     struct ibv_qp_init_attr init_attr;
    1229             : 
    1230           4 :     LBASSERT( _depth > 0 );
    1231           4 :     LBASSERT( NULL != _cm_id );
    1232           4 :     LBASSERT( NULL != _pd );
    1233           4 :     LBASSERT( NULL != _cq );
    1234             : 
    1235           4 :     ::memset( (void *)&init_attr, 0, sizeof(struct ibv_qp_init_attr) );
    1236           4 :     init_attr.qp_type = IBV_QPT_RC;
    1237           4 :     init_attr.cap.max_send_wr = _depth;
    1238           4 :     init_attr.cap.max_recv_wr = _depth;
    1239           4 :     init_attr.cap.max_recv_sge = 1;
    1240           4 :     init_attr.cap.max_send_sge = 1;
    1241           4 :     init_attr.recv_cq = _cq;
    1242           4 :     init_attr.send_cq = _cq;
    1243           4 :     init_attr.sq_sig_all = 1; // i.e. always IBV_SEND_SIGNALED
    1244             : 
    1245             :     // Create queue pair.
    1246           4 :     if( ::rdma_create_qp( _cm_id, _pd, &init_attr ))
    1247             :     {
    1248           0 :         LBERROR << "rdma_create_qp : " << lunchbox::sysError << std::endl;
    1249           0 :         goto err;
    1250             :     }
    1251             : 
    1252           4 :     LBVERB << "RDMA QP caps : " << std::dec <<
    1253           0 :         init_attr.cap.max_recv_wr << " receives, " <<
    1254           4 :         init_attr.cap.max_send_wr << " sends, " << std::endl;
    1255             : 
    1256           4 :     return true;
    1257             : 
    1258             : err:
    1259           0 :     return false;
    1260             : }
    1261             : 
    1262           4 : bool RDMAConnection::_initBuffers( )
    1263             : {
    1264           4 :     const size_t rbs = 1024 * 1024 *
    1265           8 :         Global::getIAttribute( Global::IATTR_RDMA_RING_BUFFER_SIZE_MB );
    1266             : 
    1267           4 :     LBASSERT( _depth > 0 );
    1268           4 :     LBASSERT( NULL != _pd );
    1269             : 
    1270           4 :     if( 0 == rbs )
    1271             :     {
    1272           0 :         LBERROR << "Invalid RDMA ring buffer size." << std::endl;
    1273           0 :         goto err;
    1274             :     }
    1275             : 
    1276           4 :     if( !_sourcebuf.resize( _pd, rbs ))
    1277             :     {
    1278           0 :         LBERROR << "Failed to resize source buffer." << std::endl;
    1279           0 :         goto err;
    1280             :     }
    1281             : 
    1282           4 :     if( !_sinkbuf.resize( _pd, rbs ))
    1283             :     {
    1284           0 :         LBERROR << "Failed to resize sink buffer." << std::endl;
    1285           0 :         goto err;
    1286             :     }
    1287             : 
    1288           4 :     _sourceptr.clear( uint32_t( _sourcebuf.getSize( )));
    1289           4 :     _sinkptr.clear( uint32_t( _sinkbuf.getSize( )));
    1290             : 
    1291             :     // Need enough space for both sends and receives.
    1292           4 :     if( !_msgbuf.resize( _pd, _depth * 2 ))
    1293             :     {
    1294           0 :         LBERROR << "Failed to resize message buffer pool." << std::endl;
    1295           0 :         goto err;
    1296             :     }
    1297             : 
    1298           4 :     return true;
    1299             : 
    1300             : err:
    1301           0 :     return false;
    1302             : }
    1303             : 
    1304           2 : bool RDMAConnection::_resolveAddress( )
    1305             : {
    1306           2 :     LBASSERT( NULL != _cm_id );
    1307           2 :     LBASSERT( NULL != _rai );
    1308             : 
    1309           2 :     if( ::rdma_resolve_addr( _cm_id, _rai->ai_src_addr, _rai->ai_dst_addr,
    1310           2 :             _timeout ))
    1311             :     {
    1312           0 :         LBERROR << "rdma_resolve_addr : " << lunchbox::sysError << std::endl;
    1313           0 :         goto err;
    1314             :     }
    1315             : 
    1316           2 :     return _waitForCMEvent( RDMA_CM_EVENT_ADDR_RESOLVED );
    1317             : 
    1318             : err:
    1319           0 :     return false;
    1320             : }
    1321             : 
    1322           2 : bool RDMAConnection::_resolveRoute( )
    1323             : {
    1324           2 :     LBASSERT( NULL != _cm_id );
    1325           2 :     LBASSERT( NULL != _rai );
    1326             : 
    1327           4 :     if(( IBV_TRANSPORT_IB == _cm_id->verbs->device->transport_type ) &&
    1328           2 :             ( _rai->ai_route_len > 0 ))
    1329             :     {
    1330             : #ifdef _WIN32
    1331             :         if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
    1332             : #else
    1333           0 :         if( ::rdma_set_option( _cm_id, RDMA_OPTION_IB, RDMA_OPTION_IB_PATH,
    1334             : #endif
    1335           0 :                 _rai->ai_route, _rai->ai_route_len ))
    1336             :         {
    1337           0 :             LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
    1338           0 :             goto err;
    1339             :         }
    1340             : 
    1341             :         // rdma_resolve_route not required (TODO : is this really true?)
    1342           0 :         return true;
    1343             :     }
    1344             : 
    1345           2 :     if( ::rdma_resolve_route( _cm_id, _timeout ))
    1346             :     {
    1347           0 :         LBERROR << "rdma_resolve_route : " << lunchbox::sysError << std::endl;
    1348           0 :         goto err;
    1349             :     }
    1350             : 
    1351           2 :     return _waitForCMEvent( RDMA_CM_EVENT_ROUTE_RESOLVED );
    1352             : 
    1353             : err:
    1354           0 :     return false;
    1355             : }
    1356             : 
    1357           2 : bool RDMAConnection::_connect( )
    1358             : {
    1359           2 :     LBASSERT( NULL != _cm_id );
    1360           2 :     LBASSERT( !_established );
    1361             : 
    1362             : #if 0 // TODO
    1363             :     static const uint8_t DSCP = 0;
    1364             : 
    1365             :     if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
    1366             :             (void *)&DSCP, sizeof(DSCP) ))
    1367             :     {
    1368             :         LBERROR << "rdma_set_option : " << lunchbox::sysError << std::endl;
    1369             :         goto err;
    1370             :     }
    1371             : #endif
    1372             : 
    1373             :     struct rdma_conn_param conn_param;
    1374             : 
    1375           2 :     ::memset( (void *)&conn_param, 0, sizeof(struct rdma_conn_param) );
    1376             : 
    1377           2 :     _cpd.magic = RDMA_PROTOCOL_MAGIC;
    1378           2 :     _cpd.version = RDMA_PROTOCOL_VERSION;
    1379           2 :     _cpd.depth = _depth;
    1380           2 :     conn_param.private_data = reinterpret_cast< const void * >( &_cpd );
    1381           2 :     conn_param.private_data_len = sizeof(struct RDMAConnParamData);
    1382           2 :     conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
    1383           2 :     conn_param.responder_resources = RDMA_MAX_RESP_RES;
    1384             :     // Magic 3-bit values.
    1385             :     //conn_param.retry_count = 5;
    1386             :     //conn_param.rnr_retry_count = 7;
    1387             : 
    1388           6 :     LBINFO << "Connect on source lid : " << std::showbase
    1389           4 :         << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
    1390           4 :         << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
    1391           2 :         << "to dest lid : "
    1392           4 :         << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
    1393           4 :         << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
    1394           6 :         << std::endl;
    1395             : 
    1396           2 :     if( ::rdma_connect( _cm_id, &conn_param ))
    1397             :     {
    1398           0 :         LBERROR << "rdma_connect : " << lunchbox::sysError << std::endl;
    1399           0 :         goto err;
    1400             :     }
    1401             : 
    1402           2 :     return _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
    1403             : 
    1404             : err:
    1405           0 :     return false;
    1406             : }
    1407             : 
    1408           2 : bool RDMAConnection::_bindAddress( )
    1409             : {
    1410           2 :     LBASSERT( NULL != _cm_id );
    1411           2 :     ConstConnectionDescriptionPtr description = getDescription();
    1412             : 
    1413             : #if IPV6_DEFAULT
    1414             :     struct sockaddr_in6 sin;
    1415             :     memset( (void *)&sin, 0, sizeof(struct sockaddr_in6) );
    1416             :     sin.sin6_family = AF_INET6;
    1417             :     sin.sin6_port = htons( description->port );
    1418             :     sin.sin6_addr = in6addr_any;
    1419             : #else
    1420             :     struct sockaddr_in sin;
    1421           2 :     memset( (void *)&sin, 0, sizeof(struct sockaddr_in) );
    1422           2 :     sin.sin_family = AF_INET;
    1423           2 :     sin.sin_port = htons( description->port );
    1424           2 :     sin.sin_addr.s_addr = INADDR_ANY;
    1425             : #endif
    1426             : 
    1427           4 :     if( ::rdma_bind_addr( _cm_id, ( NULL != _rai ) ? _rai->ai_src_addr :
    1428           2 :             reinterpret_cast< struct sockaddr * >( &sin )))
    1429             :     {
    1430           0 :         LBERROR << "rdma_bind_addr : " << lunchbox::sysError << std::endl;
    1431           0 :         goto err;
    1432             :     }
    1433             : 
    1434           2 :     return true;
    1435             : 
    1436             : err:
    1437           0 :     return false;
    1438             : }
    1439             : 
    1440           2 : bool RDMAConnection::_listen( int backlog )
    1441             : {
    1442           2 :     LBASSERT( NULL != _cm_id );
    1443             : 
    1444           2 :     if( ::rdma_listen( _cm_id, backlog ))
    1445             :     {
    1446           0 :         LBERROR << "rdma_listen : " << lunchbox::sysError << std::endl;
    1447           0 :         goto err;
    1448             :     }
    1449             : 
    1450           2 :     return true;
    1451             : 
    1452             : err:
    1453           0 :     return false;
    1454             : }
    1455             : 
    1456           2 : bool RDMAConnection::_migrateId( )
    1457             : {
    1458           2 :     LBASSERT( NULL != _cm_id );
    1459           2 :     LBASSERT( NULL != _cm );
    1460             : 
    1461           2 :     if( ::rdma_migrate_id( _cm_id, _cm ))
    1462             :     {
    1463           0 :         LBERROR << "rdma_migrate_id : " << lunchbox::sysError << std::endl;
    1464           0 :         goto err;
    1465             :     }
    1466             : 
    1467           2 :     return true;
    1468             : 
    1469             : err:
    1470           0 :     return false;
    1471             : }
    1472             : 
    1473           2 : bool RDMAConnection::_accept( )
    1474             : {
    1475             :     struct rdma_conn_param accept_param;
    1476             : 
    1477           2 :     LBASSERT( NULL != _cm_id );
    1478           2 :     LBASSERT( !_established );
    1479             : 
    1480           2 :     ::memset( (void *)&accept_param, 0, sizeof(struct rdma_conn_param) );
    1481             : 
    1482           2 :     _cpd.magic = RDMA_PROTOCOL_MAGIC;
    1483           2 :     _cpd.version = RDMA_PROTOCOL_VERSION;
    1484           2 :     _cpd.depth = _depth;
    1485           2 :     accept_param.private_data = reinterpret_cast< const void * >( &_cpd );
    1486           2 :     accept_param.private_data_len = sizeof(struct RDMAConnParamData);
    1487           2 :     accept_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
    1488           2 :     accept_param.responder_resources = RDMA_MAX_RESP_RES;
    1489             :     // Magic 3-bit value.
    1490             :     //accept_param.rnr_retry_count = 7;
    1491             : 
    1492           6 :     LBINFO << "Accept on source lid : "<< std::showbase
    1493           4 :            << std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
    1494           4 :            << std::dec << ntohs( _cm_id->route.path_rec->slid ) << ") "
    1495           2 :            << "from dest lid : "
    1496           4 :            << std::hex << ntohs( _cm_id->route.path_rec->dlid ) << " ("
    1497           4 :            << std::dec << ntohs( _cm_id->route.path_rec->dlid ) << ") "
    1498           6 :            << std::endl;
    1499             : 
    1500           2 :     if( ::rdma_accept( _cm_id, &accept_param ))
    1501             :     {
    1502           0 :         LBERROR << "rdma_accept : " << lunchbox::sysError << std::endl;
    1503           0 :         goto err;
    1504             :     }
    1505             : 
    1506           2 :     return _waitForCMEvent( RDMA_CM_EVENT_ESTABLISHED );
    1507             : 
    1508             : err:
    1509           0 :     return false;
    1510             : }
    1511             : 
    1512           0 : bool RDMAConnection::_reject( )
    1513             : {
    1514           0 :     LBASSERT( NULL != _cm_id );
    1515           0 :     LBASSERT( !_established );
    1516             : 
    1517           0 :     if( ::rdma_reject( _cm_id, NULL, 0 ))
    1518             :     {
    1519           0 :         LBERROR << "rdma_reject : " << lunchbox::sysError << std::endl;
    1520           0 :         goto err;
    1521             :     }
    1522             : 
    1523           0 :     return true;
    1524             : 
    1525             : err:
    1526           0 :     return false;
    1527             : }
    1528             : 
    1529             : ////////////////////////////////////////////////////////////////////////////////
    1530             : 
    1531           4 : bool RDMAConnection::_initProtocol( int32_t depth )
    1532             : {
    1533           4 :     if( depth < 2L )
    1534             :     {
    1535           0 :         LBERROR << "Invalid queue depth." << std::endl;
    1536           0 :         goto err;
    1537             :     }
    1538             : 
    1539           4 :     _depth = depth;
    1540           4 :     _writes = 0L;
    1541           4 :     _fcs = 0L;
    1542           4 :     _readBytes = 0u;
    1543           4 :     _wcredits = _depth / 2 - 2;
    1544           4 :     _fcredits = _depth / 2 + 2;
    1545             : 
    1546           4 :     return true;
    1547             : 
    1548             : err:
    1549           0 :     return false;
    1550             : }
    1551             : 
    1552             : /* inline */
    1553      815804 : bool RDMAConnection::_needFC( )
    1554             : {
    1555             :     bool bytesAvail;
    1556             : #ifdef _WIN32
    1557             :     lunchbox::ScopedFastRead mutex( _eventLock );
    1558             :     bytesAvail = ( _availBytes != 0 );
    1559             : #else
    1560             :     pollfd pfd;
    1561      815804 :     pfd.fd = _pipe_fd[0];
    1562      815804 :     pfd.events = EPOLLIN;
    1563      815804 :     pfd.revents = 0;
    1564      815804 :     int ret = poll( &pfd, 1, 0 );
    1565      815804 :     if ( ret == -1 )
    1566             :     {
    1567           0 :         LBERROR << "poll: " << lunchbox::sysError << std::endl;
    1568           0 :         return true;
    1569             :     }
    1570      815804 :     bytesAvail = ret != 0;
    1571             : #endif
    1572      815804 :     return ( !bytesAvail || (uint32_t)_writes >= FC_MESSAGE_FREQUENCY );
    1573             : }
    1574             : 
    1575      156864 : bool RDMAConnection::_postReceives( const uint32_t count )
    1576             : {
    1577      156864 :     LBASSERT( NULL != _cm_id->qp );
    1578      156859 :     LBASSERT( count > 0UL );
    1579             : 
    1580      156859 :     ibv_sge* sge = new ibv_sge[count];
    1581      156882 :     ibv_recv_wr* wrs = new ibv_recv_wr[count];
    1582             : 
    1583      589009 :     for( uint32_t i = 0UL; i != count; i++ )
    1584             :     {
    1585      432153 :         sge[i].addr = (uint64_t)(uintptr_t)_msgbuf.getBuffer( );
    1586      432146 :         sge[i].length = uint32_t( _msgbuf.getBufferSize( ));
    1587      432133 :         sge[i].lkey = _msgbuf.getMR( )->lkey;
    1588             : 
    1589      432107 :         wrs[i].wr_id = sge[i].addr;
    1590      432107 :         wrs[i].next = &wrs[i + 1];
    1591      432107 :         wrs[i].sg_list = &sge[i];
    1592      432107 :         wrs[i].num_sge = 1;
    1593             :     }
    1594      156856 :     wrs[count - 1].next = NULL;
    1595             : 
    1596             :     struct ibv_recv_wr *bad_wr;
    1597      156856 :     if( ::rdma_seterrno( ::ibv_post_recv( _cm_id->qp, wrs, &bad_wr )))
    1598             :     {
    1599           0 :         LBERROR << "ibv_post_recv : "  << lunchbox::sysError << std::endl;
    1600           0 :         goto err;
    1601             :     }
    1602             : 
    1603      156804 :     delete[] sge;
    1604      156882 :     delete[] wrs;
    1605      156880 :     return true;
    1606             : 
    1607             : err:
    1608           0 :     delete[] sge;
    1609           0 :     delete[] wrs;
    1610           0 :     return false;
    1611             : }
    1612             : 
    1613             : /* inline */
    1614      408041 : void RDMAConnection::_recvRDMAWrite( const uint32_t imm_data )
    1615             : {
    1616             :     union
    1617             :     {
    1618             :         uint32_t val;
    1619             :         RDMAFCImm fc;
    1620             :     } x;
    1621      408041 :     x.val = ntohl( imm_data );
    1622             : 
    1623             :     // Analysis:
    1624             :     //
    1625             :     // Since the ring pointers are circular, a malicious (presumably overflow)
    1626             :     // value here would at worst only result in us reading arbitrary regions
    1627             :     // from our sink buffer, not segfaulting.  If the other side wanted us to
    1628             :     // reread a previous message it should just resend it!
    1629      408041 :     _sinkptr.incrHead( x.fc.bytes_sent );
    1630      408041 :     _incrAvailableBytes( x.fc.bytes_sent );
    1631             : 
    1632      408041 :     _fcredits += x.fc.fcs_received;
    1633      408041 :     LBASSERTINFO( _fcredits <= _depth, _fcredits << " > " << _depth );
    1634             : 
    1635      408041 :     _writes++;
    1636      408041 : }
    1637             : 
    1638             : /* inline */
    1639      408041 : uint32_t RDMAConnection::_makeImm( const uint32_t b )
    1640             : {
    1641             :     union
    1642             :     {
    1643             :         uint32_t val;
    1644             :         RDMAFCImm fc;
    1645             :     } x;
    1646             : 
    1647      408041 :     x.fc.fcs_received = std::min( MAX_FC, static_cast< uint16_t >( _fcs ));
    1648      408041 :     _fcs -= x.fc.fcs_received;
    1649      408041 :     LBASSERT( _fcs >= 0 );
    1650             : 
    1651      408041 :     LBASSERT( b <= MAX_BS );
    1652      408041 :     x.fc.bytes_sent = b;
    1653             : 
    1654      408041 :     return htonl( x.val );
    1655             : }
    1656             : 
    1657      408041 : bool RDMAConnection::_postRDMAWrite( )
    1658             : {
    1659             :     struct ibv_sge sge;
    1660             :     struct ibv_send_wr wr;
    1661             : 
    1662      816082 :     sge.addr = (uint64_t)( (uintptr_t)_sourcebuf.getBase( ) +
    1663      816082 :         _sourceptr.ptr( _sourceptr.MIDDLE ));
    1664             :     sge.length = (uint64_t)_sourceptr.available( _sourceptr.HEAD,
    1665      408041 :         _sourceptr.MIDDLE );
    1666      408041 :     sge.lkey = _sourcebuf.getMR( )->lkey;
    1667      408041 :     _sourceptr.incr( _sourceptr.MIDDLE, (uint32_t)sge.length );
    1668             : 
    1669      408041 :     wr.wr_id = (uint64_t)sge.length;
    1670      408041 :     wr.next = NULL;
    1671      408041 :     wr.sg_list = &sge;
    1672      408041 :     wr.num_sge = 1;
    1673      408041 :     wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
    1674      408041 :     wr.send_flags = IBV_SEND_SOLICITED; // Important!
    1675      408041 :     wr.imm_data = _makeImm( (uint32_t)sge.length );
    1676      408041 :     wr.wr.rdma.rkey = _rkey;
    1677      816082 :     wr.wr.rdma.remote_addr = (uint64_t)( (uintptr_t)_rbase +
    1678      816082 :         _rptr.ptr( _rptr.HEAD ));
    1679      408041 :     _rptr.incrHead( (uint32_t)sge.length );
    1680             : 
    1681      408041 :     _wcredits--;
    1682      408041 :     LBASSERT( _wcredits >= 0L );
    1683             : 
    1684             :     struct ibv_send_wr *bad_wr;
    1685      408041 :     if( ::rdma_seterrno( ::ibv_post_send( _cm_id->qp, &wr, &bad_wr )))
    1686             :     {
    1687           0 :         LBERROR << "ibv_post_send : "  << lunchbox::sysError << std::endl;
    1688           0 :         goto err;
    1689             :     }
    1690             : 
    1691      408041 :     return true;
    1692             : 
    1693             : err:
    1694           0 :     return false;
    1695             : }
    1696             : 
    1697       22093 : bool RDMAConnection::_postMessage( const RDMAMessage &message )
    1698             : {
    1699       22093 :     _fcredits--;
    1700       22093 :     LBASSERT( _fcredits >= 0L );
    1701             : 
    1702       22093 :     if( ::rdma_post_send( _cm_id, (void *)&message, (void *)&message,
    1703             :             offsetof( RDMAMessage, payload ) + message.length, _msgbuf.getMR( ),
    1704       22093 :             0 ))
    1705             :     {
    1706           0 :         LBERROR << "rdma_post_send : "  << lunchbox::sysError << std::endl;
    1707           0 :         goto err;
    1708             :     }
    1709             : 
    1710       22093 :     return true;
    1711             : 
    1712             : err:
    1713           0 :     return false;
    1714             : }
    1715             : 
    1716       22087 : void RDMAConnection::_recvMessage( const RDMAMessage &message )
    1717             : {
    1718       22087 :     switch( message.opcode )
    1719             :     {
    1720             :         case FC:
    1721       22083 :             if( sizeof(struct RDMAFCPayload) == (size_t)message.length )
    1722       22083 :                 _recvFC( message.payload.fc );
    1723             :             else
    1724           0 :                 LBWARN << "Invalid flow control message received!" << std::endl;
    1725       22083 :             break;
    1726             :         case SETUP:
    1727           4 :             if( sizeof(struct RDMASetupPayload) == (size_t)message.length )
    1728           4 :                 _recvSetup( message.payload.setup );
    1729             :             else
    1730           0 :                 LBWARN << "Invalid setup message received!" << std::endl;
    1731           4 :             break;
    1732             :         default:
    1733           0 :             LBWARN << "Invalid message received: "
    1734           0 :                 << std::hex << (int)message.opcode << std::dec << std::endl;
    1735             :     }
    1736       22087 : }
    1737             : 
    1738             : /* inline */
    1739       22083 : void RDMAConnection::_recvFC( const RDMAFCPayload &fc )
    1740             : {
    1741             :     // Analysis:
    1742             :     //
    1743             :     // Since we will only write a maximum of _sourceptr.available( ) bytes
    1744             :     // to our source buffer, a malicious (presumably overflow) value here would
    1745             :     // have no chance of causing us to write beyond our buffer as we have local
    1746             :     // control over those ring pointers.  Worst case, we'd and up writing to
    1747             :     // arbitrary regions of the remote buffer, since this ring pointer is
    1748             :     // circular as well.
    1749       22083 :     _rptr.incrTail( fc.bytes_received );
    1750             : 
    1751       22083 :     _wcredits += fc.writes_received;
    1752       22083 :     LBASSERTINFO( _wcredits <= _depth, _wcredits << " > " << _depth );
    1753             : 
    1754       22083 :     _fcs++;
    1755       22083 : }
    1756             : 
    1757       22089 : bool RDMAConnection::_postFC()
    1758             : {
    1759             :     RDMAMessage &message =
    1760       22089 :         *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
    1761             : 
    1762       22089 :     message.opcode = FC;
    1763       22089 :     message.length = (uint8_t)sizeof(struct RDMAFCPayload);
    1764             : 
    1765       22089 :     message.payload.fc.bytes_received = _readBytes;
    1766       22089 :     message.payload.fc.writes_received = _writes;
    1767       22089 :     _writes -= message.payload.fc.writes_received;
    1768       22089 :     _readBytes = 0;
    1769             : 
    1770       22089 :     LBASSERT( _writes >= 0 );
    1771       22089 :     return _postMessage( message );
    1772             : }
    1773             : 
    1774           4 : void RDMAConnection::_recvSetup( const RDMASetupPayload &setup )
    1775             : {
    1776             :     // Analysis:
    1777             :     //
    1778             :     // Malicious values here would only affect the receiver, we're willing
    1779             :     // to RDMA write to anywhere specified!
    1780           4 :     _rbase = setup.rbase;
    1781           4 :     _rptr.clear( setup.rlen );
    1782           4 :     _rkey = setup.rkey;
    1783             : 
    1784           4 :     LBVERB << "RDMA MR: " << std::showbase
    1785           0 :         << std::dec << setup.rlen << " @ "
    1786           4 :         << std::hex << setup.rbase << std::dec << std::endl;
    1787           4 : }
    1788             : 
    1789           4 : bool RDMAConnection::_postSetup( )
    1790             : {
    1791             :     RDMAMessage &message =
    1792           4 :         *reinterpret_cast< RDMAMessage * >( _msgbuf.getBuffer( ));
    1793             : 
    1794           4 :     message.opcode = SETUP;
    1795           4 :     message.length = (uint8_t)sizeof(struct RDMASetupPayload);
    1796             : 
    1797           4 :     message.payload.setup.rbase = (uint64_t)(uintptr_t)_sinkbuf.getBase( );
    1798           4 :     message.payload.setup.rlen = (uint64_t)_sinkbuf.getSize( );
    1799           4 :     message.payload.setup.rkey = _sinkbuf.getMR( )->rkey;
    1800             : 
    1801           4 :     return _postMessage( message );
    1802             : }
    1803             : 
    1804           4 : bool RDMAConnection::_waitRecvSetup( )
    1805             : {
    1806           4 :     lunchbox::Clock clock;
    1807           4 :     const int64_t start = clock.getTime64( );
    1808           4 :     const uint32_t timeout = Global::getTimeout( );
    1809           4 :     eventset events;
    1810             : 
    1811             : retry:
    1812          40 :     if( !_checkDisconnected( events ))
    1813             :     {
    1814           0 :         LBERROR << "Error while checking event state." << std::endl;
    1815           0 :         goto err;
    1816             :     }
    1817             : 
    1818          40 :     if( !_established )
    1819             :     {
    1820           0 :         LBERROR << "Disconnected while waiting for setup message." << std::endl;
    1821           0 :         goto err;
    1822             :     }
    1823             : 
    1824          40 :     if( !_checkCQ( true ))
    1825             :     {
    1826           0 :         LBERROR << "Error while polling receive completion queue." << std::endl;
    1827           0 :         goto err;
    1828             :     }
    1829             : 
    1830          40 :     if( 0ULL == _rkey )
    1831             :     {
    1832          36 :         if( LB_TIMEOUT_INDEFINITE != timeout )
    1833             :         {
    1834          36 :             if(( clock.getTime64( ) - start ) > timeout )
    1835             :             {
    1836           0 :                 LBERROR << "Timed out waiting for setup message." << std::endl;
    1837           0 :                 goto err;
    1838             :             }
    1839             :         }
    1840             : 
    1841          36 :         lunchbox::Thread::yield( );
    1842          36 :         goto retry;
    1843             :     }
    1844             : 
    1845           4 :     return true;
    1846             : 
    1847             : err:
    1848           0 :     return false;
    1849             : }
    1850             : 
    1851             : ////////////////////////////////////////////////////////////////////////////////
    1852             : 
    1853           6 : bool RDMAConnection::_createNotifier( )
    1854             : {
    1855           6 :     LBASSERT( _notifier < 0 );
    1856             : #ifdef _WIN32
    1857             :     _notifier = CreateEvent( 0, TRUE, FALSE, 0 );
    1858             :     return true;
    1859             : #else
    1860           6 :     _notifier = ::epoll_create( 1 );
    1861           6 :     if( _notifier < 0 )
    1862             :     {
    1863           0 :         LBERROR << "epoll_create1 : " << lunchbox::sysError << std::endl;
    1864           0 :         goto err;
    1865             :     }
    1866             : 
    1867           6 :     return true;
    1868             : 
    1869             : err:
    1870           0 :     return false;
    1871             : #endif
    1872             : }
    1873             : 
    1874           6 : void RDMAConnection::_updateNotifier()
    1875             : {
    1876             : #ifdef _WIN32
    1877             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    1878             :     if ( _availBytes == 0 && !_cm->channel.Head  &&
    1879             :             ( !_cc || !_cc->comp_channel.Head ))
    1880             :         ResetEvent( _notifier );
    1881             : #else
    1882           6 :     eventset events;
    1883           6 :     _checkEvents( events );
    1884             : #endif
    1885           6 : }
    1886             : 
    1887     1287088 : bool RDMAConnection::_checkEvents( eventset &events )
    1888             : {
    1889     1287088 :     events.reset( );
    1890             : 
    1891             : #ifdef _WIN32
    1892             :     lunchbox::ScopedFastRead mutex( _eventLock );
    1893             :     if ( _availBytes != 0 )
    1894             :         events.set( BUF_EVENT );
    1895             :     if ( _cc  && ( _cc->comp_channel.Head != 0 ))
    1896             :         events.set( CQ_EVENT );
    1897             :     if ( _cm->channel.Head )
    1898             :         events.set( CM_EVENT );
    1899             : 
    1900             :     return true;
    1901             : #else
    1902             :     struct epoll_event evts[3];
    1903             : 
    1904     1287907 :     int nfds = TEMP_FAILURE_RETRY( ::epoll_wait( _notifier, evts, 3, 0 ));
    1905     1285170 :     if( nfds < 0 )
    1906             :     {
    1907           0 :         LBERROR << "epoll_wait : " << lunchbox::sysError << std::endl;
    1908           0 :         goto err;
    1909             :     }
    1910             : 
    1911     1732015 :     for( int i = 0; i < nfds; i++ )
    1912             :     {
    1913      446845 :         const int fd = evts[i].data.fd;
    1914      446845 :         if(( _pipe_fd[0] >= 0 ) && ( fd == _pipe_fd[0] ))
    1915      405648 :             events.set( BUF_EVENT );
    1916       41197 :         else if( _cc && ( fd == _cc->fd ))
    1917       41185 :             events.set( CQ_EVENT );
    1918          12 :         else if( _cm && ( fd == _cm->fd ))
    1919          12 :             events.set( CM_EVENT );
    1920             :         else
    1921           0 :             LBUNREACHABLE;
    1922             :     }
    1923             : 
    1924     1285170 :     return true;
    1925             : 
    1926             : err:
    1927           0 :     return false;
    1928             : #endif
    1929             : }
    1930             : 
    1931     1203071 : bool RDMAConnection::_checkDisconnected( eventset &events )
    1932             : {
    1933     1203071 :     lunchbox::ScopedWrite poll_mutex( _poll_lock );
    1934             : 
    1935     1203151 :     if( !_checkEvents( events ))
    1936             :     {
    1937           0 :         LBERROR << "Error while checking event state." << std::endl;
    1938           0 :         goto err;
    1939             :     }
    1940             : 
    1941     1200016 :     if( events.test( CM_EVENT ))
    1942             :     {
    1943           2 :         if( !_doCMEvent( RDMA_CM_EVENT_DISCONNECTED ))
    1944             :         {
    1945           0 :             LBERROR << "Unexpected connection manager event." << std::endl;
    1946           0 :             goto err;
    1947             :         }
    1948             : 
    1949           2 :         LBASSERT( !_established );
    1950             :     }
    1951             : 
    1952     1198303 :     return true;
    1953             : 
    1954             : err:
    1955           0 :     return false;
    1956             : }
    1957             : 
    1958           4 : bool RDMAConnection::_createBytesAvailableFD( )
    1959             : {
    1960             : #ifndef _WIN32
    1961           4 :     if ( pipe( _pipe_fd ) == -1 )
    1962             :     {
    1963           0 :         LBERROR << "pipe: " << lunchbox::sysError << std::endl;
    1964           0 :         return false;
    1965             :     }
    1966             : 
    1967           4 :     LBASSERT( _pipe_fd[0] >= 0 && _pipe_fd[1] >= 0);
    1968             : 
    1969             :     // Use the event fd to signal Collage of bytes remaining.
    1970             :     struct epoll_event evctl;
    1971           4 :     ::memset( (void *)&evctl, 0, sizeof(struct epoll_event) );
    1972           4 :     evctl.events = EPOLLIN;
    1973           4 :     evctl.data.fd = _pipe_fd[0];
    1974           4 :     if( ::epoll_ctl( _notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl ))
    1975             :     {
    1976           0 :         LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
    1977           0 :         return false;
    1978             :     }
    1979             : #endif
    1980           4 :     return true;
    1981             : }
    1982             : 
    1983      813115 : bool RDMAConnection::_incrAvailableBytes( const uint64_t b )
    1984             : {
    1985             : #ifdef _WIN32
    1986             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    1987             :     _availBytes += b;
    1988             :     ::SetEvent( _notifier );
    1989             : #else
    1990      813115 :     if( ::write( _pipe_fd[1], (const void *)&b, sizeof(b) ) != sizeof(b) )
    1991             :     {
    1992           0 :         LBERROR << "write : " << lunchbox::sysError << std::endl;
    1993           0 :         return false;
    1994             :     }
    1995             : #endif
    1996      813115 :     return true;
    1997             : }
    1998             : 
    1999      405648 : uint64_t RDMAConnection::_getAvailableBytes( )
    2000             : {
    2001      405648 :     uint64_t available_bytes = 0;
    2002             : #ifdef _WIN32
    2003             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    2004             :     available_bytes = static_cast<uint64_t>( _availBytes );
    2005             :     _availBytes = 0;
    2006             :     return available_bytes;
    2007             : #else
    2008      405648 :     uint64_t currBytes = 0;
    2009             :     ssize_t count;
    2010             :     pollfd pfd;
    2011      405648 :     pfd.fd = _pipe_fd[0];
    2012      405648 :     pfd.events = EPOLLIN;
    2013             : 
    2014      813115 :     do
    2015             :     {
    2016      813115 :         count = ::read( _pipe_fd[0], (void *)&currBytes, sizeof( currBytes ));
    2017      813115 :         if ( count > 0 && count < (ssize_t)sizeof( currBytes ) )
    2018           0 :             goto err;
    2019      813115 :         available_bytes += currBytes;
    2020      813115 :         pfd.revents = 0;
    2021      813115 :         if ( ::poll( &pfd, 1, 0 ) == -1 )
    2022             :         {
    2023           0 :             LBERROR << "poll : " << lunchbox::sysError << std::endl;
    2024           0 :             goto err;
    2025             :         }
    2026      813115 :     } while ( pfd.revents > 0 );
    2027             : 
    2028      405648 :     if ( count == -1 )
    2029             :     {
    2030           0 :         LBERROR << "read : " << lunchbox::sysError << std::endl;
    2031           0 :         goto err;
    2032             :     }
    2033             : 
    2034      405648 :     LBASSERT( available_bytes > 0ULL );
    2035      405648 :     return available_bytes;
    2036             : err:
    2037           0 :     return 0ULL;
    2038             : #endif
    2039             : }
    2040             : 
    2041          10 : bool RDMAConnection::_waitForCMEvent( enum rdma_cm_event_type expected )
    2042             : {
    2043          10 :     lunchbox::Clock clock;
    2044          10 :     const int64_t start = clock.getTime64( );
    2045          10 :     const uint32_t timeout = Global::getTimeout( );
    2046          10 :     bool done = false;
    2047          10 :     eventset events;
    2048             : 
    2049             : retry:
    2050       83954 :     if( !_checkEvents( events ))
    2051             :     {
    2052           0 :         LBERROR << "Error while checking event state." << std::endl;
    2053           0 :         goto err;
    2054             :     }
    2055             : 
    2056       83981 :     if( events.test( CM_EVENT ))
    2057             :     {
    2058          10 :         done = _doCMEvent( expected );
    2059          10 :         if( !done )
    2060             :         {
    2061           0 :             LBERROR << "Unexpected connection manager event." << std::endl;
    2062           0 :             goto err;
    2063             :         }
    2064             :     }
    2065             : 
    2066       83948 :     if( !done )
    2067             :     {
    2068       83938 :         if( LB_TIMEOUT_INDEFINITE != timeout )
    2069             :         {
    2070       83938 :             if(( clock.getTime64( ) - start ) > timeout )
    2071             :             {
    2072           0 :                 LBERROR << "Timed out waiting for setup message." << std::endl;
    2073           0 :                 goto err;
    2074             :             }
    2075             :         }
    2076             : 
    2077       83919 :         lunchbox::Thread::yield( );
    2078       83944 :         goto retry;
    2079             :     }
    2080             : 
    2081          10 :     return true;
    2082             : 
    2083             : err:
    2084           0 :     return false;
    2085             : }
    2086             : 
    2087          12 : bool RDMAConnection::_doCMEvent( enum rdma_cm_event_type expected )
    2088             : {
    2089          12 :     bool ok = false;
    2090             :     struct rdma_cm_event *event;
    2091             : 
    2092          12 :     if( ::rdma_get_cm_event( _cm, &event ))
    2093             :     {
    2094           0 :         LBERROR << "rdma_get_cm_event : " << lunchbox::sysError << std::endl;
    2095           0 :         goto out;
    2096             :     }
    2097             : 
    2098          12 :     ok = ( event->event == expected );
    2099             : 
    2100             : #ifndef NDEBUG
    2101          12 :     if( ok )
    2102             :     {
    2103          12 :         LBVERB << (void *)this
    2104           0 :             << " (" << _addr << ":" << _serv << ")"
    2105           0 :             << " event : " << ::rdma_event_str( event->event )
    2106          12 :             << std::endl;
    2107             :     }
    2108             :     else
    2109             :     {
    2110           0 :         LBINFO << (void *)this
    2111           0 :             << " (" << _addr << ":" << _serv << ")"
    2112           0 :             << " event : " << ::rdma_event_str( event->event )
    2113           0 :             << " expected: " << ::rdma_event_str( expected )
    2114           0 :             << std::endl;
    2115             :     }
    2116             : #endif
    2117             : 
    2118          12 :     if( ok && ( RDMA_CM_EVENT_DISCONNECTED == event->event ))
    2119           2 :         _established = false;
    2120             : 
    2121          12 :     if( ok && ( RDMA_CM_EVENT_ESTABLISHED == event->event ))
    2122             :     {
    2123           4 :         _established = true;
    2124             : 
    2125           4 :         struct rdma_conn_param *cp = &event->param.conn;
    2126             : 
    2127           4 :         ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
    2128             :         // Note that the actual amount of data transferred to the remote side
    2129             :         // is transport dependent and may be larger than that requested.
    2130           4 :         if( cp->private_data_len >= sizeof(RDMAConnParamData) )
    2131             :             _cpd = *reinterpret_cast< const RDMAConnParamData * >(
    2132           2 :                 cp->private_data );
    2133             :     }
    2134             : 
    2135          12 :     if( ok && ( RDMA_CM_EVENT_CONNECT_REQUEST == event->event ))
    2136             :     {
    2137           2 :         _new_cm_id = event->id;
    2138             : 
    2139           2 :         struct rdma_conn_param *cp = &event->param.conn;
    2140             : 
    2141           2 :         ::memset( (void *)&_cpd, 0, sizeof(RDMAConnParamData) );
    2142             :         // TODO : Not sure what happens when initiator sent ai_connect data
    2143             :         // (assuming the underlying transport doesn't strip it)?
    2144           2 :         if( cp->private_data_len >= sizeof(RDMAConnParamData) )
    2145             :             _cpd = *reinterpret_cast< const RDMAConnParamData * >(
    2146           2 :                 cp->private_data );
    2147             :     }
    2148             : 
    2149          12 :     if( RDMA_CM_EVENT_REJECTED == event->event )
    2150           0 :         LBINFO << "Connection reject status : " << event->status << std::endl;
    2151             : 
    2152          12 :     if( ::rdma_ack_cm_event( event ))
    2153           0 :         LBWARN << "rdma_ack_cm_event : "  << lunchbox::sysError << std::endl;
    2154             : 
    2155             : out:
    2156          12 :     return ok;
    2157             : }
    2158             : 
    2159       41185 : bool RDMAConnection::_rearmCQ( )
    2160             : {
    2161             :     struct ibv_cq *ev_cq;
    2162             :     void *ev_ctx;
    2163             : 
    2164             : #ifdef _WIN32
    2165             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    2166             : #endif
    2167       41185 :     if( ::ibv_get_cq_event( _cc, &ev_cq, &ev_ctx ))
    2168             :     {
    2169           0 :         LBERROR << "ibv_get_cq_event : " << lunchbox::sysError << std::endl;
    2170           0 :         return false;
    2171             :     }
    2172             : 
    2173             :     // http://lists.openfabrics.org/pipermail/general/2008-November/055237.html
    2174       41185 :     _completions++;
    2175       41185 :     if( std::numeric_limits< unsigned int >::max( ) == _completions )
    2176             :     {
    2177           0 :         ::ibv_ack_cq_events( _cq, _completions );
    2178           0 :         _completions = 0U;
    2179             :     }
    2180             : 
    2181             :     // Solicited only!
    2182       41185 :     if( ::rdma_seterrno( ::ibv_req_notify_cq( _cq, 1 )))
    2183             :     {
    2184           0 :         LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
    2185           0 :         return false;
    2186             :     }
    2187             : 
    2188       41185 :     return true;
    2189             : }
    2190             : 
    2191     1201460 : bool RDMAConnection::_checkCQ( bool drain )
    2192             : {
    2193             :     uint32_t num_recvs;
    2194             :     int count;
    2195             : 
    2196     1201460 :     lunchbox::ScopedWrite poll_mutex( _poll_lock );
    2197             : 
    2198     1202307 :     if( NULL == _cq )
    2199        5511 :         goto out;
    2200             : 
    2201             : repoll:
    2202             :     /* CHECK RECEIVE COMPLETIONS */
    2203     1332679 :     count = ::ibv_poll_cq( _cq, _depth, _wcs );
    2204     1338226 :     if( count < 0 )
    2205             :     {
    2206           0 :         LBERROR << "ibv_poll_cq : " << lunchbox::sysError << std::endl;
    2207           0 :         goto err;
    2208             :     }
    2209             : 
    2210     1338226 :     num_recvs = 0UL;
    2211     2185098 :     for( int i = 0; i != count ; i++ )
    2212             :     {
    2213      851621 :         struct ibv_wc &wc = _wcs[i];
    2214             : 
    2215      851621 :         if( IBV_WC_SUCCESS != wc.status )
    2216             :         {
    2217             :             // Non-fatal.
    2218           0 :             if( IBV_WC_WR_FLUSH_ERR == wc.status )
    2219           0 :                 continue;
    2220             : 
    2221           0 :             LBWARN << (void *)this << " !IBV_WC_SUCCESS : " << std::showbase
    2222           0 :                 << std::hex << "wr_id = " << wc.wr_id
    2223           0 :                 << ", status = \"" << ::ibv_wc_status_str( wc.status ) << "\""
    2224           0 :                 << std::dec << " (" << (unsigned int)wc.status << ")"
    2225           0 :                 << std::hex << ", vendor_err = " << wc.vendor_err
    2226           0 :                 << std::dec << std::endl;
    2227             : 
    2228             :             // All others are fatal.
    2229           0 :             goto err;
    2230             :         }
    2231             : 
    2232      851621 :         LBASSERT( IBV_WC_SUCCESS == wc.status );
    2233             : 
    2234             : #ifdef _WIN32 //----------------------------------------------------------------
    2235             :         // WINDOWS IBV API WORKAROUND.
    2236             :         // TODO: remove this as soon as IBV API is fixed
    2237             :         if ( wc.opcode == IBV_WC_RECV && wc.wc_flags == IBV_WC_WITH_IMM )
    2238             :             wc.opcode = IBV_WC_RECV_RDMA_WITH_IMM;
    2239             : #endif //-----------------------------------------------------------------------
    2240             : 
    2241      860225 :         if( IBV_WC_RECV_RDMA_WITH_IMM == wc.opcode )
    2242      408041 :             _recvRDMAWrite( wc.imm_data );
    2243      452184 :         else if( IBV_WC_RECV == wc.opcode )
    2244       22087 :             _recvMessage( *reinterpret_cast< RDMAMessage * >( wc.wr_id ));
    2245      430097 :         else if( IBV_WC_SEND == wc.opcode )
    2246       22093 :             _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
    2247      408004 :         else if( IBV_WC_RDMA_WRITE == wc.opcode )
    2248      408004 :             _sourceptr.incrTail( (uint32_t)wc.wr_id );
    2249             :         else
    2250           0 :             LBUNREACHABLE;
    2251             : 
    2252      846896 :         if( IBV_WC_RECV & wc.opcode )
    2253             :         {
    2254      430112 :             _msgbuf.freeBuffer( (void *)(uintptr_t)wc.wr_id );
    2255             :             // All receive completions need to be reposted.
    2256      430088 :             num_recvs++;
    2257             :         }
    2258             :     }
    2259             : 
    2260     1333477 :     if(( num_recvs > 0UL ) && !_postReceives( num_recvs ))
    2261           0 :         goto err;
    2262             : 
    2263     1333486 :     if( drain && ( count > 0 ))
    2264      135883 :         goto repoll;
    2265             : 
    2266             : out:
    2267     1203114 :     return true;
    2268             : 
    2269             : err:
    2270           0 :     return false;
    2271             : }
    2272             : 
    2273             : /* inline */
    2274      405650 : uint32_t RDMAConnection::_drain( void *buffer, const uint32_t bytes )
    2275             : {
    2276      405650 :     const uint32_t b = std::min( bytes, _sinkptr.available( ));
    2277      811300 :     ::memcpy( buffer, (const void *)((uintptr_t)_sinkbuf.getBase( ) +
    2278     1216950 :         _sinkptr.tail( )), b );
    2279      405650 :     _sinkptr.incrTail( b );
    2280      405650 :     return b;
    2281             : }
    2282             : 
    2283             : /* inline */
    2284      793520 : uint32_t RDMAConnection::_fill( const void *buffer, const uint32_t bytes )
    2285             : {
    2286      793520 :     uint32_t b = std::min( bytes, std::min( _sourceptr.negAvailable( ),
    2287     1587040 :         _rptr.negAvailable( )));
    2288             : #ifndef WRAP
    2289      793520 :     b = std::min( b, (uint32_t)(_sourcebuf.getSize() -
    2290      793520 :                                             _sourceptr.ptr( _sourceptr.HEAD )));
    2291             : #endif
    2292     1587040 :     ::memcpy( (void *)((uintptr_t)_sourcebuf.getBase( ) +
    2293     2380560 :         _sourceptr.ptr( _sourceptr.HEAD )), buffer, b );
    2294      793520 :     _sourceptr.incrHead( b );
    2295      793520 :     return b;
    2296             : }
    2297             : 
    2298           0 : Connection::Notifier RDMAConnection::getNotifier() const
    2299             : {
    2300           0 :     return _notifier;
    2301             : }
    2302             : 
    2303             : #ifdef _WIN32
    2304             : void RDMAConnection::_triggerNotifierCQ( RDMAConnection* conn )
    2305             : {
    2306             :     conn->_triggerNotifierWorker( CQ_EVENT );
    2307             : }
    2308             : 
    2309             : void RDMAConnection::_triggerNotifierCM( RDMAConnection* conn )
    2310             : {
    2311             :     conn->_triggerNotifierWorker( CM_EVENT );
    2312             : }
    2313             : 
    2314             : void RDMAConnection::_triggerNotifierWorker( Events event )
    2315             : {
    2316             :     lunchbox::ScopedFastWrite mutex( _eventLock );
    2317             :     COMP_CHANNEL* chan = event == CQ_EVENT ? &_cc->comp_channel : &_cm->channel;
    2318             :     EnterCriticalSection( &chan->Lock );
    2319             :     ResetEvent( chan->Event );
    2320             :     if ( chan->Head )
    2321             :         SetEvent( _notifier );
    2322             :     LeaveCriticalSection( &chan->Lock );
    2323             : }
    2324             : #endif
    2325             : 
    2326             : //////////////////////////////////////////////////////////////////////////////
    2327             : 
    2328           0 : void RDMAConnection::_showStats( )
    2329             : {
    2330           0 :     LBVERB << std::dec
    2331           0 :         << "reads = " << _stats.reads
    2332           0 :         << ", buffer_empty = " << _stats.buffer_empty
    2333           0 :         << ", no_credits_fc = " << _stats.no_credits_fc
    2334           0 :         << ", writes = " << _stats.writes
    2335           0 :         << ", buffer_full = " << _stats.buffer_full
    2336           0 :         << ", no_credits_rdma = " << _stats.no_credits_rdma
    2337           0 :         << std::endl;
    2338           0 : }
    2339             : 
    2340             : //////////////////////////////////////////////////////////////////////////////
    2341             : 
    2342           6 : BufferPool::BufferPool( size_t buffer_size )
    2343             :     : _buffer_size( buffer_size )
    2344             :     , _num_bufs( 0 )
    2345             :     , _buffer( NULL )
    2346             :     , _mr( NULL )
    2347           6 :     , _ring( 0 )
    2348             : {
    2349           6 : }
    2350             : 
    2351          12 : BufferPool::~BufferPool( )
    2352             : {
    2353           6 :     clear( );
    2354           6 : }
    2355             : 
    2356          20 : void BufferPool::clear( )
    2357             : {
    2358          20 :     _num_bufs = 0;
    2359          20 :     _ring.clear( _num_bufs );
    2360             : 
    2361          20 :     if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
    2362           0 :         LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
    2363          20 :     _mr = NULL;
    2364             : 
    2365          20 :     if( NULL != _buffer )
    2366             : #ifdef _WIN32
    2367             :         _aligned_free( _buffer );
    2368             : #else
    2369           4 :         ::free( _buffer );
    2370             : #endif
    2371          20 :     _buffer = NULL;
    2372          20 : }
    2373             : 
    2374           4 : bool BufferPool::resize( ibv_pd *pd, uint32_t num_bufs )
    2375             : {
    2376           4 :     clear( );
    2377             : 
    2378           4 :     if( num_bufs )
    2379             :     {
    2380           4 :         _num_bufs = num_bufs;
    2381           4 :         _ring.clear( _num_bufs );
    2382             : 
    2383             : #ifdef _WIN32
    2384             :         _buffer = _aligned_malloc((size_t)( _num_bufs * _buffer_size ),
    2385             :             boost::interprocess::mapped_region::get_page_size());
    2386             :         if ( !_buffer )
    2387             :         {
    2388             :             LBERROR << "_aligned_malloc : Couldn't allocate aligned memory. "
    2389             :                     << lunchbox::sysError << std::endl;
    2390             :             goto err;
    2391             :         }
    2392             : #else
    2393           8 :         if( ::posix_memalign( &_buffer, (size_t)::getpagesize( ),
    2394           8 :                 (size_t)( _num_bufs * _buffer_size )))
    2395             :         {
    2396           0 :             LBERROR << "posix_memalign : " << lunchbox::sysError << std::endl;
    2397           0 :             goto err;
    2398             :         }
    2399             : #endif
    2400           4 :         ::memset( _buffer, 0xff, (size_t)( _num_bufs * _buffer_size ));
    2401             :         _mr = ::ibv_reg_mr( pd, _buffer, (size_t)( _num_bufs * _buffer_size ),
    2402           4 :             IBV_ACCESS_LOCAL_WRITE );
    2403           4 :         if( NULL == _mr )
    2404             :         {
    2405           0 :             LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
    2406           0 :             goto err;
    2407             :         }
    2408             : 
    2409        4100 :         for( uint32_t i = 0; i != _num_bufs; i++ )
    2410        4096 :             _ring.put( i );
    2411             :     }
    2412             : 
    2413           4 :     return true;
    2414             : 
    2415             : err:
    2416           0 :     return false;
    2417             : }
    2418             : 
    2419             : //////////////////////////////////////////////////////////////////////////////
    2420             : 
    2421          12 : RingBuffer::RingBuffer( int access )
    2422             :     : _access( access )
    2423             :     , _size( 0 )
    2424             : #ifdef _WIN32
    2425             :     , _mapping( 0 )
    2426             :     , _map( 0 )
    2427             : #else
    2428             :     , _map( MAP_FAILED )
    2429             : #endif
    2430          12 :     , _mr( 0 )
    2431             : {
    2432          12 : }
    2433             : 
    2434          12 : RingBuffer::~RingBuffer( )
    2435             : {
    2436          12 :     clear( );
    2437          12 : }
    2438             : 
    2439          40 : void RingBuffer::clear( )
    2440             : {
    2441          40 :     if(( NULL != _mr ) && ::rdma_dereg_mr( _mr ))
    2442           0 :         LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
    2443          40 :     _mr = NULL;
    2444             : 
    2445             : #ifdef _WIN32
    2446             :     if ( _map )
    2447             :     {
    2448             :         UnmapViewOfFile( static_cast<char*>( _map ));
    2449             :         UnmapViewOfFile( static_cast<char*>( _map ) + _size );
    2450             :     }
    2451             :     if ( _mapping )
    2452             :         CloseHandle( _mapping );
    2453             : 
    2454             :     _map = 0;
    2455             :     _mapping = 0;
    2456             : #else
    2457          40 :     if(( MAP_FAILED != _map ) && ::munmap( _map, _size << 1 ))
    2458           0 :         LBWARN << "munmap : " << lunchbox::sysError << std::endl;
    2459          40 :     _map = MAP_FAILED;
    2460             : #endif
    2461          40 :     _size = 0;
    2462          40 : }
    2463             : 
    2464           8 : bool RingBuffer::resize( ibv_pd *pd, size_t size )
    2465             : {
    2466           8 :     bool ok = false;
    2467           8 :     int fd = -1;
    2468             : 
    2469           8 :     clear( );
    2470             : 
    2471           8 :     if( size )
    2472             :     {
    2473             : #ifdef _WIN32
    2474             :         uint32_t num_retries = RINGBUFFER_ALLOC_RETRIES;
    2475             :         while (!_map && num_retries-- != 0)
    2476             :         {
    2477             :             void *target_addr = determineViableAddr( size * 2 );
    2478             :             if (target_addr)
    2479             :                 allocAt( size, target_addr );
    2480             :         }
    2481             : 
    2482             :         if ( !_map )
    2483             :         {
    2484             :             LBERROR << "Couldn't allocate desired RingBuffer memory after "
    2485             :                     << RINGBUFFER_ALLOC_RETRIES << " retries." << std::endl;
    2486             :         }
    2487             :         else
    2488             :         {
    2489             :             LBVERB << "Allocated RDMA Ringbuffer memory in "
    2490             :                     << ( RINGBUFFER_ALLOC_RETRIES - num_retries ) << " tries."
    2491             :                     << std::endl;
    2492             :             ok = true;
    2493             :         }
    2494             : 
    2495             : #else
    2496             :         void *addr1, *addr2;
    2497           8 :         char path[] = "/dev/shm/co-rdma-buffer-XXXXXX";
    2498             : 
    2499           8 :         _size = size;
    2500             : 
    2501           8 :         fd = ::mkstemp( path );
    2502           8 :         if( fd < 0 )
    2503             :         {
    2504           0 :             LBERROR << "mkstemp : " << lunchbox::sysError << std::endl;
    2505           0 :             goto out;
    2506             :         }
    2507             : 
    2508           8 :         if( ::unlink( path ))
    2509             :         {
    2510           0 :             LBERROR << "unlink : " << lunchbox::sysError << std::endl;
    2511           0 :             goto out;
    2512             :         }
    2513             : 
    2514           8 :         if( ::ftruncate( fd, _size ))
    2515             :         {
    2516           0 :             LBERROR << "ftruncate : " << lunchbox::sysError << std::endl;
    2517           0 :             goto out;
    2518             :         }
    2519             : 
    2520             :         _map = ::mmap( NULL, _size << 1,
    2521             :             PROT_NONE,
    2522           8 :             MAP_ANONYMOUS | MAP_PRIVATE, -1, 0 );
    2523           8 :         if( MAP_FAILED == _map )
    2524             :         {
    2525           0 :             LBERROR << "mmap : " << lunchbox::sysError << std::endl;
    2526           0 :             goto out;
    2527             :         }
    2528             : 
    2529             :         addr1 = ::mmap( _map, _size,
    2530             :             PROT_READ | PROT_WRITE,
    2531           8 :             MAP_FIXED | MAP_SHARED, fd, 0 );
    2532           8 :         if( MAP_FAILED == addr1 )
    2533             :         {
    2534           0 :             LBERROR << "mmap : " << lunchbox::sysError << std::endl;
    2535           0 :             goto out;
    2536             :         }
    2537             : 
    2538           8 :         addr2 = ::mmap( (void *)( (uintptr_t)_map + _size ), _size,
    2539             :             PROT_READ | PROT_WRITE,
    2540          16 :             MAP_FIXED | MAP_SHARED, fd, 0 );
    2541           8 :         if( MAP_FAILED == addr2 )
    2542             :         {
    2543           0 :             LBERROR << "mmap : " << lunchbox::sysError << std::endl;
    2544           0 :             goto out;
    2545             :         }
    2546             : 
    2547           8 :         LBASSERT( addr1 == _map );
    2548           8 :         LBASSERT( addr2 == (void *)( (uintptr_t)_map + _size ));
    2549             : 
    2550             : #endif
    2551             : #ifdef WRAP
    2552             :         _mr = ::ibv_reg_mr( pd, _map, _size << 1, _access );
    2553             : #else
    2554           8 :         _mr = ::ibv_reg_mr( pd, _map, _size, _access );
    2555             : #endif
    2556             : 
    2557           8 :         if( NULL == _mr )
    2558             :         {
    2559           0 :             LBERROR << "ibv_reg_mr : " << lunchbox::sysError << std::endl;
    2560           0 :             goto out;
    2561             :         }
    2562             : 
    2563           8 :         ::memset( _map, 0, _size );
    2564           8 :         *reinterpret_cast< uint8_t * >( _map ) = 0x45;
    2565           8 :         LBASSERT( 0x45 ==
    2566             :             *reinterpret_cast< uint8_t * >( (uintptr_t)_map + _size ));
    2567             :     }
    2568             : 
    2569           8 :     ok = true;
    2570             : 
    2571             : out:
    2572             : #ifndef _WIN32
    2573           8 :     if(( fd >= 0 ) && TEMP_FAILURE_RETRY( ::close( fd )))
    2574           0 :         LBWARN << "close : " << lunchbox::sysError << std::endl;
    2575             : #endif
    2576           8 :     return ok;
    2577             : }
    2578             : 
    2579             : #ifdef _WIN32
    2580             : void* RingBuffer::determineViableAddr( size_t size )
    2581             : {
    2582             :     void *ptr = VirtualAlloc(0, size, MEM_RESERVE, PAGE_NOACCESS);
    2583             :     if (!ptr)
    2584             :         return 0;
    2585             : 
    2586             :     VirtualFree(ptr, 0, MEM_RELEASE);
    2587             :     return ptr;
    2588             : }
    2589             : 
    2590             : void RingBuffer::allocAt( size_t size, void* desiredAddr )
    2591             : {
    2592             :     // if we already hold one allocation, refuse to make another.
    2593             :     LBASSERT( !_map );
    2594             :     LBASSERT( !_mapping );
    2595             :     if ( _map || _mapping )
    2596             :         return;
    2597             : 
    2598             :     // is ring_size a multiple of 64k? if not, this won't ever work!
    2599             :     if (( size & 0xffff ) != 0 )
    2600             :         return;
    2601             : 
    2602             :     // try to allocate and map our space
    2603             :     size_t alloc_size = size * 2;
    2604             :     _mapping = CreateFileMappingA(  INVALID_HANDLE_VALUE,
    2605             :                                     0,
    2606             :                                     PAGE_READWRITE,
    2607             :                                     (unsigned long long)alloc_size >> 32,
    2608             :                                     alloc_size & 0xffffffffu,
    2609             :                                     0 );
    2610             : 
    2611             :     if ( !_mapping )
    2612             :     {
    2613             :         LBERROR << "CreateFileMappingA failed" << std::endl;
    2614             :         goto err;
    2615             :     }
    2616             : 
    2617             :     _map = MapViewOfFileEx( _mapping,
    2618             :                             FILE_MAP_ALL_ACCESS,
    2619             :                             0, 0,
    2620             :                             size,
    2621             :                             desiredAddr );
    2622             : 
    2623             :     if ( !_map )
    2624             :     {
    2625             :         LBERROR << "First MapViewOfFileEx failed" << std::endl;
    2626             :         goto err;
    2627             :     }
    2628             : 
    2629             :     if (!MapViewOfFileEx(   _mapping,
    2630             :                             FILE_MAP_ALL_ACCESS,
    2631             :                             0, 0,
    2632             :                             size,
    2633             :                             (char *)desiredAddr + size))
    2634             :     {
    2635             :         LBERROR << "Second MapViewOfFileEx failed" << std::endl;
    2636             :         goto err;
    2637             :     }
    2638             : 
    2639             :     _size = size;
    2640             :     return;
    2641             : err:
    2642             :     // something went wrong - clean up
    2643             :     clear();
    2644             : }
    2645             : #endif
    2646             : 
    2647             : 
    2648          63 : } // namespace co

Generated by: LCOV version 1.11