LCOV - code coverage report
Current view: top level - co - rdmaConnection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 1 1088 0.1 %
Date: 2018-01-09 16:37:03 Functions: 2 67 3.0 %

          Line data    Source code
       1             : // -*- mode: c++ -*-
       2             : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
       3             :  *                     United States Naval Research Laboratory
       4             :  *               2012-2013, Stefan Eilemann <eile@eyescale.ch>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : #include "rdmaConnection.h"
      22             : 
      23             : #include "connectionDescription.h"
      24             : #include "connectionType.h"
      25             : #include "global.h"
      26             : 
      27             : #include <lunchbox/clock.h>
      28             : 
      29             : #ifdef _WIN32
      30             : #pragma warning(disable : 4018)
      31             : #include <boost/interprocess/mapped_region.hpp>
      32             : #pragma warning(default : 4018)
      33             : #else
      34             : #include <arpa/inet.h>
      35             : #include <poll.h>
      36             : #include <sys/epoll.h>
      37             : #include <sys/mman.h>
      38             : #include <unistd.h>
      39             : #endif
      40             : 
      41             : #include <errno.h>
      42             : #include <fcntl.h>
      43             : #include <limits>
      44             : #include <sstream>
      45             : #include <stddef.h>
      46             : 
      47             : #include <rdma/rdma_verbs.h>
      48             : 
      49             : #define IPV6_DEFAULT 0
      50             : 
      51             : #define RDMA_PROTOCOL_MAGIC 0xC0
      52             : #define RDMA_PROTOCOL_VERSION 0x03
      53             : 
      54             : namespace co
      55             : {
      56             : // namespace { static const uint64_t ONE = 1ULL; }
      57             : 
      58             : /**
      59             :  * Message types
      60             :  */
      61             : enum OpCode
      62             : {
      63             :     SETUP = 1 << 0,
      64             :     FC = 1 << 1,
      65             : };
      66             : 
      67             : /**
      68             :  * Initial setup message used to exchange sink MR parameters.
      69             :  */
      70             : struct RDMASetupPayload
      71             : {
      72             :     uint64_t rbase;
      73             :     uint64_t rlen;
      74             :     uint64_t rkey;
      75             : };
      76             : 
      77             : /**
      78             :  * "ACK" messages sent after read, tells source about receive progress.
      79             :  */
      80             : struct RDMAFCPayload
      81             : {
      82             :     uint32_t bytes_received;
      83             :     uint32_t writes_received;
      84             : };
      85             : 
      86             : /**
      87             :  * Payload wrapper
      88             :  */
      89             : struct RDMAMessage
      90             : {
      91             :     enum OpCode opcode;
      92             :     uint8_t length;
      93             :     union {
      94             :         struct RDMASetupPayload setup;
      95             :         struct RDMAFCPayload fc;
      96             :     } payload;
      97             : };
      98             : 
      99             : /**
     100             :  * "IMM" data sent with RDMA write, tells sink about send progress.
     101             :  */
     102             : struct RDMAFCImm
     103             : {
     104             :     uint32_t bytes_sent : 28;
     105             :     uint32_t fcs_received : 4;
     106             : };
     107             : 
     108             : namespace
     109             : {
     110             : // We send a max of 28 bits worth of byte counts per RDMA write.
     111             : static const uint64_t MAX_BS = ((2 << (28 - 1)) - 1);
     112             : // We send a max of four bits worth of fc counts per RDMA write.
     113             : static const uint16_t MAX_FC = ((2 << (4 - 1)) - 1);
     114             : 
     115             : #ifdef _WIN32
     116             : static const uint32_t RINGBUFFER_ALLOC_RETRIES = 8;
     117             : static const uint32_t WINDOWS_CONNECTION_BACKLOG = 1024;
     118             : #endif
     119             : static const uint32_t FC_MESSAGE_FREQUENCY = 12;
     120             : 
     121           0 : bool _isInetFamily(const rdma_addrinfo *addrinfo)
     122             : {
     123             : #ifndef _WIN32
     124           0 :     return addrinfo->ai_family == AF_INET || addrinfo->ai_family == AF_INET6;
     125             : #else
     126             :     return addrinfo->ai_family == AF_INET;
     127             : #endif
     128             : }
     129             : }
     130             : 
     131             : /**
     132             :  * An RDMA connection implementation.
     133             :  *
     134             :  * The protocol is simple, e.g.:
     135             :  *
     136             :  *      initiator                        target
     137             :  * -----------------------------------------------------
     138             :  *                                  resolve/bind/listen
     139             :  * resolve/prepost/connect
     140             :  *                                    prepost/accept
     141             :  *     send setup         <------->    send setup
     142             :  *   wait for setup                   wait for setup
     143             :  * RDMA_WRITE_WITH_IMM WR  -------> RDMA_WRITE(DATA) WC
     144             :  *     RECV(FC) WC        <-------      SEND WR
     145             :  *                            .
     146             :  *                            .
     147             :  *                            .
     148             :  *
     149             :  * The setup phase exchanges the MR parameters of a fixed size circular buffer
     150             :  * to which remote writes are sent.  Sender tracks available space on the
     151             :  * receiver by accepting "Flow Control" messages that update the tail pointer
     152             :  * of the local "view" of the remote sink MR.
     153             :  *
     154             :  * Once setup is complete, either side may begin operations on the other's MR
     155             :  * (the initiator doesn't have to send first, as in the above example).
     156             :  *
     157             :  * If either credits or buffer space are exhausted, sender will spin waiting
     158             :  * for flow control messages.  Receiver will also not send flow control if
     159             :  * there are no credits available.
     160             :  *
     161             :  * One catch is that Collage will only monitor a single "notifier" for events
     162             :  * and we have three that need to be monitored: one for connection status
     163             :  * events (the RDMA event channel) - RDMA_CM_EVENT_DISCONNECTED in particular,
     164             :  * one for the receive completion queue (upon incoming RDMA write), and an
     165             :  * additional eventfd(2) used to keep the notifier "hot" after partial reads.
     166             :  * We leverage the feature of epoll(7) in that "If an epoll file descriptor
     167             :  * has events waiting then it will indicate as being readable".
     168             :  *
     169             :  * Quite interesting is the effect of RDMA_RING_BUFFER_SIZE_MB and
     170             :  * RDMA_SEND_QUEUE_DEPTH depending on the communication pattern.  Basically,
     171             :  * bigger doesn't necessarily equate to faster!  The defaults are suited for
     172             :  * low latency conditions and would need tuning otherwise.
     173             :  *
     174             :  * ib_write_bw
     175             :  * -----------
     176             :  *  #bytes    num iterations  BW peak[MB/sec]    BW average[MB/sec]
     177             :  * 1048576    10000           3248.10            3247.94
     178             :  *
     179             :  * netperf
     180             :  * -------
     181             :  * Send perf: 3240.72MB/s (3240.72pps)
     182             :  * Send perf: 3240.72MB/s (3240.72pps)
     183             :  * Send perf: 3240.95MB/s (3240.95pps)
     184             :  */
     185           0 : RDMAConnection::RDMAConnection()
     186             : #ifdef _WIN32
     187             :     : _notifier()
     188             : #else
     189             :     : _notifier(-1)
     190             : #endif
     191           0 :     , _timeout(Global::getIAttribute(Global::IATTR_RDMA_RESOLVE_TIMEOUT_MS))
     192             :     , _rai(NULL)
     193             :     , _addrinfo(NULL)
     194             :     , _cm(NULL)
     195             :     , _cm_id(NULL)
     196             :     , _new_cm_id(NULL)
     197             :     , _cc(NULL)
     198             :     , _cq(NULL)
     199             :     , _pd(NULL)
     200             :     , _wcs(0)
     201             :     , _readBytes(0)
     202             :     , _established(false)
     203             :     , _depth(0L)
     204             :     , _writes(0L)
     205             :     , _fcs(0L)
     206             :     , _wcredits(0L)
     207             :     , _fcredits(0L)
     208             :     , _completions(0U)
     209             :     , _msgbuf(sizeof(RDMAMessage))
     210             :     , _sourcebuf(0)
     211             :     , _sourceptr(0)
     212             :     , _sinkbuf(IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE)
     213             :     , _sinkptr(0)
     214             :     , _rptr(0UL)
     215             :     , _rbase(0ULL)
     216           0 :     , _rkey(0ULL)
     217             : #ifdef _WIN32
     218             :     , _availBytes(0)
     219             :     , _eventFlag(0)
     220             :     , _cmWaitObj(0)
     221             :     , _ccWaitObj(0)
     222             : #endif
     223             : {
     224             : #ifndef _WIN32
     225           0 :     _pipe_fd[0] = -1;
     226           0 :     _pipe_fd[1] = -1;
     227             : #endif
     228             : 
     229           0 :     ::memset((void *)&_addr, 0, sizeof(_addr));
     230           0 :     ::memset((void *)&_serv, 0, sizeof(_serv));
     231             : 
     232           0 :     ::memset((void *)&_cpd, 0, sizeof(struct RDMAConnParamData));
     233             : 
     234           0 :     ConnectionDescriptionPtr description = _getDescription();
     235           0 :     description->type = CONNECTIONTYPE_RDMA;
     236           0 :     description->bandwidth = // QDR default, report "actual" 8b/10b bandwidth
     237           0 :         (::ibv_rate_to_mult(IBV_RATE_40_GBPS) * 2.5 * 1024000 / 8) * 0.8;
     238           0 : }
     239             : 
     240           0 : bool RDMAConnection::connect()
     241             : {
     242           0 :     ConstConnectionDescriptionPtr description = getDescription();
     243           0 :     LBASSERT(CONNECTIONTYPE_RDMA == description->type);
     244             : 
     245           0 :     if (!isClosed() || 0u == description->port)
     246           0 :         return false;
     247             : 
     248           0 :     _cleanup();
     249           0 :     _setState(STATE_CONNECTING);
     250             : 
     251           0 :     if (!_lookupAddress(false) || (NULL == _addrinfo))
     252             :     {
     253           0 :         LBERROR << "Failed to lookup destination address." << std::endl;
     254           0 :         goto err;
     255             :     }
     256             : 
     257           0 :     _updateInfo(_addrinfo->ai_dst_addr);
     258             : 
     259           0 :     if (!_createNotifier())
     260             :     {
     261           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     262           0 :         goto err;
     263             :     }
     264             : 
     265           0 :     if (!_createEventChannel())
     266             :     {
     267           0 :         LBERROR << "Failed to create communication event channel." << std::endl;
     268           0 :         goto err;
     269             :     }
     270             : 
     271           0 :     if (!_createId())
     272             :     {
     273           0 :         LBERROR << "Failed to create communication identifier." << std::endl;
     274           0 :         goto err;
     275             :     }
     276             : 
     277           0 :     if (!_resolveAddress())
     278             :     {
     279           0 :         LBERROR << "Failed to resolve destination address for : " << _addr
     280           0 :                 << ":" << _serv << std::endl;
     281           0 :         goto err;
     282             :     }
     283             : 
     284           0 :     _updateInfo(::rdma_get_peer_addr(_cm_id));
     285             : 
     286           0 :     _device_name = ::ibv_get_device_name(_cm_id->verbs->device);
     287             : 
     288           0 :     LBVERB << "Initiating connection on " << std::dec << _device_name << ":"
     289           0 :            << (int)_cm_id->port_num << " to " << _addr << ":" << _serv << " ("
     290           0 :            << description->toString() << ")" << std::endl;
     291             : 
     292           0 :     if (!_initProtocol(
     293             :             Global::getIAttribute(Global::IATTR_RDMA_SEND_QUEUE_DEPTH)))
     294             :     {
     295           0 :         LBERROR << "Failed to initialize protocol variables." << std::endl;
     296           0 :         goto err;
     297             :     }
     298             : 
     299           0 :     if (!_initVerbs())
     300             :     {
     301           0 :         LBERROR << "Failed to initialize verbs." << std::endl;
     302           0 :         goto err;
     303             :     }
     304             : 
     305           0 :     if (!_createQP())
     306             :     {
     307           0 :         LBERROR << "Failed to create queue pair." << std::endl;
     308           0 :         goto err;
     309             :     }
     310             : 
     311           0 :     if (!_createBytesAvailableFD())
     312             :     {
     313           0 :         LBERROR << "Failed to create available byte notifier." << std::endl;
     314           0 :         goto err;
     315             :     }
     316             : 
     317           0 :     if (!_initBuffers())
     318             :     {
     319           0 :         LBERROR << "Failed to initialize ring buffers." << std::endl;
     320           0 :         goto err;
     321             :     }
     322             : 
     323           0 :     if (!_resolveRoute())
     324             :     {
     325           0 :         LBERROR << "Failed to resolve route to destination : " << _addr << ":"
     326           0 :                 << _serv << std::endl;
     327           0 :         goto err;
     328             :     }
     329             : 
     330           0 :     if (!_postReceives(_depth))
     331             :     {
     332           0 :         LBERROR << "Failed to pre-post receives." << std::endl;
     333           0 :         goto err;
     334             :     }
     335             : 
     336           0 :     if (!_connect())
     337             :     {
     338           0 :         LBERROR << "Failed to connect to destination : " << _addr << ":"
     339           0 :                 << _serv << std::endl;
     340           0 :         goto err;
     341             :     }
     342             : 
     343           0 :     LBASSERT(_established);
     344             : 
     345           0 :     if ((RDMA_PROTOCOL_MAGIC != _cpd.magic) ||
     346           0 :         (RDMA_PROTOCOL_VERSION != _cpd.version))
     347             :     {
     348           0 :         LBERROR << "Protocol mismatch with target : " << _addr << ":" << _serv
     349           0 :                 << std::endl;
     350           0 :         goto err;
     351             :     }
     352             : 
     353           0 :     if (!_postSetup())
     354             :     {
     355           0 :         LBERROR << "Failed to post setup message." << std::endl;
     356           0 :         goto err;
     357             :     }
     358             : 
     359           0 :     if (!_waitRecvSetup())
     360             :     {
     361           0 :         LBERROR << "Failed to receive setup message." << std::endl;
     362           0 :         goto err;
     363             :     }
     364             : 
     365           0 :     LBVERB << "Connection established on " << std::dec << _device_name << ":"
     366           0 :            << (int)_cm_id->port_num << " to " << _addr << ":" << _serv << " ("
     367           0 :            << description->toString() << ")" << std::endl;
     368             : 
     369           0 :     _setState(STATE_CONNECTED);
     370           0 :     _updateNotifier();
     371           0 :     return true;
     372             : 
     373             : err:
     374           0 :     LBVERB << "Connection failed on " << std::dec << _device_name << ":"
     375           0 :            << (int)_cm_id->port_num << " to " << _addr << ":" << _serv << " ("
     376           0 :            << description->toString() << ")" << std::endl;
     377             : 
     378           0 :     close();
     379             : 
     380           0 :     return false;
     381             : }
     382             : 
     383           0 : bool RDMAConnection::listen()
     384             : {
     385           0 :     ConstConnectionDescriptionPtr description = getDescription();
     386           0 :     LBASSERT(CONNECTIONTYPE_RDMA == description->type);
     387             : 
     388           0 :     if (!isClosed())
     389           0 :         return false;
     390             : 
     391           0 :     _cleanup();
     392           0 :     _setState(STATE_CONNECTING);
     393             : 
     394           0 :     if (!_lookupAddress(true))
     395             :     {
     396           0 :         LBERROR << "Failed to lookup local address." << std::endl;
     397           0 :         goto err;
     398             :     }
     399             : 
     400           0 :     if (NULL != _addrinfo)
     401           0 :         _updateInfo(_addrinfo->ai_src_addr);
     402             : 
     403           0 :     if (!_createNotifier())
     404             :     {
     405           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     406           0 :         goto err;
     407             :     }
     408             : 
     409           0 :     if (!_createEventChannel())
     410             :     {
     411           0 :         LBERROR << "Failed to create communication event channel." << std::endl;
     412           0 :         goto err;
     413             :     }
     414             : 
     415           0 :     if (!_createId())
     416             :     {
     417           0 :         LBERROR << "Failed to create communication identifier." << std::endl;
     418           0 :         goto err;
     419             :     }
     420             : 
     421             : #if 0
     422             :     /* NOT IMPLEMENTED */
     423             : 
     424             :     if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
     425             :             (void *)&ONE, sizeof(ONE) ))
     426             :     {
     427             :         LBERROR << "rdma_set_option: " << lunchbox::sysError << std::endl;
     428             :         goto err;
     429             :     }
     430             : #endif
     431             : 
     432           0 :     if (!_bindAddress())
     433             :     {
     434           0 :         LBERROR << "Failed to bind to local address : " << _addr << ":" << _serv
     435           0 :                 << std::endl;
     436           0 :         goto err;
     437             :     }
     438             : 
     439           0 :     _updateInfo(::rdma_get_local_addr(_cm_id));
     440             : 
     441             : #ifdef _WIN32
     442             :     if (!_listen(WINDOWS_CONNECTION_BACKLOG))
     443             : #else
     444           0 :     if (!_listen(SOMAXCONN))
     445             : #endif
     446             :     {
     447           0 :         LBERROR << "Failed to listen on bound address : " << _addr << ":"
     448           0 :                 << _serv << std::endl;
     449           0 :         goto err;
     450             :     }
     451             : 
     452           0 :     if (NULL != _cm_id->verbs)
     453           0 :         _device_name = ::ibv_get_device_name(_cm_id->verbs->device);
     454             : 
     455           0 :     LBVERB << "Listening on " << std::dec << _device_name << ":"
     456           0 :            << (int)_cm_id->port_num << " at " << _addr << ":" << _serv << " ("
     457           0 :            << description->toString() << ")" << std::endl;
     458             : 
     459           0 :     _setState(STATE_LISTENING);
     460           0 :     return true;
     461             : 
     462             : err:
     463           0 :     close();
     464           0 :     return false;
     465             : }
     466             : 
     467           0 : void RDMAConnection::acceptNB()
     468             : { /* NOP */
     469           0 : }
     470             : 
     471           0 : ConnectionPtr RDMAConnection::acceptSync()
     472             : {
     473           0 :     RDMAConnection *newConnection = NULL;
     474             : 
     475           0 :     if (!isListening())
     476             :     {
     477           0 :         LBERROR << "Connection not in listening state." << std::endl;
     478           0 :         goto out;
     479             :     }
     480             : 
     481           0 :     if (!_waitForCMEvent(RDMA_CM_EVENT_CONNECT_REQUEST))
     482             :     {
     483           0 :         LBERROR << "Failed to receive valid connect request." << std::endl;
     484           0 :         goto out;
     485             :     }
     486             : 
     487           0 :     LBASSERT(NULL != _new_cm_id);
     488             : 
     489           0 :     newConnection = new RDMAConnection();
     490             : 
     491           0 :     if (!newConnection->_finishAccept(_new_cm_id, _cpd))
     492             :     {
     493           0 :         delete newConnection;
     494           0 :         newConnection = NULL;
     495             :     }
     496             : 
     497             : out:
     498           0 :     _new_cm_id = NULL;
     499           0 :     _updateNotifier();
     500           0 :     return newConnection;
     501             : }
     502             : 
     503           0 : void RDMAConnection::readNB(void *, const uint64_t)
     504             : { /* NOP */
     505           0 : }
     506             : 
     507           0 : int64_t RDMAConnection::readSync(void *buffer, const uint64_t bytes,
     508             :                                  const bool block)
     509             : {
     510           0 :     lunchbox::Clock clock;
     511           0 :     const int64_t start = clock.getTime64();
     512           0 :     const uint32_t timeout = Global::getTimeout();
     513           0 :     eventset events;
     514           0 :     uint64_t available_bytes = 0ULL;
     515           0 :     uint32_t bytes_taken = 0UL;
     516           0 :     bool extra_event = false;
     517             : 
     518           0 :     if (!isConnected())
     519           0 :         goto err;
     520             : 
     521             :     //    LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
     522             :     //       << std::endl;
     523             : 
     524           0 :     _stats.reads++;
     525             : 
     526             : retry:
     527           0 :     if (!_checkDisconnected(events))
     528             :     {
     529           0 :         LBERROR << "Error while checking event state." << std::endl;
     530           0 :         goto err;
     531             :     }
     532             : 
     533           0 :     if (events.test(CQ_EVENT))
     534             :     {
     535           0 :         if (!_rearmCQ())
     536             :         {
     537           0 :             LBERROR << "Error while rearming receive channel." << std::endl;
     538           0 :             goto err;
     539             :         }
     540             :     }
     541             : 
     542             :     // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
     543           0 :     if (!_checkCQ(events.test(CQ_EVENT)))
     544             :     {
     545           0 :         LBERROR << "Error while polling completion queues." << std::endl;
     546           0 :         goto err;
     547             :     }
     548             : 
     549           0 :     LBASSERT(_fcredits >= 0L);
     550             : 
     551           0 :     if (_established && _needFC() && (0L == _fcredits))
     552             :     {
     553           0 :         if (LB_TIMEOUT_INDEFINITE != timeout)
     554             :         {
     555           0 :             if ((clock.getTime64() - start) > timeout)
     556             :             {
     557           0 :                 LBERROR << "Timed out trying to acquire credit." << std::endl;
     558           0 :                 goto err;
     559             :             }
     560             :         }
     561             : 
     562             :         // LBWARN << "No credit for flow control." << std::endl;
     563           0 :         lunchbox::Thread::yield();
     564           0 :         _stats.no_credits_fc++;
     565           0 :         goto retry;
     566             :     }
     567             : 
     568             :     // "Note that an extra event may be triggered without having a
     569             :     // corresponding completion entry in the CQ." (per ibv_get_cq_event(3))
     570           0 :     if (_established && !events.test(BUF_EVENT))
     571             :     {
     572             :         // Special case: If LocalNode is reading the length part of a message
     573             :         // it will ignore this zero return and restart the select.
     574           0 :         if (extra_event && !block)
     575             :         {
     576           0 :             _updateNotifier();
     577           0 :             return 0LL;
     578             :         }
     579             : 
     580           0 :         extra_event = true;
     581           0 :         goto retry;
     582             :     }
     583             : 
     584           0 :     if (events.test(BUF_EVENT))
     585             :     {
     586           0 :         available_bytes = _getAvailableBytes();
     587           0 :         if (0ULL == available_bytes)
     588             :         {
     589           0 :             LBERROR << "Error while reading from event fd." << std::endl;
     590           0 :             goto err;
     591             :         }
     592             :     }
     593             : 
     594             :     // Modifies _sinkptr.TAIL
     595           0 :     bytes_taken = _drain(buffer, bytes);
     596             : 
     597           0 :     if (0UL == bytes_taken)
     598             :     {
     599           0 :         if (_sinkptr.isEmpty() && !_established)
     600             :         {
     601           0 :             LBINFO << "Got EOF, closing " << getDescription()->toString()
     602           0 :                    << std::endl;
     603           0 :             goto err;
     604             :         }
     605             : 
     606           0 :         if (LB_TIMEOUT_INDEFINITE != timeout)
     607             :         {
     608           0 :             if ((clock.getTime64() - start) > timeout)
     609             :             {
     610           0 :                 LBERROR << "Timed out trying to drain buffer." << std::endl;
     611           0 :                 goto err;
     612             :             }
     613             :         }
     614             : 
     615             :         // LBWARN << "Sink buffer empty." << std::endl;
     616           0 :         lunchbox::Thread::yield();
     617           0 :         _stats.buffer_empty++;
     618           0 :         goto retry;
     619             :     }
     620             : 
     621             :     // Put back what wasn't taken (ensure the master notifier stays "hot").
     622           0 :     if (available_bytes > bytes_taken)
     623           0 :         _incrAvailableBytes(available_bytes - bytes_taken);
     624             : #ifdef _WIN32
     625             :     else
     626             :         _updateNotifier();
     627             : #endif
     628             : 
     629           0 :     _readBytes += bytes_taken;
     630             : 
     631           0 :     if (_established && _needFC() && !_postFC())
     632           0 :         LBWARN << "Error while posting flow control message." << std::endl;
     633             : 
     634             :     //    LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
     635             :     //       << " took " << bytes_taken << " bytes"
     636             :     //       << " (" << _sinkptr.available( ) << " still available)" <<
     637             :     //       std::endl;
     638             : 
     639           0 :     return bytes_taken;
     640             : 
     641             : err:
     642           0 :     close();
     643             : 
     644           0 :     return -1LL;
     645             : }
     646             : 
     647           0 : int64_t RDMAConnection::write(const void *buffer, const uint64_t bytes)
     648             : {
     649           0 :     lunchbox::Clock clock;
     650           0 :     const int64_t start = clock.getTime64();
     651           0 :     const uint32_t timeout = Global::getTimeout();
     652           0 :     eventset events;
     653           0 :     const uint32_t can_put = std::min(bytes, MAX_BS);
     654             :     uint32_t bytes_put;
     655             : 
     656           0 :     if (!isConnected())
     657           0 :         goto err;
     658             : 
     659             :     //    LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
     660             :     //        << std::endl;
     661             : 
     662           0 :     _stats.writes++;
     663             : 
     664             : retry:
     665           0 :     if (!_checkDisconnected(events))
     666             :     {
     667           0 :         LBERROR << "Error while checking connection state." << std::endl;
     668           0 :         goto err;
     669             :     }
     670             : 
     671           0 :     if (!_established)
     672             :     {
     673           0 :         LBWARN << "Disconnected in write." << std::endl;
     674           0 :         goto err;
     675             :     }
     676             : 
     677             :     // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
     678           0 :     if (!_checkCQ(false))
     679             :     {
     680           0 :         LBERROR << "Error while polling completion queues." << std::endl;
     681           0 :         goto err;
     682             :     }
     683             : 
     684           0 :     LBASSERT(_wcredits >= 0L);
     685             : 
     686           0 :     if (0L == _wcredits)
     687             :     {
     688           0 :         if (LB_TIMEOUT_INDEFINITE != timeout)
     689             :         {
     690           0 :             if ((clock.getTime64() - start) > timeout)
     691             :             {
     692           0 :                 LBERROR << "Timed out trying to acquire credit." << std::endl;
     693           0 :                 goto err;
     694             :             }
     695             :         }
     696             : 
     697             :         // LBWARN << "No credits for RDMA." << std::endl;
     698           0 :         lunchbox::Thread::yield();
     699           0 :         _stats.no_credits_rdma++;
     700           0 :         goto retry;
     701             :     }
     702             : 
     703             :     // Modifies _sourceptr.HEAD
     704           0 :     bytes_put = _fill(buffer, can_put);
     705             : 
     706           0 :     if (0UL == bytes_put)
     707             :     {
     708           0 :         if (LB_TIMEOUT_INDEFINITE != timeout)
     709             :         {
     710           0 :             if ((clock.getTime64() - start) > timeout)
     711             :             {
     712           0 :                 LBERROR << "Timed out trying to fill buffer." << std::endl;
     713           0 :                 goto err;
     714             :             }
     715             :         }
     716             : 
     717             :         // LBWARN << "Source buffer full." << std::endl;
     718           0 :         lunchbox::Thread::yield();
     719           0 :         if (_sourceptr.isFull() || _rptr.isFull())
     720           0 :             _stats.buffer_full++;
     721           0 :         goto retry;
     722             :     }
     723             : 
     724             :     // Modifies _sourceptr.MIDDLE & _rptr.HEAD
     725           0 :     if (!_postRDMAWrite())
     726             :     {
     727           0 :         LBERROR << "Error while posting RDMA write." << std::endl;
     728           0 :         goto err;
     729             :     }
     730             : 
     731             :     //    LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
     732             :     //       << " put " << bytes_put << " bytes" << std::endl;
     733             : 
     734           0 :     return bytes_put;
     735             : 
     736             : err:
     737           0 :     return -1LL;
     738             : }
     739             : 
     740           0 : RDMAConnection::~RDMAConnection()
     741             : {
     742           0 :     _close();
     743           0 : }
     744             : 
     745             : ////////////////////////////////////////////////////////////////////////////////
     746             : 
     747           0 : void RDMAConnection::_close()
     748             : {
     749           0 :     lunchbox::ScopedWrite close_mutex(_poll_lock);
     750             : 
     751           0 :     if (!isClosed())
     752             :     {
     753           0 :         LBASSERT(!isClosing());
     754           0 :         _setState(STATE_CLOSING);
     755             : 
     756           0 :         if (_established && ::rdma_disconnect(_cm_id))
     757           0 :             LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
     758             : 
     759           0 :         _setState(STATE_CLOSED);
     760           0 :         _cleanup();
     761             :     }
     762           0 : }
     763             : 
     764           0 : void RDMAConnection::_cleanup()
     765             : {
     766           0 :     LBASSERT(isClosed());
     767             : 
     768           0 :     _sourcebuf.clear();
     769           0 :     _sinkbuf.clear();
     770           0 :     _msgbuf.clear();
     771             : 
     772           0 :     if (_completions > 0U)
     773             :     {
     774           0 :         ::ibv_ack_cq_events(_cq, _completions);
     775           0 :         _completions = 0U;
     776             :     }
     777             : 
     778           0 :     delete[] _wcs;
     779           0 :     _wcs = 0;
     780             : 
     781             : #ifdef _WIN32
     782             :     if (_ccWaitObj)
     783             :         UnregisterWait(_ccWaitObj);
     784             :     _ccWaitObj = 0;
     785             : 
     786             :     if (_cmWaitObj)
     787             :         UnregisterWait(_cmWaitObj);
     788             :     _cmWaitObj = 0;
     789             : #endif
     790             : 
     791           0 :     if (NULL != _cm_id)
     792           0 :         ::rdma_destroy_ep(_cm_id);
     793           0 :     _cm_id = NULL;
     794             : 
     795           0 :     if ((NULL != _cq) && ::rdma_seterrno(::ibv_destroy_cq(_cq)))
     796           0 :         LBWARN << "ibv_destroy_cq : " << lunchbox::sysError << std::endl;
     797           0 :     _cq = NULL;
     798             : 
     799           0 :     if ((NULL != _cc) && ::rdma_seterrno(::ibv_destroy_comp_channel(_cc)))
     800           0 :         LBWARN << "ibv_destroy_comp_channel : " << lunchbox::sysError
     801           0 :                << std::endl;
     802           0 :     _cc = NULL;
     803             : 
     804           0 :     if ((NULL != _pd) && ::rdma_seterrno(::ibv_dealloc_pd(_pd)))
     805           0 :         LBWARN << "ibv_dealloc_pd : " << lunchbox::sysError << std::endl;
     806           0 :     _pd = NULL;
     807             : 
     808           0 :     if (NULL != _cm)
     809           0 :         ::rdma_destroy_event_channel(_cm);
     810           0 :     _cm = NULL;
     811             : 
     812           0 :     if (NULL != _rai)
     813           0 :         ::rdma_freeaddrinfo(_rai);
     814           0 :     _rai = NULL;
     815             : 
     816           0 :     _rptr = 0UL;
     817           0 :     _rbase = _rkey = 0ULL;
     818             : #ifndef _WIN32
     819           0 :     if ((_notifier >= 0) && TEMP_FAILURE_RETRY(::close(_notifier)))
     820           0 :         LBWARN << "close : " << lunchbox::sysError << std::endl;
     821           0 :     _notifier = -1;
     822             : #endif
     823           0 : }
     824             : 
     825           0 : bool RDMAConnection::_finishAccept(struct rdma_cm_id *new_cm_id,
     826             :                                    const RDMAConnParamData &cpd)
     827             : {
     828           0 :     LBASSERT(isClosed());
     829           0 :     _setState(STATE_CONNECTING);
     830             : 
     831           0 :     LBASSERT(NULL != new_cm_id);
     832             : 
     833           0 :     _cm_id = new_cm_id;
     834             : 
     835             :     {
     836             :         // FIXME: RDMA CM appears to send up invalid addresses when receiving
     837             :         // connections that use a different protocol than what was bound.  E.g.
     838             :         // if an IPv6 listener gets an IPv4 connection then the sa_family
     839             :         // will be AF_INET6 but the actual data is struct sockaddr_in.  Example:
     840             :         //
     841             :         // 0000000: 0a00 bc10 c0a8 b01a 0000 0000 0000 0000  ................
     842             :         //
     843             :         // However, in the reverse case, when an IPv4 listener gets an IPv6
     844             :         // connection not only is the address family incorrect, but the actual
     845             :         // IPv6 address is only partially there:
     846             :         //
     847             :         // 0000000: 0200 bc11 0000 0000 fe80 0000 0000 0000  ................
     848             :         // 0000010: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     849             :         // 0000020: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     850             :         // 0000030: 0000 0000 0000 0000 0000 0000 0000 0000  ................
     851             :         //
     852             :         // Surely seems to be a bug in RDMA CM.
     853             : 
     854             :         union {
     855             :             struct sockaddr addr;
     856             :             struct sockaddr_in sin;
     857             :             struct sockaddr_in6 sin6;
     858             :             struct sockaddr_storage storage;
     859             :         } sss;
     860             : 
     861             :         // Make a copy since we might change it.
     862             :         // sss.storage = _cm_id->route.addr.dst_storage;
     863             :         ::memcpy((void *)&sss.storage,
     864           0 :                  (const void *)::rdma_get_peer_addr(_cm_id),
     865           0 :                  sizeof(struct sockaddr_storage));
     866             : #ifndef _WIN32
     867           0 :         if ((AF_INET == sss.storage.ss_family) &&
     868           0 :             (sss.sin6.sin6_addr.s6_addr32[0] != 0 ||
     869           0 :              sss.sin6.sin6_addr.s6_addr32[1] != 0 ||
     870           0 :              sss.sin6.sin6_addr.s6_addr32[2] != 0 ||
     871           0 :              sss.sin6.sin6_addr.s6_addr32[3] != 0))
     872             :         {
     873           0 :             LBWARN << "IPv6 address detected but likely invalid!" << std::endl;
     874           0 :             sss.storage.ss_family = AF_INET6;
     875             :         }
     876             :         else
     877             : #endif
     878           0 :             if ((AF_INET6 == sss.storage.ss_family) &&
     879           0 :                 (INADDR_ANY != sss.sin.sin_addr.s_addr))
     880             :         {
     881           0 :             sss.storage.ss_family = AF_INET;
     882             :         }
     883             : 
     884           0 :         _updateInfo(&sss.addr);
     885             :     }
     886             : 
     887           0 :     _device_name = ::ibv_get_device_name(_cm_id->verbs->device);
     888             : 
     889           0 :     LBVERB << "Connection initiated on " << std::dec << _device_name << ":"
     890           0 :            << (int)_cm_id->port_num << " from " << _addr << ":" << _serv << " ("
     891           0 :            << getDescription()->toString() << ")" << std::endl;
     892             : 
     893           0 :     if ((RDMA_PROTOCOL_MAGIC != cpd.magic) ||
     894           0 :         (RDMA_PROTOCOL_VERSION != cpd.version))
     895             :     {
     896           0 :         LBERROR << "Protocol mismatch with initiator : " << _addr << ":"
     897           0 :                 << _serv << std::endl;
     898           0 :         goto err_reject;
     899             :     }
     900             : 
     901           0 :     if (!_createNotifier())
     902             :     {
     903           0 :         LBERROR << "Failed to create master notifier." << std::endl;
     904           0 :         goto err_reject;
     905             :     }
     906             : 
     907           0 :     if (!_createEventChannel())
     908             :     {
     909           0 :         LBERROR << "Failed to create event channel." << std::endl;
     910           0 :         goto err_reject;
     911             :     }
     912             : 
     913           0 :     if (!_migrateId())
     914             :     {
     915           0 :         LBERROR << "Failed to migrate communication identifier." << std::endl;
     916           0 :         goto err_reject;
     917             :     }
     918             : 
     919           0 :     if (!_initProtocol(cpd.depth))
     920             :     {
     921           0 :         LBERROR << "Failed to initialize protocol variables." << std::endl;
     922           0 :         goto err_reject;
     923             :     }
     924             : 
     925           0 :     if (!_initVerbs())
     926             :     {
     927           0 :         LBERROR << "Failed to initialize verbs." << std::endl;
     928           0 :         goto err_reject;
     929             :     }
     930             : 
     931           0 :     if (!_createQP())
     932             :     {
     933           0 :         LBERROR << "Failed to create queue pair." << std::endl;
     934           0 :         goto err_reject;
     935             :     }
     936             : 
     937           0 :     if (!_createBytesAvailableFD())
     938             :     {
     939           0 :         LBERROR << "Failed to create available byte notifier." << std::endl;
     940           0 :         goto err_reject;
     941             :     }
     942             : 
     943           0 :     if (!_initBuffers())
     944             :     {
     945           0 :         LBERROR << "Failed to initialize ring buffers." << std::endl;
     946           0 :         goto err_reject;
     947             :     }
     948             : 
     949           0 :     if (!_postReceives(_depth))
     950             :     {
     951           0 :         LBERROR << "Failed to pre-post receives." << std::endl;
     952           0 :         goto err_reject;
     953             :     }
     954             : 
     955           0 :     if (!_accept())
     956             :     {
     957           0 :         LBERROR << "Failed to accept remote connection from : " << _addr << ":"
     958           0 :                 << _serv << std::endl;
     959           0 :         goto err;
     960             :     }
     961             : 
     962           0 :     LBASSERT(_established);
     963             : 
     964           0 :     if (!_waitRecvSetup())
     965             :     {
     966           0 :         LBERROR << "Failed to receive setup message." << std::endl;
     967           0 :         goto err;
     968             :     }
     969             : 
     970           0 :     if (!_postSetup())
     971             :     {
     972           0 :         LBERROR << "Failed to post setup message." << std::endl;
     973           0 :         goto err;
     974             :     }
     975             : 
     976           0 :     LBVERB << "Connection accepted on " << std::dec << _device_name << ":"
     977           0 :            << (int)_cm_id->port_num << " from " << _addr << ":" << _serv << " ("
     978           0 :            << getDescription()->toString() << ")" << std::endl;
     979             : 
     980           0 :     _setState(STATE_CONNECTED);
     981           0 :     _updateNotifier();
     982           0 :     return true;
     983             : 
     984             : err_reject:
     985           0 :     LBWARN << "Rejecting connection from remote address : " << _addr << ":"
     986           0 :            << _serv << std::endl;
     987             : 
     988           0 :     if (!_reject())
     989           0 :         LBWARN << "Failed to issue connection reject." << std::endl;
     990             : 
     991             : err:
     992           0 :     close();
     993             : 
     994           0 :     return false;
     995             : }
     996             : 
     997           0 : bool RDMAConnection::_lookupAddress(const bool passive)
     998             : {
     999             :     struct rdma_addrinfo hints;
    1000           0 :     char *node = 0;
    1001           0 :     char *service = 0;
    1002           0 :     std::string s;
    1003             : 
    1004           0 :     ::memset((void *)&hints, 0, sizeof(struct rdma_addrinfo));
    1005           0 :     if (passive)
    1006           0 :         hints.ai_flags |= RAI_PASSIVE;
    1007             : 
    1008           0 :     ConstConnectionDescriptionPtr description = getDescription();
    1009           0 :     const std::string &hostname = description->getHostname();
    1010           0 :     if (!hostname.empty())
    1011           0 :         node = const_cast<char *>(hostname.c_str());
    1012             : 
    1013           0 :     if (0u != description->port)
    1014             :     {
    1015           0 :         std::stringstream ss;
    1016           0 :         ss << description->port;
    1017           0 :         s = ss.str();
    1018           0 :         service = const_cast<char *>(s.c_str());
    1019             :     }
    1020             : 
    1021           0 :     if ((NULL != node) && ::rdma_getaddrinfo(node, service, &hints, &_rai))
    1022             :     {
    1023           0 :         LBERROR << "rdma_getaddrinfo: " << lunchbox::sysError << std::endl;
    1024           0 :         return false;
    1025             :     }
    1026             : 
    1027           0 :     _addrinfo = _rai;
    1028           0 :     while (_addrinfo && !_isInetFamily(_addrinfo))
    1029           0 :         _addrinfo = _addrinfo->ai_next;
    1030             : 
    1031           0 :     if (!_addrinfo)
    1032             :     {
    1033           0 :         if (_rai)
    1034             :         {
    1035           0 :             LBWARN << "No IP or IPv6 address found by rdma_getaddrinfo. "
    1036           0 :                       "Connection establishment may fail."
    1037           0 :                    << std::endl;
    1038             :         }
    1039           0 :         _addrinfo = _rai;
    1040             :     }
    1041             : 
    1042           0 :     if ((NULL != _addrinfo) && (_addrinfo->ai_connect_len > 0))
    1043           0 :         LBWARN << "WARNING : ai_connect data specified!" << std::endl;
    1044             : 
    1045           0 :     return true;
    1046             : }
    1047             : 
    1048           0 : void RDMAConnection::_updateInfo(struct sockaddr *addr)
    1049             : {
    1050           0 :     int salen = sizeof(struct sockaddr);
    1051           0 :     bool is_unspec = false;
    1052             : 
    1053           0 :     if (AF_INET == addr->sa_family)
    1054             :     {
    1055           0 :         struct sockaddr_in *sin = reinterpret_cast<struct sockaddr_in *>(addr);
    1056           0 :         is_unspec = (INADDR_ANY == sin->sin_addr.s_addr);
    1057           0 :         salen = sizeof(struct sockaddr_in);
    1058             :     }
    1059             : // TODO: IPv6 for WIndows
    1060             : #ifndef _WIN32
    1061           0 :     else if (AF_INET6 == addr->sa_family)
    1062             :     {
    1063             :         struct sockaddr_in6 *sin6 =
    1064           0 :             reinterpret_cast<struct sockaddr_in6 *>(addr);
    1065             : 
    1066           0 :         is_unspec = (sin6->sin6_addr.s6_addr32[0] == 0 &&
    1067           0 :                      sin6->sin6_addr.s6_addr32[1] == 0 &&
    1068           0 :                      sin6->sin6_addr.s6_addr32[2] == 0 &&
    1069           0 :                      sin6->sin6_addr.s6_addr32[3] == 0);
    1070           0 :         salen = sizeof(struct sockaddr_in6);
    1071             :     }
    1072             : #endif
    1073             :     else
    1074             :     {
    1075           0 :         LBERROR << "Unsupported socket address family " << addr->sa_family
    1076           0 :                 << std::endl;
    1077           0 :         return;
    1078             :     }
    1079             : 
    1080             :     int err;
    1081           0 :     if ((err = ::getnameinfo(addr, salen, _addr, sizeof(_addr), _serv,
    1082             :                              sizeof(_serv), NI_NUMERICHOST | NI_NUMERICSERV)))
    1083           0 :         LBWARN << "Name info lookup failed : " << err << std::endl;
    1084             : 
    1085           0 :     if (is_unspec)
    1086           0 :         ::gethostname(_addr, NI_MAXHOST);
    1087             : 
    1088           0 :     ConnectionDescriptionPtr description = _getDescription();
    1089           0 :     if (description->getHostname().empty())
    1090           0 :         description->setHostname(_addr);
    1091           0 :     if (0u == description->port)
    1092           0 :         description->port = atoi(_serv);
    1093             : }
    1094             : 
    1095           0 : bool RDMAConnection::_createEventChannel()
    1096             : {
    1097           0 :     LBASSERT(NULL == _cm);
    1098             : 
    1099           0 :     _cm = ::rdma_create_event_channel();
    1100           0 :     if (NULL == _cm)
    1101             :     {
    1102           0 :         LBERROR << "rdma_create_event_channel : " << lunchbox::sysError
    1103           0 :                 << std::endl;
    1104           0 :         return false;
    1105             :     }
    1106             : 
    1107             : #ifdef _WIN32
    1108             :     if (!RegisterWaitForSingleObject(&_cmWaitObj, _cm->channel.Event,
    1109             :                                      WAITORTIMERCALLBACK(&_triggerNotifierCM),
    1110             :                                      this, INFINITE, WT_EXECUTEINWAITTHREAD))
    1111             :     {
    1112             :         LBERROR << "RegisterWaitForSingleObject: " << co::base::sysError
    1113             :                 << std::endl;
    1114             :         return false;
    1115             :     }
    1116             : #else
    1117             :     struct epoll_event evctl;
    1118             : 
    1119           0 :     LBASSERT(_notifier >= 0);
    1120             : 
    1121             :     // Use the CM fd to signal Collage of connection events.
    1122           0 :     ::memset((void *)&evctl, 0, sizeof(struct epoll_event));
    1123           0 :     evctl.events = EPOLLIN;
    1124           0 :     evctl.data.fd = _cm->fd;
    1125           0 :     if (::epoll_ctl(_notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl))
    1126             :     {
    1127           0 :         LBERROR << "epoll_ctl: " << lunchbox::sysError << std::endl;
    1128           0 :         return false;
    1129             :     }
    1130             : #endif
    1131             : 
    1132           0 :     return true;
    1133             : }
    1134             : 
    1135           0 : bool RDMAConnection::_createId()
    1136             : {
    1137           0 :     LBASSERT(NULL != _cm);
    1138           0 :     LBASSERT(NULL == _cm_id);
    1139             : 
    1140           0 :     if (::rdma_create_id(_cm, &_cm_id, NULL, RDMA_PS_TCP))
    1141             :     {
    1142           0 :         LBERROR << "rdma_create_id: " << lunchbox::sysError << std::endl;
    1143           0 :         return false;
    1144             :     }
    1145             : 
    1146           0 :     return true;
    1147             : }
    1148             : 
    1149           0 : bool RDMAConnection::_initVerbs()
    1150             : {
    1151           0 :     LBASSERT(NULL != _cm_id);
    1152           0 :     LBASSERT(NULL != _cm_id->verbs);
    1153             : 
    1154           0 :     LBASSERT(NULL == _pd);
    1155             : 
    1156             :     // Allocate protection domain.
    1157           0 :     _pd = ::ibv_alloc_pd(_cm_id->verbs);
    1158           0 :     if (NULL == _pd)
    1159             :     {
    1160           0 :         LBERROR << "ibv_alloc_pd: " << lunchbox::sysError << std::endl;
    1161           0 :         return false;
    1162             :     }
    1163             : 
    1164           0 :     LBASSERT(NULL == _cc);
    1165             : 
    1166             :     // Create completion channel.
    1167           0 :     _cc = ::ibv_create_comp_channel(_cm_id->verbs);
    1168           0 :     if (NULL == _cc)
    1169             :     {
    1170           0 :         LBERROR << "ibv_create_comp_channel : " << lunchbox::sysError
    1171           0 :                 << std::endl;
    1172           0 :         return false;
    1173             :     }
    1174             : 
    1175             : #ifdef _WIN32
    1176             :     if (!RegisterWaitForSingleObject(&_ccWaitObj, _cc->comp_channel.Event,
    1177             :                                      WAITORTIMERCALLBACK(&_triggerNotifierCQ),
    1178             :                                      this, INFINITE, WT_EXECUTEINWAITTHREAD))
    1179             :     {
    1180             :         LBERROR << "RegisterWaitForSingleObject : " << lunchbox::sysError
    1181             :                 << std::endl;
    1182             :         return false;
    1183             :     }
    1184             : #else
    1185           0 :     LBASSERT(_notifier >= 0);
    1186             : 
    1187             :     // Use the completion channel fd to signal Collage of RDMA writes received.
    1188             :     struct epoll_event evctl;
    1189           0 :     ::memset((void *)&evctl, 0, sizeof(struct epoll_event));
    1190           0 :     evctl.events = EPOLLIN;
    1191           0 :     evctl.data.fd = _cc->fd;
    1192           0 :     if (::epoll_ctl(_notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl))
    1193             :     {
    1194           0 :         LBERROR << "epoll_ctl: " << lunchbox::sysError << std::endl;
    1195           0 :         return false;
    1196             :     }
    1197             : #endif
    1198             : 
    1199           0 :     LBASSERT(NULL == _cq);
    1200             : 
    1201             :     // Create a single completion queue for both sends & receives */
    1202           0 :     _cq = ::ibv_create_cq(_cm_id->verbs, _depth * 2, NULL, _cc, 0);
    1203           0 :     if (NULL == _cq)
    1204             :     {
    1205           0 :         LBERROR << "ibv_create_cq: " << lunchbox::sysError << std::endl;
    1206           0 :         return false;
    1207             :     }
    1208             : 
    1209             :     // Request IBV_SEND_SOLICITED events only (i.e. RDMA writes, not FC)
    1210           0 :     if (::rdma_seterrno(::ibv_req_notify_cq(_cq, 1)))
    1211             :     {
    1212           0 :         LBERROR << "ibv_req_notify_cq: " << lunchbox::sysError << std::endl;
    1213           0 :         return false;
    1214             :     }
    1215             : 
    1216           0 :     _wcs = new struct ibv_wc[_depth];
    1217           0 :     return true;
    1218             : }
    1219             : 
    1220           0 : bool RDMAConnection::_createQP()
    1221             : {
    1222             :     struct ibv_qp_init_attr init_attr;
    1223             : 
    1224           0 :     LBASSERT(_depth > 0);
    1225           0 :     LBASSERT(NULL != _cm_id);
    1226           0 :     LBASSERT(NULL != _pd);
    1227           0 :     LBASSERT(NULL != _cq);
    1228             : 
    1229           0 :     ::memset((void *)&init_attr, 0, sizeof(struct ibv_qp_init_attr));
    1230           0 :     init_attr.qp_type = IBV_QPT_RC;
    1231           0 :     init_attr.cap.max_send_wr = _depth;
    1232           0 :     init_attr.cap.max_recv_wr = _depth;
    1233           0 :     init_attr.cap.max_recv_sge = 1;
    1234           0 :     init_attr.cap.max_send_sge = 1;
    1235           0 :     init_attr.recv_cq = _cq;
    1236           0 :     init_attr.send_cq = _cq;
    1237           0 :     init_attr.sq_sig_all = 1; // i.e. always IBV_SEND_SIGNALED
    1238             : 
    1239             :     // Create queue pair.
    1240           0 :     if (::rdma_create_qp(_cm_id, _pd, &init_attr))
    1241             :     {
    1242           0 :         LBERROR << "rdma_create_qp: " << lunchbox::sysError << std::endl;
    1243           0 :         return false;
    1244             :     }
    1245             : 
    1246           0 :     LBVERB << "RDMA QP caps : " << std::dec << init_attr.cap.max_recv_wr
    1247           0 :            << " receives, " << init_attr.cap.max_send_wr << " sends, "
    1248           0 :            << std::endl;
    1249             : 
    1250           0 :     return true;
    1251             : }
    1252             : 
    1253           0 : bool RDMAConnection::_initBuffers()
    1254             : {
    1255             :     const size_t rbs =
    1256           0 :         1024 * 1024 *
    1257           0 :         Global::getIAttribute(Global::IATTR_RDMA_RING_BUFFER_SIZE_MB);
    1258             : 
    1259           0 :     LBASSERT(_depth > 0);
    1260           0 :     LBASSERT(NULL != _pd);
    1261             : 
    1262           0 :     if (0 == rbs)
    1263             :     {
    1264           0 :         LBERROR << "Invalid RDMA ring buffer size." << std::endl;
    1265           0 :         return false;
    1266             :     }
    1267             : 
    1268           0 :     if (!_sourcebuf.resize(_pd, rbs))
    1269             :     {
    1270           0 :         LBERROR << "Failed to resize source buffer." << std::endl;
    1271           0 :         return false;
    1272             :     }
    1273             : 
    1274           0 :     if (!_sinkbuf.resize(_pd, rbs))
    1275             :     {
    1276           0 :         LBERROR << "Failed to resize sink buffer." << std::endl;
    1277           0 :         return false;
    1278             :     }
    1279             : 
    1280           0 :     _sourceptr.clear(uint32_t(_sourcebuf.getSize()));
    1281           0 :     _sinkptr.clear(uint32_t(_sinkbuf.getSize()));
    1282             : 
    1283             :     // Need enough space for both sends and receives.
    1284           0 :     if (!_msgbuf.resize(_pd, _depth * 2))
    1285             :     {
    1286           0 :         LBERROR << "Failed to resize message buffer pool." << std::endl;
    1287           0 :         return false;
    1288             :     }
    1289             : 
    1290           0 :     return true;
    1291             : }
    1292             : 
    1293           0 : bool RDMAConnection::_resolveAddress()
    1294             : {
    1295           0 :     LBASSERT(NULL != _cm_id);
    1296           0 :     LBASSERT(NULL != _addrinfo);
    1297             : 
    1298           0 :     if (::rdma_resolve_addr(_cm_id, _addrinfo->ai_src_addr,
    1299           0 :                             _addrinfo->ai_dst_addr, _timeout))
    1300             :     {
    1301           0 :         LBERROR << "rdma_resolve_addr: " << lunchbox::sysError << std::endl;
    1302           0 :         return false;
    1303             :     }
    1304             : 
    1305           0 :     return _waitForCMEvent(RDMA_CM_EVENT_ADDR_RESOLVED);
    1306             : }
    1307             : 
    1308           0 : bool RDMAConnection::_resolveRoute()
    1309             : {
    1310           0 :     LBASSERT(NULL != _cm_id);
    1311           0 :     LBASSERT(NULL != _addrinfo);
    1312             : 
    1313           0 :     if ((IBV_TRANSPORT_IB == _cm_id->verbs->device->transport_type) &&
    1314           0 :         (_addrinfo->ai_route_len > 0))
    1315             :     {
    1316             : #ifdef _WIN32
    1317             :         if (::rdma_set_option(_cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
    1318             : #else
    1319           0 :         if (::rdma_set_option(_cm_id, RDMA_OPTION_IB, RDMA_OPTION_IB_PATH,
    1320             : #endif
    1321           0 :                               _addrinfo->ai_route, _addrinfo->ai_route_len))
    1322             :         {
    1323           0 :             LBERROR << "rdma_set_option: " << lunchbox::sysError << std::endl;
    1324           0 :             return false;
    1325             :         }
    1326             :     }
    1327           0 :     else if (::rdma_resolve_route(_cm_id, _timeout))
    1328             :     {
    1329           0 :         LBERROR << "rdma_resolve_route: " << lunchbox::sysError << std::endl;
    1330           0 :         return false;
    1331             :     }
    1332             : 
    1333           0 :     return _waitForCMEvent(RDMA_CM_EVENT_ROUTE_RESOLVED);
    1334             : }
    1335             : 
    1336           0 : bool RDMAConnection::_connect()
    1337             : {
    1338           0 :     LBASSERT(NULL != _cm_id);
    1339           0 :     LBASSERT(!_established);
    1340             : 
    1341             :     struct rdma_conn_param conn_param;
    1342             : 
    1343           0 :     ::memset((void *)&conn_param, 0, sizeof(struct rdma_conn_param));
    1344             : 
    1345           0 :     _cpd.magic = RDMA_PROTOCOL_MAGIC;
    1346           0 :     _cpd.version = RDMA_PROTOCOL_VERSION;
    1347           0 :     _cpd.depth = _depth;
    1348           0 :     conn_param.private_data = reinterpret_cast<const void *>(&_cpd);
    1349           0 :     conn_param.private_data_len = sizeof(struct RDMAConnParamData);
    1350           0 :     conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
    1351           0 :     conn_param.responder_resources = RDMA_MAX_RESP_RES;
    1352             :     // Magic 3-bit values.
    1353             :     // conn_param.retry_count = 5;
    1354             :     // conn_param.rnr_retry_count = 7;
    1355             : 
    1356           0 :     LBINFO << "Connect on source lid : " << std::showbase << std::hex
    1357           0 :            << ntohs(_cm_id->route.path_rec->slid) << " (" << std::dec
    1358           0 :            << ntohs(_cm_id->route.path_rec->slid) << ") "
    1359           0 :            << "to dest lid : " << std::hex
    1360           0 :            << ntohs(_cm_id->route.path_rec->dlid) << " (" << std::dec
    1361           0 :            << ntohs(_cm_id->route.path_rec->dlid) << ") " << std::endl;
    1362             : 
    1363           0 :     if (::rdma_connect(_cm_id, &conn_param))
    1364             :     {
    1365           0 :         LBERROR << "rdma_connect: " << lunchbox::sysError << std::endl;
    1366           0 :         return false;
    1367             :     }
    1368             : 
    1369           0 :     bool ret = _waitForCMEvent(RDMA_CM_EVENT_ESTABLISHED);
    1370             : 
    1371           0 :     return ret;
    1372             : }
    1373             : 
    1374           0 : bool RDMAConnection::_bindAddress()
    1375             : {
    1376           0 :     LBASSERT(NULL != _cm_id);
    1377           0 :     ConstConnectionDescriptionPtr description = getDescription();
    1378             : 
    1379             : #if IPV6_DEFAULT
    1380             :     struct sockaddr_in6 sin;
    1381             :     memset((void *)&sin, 0, sizeof(struct sockaddr_in6));
    1382             :     sin.sin6_family = AF_INET6;
    1383             :     sin.sin6_port = htons(description->port);
    1384             :     sin.sin6_addr = in6addr_any;
    1385             : #else
    1386             :     struct sockaddr_in sin;
    1387           0 :     memset((void *)&sin, 0, sizeof(struct sockaddr_in));
    1388           0 :     sin.sin_family = AF_INET;
    1389           0 :     sin.sin_port = htons(description->port);
    1390           0 :     sin.sin_addr.s_addr = INADDR_ANY;
    1391             : #endif
    1392             : 
    1393           0 :     if (::rdma_bind_addr(_cm_id,
    1394           0 :                          (NULL != _addrinfo)
    1395           0 :                              ? _addrinfo->ai_src_addr
    1396             :                              : reinterpret_cast<struct sockaddr *>(&sin)))
    1397             :     {
    1398           0 :         LBERROR << "rdma_bind_addr: " << lunchbox::sysError << std::endl;
    1399           0 :         return false;
    1400             :     }
    1401             : 
    1402           0 :     const uint16_t port = ntohs(rdma_get_src_port(_cm_id));
    1403           0 :     LBDEBUG << "Listening on " << description->getHostname() << "[" << _addr
    1404           0 :             << "]:" << port << " (" << description->toString() << ")"
    1405           0 :             << std::endl;
    1406             : 
    1407           0 :     return true;
    1408             : }
    1409             : 
    1410           0 : bool RDMAConnection::_listen(int backlog)
    1411             : {
    1412           0 :     LBASSERT(NULL != _cm_id);
    1413             : 
    1414           0 :     if (::rdma_listen(_cm_id, backlog))
    1415             :     {
    1416           0 :         LBERROR << "rdma_listen: " << lunchbox::sysError << std::endl;
    1417           0 :         return false;
    1418             :     }
    1419             : 
    1420           0 :     return true;
    1421             : }
    1422             : 
    1423           0 : bool RDMAConnection::_migrateId()
    1424             : {
    1425           0 :     LBASSERT(NULL != _cm_id);
    1426           0 :     LBASSERT(NULL != _cm);
    1427             : 
    1428           0 :     if (::rdma_migrate_id(_cm_id, _cm))
    1429             :     {
    1430           0 :         LBERROR << "rdma_migrate_id: " << lunchbox::sysError << std::endl;
    1431           0 :         return false;
    1432             :     }
    1433             : 
    1434           0 :     return true;
    1435             : }
    1436             : 
    1437           0 : bool RDMAConnection::_accept()
    1438             : {
    1439             :     struct rdma_conn_param accept_param;
    1440             : 
    1441           0 :     LBASSERT(NULL != _cm_id);
    1442           0 :     LBASSERT(!_established);
    1443             : 
    1444           0 :     ::memset((void *)&accept_param, 0, sizeof(struct rdma_conn_param));
    1445             : 
    1446           0 :     _cpd.magic = RDMA_PROTOCOL_MAGIC;
    1447           0 :     _cpd.version = RDMA_PROTOCOL_VERSION;
    1448           0 :     _cpd.depth = _depth;
    1449           0 :     accept_param.private_data = reinterpret_cast<const void *>(&_cpd);
    1450           0 :     accept_param.private_data_len = sizeof(struct RDMAConnParamData);
    1451           0 :     accept_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
    1452           0 :     accept_param.responder_resources = RDMA_MAX_RESP_RES;
    1453             :     // Magic 3-bit value.
    1454             :     // accept_param.rnr_retry_count = 7;
    1455             : 
    1456           0 :     LBINFO << "Accept on source lid : " << std::showbase << std::hex
    1457           0 :            << ntohs(_cm_id->route.path_rec->slid) << " (" << std::dec
    1458           0 :            << ntohs(_cm_id->route.path_rec->slid) << ") "
    1459           0 :            << "from dest lid : " << std::hex
    1460           0 :            << ntohs(_cm_id->route.path_rec->dlid) << " (" << std::dec
    1461           0 :            << ntohs(_cm_id->route.path_rec->dlid) << ") " << std::endl;
    1462             : 
    1463           0 :     if (::rdma_accept(_cm_id, &accept_param))
    1464             :     {
    1465           0 :         LBERROR << "rdma_accept: " << lunchbox::sysError << std::endl;
    1466           0 :         return false;
    1467             :     }
    1468             : 
    1469           0 :     return _waitForCMEvent(RDMA_CM_EVENT_ESTABLISHED);
    1470             : }
    1471             : 
    1472           0 : bool RDMAConnection::_reject()
    1473             : {
    1474           0 :     LBASSERT(NULL != _cm_id);
    1475           0 :     LBASSERT(!_established);
    1476             : 
    1477           0 :     if (::rdma_reject(_cm_id, NULL, 0))
    1478             :     {
    1479           0 :         LBERROR << "rdma_reject: " << lunchbox::sysError << std::endl;
    1480           0 :         return false;
    1481             :     }
    1482             : 
    1483           0 :     return true;
    1484             : }
    1485             : 
    1486             : ////////////////////////////////////////////////////////////////////////////////
    1487             : 
    1488           0 : bool RDMAConnection::_initProtocol(int32_t depth)
    1489             : {
    1490           0 :     if (depth < 2L)
    1491             :     {
    1492           0 :         LBERROR << "Invalid queue depth." << std::endl;
    1493           0 :         return false;
    1494             :     }
    1495             : 
    1496           0 :     _depth = depth;
    1497           0 :     _writes = 0L;
    1498           0 :     _fcs = 0L;
    1499           0 :     _readBytes = 0u;
    1500           0 :     _wcredits = _depth / 2 - 2;
    1501           0 :     _fcredits = _depth / 2 + 2;
    1502             : 
    1503           0 :     return true;
    1504             : }
    1505             : 
    1506             : /* inline */
    1507           0 : bool RDMAConnection::_needFC()
    1508             : {
    1509             :     bool bytesAvail;
    1510             : #ifdef _WIN32
    1511             :     lunchbox::ScopedFastRead mutex(_eventLock);
    1512             :     bytesAvail = (_availBytes != 0);
    1513             : #else
    1514             :     pollfd pfd;
    1515           0 :     pfd.fd = _pipe_fd[0];
    1516           0 :     pfd.events = EPOLLIN;
    1517           0 :     pfd.revents = 0;
    1518           0 :     int ret = poll(&pfd, 1, 0);
    1519           0 :     if (ret == -1)
    1520             :     {
    1521           0 :         LBERROR << "poll: " << lunchbox::sysError << std::endl;
    1522           0 :         return true;
    1523             :     }
    1524           0 :     bytesAvail = ret != 0;
    1525             : #endif
    1526           0 :     return (!bytesAvail || (uint32_t)_writes >= FC_MESSAGE_FREQUENCY);
    1527             : }
    1528             : 
    1529           0 : bool RDMAConnection::_postReceives(const uint32_t count)
    1530             : {
    1531           0 :     LBASSERT(NULL != _cm_id->qp);
    1532           0 :     LBASSERT(count > 0UL);
    1533             : 
    1534           0 :     ibv_sge *sge = new ibv_sge[count];
    1535           0 :     ibv_recv_wr *wrs = new ibv_recv_wr[count];
    1536             : 
    1537           0 :     for (uint32_t i = 0UL; i != count; i++)
    1538             :     {
    1539           0 :         sge[i].addr = (uint64_t)(uintptr_t)_msgbuf.getBuffer();
    1540           0 :         sge[i].length = uint32_t(_msgbuf.getBufferSize());
    1541           0 :         sge[i].lkey = _msgbuf.getMR()->lkey;
    1542             : 
    1543           0 :         wrs[i].wr_id = sge[i].addr;
    1544           0 :         wrs[i].next = &wrs[i + 1];
    1545           0 :         wrs[i].sg_list = &sge[i];
    1546           0 :         wrs[i].num_sge = 1;
    1547             :     }
    1548           0 :     wrs[count - 1].next = NULL;
    1549             : 
    1550             :     struct ibv_recv_wr *bad_wr;
    1551             :     const bool error =
    1552           0 :         ::rdma_seterrno(::ibv_post_recv(_cm_id->qp, wrs, &bad_wr));
    1553             : 
    1554           0 :     delete[] sge;
    1555           0 :     delete[] wrs;
    1556             : 
    1557           0 :     if (error)
    1558             :     {
    1559           0 :         LBERROR << "ibv_post_recv : " << lunchbox::sysError << std::endl;
    1560           0 :         return false;
    1561             :     }
    1562             : 
    1563           0 :     return true;
    1564             : }
    1565             : 
    1566             : /* inline */
    1567           0 : void RDMAConnection::_recvRDMAWrite(const uint32_t imm_data)
    1568             : {
    1569             :     union {
    1570             :         uint32_t val;
    1571             :         RDMAFCImm fc;
    1572             :     } x;
    1573           0 :     x.val = ntohl(imm_data);
    1574             : 
    1575             :     // Analysis:
    1576             :     //
    1577             :     // Since the ring pointers are circular, a malicious (presumably overflow)
    1578             :     // value here would at worst only result in us reading arbitrary regions
    1579             :     // from our sink buffer, not segfaulting.  If the other side wanted us to
    1580             :     // reread a previous message it should just resend it!
    1581           0 :     _sinkptr.incrHead(x.fc.bytes_sent);
    1582           0 :     _incrAvailableBytes(x.fc.bytes_sent);
    1583             : 
    1584           0 :     _fcredits += x.fc.fcs_received;
    1585           0 :     LBASSERTINFO(_fcredits <= _depth, _fcredits << " > " << _depth);
    1586             : 
    1587           0 :     _writes++;
    1588           0 : }
    1589             : 
    1590             : /* inline */
    1591           0 : uint32_t RDMAConnection::_makeImm(const uint32_t b)
    1592             : {
    1593             :     union {
    1594             :         uint32_t val;
    1595             :         RDMAFCImm fc;
    1596             :     } x;
    1597             : 
    1598           0 :     x.fc.fcs_received = std::min(MAX_FC, static_cast<uint16_t>(_fcs));
    1599           0 :     _fcs -= x.fc.fcs_received;
    1600           0 :     LBASSERT(_fcs >= 0);
    1601             : 
    1602           0 :     LBASSERT(b <= MAX_BS);
    1603           0 :     x.fc.bytes_sent = b;
    1604             : 
    1605           0 :     return htonl(x.val);
    1606             : }
    1607             : 
    1608           0 : bool RDMAConnection::_postRDMAWrite()
    1609             : {
    1610             :     struct ibv_sge sge;
    1611             :     struct ibv_send_wr wr;
    1612             : 
    1613           0 :     sge.addr = (uint64_t)((uintptr_t)_sourcebuf.getBase() +
    1614           0 :                           _sourceptr.ptr(_sourceptr.MIDDLE));
    1615           0 :     sge.length =
    1616           0 :         (uint64_t)_sourceptr.available(_sourceptr.HEAD, _sourceptr.MIDDLE);
    1617           0 :     sge.lkey = _sourcebuf.getMR()->lkey;
    1618           0 :     _sourceptr.incr(_sourceptr.MIDDLE, (uint32_t)sge.length);
    1619             : 
    1620           0 :     wr.wr_id = (uint64_t)sge.length;
    1621           0 :     wr.next = NULL;
    1622           0 :     wr.sg_list = &sge;
    1623           0 :     wr.num_sge = 1;
    1624           0 :     wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
    1625           0 :     wr.send_flags = IBV_SEND_SOLICITED; // Important!
    1626           0 :     wr.imm_data = _makeImm((uint32_t)sge.length);
    1627           0 :     wr.wr.rdma.rkey = _rkey;
    1628           0 :     wr.wr.rdma.remote_addr =
    1629           0 :         (uint64_t)((uintptr_t)_rbase + _rptr.ptr(_rptr.HEAD));
    1630           0 :     _rptr.incrHead((uint32_t)sge.length);
    1631             : 
    1632           0 :     _wcredits--;
    1633           0 :     LBASSERT(_wcredits >= 0L);
    1634             : 
    1635             :     struct ibv_send_wr *bad_wr;
    1636           0 :     if (::rdma_seterrno(::ibv_post_send(_cm_id->qp, &wr, &bad_wr)))
    1637             :     {
    1638           0 :         LBERROR << "ibv_post_send : " << lunchbox::sysError << std::endl;
    1639           0 :         return false;
    1640             :     }
    1641             : 
    1642           0 :     return true;
    1643             : }
    1644             : 
    1645           0 : bool RDMAConnection::_postMessage(const RDMAMessage &message)
    1646             : {
    1647           0 :     _fcredits--;
    1648           0 :     LBASSERT(_fcredits >= 0L);
    1649             : 
    1650           0 :     if (::rdma_post_send(_cm_id, (void *)&message, (void *)&message,
    1651           0 :                          offsetof(RDMAMessage, payload) + message.length,
    1652             :                          _msgbuf.getMR(), 0))
    1653             :     {
    1654           0 :         LBERROR << "rdma_post_send : " << lunchbox::sysError << std::endl;
    1655           0 :         return false;
    1656             :     }
    1657             : 
    1658           0 :     return true;
    1659             : }
    1660             : 
    1661           0 : void RDMAConnection::_recvMessage(const RDMAMessage &message)
    1662             : {
    1663           0 :     switch (message.opcode)
    1664             :     {
    1665             :     case FC:
    1666           0 :         if (sizeof(struct RDMAFCPayload) == (size_t)message.length)
    1667           0 :             _recvFC(message.payload.fc);
    1668             :         else
    1669           0 :             LBWARN << "Invalid flow control message received!" << std::endl;
    1670           0 :         break;
    1671             :     case SETUP:
    1672           0 :         if (sizeof(struct RDMASetupPayload) == (size_t)message.length)
    1673           0 :             _recvSetup(message.payload.setup);
    1674             :         else
    1675           0 :             LBWARN << "Invalid setup message received!" << std::endl;
    1676           0 :         break;
    1677             :     default:
    1678           0 :         LBWARN << "Invalid message received: " << std::hex
    1679           0 :                << (int)message.opcode << std::dec << std::endl;
    1680             :     }
    1681           0 : }
    1682             : 
    1683             : /* inline */
    1684           0 : void RDMAConnection::_recvFC(const RDMAFCPayload &fc)
    1685             : {
    1686             :     // Analysis:
    1687             :     //
    1688             :     // Since we will only write a maximum of _sourceptr.available( ) bytes
    1689             :     // to our source buffer, a malicious (presumably overflow) value here would
    1690             :     // have no chance of causing us to write beyond our buffer as we have local
    1691             :     // control over those ring pointers.  Worst case, we'd and up writing to
    1692             :     // arbitrary regions of the remote buffer, since this ring pointer is
    1693             :     // circular as well.
    1694           0 :     _rptr.incrTail(fc.bytes_received);
    1695             : 
    1696           0 :     _wcredits += fc.writes_received;
    1697           0 :     LBASSERTINFO(_wcredits <= _depth, _wcredits << " > " << _depth);
    1698             : 
    1699           0 :     _fcs++;
    1700           0 : }
    1701             : 
    1702           0 : bool RDMAConnection::_postFC()
    1703             : {
    1704             :     RDMAMessage &message =
    1705           0 :         *reinterpret_cast<RDMAMessage *>(_msgbuf.getBuffer());
    1706             : 
    1707           0 :     message.opcode = FC;
    1708           0 :     message.length = (uint8_t)sizeof(struct RDMAFCPayload);
    1709             : 
    1710           0 :     message.payload.fc.bytes_received = _readBytes;
    1711           0 :     message.payload.fc.writes_received = _writes;
    1712           0 :     _writes -= message.payload.fc.writes_received;
    1713           0 :     _readBytes = 0;
    1714             : 
    1715           0 :     LBASSERT(_writes >= 0);
    1716           0 :     return _postMessage(message);
    1717             : }
    1718             : 
    1719           0 : void RDMAConnection::_recvSetup(const RDMASetupPayload &setup)
    1720             : {
    1721             :     // Analysis:
    1722             :     //
    1723             :     // Malicious values here would only affect the receiver, we're willing
    1724             :     // to RDMA write to anywhere specified!
    1725           0 :     _rbase = setup.rbase;
    1726           0 :     _rptr.clear(setup.rlen);
    1727           0 :     _rkey = setup.rkey;
    1728             : 
    1729           0 :     LBVERB << "RDMA MR: " << std::showbase << std::dec << setup.rlen << " @ "
    1730           0 :            << std::hex << setup.rbase << std::dec << std::endl;
    1731           0 : }
    1732             : 
    1733           0 : bool RDMAConnection::_postSetup()
    1734             : {
    1735             :     RDMAMessage &message =
    1736           0 :         *reinterpret_cast<RDMAMessage *>(_msgbuf.getBuffer());
    1737             : 
    1738           0 :     message.opcode = SETUP;
    1739           0 :     message.length = (uint8_t)sizeof(struct RDMASetupPayload);
    1740             : 
    1741           0 :     message.payload.setup.rbase = (uint64_t)(uintptr_t)_sinkbuf.getBase();
    1742           0 :     message.payload.setup.rlen = (uint64_t)_sinkbuf.getSize();
    1743           0 :     message.payload.setup.rkey = _sinkbuf.getMR()->rkey;
    1744             : 
    1745           0 :     return _postMessage(message);
    1746             : }
    1747             : 
    1748           0 : bool RDMAConnection::_waitRecvSetup()
    1749             : {
    1750           0 :     lunchbox::Clock clock;
    1751           0 :     const int64_t start = clock.getTime64();
    1752           0 :     const uint32_t timeout = Global::getTimeout();
    1753           0 :     eventset events;
    1754             : 
    1755             : retry:
    1756           0 :     if (!_checkDisconnected(events))
    1757             :     {
    1758           0 :         LBERROR << "Error while checking event state." << std::endl;
    1759           0 :         return false;
    1760             :     }
    1761             : 
    1762           0 :     if (!_established)
    1763             :     {
    1764           0 :         LBERROR << "Disconnected while waiting for setup message." << std::endl;
    1765           0 :         return false;
    1766             :     }
    1767             : 
    1768           0 :     if (!_checkCQ(true))
    1769             :     {
    1770           0 :         LBERROR << "Error while polling receive completion queue." << std::endl;
    1771           0 :         return false;
    1772             :     }
    1773             : 
    1774           0 :     if (0ULL == _rkey)
    1775             :     {
    1776           0 :         if (LB_TIMEOUT_INDEFINITE != timeout)
    1777             :         {
    1778           0 :             if ((clock.getTime64() - start) > timeout)
    1779             :             {
    1780           0 :                 LBERROR << "Timed out waiting for setup message." << std::endl;
    1781           0 :                 return false;
    1782             :             }
    1783             :         }
    1784             : 
    1785           0 :         lunchbox::Thread::yield();
    1786           0 :         goto retry;
    1787             :     }
    1788             : 
    1789           0 :     return true;
    1790             : }
    1791             : 
    1792             : ////////////////////////////////////////////////////////////////////////////////
    1793             : 
    1794           0 : bool RDMAConnection::_createNotifier()
    1795             : {
    1796           0 :     LBASSERT(_notifier < 0);
    1797             : #ifdef _WIN32
    1798             :     _notifier = CreateEvent(0, TRUE, FALSE, 0);
    1799             :     return true;
    1800             : #else
    1801           0 :     _notifier = ::epoll_create(1);
    1802           0 :     if (_notifier < 0)
    1803             :     {
    1804           0 :         LBERROR << "epoll_create1: " << lunchbox::sysError << std::endl;
    1805           0 :         return false;
    1806             :     }
    1807             : 
    1808           0 :     return true;
    1809             : #endif
    1810             : }
    1811             : 
    1812           0 : void RDMAConnection::_updateNotifier()
    1813             : {
    1814             : #ifdef _WIN32
    1815             :     lunchbox::ScopedFastWrite mutex(_eventLock);
    1816             :     if (_availBytes == 0 && !_cm->channel.Head &&
    1817             :         (!_cc || !_cc->comp_channel.Head))
    1818             :         ResetEvent(_notifier);
    1819             : #else
    1820           0 :     eventset events;
    1821           0 :     _checkEvents(events);
    1822             : #endif
    1823           0 : }
    1824             : 
    1825           0 : bool RDMAConnection::_checkEvents(eventset &events)
    1826             : {
    1827           0 :     events.reset();
    1828             : 
    1829             : #ifdef _WIN32
    1830             :     lunchbox::ScopedFastRead mutex(_eventLock);
    1831             :     if (_availBytes != 0)
    1832             :         events.set(BUF_EVENT);
    1833             :     if (_cc && (_cc->comp_channel.Head != 0))
    1834             :         events.set(CQ_EVENT);
    1835             :     if (_cm->channel.Head)
    1836             :         events.set(CM_EVENT);
    1837             : 
    1838             :     return true;
    1839             : #else
    1840             :     struct epoll_event evts[3];
    1841             : 
    1842           0 :     int nfds = TEMP_FAILURE_RETRY(::epoll_wait(_notifier, evts, 3, 0));
    1843           0 :     if (nfds < 0)
    1844             :     {
    1845           0 :         LBERROR << "epoll_wait: " << lunchbox::sysError << std::endl;
    1846           0 :         return false;
    1847             :     }
    1848             : 
    1849           0 :     for (int i = 0; i < nfds; i++)
    1850             :     {
    1851           0 :         const int fd = evts[i].data.fd;
    1852           0 :         if ((_pipe_fd[0] >= 0) && (fd == _pipe_fd[0]))
    1853           0 :             events.set(BUF_EVENT);
    1854           0 :         else if (_cc && (fd == _cc->fd))
    1855           0 :             events.set(CQ_EVENT);
    1856           0 :         else if (_cm && (fd == _cm->fd))
    1857           0 :             events.set(CM_EVENT);
    1858             :         else
    1859           0 :             LBUNREACHABLE;
    1860             :     }
    1861             : 
    1862           0 :     return true;
    1863             : #endif
    1864             : }
    1865             : 
    1866           0 : bool RDMAConnection::_checkDisconnected(eventset &events)
    1867             : {
    1868           0 :     lunchbox::ScopedWrite poll_mutex(_poll_lock);
    1869             : 
    1870           0 :     if (!_checkEvents(events))
    1871             :     {
    1872           0 :         LBERROR << "Error while checking event state." << std::endl;
    1873           0 :         return false;
    1874             :     }
    1875             : 
    1876           0 :     if (events.test(CM_EVENT))
    1877             :     {
    1878           0 :         if (!_doCMEvent(RDMA_CM_EVENT_DISCONNECTED))
    1879             :         {
    1880           0 :             LBERROR << "Unexpected connection manager event." << std::endl;
    1881           0 :             return false;
    1882             :         }
    1883             : 
    1884           0 :         LBASSERT(!_established);
    1885             :     }
    1886             : 
    1887           0 :     return true;
    1888             : }
    1889             : 
    1890           0 : bool RDMAConnection::_createBytesAvailableFD()
    1891             : {
    1892             : #ifndef _WIN32
    1893           0 :     if (pipe(_pipe_fd) == -1)
    1894             :     {
    1895           0 :         LBERROR << "pipe: " << lunchbox::sysError << std::endl;
    1896           0 :         return false;
    1897             :     }
    1898             : 
    1899           0 :     LBASSERT(_pipe_fd[0] >= 0 && _pipe_fd[1] >= 0);
    1900             : 
    1901             :     // Use the event fd to signal Collage of bytes remaining.
    1902             :     struct epoll_event evctl;
    1903           0 :     ::memset((void *)&evctl, 0, sizeof(struct epoll_event));
    1904           0 :     evctl.events = EPOLLIN;
    1905           0 :     evctl.data.fd = _pipe_fd[0];
    1906           0 :     if (::epoll_ctl(_notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl))
    1907             :     {
    1908           0 :         LBERROR << "epoll_ctl: " << lunchbox::sysError << std::endl;
    1909           0 :         return false;
    1910             :     }
    1911             : #endif
    1912           0 :     return true;
    1913             : }
    1914             : 
    1915           0 : bool RDMAConnection::_incrAvailableBytes(const uint64_t b)
    1916             : {
    1917             : #ifdef _WIN32
    1918             :     lunchbox::ScopedFastWrite mutex(_eventLock);
    1919             :     _availBytes += b;
    1920             :     ::SetEvent(_notifier);
    1921             : #else
    1922           0 :     if (::write(_pipe_fd[1], (const void *)&b, sizeof(b)) != sizeof(b))
    1923             :     {
    1924           0 :         LBERROR << "write: " << lunchbox::sysError << std::endl;
    1925           0 :         return false;
    1926             :     }
    1927             : #endif
    1928           0 :     return true;
    1929             : }
    1930             : 
    1931           0 : uint64_t RDMAConnection::_getAvailableBytes()
    1932             : {
    1933           0 :     uint64_t available_bytes = 0;
    1934             : #ifdef _WIN32
    1935             :     lunchbox::ScopedFastWrite mutex(_eventLock);
    1936             :     available_bytes = static_cast<uint64_t>(_availBytes);
    1937             :     _availBytes = 0;
    1938             :     return available_bytes;
    1939             : #else
    1940           0 :     uint64_t currBytes = 0;
    1941             :     ssize_t count;
    1942             :     pollfd pfd;
    1943           0 :     pfd.fd = _pipe_fd[0];
    1944           0 :     pfd.events = EPOLLIN;
    1945             : 
    1946           0 :     do
    1947             :     {
    1948           0 :         count = ::read(_pipe_fd[0], (void *)&currBytes, sizeof(currBytes));
    1949           0 :         if (count > 0 && count < (ssize_t)sizeof(currBytes))
    1950           0 :             return 0ULL;
    1951           0 :         available_bytes += currBytes;
    1952           0 :         pfd.revents = 0;
    1953           0 :         if (::poll(&pfd, 1, 0) == -1)
    1954             :         {
    1955           0 :             LBERROR << "poll: " << lunchbox::sysError << std::endl;
    1956           0 :             return 0ULL;
    1957             :         }
    1958           0 :     } while (pfd.revents > 0);
    1959             : 
    1960           0 :     if (count == -1)
    1961             :     {
    1962           0 :         LBERROR << "read: " << lunchbox::sysError << std::endl;
    1963           0 :         return 0ULL;
    1964             :     }
    1965             : 
    1966           0 :     LBASSERT(available_bytes > 0ULL);
    1967           0 :     return available_bytes;
    1968             : #endif
    1969             : }
    1970             : 
    1971           0 : bool RDMAConnection::_waitForCMEvent(enum rdma_cm_event_type expected)
    1972             : {
    1973           0 :     lunchbox::Clock clock;
    1974           0 :     const int64_t start = clock.getTime64();
    1975           0 :     const uint32_t timeout = Global::getTimeout();
    1976           0 :     bool done = false;
    1977           0 :     eventset events;
    1978             : 
    1979             : retry:
    1980           0 :     if (!_checkEvents(events))
    1981             :     {
    1982           0 :         LBERROR << "Error while checking event state." << std::endl;
    1983           0 :         return false;
    1984             :     }
    1985             : 
    1986           0 :     if (events.test(CM_EVENT))
    1987             :     {
    1988           0 :         done = _doCMEvent(expected);
    1989           0 :         if (!done)
    1990             :         {
    1991           0 :             LBERROR << "Unexpected connection manager event." << std::endl;
    1992           0 :             return false;
    1993             :         }
    1994             :     }
    1995             : 
    1996           0 :     if (!done)
    1997             :     {
    1998           0 :         if (LB_TIMEOUT_INDEFINITE != timeout)
    1999             :         {
    2000           0 :             if ((clock.getTime64() - start) > timeout)
    2001             :             {
    2002           0 :                 LBERROR << "Timed out waiting for setup message." << std::endl;
    2003           0 :                 return false;
    2004             :             }
    2005             :         }
    2006             : 
    2007           0 :         lunchbox::Thread::yield();
    2008           0 :         goto retry;
    2009             :     }
    2010             : 
    2011           0 :     return true;
    2012             : }
    2013             : 
    2014           0 : bool RDMAConnection::_doCMEvent(enum rdma_cm_event_type expected)
    2015             : {
    2016           0 :     bool ok = false;
    2017             :     struct rdma_cm_event *event;
    2018             : 
    2019           0 :     if (::rdma_get_cm_event(_cm, &event))
    2020             :     {
    2021           0 :         LBERROR << "rdma_get_cm_event: " << lunchbox::sysError << std::endl;
    2022           0 :         return false;
    2023             :     }
    2024             : 
    2025           0 :     ok = (event->event == expected);
    2026             : 
    2027             : #ifndef NDEBUG
    2028           0 :     if (ok)
    2029             :     {
    2030           0 :         LBVERB << (void *)this << " (" << _addr << ":" << _serv << ")"
    2031           0 :                << " event : " << ::rdma_event_str(event->event) << std::endl;
    2032             :     }
    2033             :     else
    2034             :     {
    2035           0 :         LBINFO << (void *)this << " (" << _addr << ":" << _serv << ")"
    2036             :                << " event : " << ::rdma_event_str(event->event)
    2037           0 :                << " expected: " << ::rdma_event_str(expected) << std::endl;
    2038             :     }
    2039             : #endif
    2040             : 
    2041           0 :     if (ok && (RDMA_CM_EVENT_DISCONNECTED == event->event))
    2042           0 :         _established = false;
    2043             : 
    2044           0 :     if (ok && (RDMA_CM_EVENT_ESTABLISHED == event->event))
    2045             :     {
    2046           0 :         _established = true;
    2047             : 
    2048           0 :         struct rdma_conn_param *cp = &event->param.conn;
    2049             : 
    2050           0 :         ::memset((void *)&_cpd, 0, sizeof(RDMAConnParamData));
    2051             :         // Note that the actual amount of data transferred to the remote side
    2052             :         // is transport dependent and may be larger than that requested.
    2053           0 :         if (cp->private_data_len >= sizeof(RDMAConnParamData))
    2054           0 :             _cpd =
    2055           0 :                 *reinterpret_cast<const RDMAConnParamData *>(cp->private_data);
    2056             :     }
    2057             : 
    2058           0 :     if (ok && (RDMA_CM_EVENT_CONNECT_REQUEST == event->event))
    2059             :     {
    2060           0 :         _new_cm_id = event->id;
    2061             : 
    2062           0 :         struct rdma_conn_param *cp = &event->param.conn;
    2063             : 
    2064           0 :         ::memset((void *)&_cpd, 0, sizeof(RDMAConnParamData));
    2065             :         // TODO : Not sure what happens when initiator sent ai_connect data
    2066             :         // (assuming the underlying transport doesn't strip it)?
    2067           0 :         if (cp->private_data_len >= sizeof(RDMAConnParamData))
    2068           0 :             _cpd =
    2069           0 :                 *reinterpret_cast<const RDMAConnParamData *>(cp->private_data);
    2070             :     }
    2071             : 
    2072           0 :     if (RDMA_CM_EVENT_REJECTED == event->event)
    2073           0 :         LBINFO << "Connection reject status : " << event->status << std::endl;
    2074             : 
    2075           0 :     if (::rdma_ack_cm_event(event))
    2076           0 :         LBWARN << "rdma_ack_cm_event : " << lunchbox::sysError << std::endl;
    2077             : 
    2078           0 :     return ok;
    2079             : }
    2080             : 
    2081           0 : bool RDMAConnection::_rearmCQ()
    2082             : {
    2083             :     struct ibv_cq *ev_cq;
    2084             :     void *ev_ctx;
    2085             : 
    2086             : #ifdef _WIN32
    2087             :     lunchbox::ScopedFastWrite mutex(_eventLock);
    2088             : #endif
    2089           0 :     if (::ibv_get_cq_event(_cc, &ev_cq, &ev_ctx))
    2090             :     {
    2091           0 :         LBERROR << "ibv_get_cq_event: " << lunchbox::sysError << std::endl;
    2092           0 :         return false;
    2093             :     }
    2094             : 
    2095             :     // http://lists.openfabrics.org/pipermail/general/2008-November/055237.html
    2096           0 :     _completions++;
    2097           0 :     if (std::numeric_limits<unsigned int>::max() == _completions)
    2098             :     {
    2099           0 :         ::ibv_ack_cq_events(_cq, _completions);
    2100           0 :         _completions = 0U;
    2101             :     }
    2102             : 
    2103             :     // Solicited only!
    2104           0 :     if (::rdma_seterrno(::ibv_req_notify_cq(_cq, 1)))
    2105             :     {
    2106           0 :         LBERROR << "ibv_req_notify_cq: " << lunchbox::sysError << std::endl;
    2107           0 :         return false;
    2108             :     }
    2109             : 
    2110           0 :     return true;
    2111             : }
    2112             : 
    2113           0 : bool RDMAConnection::_checkCQ(bool drain)
    2114             : {
    2115             :     uint32_t num_recvs;
    2116             :     int count;
    2117             : 
    2118           0 :     lunchbox::ScopedWrite poll_mutex(_poll_lock);
    2119             : 
    2120           0 :     if (NULL == _cq)
    2121           0 :         return true;
    2122             : 
    2123             : repoll:
    2124             :     /* CHECK RECEIVE COMPLETIONS */
    2125           0 :     count = ::ibv_poll_cq(_cq, _depth, _wcs);
    2126           0 :     if (count < 0)
    2127             :     {
    2128           0 :         LBERROR << "ibv_poll_cq: " << lunchbox::sysError << std::endl;
    2129           0 :         return false;
    2130             :     }
    2131             : 
    2132           0 :     num_recvs = 0UL;
    2133           0 :     for (int i = 0; i != count; i++)
    2134             :     {
    2135           0 :         struct ibv_wc &wc = _wcs[i];
    2136             : 
    2137           0 :         if (IBV_WC_SUCCESS != wc.status)
    2138             :         {
    2139             :             // Non-fatal.
    2140           0 :             if (IBV_WC_WR_FLUSH_ERR == wc.status)
    2141           0 :                 continue;
    2142             : 
    2143           0 :             LBWARN << (void *)this << " !IBV_WC_SUCCESS : " << std::showbase
    2144           0 :                    << std::hex << "wr_id = " << wc.wr_id << ", status = \""
    2145           0 :                    << ::ibv_wc_status_str(wc.status) << "\"" << std::dec << " ("
    2146           0 :                    << (unsigned int)wc.status << ")" << std::hex
    2147           0 :                    << ", vendor_err = " << wc.vendor_err << std::dec
    2148           0 :                    << std::endl;
    2149             : 
    2150             :             // All others are fatal.
    2151           0 :             return false;
    2152             :         }
    2153             : 
    2154           0 :         LBASSERT(IBV_WC_SUCCESS == wc.status);
    2155             : 
    2156             : #ifdef _WIN32 //----------------------------------------------------------------
    2157             :         // WINDOWS IBV API WORKAROUND.
    2158             :         // TODO: remove this as soon as IBV API is fixed
    2159             :         if (wc.opcode == IBV_WC_RECV && wc.wc_flags == IBV_WC_WITH_IMM)
    2160             :             wc.opcode = IBV_WC_RECV_RDMA_WITH_IMM;
    2161             : #endif //-----------------------------------------------------------------------
    2162             : 
    2163           0 :         if (IBV_WC_RECV_RDMA_WITH_IMM == wc.opcode)
    2164           0 :             _recvRDMAWrite(wc.imm_data);
    2165           0 :         else if (IBV_WC_RECV == wc.opcode)
    2166           0 :             _recvMessage(*reinterpret_cast<RDMAMessage *>(wc.wr_id));
    2167           0 :         else if (IBV_WC_SEND == wc.opcode)
    2168           0 :             _msgbuf.freeBuffer((void *)(uintptr_t)wc.wr_id);
    2169           0 :         else if (IBV_WC_RDMA_WRITE == wc.opcode)
    2170           0 :             _sourceptr.incrTail((uint32_t)wc.wr_id);
    2171             :         else
    2172           0 :             LBUNREACHABLE;
    2173             : 
    2174           0 :         if (IBV_WC_RECV & wc.opcode)
    2175             :         {
    2176           0 :             _msgbuf.freeBuffer((void *)(uintptr_t)wc.wr_id);
    2177             :             // All receive completions need to be reposted.
    2178           0 :             num_recvs++;
    2179             :         }
    2180             :     }
    2181             : 
    2182           0 :     if ((num_recvs > 0UL) && !_postReceives(num_recvs))
    2183           0 :         return false;
    2184             : 
    2185           0 :     if (drain && (count > 0))
    2186           0 :         goto repoll;
    2187             : 
    2188           0 :     return true;
    2189             : }
    2190             : 
    2191             : /* inline */
    2192           0 : uint32_t RDMAConnection::_drain(void *buffer, const uint32_t bytes)
    2193             : {
    2194           0 :     const uint32_t b = std::min(bytes, _sinkptr.available());
    2195           0 :     ::memcpy(buffer,
    2196           0 :              (const void *)((uintptr_t)_sinkbuf.getBase() + _sinkptr.tail()),
    2197           0 :              b);
    2198           0 :     _sinkptr.incrTail(b);
    2199           0 :     return b;
    2200             : }
    2201             : 
    2202             : /* inline */
    2203           0 : uint32_t RDMAConnection::_fill(const void *buffer, const uint32_t bytes)
    2204             : {
    2205           0 :     uint32_t b = std::min(bytes, std::min(_sourceptr.negAvailable(),
    2206           0 :                                           _rptr.negAvailable()));
    2207             : #ifndef WRAP
    2208           0 :     b = std::min(b, (uint32_t)(_sourcebuf.getSize() -
    2209           0 :                                _sourceptr.ptr(_sourceptr.HEAD)));
    2210             : #endif
    2211           0 :     ::memcpy((void *)((uintptr_t)_sourcebuf.getBase() +
    2212           0 :                       _sourceptr.ptr(_sourceptr.HEAD)),
    2213           0 :              buffer, b);
    2214           0 :     _sourceptr.incrHead(b);
    2215           0 :     return b;
    2216             : }
    2217             : 
    2218           0 : Connection::Notifier RDMAConnection::getNotifier() const
    2219             : {
    2220           0 :     return _notifier;
    2221             : }
    2222             : 
    2223             : #ifdef _WIN32
    2224             : void RDMAConnection::_triggerNotifierCQ(RDMAConnection *conn)
    2225             : {
    2226             :     conn->_triggerNotifierWorker(CQ_EVENT);
    2227             : }
    2228             : 
    2229             : void RDMAConnection::_triggerNotifierCM(RDMAConnection *conn)
    2230             : {
    2231             :     conn->_triggerNotifierWorker(CM_EVENT);
    2232             : }
    2233             : 
    2234             : void RDMAConnection::_triggerNotifierWorker(Events event)
    2235             : {
    2236             :     lunchbox::ScopedFastWrite mutex(_eventLock);
    2237             :     COMP_CHANNEL *chan = event == CQ_EVENT ? &_cc->comp_channel : &_cm->channel;
    2238             :     EnterCriticalSection(&chan->Lock);
    2239             :     ResetEvent(chan->Event);
    2240             :     if (chan->Head)
    2241             :         SetEvent(_notifier);
    2242             :     LeaveCriticalSection(&chan->Lock);
    2243             : }
    2244             : #endif
    2245             : 
    2246             : //////////////////////////////////////////////////////////////////////////////
    2247             : 
    2248           0 : void RDMAConnection::_showStats()
    2249             : {
    2250           0 :     LBVERB << std::dec << "reads = " << _stats.reads
    2251           0 :            << ", buffer_empty = " << _stats.buffer_empty
    2252           0 :            << ", no_credits_fc = " << _stats.no_credits_fc
    2253           0 :            << ", writes = " << _stats.writes
    2254           0 :            << ", buffer_full = " << _stats.buffer_full
    2255           0 :            << ", no_credits_rdma = " << _stats.no_credits_rdma << std::endl;
    2256           0 : }
    2257             : 
    2258             : //////////////////////////////////////////////////////////////////////////////
    2259             : 
    2260           0 : BufferPool::BufferPool(size_t buffer_size)
    2261             :     : _buffer_size(buffer_size)
    2262             :     , _num_bufs(0)
    2263             :     , _buffer(NULL)
    2264             :     , _mr(NULL)
    2265           0 :     , _ring(0)
    2266             : {
    2267           0 : }
    2268             : 
    2269           0 : BufferPool::~BufferPool()
    2270             : {
    2271           0 :     clear();
    2272           0 : }
    2273             : 
    2274           0 : void BufferPool::clear()
    2275             : {
    2276           0 :     _num_bufs = 0;
    2277           0 :     _ring.clear(_num_bufs);
    2278             : 
    2279           0 :     if ((NULL != _mr) && ::rdma_dereg_mr(_mr))
    2280           0 :         LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
    2281           0 :     _mr = NULL;
    2282             : 
    2283           0 :     if (NULL != _buffer)
    2284             : #ifdef _WIN32
    2285             :         _aligned_free(_buffer);
    2286             : #else
    2287           0 :         ::free(_buffer);
    2288             : #endif
    2289           0 :     _buffer = NULL;
    2290           0 : }
    2291             : 
    2292           0 : bool BufferPool::resize(ibv_pd *pd, uint32_t num_bufs)
    2293             : {
    2294           0 :     clear();
    2295             : 
    2296           0 :     if (num_bufs)
    2297             :     {
    2298           0 :         _num_bufs = num_bufs;
    2299           0 :         _ring.clear(_num_bufs);
    2300             : 
    2301             : #ifdef _WIN32
    2302             :         _buffer = _aligned_malloc(
    2303             :             (size_t)(_num_bufs * _buffer_size),
    2304             :             boost::interprocess::mapped_region::get_page_size());
    2305             :         if (!_buffer)
    2306             :         {
    2307             :             LBERROR << "_aligned_malloc: Couldn't allocate aligned memory. "
    2308             :                     << lunchbox::sysError << std::endl;
    2309             :             return false;
    2310             :         }
    2311             : #else
    2312           0 :         if (::posix_memalign(&_buffer, (size_t)::getpagesize(),
    2313           0 :                              (size_t)(_num_bufs * _buffer_size)))
    2314             :         {
    2315           0 :             LBERROR << "posix_memalign: " << lunchbox::sysError << std::endl;
    2316           0 :             return false;
    2317             :         }
    2318             : #endif
    2319           0 :         ::memset(_buffer, 0xff, (size_t)(_num_bufs * _buffer_size));
    2320           0 :         _mr = ::ibv_reg_mr(pd, _buffer, (size_t)(_num_bufs * _buffer_size),
    2321             :                            IBV_ACCESS_LOCAL_WRITE);
    2322           0 :         if (NULL == _mr)
    2323             :         {
    2324           0 :             LBERROR << "ibv_reg_mr: " << lunchbox::sysError << std::endl;
    2325           0 :             return false;
    2326             :         }
    2327             : 
    2328           0 :         for (uint32_t i = 0; i != _num_bufs; i++)
    2329           0 :             _ring.put(i);
    2330             :     }
    2331             : 
    2332           0 :     return true;
    2333             : }
    2334             : 
    2335             : //////////////////////////////////////////////////////////////////////////////
    2336             : 
    2337           0 : RingBuffer::RingBuffer(int access)
    2338             :     : _access(access)
    2339             :     , _size(0)
    2340             : #ifdef _WIN32
    2341             :     , _mapping(0)
    2342             :     , _map(0)
    2343             : #else
    2344             :     , _map(MAP_FAILED)
    2345             : #endif
    2346           0 :     , _mr(0)
    2347             : {
    2348           0 : }
    2349             : 
    2350           0 : RingBuffer::~RingBuffer()
    2351             : {
    2352           0 :     clear();
    2353           0 : }
    2354             : 
    2355           0 : void RingBuffer::clear()
    2356             : {
    2357           0 :     if ((NULL != _mr) && ::rdma_dereg_mr(_mr))
    2358           0 :         LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
    2359           0 :     _mr = NULL;
    2360             : 
    2361             : #ifdef _WIN32
    2362             :     if (_map)
    2363             :     {
    2364             :         UnmapViewOfFile(static_cast<char *>(_map));
    2365             :         UnmapViewOfFile(static_cast<char *>(_map) + _size);
    2366             :     }
    2367             :     if (_mapping)
    2368             :         CloseHandle(_mapping);
    2369             : 
    2370             :     _map = 0;
    2371             :     _mapping = 0;
    2372             : #else
    2373           0 :     if ((MAP_FAILED != _map) && ::munmap(_map, _size << 1))
    2374           0 :         LBWARN << "munmap : " << lunchbox::sysError << std::endl;
    2375           0 :     _map = MAP_FAILED;
    2376             : #endif
    2377           0 :     _size = 0;
    2378           0 : }
    2379             : 
    2380           0 : bool RingBuffer::resize(ibv_pd *pd, size_t size)
    2381             : {
    2382           0 :     bool ok = false;
    2383           0 :     int fd = -1;
    2384             : 
    2385           0 :     clear();
    2386             : 
    2387           0 :     if (size)
    2388             :     {
    2389             : #ifdef _WIN32
    2390             :         uint32_t num_retries = RINGBUFFER_ALLOC_RETRIES;
    2391             :         while (!_map && num_retries-- != 0)
    2392             :         {
    2393             :             void *target_addr = determineViableAddr(size * 2);
    2394             :             if (target_addr)
    2395             :                 allocAt(size, target_addr);
    2396             :         }
    2397             : 
    2398             :         if (!_map)
    2399             :         {
    2400             :             LBERROR << "Couldn't allocate desired RingBuffer memory after "
    2401             :                     << RINGBUFFER_ALLOC_RETRIES << " retries." << std::endl;
    2402             :         }
    2403             :         else
    2404             :         {
    2405             :             LBVERB << "Allocated RDMA Ringbuffer memory in "
    2406             :                    << (RINGBUFFER_ALLOC_RETRIES - num_retries) << " tries."
    2407             :                    << std::endl;
    2408             :             ok = true;
    2409             :         }
    2410             : 
    2411             : #else
    2412             :         void *addr1, *addr2;
    2413           0 :         char path[] = "/dev/shm/co-rdma-buffer-XXXXXX";
    2414             : 
    2415           0 :         _size = size;
    2416             : 
    2417           0 :         fd = ::mkstemp(path);
    2418           0 :         if (fd < 0)
    2419             :         {
    2420           0 :             LBERROR << "mkstemp: " << lunchbox::sysError << std::endl;
    2421           0 :             goto out;
    2422             :         }
    2423             : 
    2424           0 :         if (::unlink(path))
    2425             :         {
    2426           0 :             LBERROR << "unlink: " << lunchbox::sysError << std::endl;
    2427           0 :             goto out;
    2428             :         }
    2429             : 
    2430           0 :         if (::ftruncate(fd, _size))
    2431             :         {
    2432           0 :             LBERROR << "ftruncate: " << lunchbox::sysError << std::endl;
    2433           0 :             goto out;
    2434             :         }
    2435             : 
    2436           0 :         _map = ::mmap(NULL, _size << 1, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE,
    2437             :                       -1, 0);
    2438           0 :         if (MAP_FAILED == _map)
    2439             :         {
    2440           0 :             LBERROR << "mmap: " << lunchbox::sysError << std::endl;
    2441           0 :             goto out;
    2442             :         }
    2443             : 
    2444           0 :         addr1 = ::mmap(_map, _size, PROT_READ | PROT_WRITE,
    2445           0 :                        MAP_FIXED | MAP_SHARED, fd, 0);
    2446           0 :         if (MAP_FAILED == addr1)
    2447             :         {
    2448           0 :             LBERROR << "mmap: " << lunchbox::sysError << std::endl;
    2449           0 :             goto out;
    2450             :         }
    2451             : 
    2452           0 :         addr2 = ::mmap((void *)((uintptr_t)_map + _size), _size,
    2453           0 :                        PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
    2454           0 :         if (MAP_FAILED == addr2)
    2455             :         {
    2456           0 :             LBERROR << "mmap: " << lunchbox::sysError << std::endl;
    2457           0 :             goto out;
    2458             :         }
    2459             : 
    2460           0 :         LBASSERT(addr1 == _map);
    2461           0 :         LBASSERT(addr2 == (void *)((uintptr_t)_map + _size));
    2462             : 
    2463             : #endif
    2464             : #ifdef WRAP
    2465             :         _mr = ::ibv_reg_mr(pd, _map, _size << 1, _access);
    2466             : #else
    2467           0 :         _mr = ::ibv_reg_mr(pd, _map, _size, _access);
    2468             : #endif
    2469             : 
    2470           0 :         if (NULL == _mr)
    2471             :         {
    2472           0 :             LBERROR << "ibv_reg_mr: " << lunchbox::sysError << std::endl;
    2473           0 :             goto out;
    2474             :         }
    2475             : 
    2476           0 :         ::memset(_map, 0, _size);
    2477           0 :         *reinterpret_cast<uint8_t *>(_map) = 0x45;
    2478           0 :         LBASSERT(0x45 == *reinterpret_cast<uint8_t *>((uintptr_t)_map + _size));
    2479             :     }
    2480             : 
    2481           0 :     ok = true;
    2482             : 
    2483             : out:
    2484             : #ifndef _WIN32
    2485           0 :     if ((fd >= 0) && TEMP_FAILURE_RETRY(::close(fd)))
    2486           0 :         LBWARN << "close : " << lunchbox::sysError << std::endl;
    2487             : #endif
    2488           0 :     return ok;
    2489             : }
    2490             : 
    2491             : #ifdef _WIN32
    2492             : void *RingBuffer::determineViableAddr(size_t size)
    2493             : {
    2494             :     void *ptr = VirtualAlloc(0, size, MEM_RESERVE, PAGE_NOACCESS);
    2495             :     if (!ptr)
    2496             :         return 0;
    2497             : 
    2498             :     VirtualFree(ptr, 0, MEM_RELEASE);
    2499             :     return ptr;
    2500             : }
    2501             : 
    2502             : void RingBuffer::allocAt(size_t size, void *desiredAddr)
    2503             : {
    2504             :     // if we already hold one allocation, refuse to make another.
    2505             :     LBASSERT(!_map);
    2506             :     LBASSERT(!_mapping);
    2507             :     if (_map || _mapping)
    2508             :         return;
    2509             : 
    2510             :     // is ring_size a multiple of 64k? if not, this won't ever work!
    2511             :     if ((size & 0xffff) != 0)
    2512             :         return;
    2513             : 
    2514             :     // try to allocate and map our space
    2515             :     size_t alloc_size = size * 2;
    2516             :     _mapping = CreateFileMappingA(INVALID_HANDLE_VALUE, 0, PAGE_READWRITE,
    2517             :                                   (unsigned long long)alloc_size >> 32,
    2518             :                                   alloc_size & 0xffffffffu, 0);
    2519             : 
    2520             :     if (!_mapping)
    2521             :     {
    2522             :         LBERROR << "CreateFileMappingA failed" << std::endl;
    2523             :         goto err;
    2524             :     }
    2525             : 
    2526             :     _map =
    2527             :         MapViewOfFileEx(_mapping, FILE_MAP_ALL_ACCESS, 0, 0, size, desiredAddr);
    2528             : 
    2529             :     if (!_map)
    2530             :     {
    2531             :         LBERROR << "First MapViewOfFileEx failed" << std::endl;
    2532             :         goto err;
    2533             :     }
    2534             : 
    2535             :     if (!MapViewOfFileEx(_mapping, FILE_MAP_ALL_ACCESS, 0, 0, size,
    2536             :                          (char *)desiredAddr + size))
    2537             :     {
    2538             :         LBERROR << "Second MapViewOfFileEx failed" << std::endl;
    2539             :         goto err;
    2540             :     }
    2541             : 
    2542             :     _size = size;
    2543             :     return;
    2544             : err:
    2545             :     // something went wrong - clean up
    2546             :     clear();
    2547             : }
    2548             : #endif
    2549             : 
    2550          63 : } // namespace co

Generated by: LCOV version 1.11