Line data Source code
1 : // -*- mode: c++ -*-
2 : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
3 : * United States Naval Research Laboratory
4 : * 2012-2013, Stefan Eilemann <eile@eyescale.ch>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 : #include "rdmaConnection.h"
22 :
23 : #include "connectionDescription.h"
24 : #include "connectionType.h"
25 : #include "global.h"
26 :
27 : #include <lunchbox/clock.h>
28 :
29 : #ifdef _WIN32
30 : #pragma warning(disable : 4018)
31 : #include <boost/interprocess/mapped_region.hpp>
32 : #pragma warning(default : 4018)
33 : #else
34 : #include <arpa/inet.h>
35 : #include <poll.h>
36 : #include <sys/epoll.h>
37 : #include <sys/mman.h>
38 : #include <unistd.h>
39 : #endif
40 :
41 : #include <errno.h>
42 : #include <fcntl.h>
43 : #include <limits>
44 : #include <sstream>
45 : #include <stddef.h>
46 :
47 : #include <rdma/rdma_verbs.h>
48 :
49 : #define IPV6_DEFAULT 0
50 :
51 : #define RDMA_PROTOCOL_MAGIC 0xC0
52 : #define RDMA_PROTOCOL_VERSION 0x03
53 :
54 : namespace co
55 : {
56 : // namespace { static const uint64_t ONE = 1ULL; }
57 :
58 : /**
59 : * Message types
60 : */
61 : enum OpCode
62 : {
63 : SETUP = 1 << 0,
64 : FC = 1 << 1,
65 : };
66 :
67 : /**
68 : * Initial setup message used to exchange sink MR parameters.
69 : */
70 : struct RDMASetupPayload
71 : {
72 : uint64_t rbase;
73 : uint64_t rlen;
74 : uint64_t rkey;
75 : };
76 :
77 : /**
78 : * "ACK" messages sent after read, tells source about receive progress.
79 : */
80 : struct RDMAFCPayload
81 : {
82 : uint32_t bytes_received;
83 : uint32_t writes_received;
84 : };
85 :
86 : /**
87 : * Payload wrapper
88 : */
89 : struct RDMAMessage
90 : {
91 : enum OpCode opcode;
92 : uint8_t length;
93 : union {
94 : struct RDMASetupPayload setup;
95 : struct RDMAFCPayload fc;
96 : } payload;
97 : };
98 :
99 : /**
100 : * "IMM" data sent with RDMA write, tells sink about send progress.
101 : */
102 : struct RDMAFCImm
103 : {
104 : uint32_t bytes_sent : 28;
105 : uint32_t fcs_received : 4;
106 : };
107 :
108 : namespace
109 : {
110 : // We send a max of 28 bits worth of byte counts per RDMA write.
111 : static const uint64_t MAX_BS = ((2 << (28 - 1)) - 1);
112 : // We send a max of four bits worth of fc counts per RDMA write.
113 : static const uint16_t MAX_FC = ((2 << (4 - 1)) - 1);
114 :
115 : #ifdef _WIN32
116 : static const uint32_t RINGBUFFER_ALLOC_RETRIES = 8;
117 : static const uint32_t WINDOWS_CONNECTION_BACKLOG = 1024;
118 : #endif
119 : static const uint32_t FC_MESSAGE_FREQUENCY = 12;
120 :
121 0 : bool _isInetFamily(const rdma_addrinfo *addrinfo)
122 : {
123 : #ifndef _WIN32
124 0 : return addrinfo->ai_family == AF_INET || addrinfo->ai_family == AF_INET6;
125 : #else
126 : return addrinfo->ai_family == AF_INET;
127 : #endif
128 : }
129 : }
130 :
131 : /**
132 : * An RDMA connection implementation.
133 : *
134 : * The protocol is simple, e.g.:
135 : *
136 : * initiator target
137 : * -----------------------------------------------------
138 : * resolve/bind/listen
139 : * resolve/prepost/connect
140 : * prepost/accept
141 : * send setup <-------> send setup
142 : * wait for setup wait for setup
143 : * RDMA_WRITE_WITH_IMM WR -------> RDMA_WRITE(DATA) WC
144 : * RECV(FC) WC <------- SEND WR
145 : * .
146 : * .
147 : * .
148 : *
149 : * The setup phase exchanges the MR parameters of a fixed size circular buffer
150 : * to which remote writes are sent. Sender tracks available space on the
151 : * receiver by accepting "Flow Control" messages that update the tail pointer
152 : * of the local "view" of the remote sink MR.
153 : *
154 : * Once setup is complete, either side may begin operations on the other's MR
155 : * (the initiator doesn't have to send first, as in the above example).
156 : *
157 : * If either credits or buffer space are exhausted, sender will spin waiting
158 : * for flow control messages. Receiver will also not send flow control if
159 : * there are no credits available.
160 : *
161 : * One catch is that Collage will only monitor a single "notifier" for events
162 : * and we have three that need to be monitored: one for connection status
163 : * events (the RDMA event channel) - RDMA_CM_EVENT_DISCONNECTED in particular,
164 : * one for the receive completion queue (upon incoming RDMA write), and an
165 : * additional eventfd(2) used to keep the notifier "hot" after partial reads.
166 : * We leverage the feature of epoll(7) in that "If an epoll file descriptor
167 : * has events waiting then it will indicate as being readable".
168 : *
169 : * Quite interesting is the effect of RDMA_RING_BUFFER_SIZE_MB and
170 : * RDMA_SEND_QUEUE_DEPTH depending on the communication pattern. Basically,
171 : * bigger doesn't necessarily equate to faster! The defaults are suited for
172 : * low latency conditions and would need tuning otherwise.
173 : *
174 : * ib_write_bw
175 : * -----------
176 : * #bytes num iterations BW peak[MB/sec] BW average[MB/sec]
177 : * 1048576 10000 3248.10 3247.94
178 : *
179 : * netperf
180 : * -------
181 : * Send perf: 3240.72MB/s (3240.72pps)
182 : * Send perf: 3240.72MB/s (3240.72pps)
183 : * Send perf: 3240.95MB/s (3240.95pps)
184 : */
185 0 : RDMAConnection::RDMAConnection()
186 : #ifdef _WIN32
187 : : _notifier()
188 : #else
189 : : _notifier(-1)
190 : #endif
191 0 : , _timeout(Global::getIAttribute(Global::IATTR_RDMA_RESOLVE_TIMEOUT_MS))
192 : , _rai(NULL)
193 : , _addrinfo(NULL)
194 : , _cm(NULL)
195 : , _cm_id(NULL)
196 : , _new_cm_id(NULL)
197 : , _cc(NULL)
198 : , _cq(NULL)
199 : , _pd(NULL)
200 : , _wcs(0)
201 : , _readBytes(0)
202 : , _established(false)
203 : , _depth(0L)
204 : , _writes(0L)
205 : , _fcs(0L)
206 : , _wcredits(0L)
207 : , _fcredits(0L)
208 : , _completions(0U)
209 : , _msgbuf(sizeof(RDMAMessage))
210 : , _sourcebuf(0)
211 : , _sourceptr(0)
212 : , _sinkbuf(IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE)
213 : , _sinkptr(0)
214 : , _rptr(0UL)
215 : , _rbase(0ULL)
216 0 : , _rkey(0ULL)
217 : #ifdef _WIN32
218 : , _availBytes(0)
219 : , _eventFlag(0)
220 : , _cmWaitObj(0)
221 : , _ccWaitObj(0)
222 : #endif
223 : {
224 : #ifndef _WIN32
225 0 : _pipe_fd[0] = -1;
226 0 : _pipe_fd[1] = -1;
227 : #endif
228 :
229 0 : ::memset((void *)&_addr, 0, sizeof(_addr));
230 0 : ::memset((void *)&_serv, 0, sizeof(_serv));
231 :
232 0 : ::memset((void *)&_cpd, 0, sizeof(struct RDMAConnParamData));
233 :
234 0 : ConnectionDescriptionPtr description = _getDescription();
235 0 : description->type = CONNECTIONTYPE_RDMA;
236 0 : description->bandwidth = // QDR default, report "actual" 8b/10b bandwidth
237 0 : (::ibv_rate_to_mult(IBV_RATE_40_GBPS) * 2.5 * 1024000 / 8) * 0.8;
238 0 : }
239 :
240 0 : bool RDMAConnection::connect()
241 : {
242 0 : ConstConnectionDescriptionPtr description = getDescription();
243 0 : LBASSERT(CONNECTIONTYPE_RDMA == description->type);
244 :
245 0 : if (!isClosed() || 0u == description->port)
246 0 : return false;
247 :
248 0 : _cleanup();
249 0 : _setState(STATE_CONNECTING);
250 :
251 0 : if (!_lookupAddress(false) || (NULL == _addrinfo))
252 : {
253 0 : LBERROR << "Failed to lookup destination address." << std::endl;
254 0 : goto err;
255 : }
256 :
257 0 : _updateInfo(_addrinfo->ai_dst_addr);
258 :
259 0 : if (!_createNotifier())
260 : {
261 0 : LBERROR << "Failed to create master notifier." << std::endl;
262 0 : goto err;
263 : }
264 :
265 0 : if (!_createEventChannel())
266 : {
267 0 : LBERROR << "Failed to create communication event channel." << std::endl;
268 0 : goto err;
269 : }
270 :
271 0 : if (!_createId())
272 : {
273 0 : LBERROR << "Failed to create communication identifier." << std::endl;
274 0 : goto err;
275 : }
276 :
277 0 : if (!_resolveAddress())
278 : {
279 0 : LBERROR << "Failed to resolve destination address for : " << _addr
280 0 : << ":" << _serv << std::endl;
281 0 : goto err;
282 : }
283 :
284 0 : _updateInfo(::rdma_get_peer_addr(_cm_id));
285 :
286 0 : _device_name = ::ibv_get_device_name(_cm_id->verbs->device);
287 :
288 0 : LBVERB << "Initiating connection on " << std::dec << _device_name << ":"
289 0 : << (int)_cm_id->port_num << " to " << _addr << ":" << _serv << " ("
290 0 : << description->toString() << ")" << std::endl;
291 :
292 0 : if (!_initProtocol(
293 : Global::getIAttribute(Global::IATTR_RDMA_SEND_QUEUE_DEPTH)))
294 : {
295 0 : LBERROR << "Failed to initialize protocol variables." << std::endl;
296 0 : goto err;
297 : }
298 :
299 0 : if (!_initVerbs())
300 : {
301 0 : LBERROR << "Failed to initialize verbs." << std::endl;
302 0 : goto err;
303 : }
304 :
305 0 : if (!_createQP())
306 : {
307 0 : LBERROR << "Failed to create queue pair." << std::endl;
308 0 : goto err;
309 : }
310 :
311 0 : if (!_createBytesAvailableFD())
312 : {
313 0 : LBERROR << "Failed to create available byte notifier." << std::endl;
314 0 : goto err;
315 : }
316 :
317 0 : if (!_initBuffers())
318 : {
319 0 : LBERROR << "Failed to initialize ring buffers." << std::endl;
320 0 : goto err;
321 : }
322 :
323 0 : if (!_resolveRoute())
324 : {
325 0 : LBERROR << "Failed to resolve route to destination : " << _addr << ":"
326 0 : << _serv << std::endl;
327 0 : goto err;
328 : }
329 :
330 0 : if (!_postReceives(_depth))
331 : {
332 0 : LBERROR << "Failed to pre-post receives." << std::endl;
333 0 : goto err;
334 : }
335 :
336 0 : if (!_connect())
337 : {
338 0 : LBERROR << "Failed to connect to destination : " << _addr << ":"
339 0 : << _serv << std::endl;
340 0 : goto err;
341 : }
342 :
343 0 : LBASSERT(_established);
344 :
345 0 : if ((RDMA_PROTOCOL_MAGIC != _cpd.magic) ||
346 0 : (RDMA_PROTOCOL_VERSION != _cpd.version))
347 : {
348 0 : LBERROR << "Protocol mismatch with target : " << _addr << ":" << _serv
349 0 : << std::endl;
350 0 : goto err;
351 : }
352 :
353 0 : if (!_postSetup())
354 : {
355 0 : LBERROR << "Failed to post setup message." << std::endl;
356 0 : goto err;
357 : }
358 :
359 0 : if (!_waitRecvSetup())
360 : {
361 0 : LBERROR << "Failed to receive setup message." << std::endl;
362 0 : goto err;
363 : }
364 :
365 0 : LBVERB << "Connection established on " << std::dec << _device_name << ":"
366 0 : << (int)_cm_id->port_num << " to " << _addr << ":" << _serv << " ("
367 0 : << description->toString() << ")" << std::endl;
368 :
369 0 : _setState(STATE_CONNECTED);
370 0 : _updateNotifier();
371 0 : return true;
372 :
373 : err:
374 0 : LBVERB << "Connection failed on " << std::dec << _device_name << ":"
375 0 : << (int)_cm_id->port_num << " to " << _addr << ":" << _serv << " ("
376 0 : << description->toString() << ")" << std::endl;
377 :
378 0 : close();
379 :
380 0 : return false;
381 : }
382 :
383 0 : bool RDMAConnection::listen()
384 : {
385 0 : ConstConnectionDescriptionPtr description = getDescription();
386 0 : LBASSERT(CONNECTIONTYPE_RDMA == description->type);
387 :
388 0 : if (!isClosed())
389 0 : return false;
390 :
391 0 : _cleanup();
392 0 : _setState(STATE_CONNECTING);
393 :
394 0 : if (!_lookupAddress(true))
395 : {
396 0 : LBERROR << "Failed to lookup local address." << std::endl;
397 0 : goto err;
398 : }
399 :
400 0 : if (NULL != _addrinfo)
401 0 : _updateInfo(_addrinfo->ai_src_addr);
402 :
403 0 : if (!_createNotifier())
404 : {
405 0 : LBERROR << "Failed to create master notifier." << std::endl;
406 0 : goto err;
407 : }
408 :
409 0 : if (!_createEventChannel())
410 : {
411 0 : LBERROR << "Failed to create communication event channel." << std::endl;
412 0 : goto err;
413 : }
414 :
415 0 : if (!_createId())
416 : {
417 0 : LBERROR << "Failed to create communication identifier." << std::endl;
418 0 : goto err;
419 : }
420 :
421 : #if 0
422 : /* NOT IMPLEMENTED */
423 :
424 : if( ::rdma_set_option( _cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
425 : (void *)&ONE, sizeof(ONE) ))
426 : {
427 : LBERROR << "rdma_set_option: " << lunchbox::sysError << std::endl;
428 : goto err;
429 : }
430 : #endif
431 :
432 0 : if (!_bindAddress())
433 : {
434 0 : LBERROR << "Failed to bind to local address : " << _addr << ":" << _serv
435 0 : << std::endl;
436 0 : goto err;
437 : }
438 :
439 0 : _updateInfo(::rdma_get_local_addr(_cm_id));
440 :
441 : #ifdef _WIN32
442 : if (!_listen(WINDOWS_CONNECTION_BACKLOG))
443 : #else
444 0 : if (!_listen(SOMAXCONN))
445 : #endif
446 : {
447 0 : LBERROR << "Failed to listen on bound address : " << _addr << ":"
448 0 : << _serv << std::endl;
449 0 : goto err;
450 : }
451 :
452 0 : if (NULL != _cm_id->verbs)
453 0 : _device_name = ::ibv_get_device_name(_cm_id->verbs->device);
454 :
455 0 : LBVERB << "Listening on " << std::dec << _device_name << ":"
456 0 : << (int)_cm_id->port_num << " at " << _addr << ":" << _serv << " ("
457 0 : << description->toString() << ")" << std::endl;
458 :
459 0 : _setState(STATE_LISTENING);
460 0 : return true;
461 :
462 : err:
463 0 : close();
464 0 : return false;
465 : }
466 :
467 0 : void RDMAConnection::acceptNB()
468 : { /* NOP */
469 0 : }
470 :
471 0 : ConnectionPtr RDMAConnection::acceptSync()
472 : {
473 0 : RDMAConnection *newConnection = NULL;
474 :
475 0 : if (!isListening())
476 : {
477 0 : LBERROR << "Connection not in listening state." << std::endl;
478 0 : goto out;
479 : }
480 :
481 0 : if (!_waitForCMEvent(RDMA_CM_EVENT_CONNECT_REQUEST))
482 : {
483 0 : LBERROR << "Failed to receive valid connect request." << std::endl;
484 0 : goto out;
485 : }
486 :
487 0 : LBASSERT(NULL != _new_cm_id);
488 :
489 0 : newConnection = new RDMAConnection();
490 :
491 0 : if (!newConnection->_finishAccept(_new_cm_id, _cpd))
492 : {
493 0 : delete newConnection;
494 0 : newConnection = NULL;
495 : }
496 :
497 : out:
498 0 : _new_cm_id = NULL;
499 0 : _updateNotifier();
500 0 : return newConnection;
501 : }
502 :
503 0 : void RDMAConnection::readNB(void *, const uint64_t)
504 : { /* NOP */
505 0 : }
506 :
507 0 : int64_t RDMAConnection::readSync(void *buffer, const uint64_t bytes,
508 : const bool block)
509 : {
510 0 : lunchbox::Clock clock;
511 0 : const int64_t start = clock.getTime64();
512 0 : const uint32_t timeout = Global::getTimeout();
513 0 : eventset events;
514 0 : uint64_t available_bytes = 0ULL;
515 0 : uint32_t bytes_taken = 0UL;
516 0 : bool extra_event = false;
517 :
518 0 : if (!isConnected())
519 0 : goto err;
520 :
521 : // LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
522 : // << std::endl;
523 :
524 0 : _stats.reads++;
525 :
526 : retry:
527 0 : if (!_checkDisconnected(events))
528 : {
529 0 : LBERROR << "Error while checking event state." << std::endl;
530 0 : goto err;
531 : }
532 :
533 0 : if (events.test(CQ_EVENT))
534 : {
535 0 : if (!_rearmCQ())
536 : {
537 0 : LBERROR << "Error while rearming receive channel." << std::endl;
538 0 : goto err;
539 : }
540 : }
541 :
542 : // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
543 0 : if (!_checkCQ(events.test(CQ_EVENT)))
544 : {
545 0 : LBERROR << "Error while polling completion queues." << std::endl;
546 0 : goto err;
547 : }
548 :
549 0 : LBASSERT(_fcredits >= 0L);
550 :
551 0 : if (_established && _needFC() && (0L == _fcredits))
552 : {
553 0 : if (LB_TIMEOUT_INDEFINITE != timeout)
554 : {
555 0 : if ((clock.getTime64() - start) > timeout)
556 : {
557 0 : LBERROR << "Timed out trying to acquire credit." << std::endl;
558 0 : goto err;
559 : }
560 : }
561 :
562 : // LBWARN << "No credit for flow control." << std::endl;
563 0 : lunchbox::Thread::yield();
564 0 : _stats.no_credits_fc++;
565 0 : goto retry;
566 : }
567 :
568 : // "Note that an extra event may be triggered without having a
569 : // corresponding completion entry in the CQ." (per ibv_get_cq_event(3))
570 0 : if (_established && !events.test(BUF_EVENT))
571 : {
572 : // Special case: If LocalNode is reading the length part of a message
573 : // it will ignore this zero return and restart the select.
574 0 : if (extra_event && !block)
575 : {
576 0 : _updateNotifier();
577 0 : return 0LL;
578 : }
579 :
580 0 : extra_event = true;
581 0 : goto retry;
582 : }
583 :
584 0 : if (events.test(BUF_EVENT))
585 : {
586 0 : available_bytes = _getAvailableBytes();
587 0 : if (0ULL == available_bytes)
588 : {
589 0 : LBERROR << "Error while reading from event fd." << std::endl;
590 0 : goto err;
591 : }
592 : }
593 :
594 : // Modifies _sinkptr.TAIL
595 0 : bytes_taken = _drain(buffer, bytes);
596 :
597 0 : if (0UL == bytes_taken)
598 : {
599 0 : if (_sinkptr.isEmpty() && !_established)
600 : {
601 0 : LBINFO << "Got EOF, closing " << getDescription()->toString()
602 0 : << std::endl;
603 0 : goto err;
604 : }
605 :
606 0 : if (LB_TIMEOUT_INDEFINITE != timeout)
607 : {
608 0 : if ((clock.getTime64() - start) > timeout)
609 : {
610 0 : LBERROR << "Timed out trying to drain buffer." << std::endl;
611 0 : goto err;
612 : }
613 : }
614 :
615 : // LBWARN << "Sink buffer empty." << std::endl;
616 0 : lunchbox::Thread::yield();
617 0 : _stats.buffer_empty++;
618 0 : goto retry;
619 : }
620 :
621 : // Put back what wasn't taken (ensure the master notifier stays "hot").
622 0 : if (available_bytes > bytes_taken)
623 0 : _incrAvailableBytes(available_bytes - bytes_taken);
624 : #ifdef _WIN32
625 : else
626 : _updateNotifier();
627 : #endif
628 :
629 0 : _readBytes += bytes_taken;
630 :
631 0 : if (_established && _needFC() && !_postFC())
632 0 : LBWARN << "Error while posting flow control message." << std::endl;
633 :
634 : // LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
635 : // << " took " << bytes_taken << " bytes"
636 : // << " (" << _sinkptr.available( ) << " still available)" <<
637 : // std::endl;
638 :
639 0 : return bytes_taken;
640 :
641 : err:
642 0 : close();
643 :
644 0 : return -1LL;
645 : }
646 :
647 0 : int64_t RDMAConnection::write(const void *buffer, const uint64_t bytes)
648 : {
649 0 : lunchbox::Clock clock;
650 0 : const int64_t start = clock.getTime64();
651 0 : const uint32_t timeout = Global::getTimeout();
652 0 : eventset events;
653 0 : const uint32_t can_put = std::min(bytes, MAX_BS);
654 : uint32_t bytes_put;
655 :
656 0 : if (!isConnected())
657 0 : goto err;
658 :
659 : // LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
660 : // << std::endl;
661 :
662 0 : _stats.writes++;
663 :
664 : retry:
665 0 : if (!_checkDisconnected(events))
666 : {
667 0 : LBERROR << "Error while checking connection state." << std::endl;
668 0 : goto err;
669 : }
670 :
671 0 : if (!_established)
672 : {
673 0 : LBWARN << "Disconnected in write." << std::endl;
674 0 : goto err;
675 : }
676 :
677 : // Modifies _sourceptr.TAIL, _sinkptr.HEAD & _rptr.TAIL
678 0 : if (!_checkCQ(false))
679 : {
680 0 : LBERROR << "Error while polling completion queues." << std::endl;
681 0 : goto err;
682 : }
683 :
684 0 : LBASSERT(_wcredits >= 0L);
685 :
686 0 : if (0L == _wcredits)
687 : {
688 0 : if (LB_TIMEOUT_INDEFINITE != timeout)
689 : {
690 0 : if ((clock.getTime64() - start) > timeout)
691 : {
692 0 : LBERROR << "Timed out trying to acquire credit." << std::endl;
693 0 : goto err;
694 : }
695 : }
696 :
697 : // LBWARN << "No credits for RDMA." << std::endl;
698 0 : lunchbox::Thread::yield();
699 0 : _stats.no_credits_rdma++;
700 0 : goto retry;
701 : }
702 :
703 : // Modifies _sourceptr.HEAD
704 0 : bytes_put = _fill(buffer, can_put);
705 :
706 0 : if (0UL == bytes_put)
707 : {
708 0 : if (LB_TIMEOUT_INDEFINITE != timeout)
709 : {
710 0 : if ((clock.getTime64() - start) > timeout)
711 : {
712 0 : LBERROR << "Timed out trying to fill buffer." << std::endl;
713 0 : goto err;
714 : }
715 : }
716 :
717 : // LBWARN << "Source buffer full." << std::endl;
718 0 : lunchbox::Thread::yield();
719 0 : if (_sourceptr.isFull() || _rptr.isFull())
720 0 : _stats.buffer_full++;
721 0 : goto retry;
722 : }
723 :
724 : // Modifies _sourceptr.MIDDLE & _rptr.HEAD
725 0 : if (!_postRDMAWrite())
726 : {
727 0 : LBERROR << "Error while posting RDMA write." << std::endl;
728 0 : goto err;
729 : }
730 :
731 : // LBWARN << (void *)this << std::dec << ".write(" << bytes << ")"
732 : // << " put " << bytes_put << " bytes" << std::endl;
733 :
734 0 : return bytes_put;
735 :
736 : err:
737 0 : return -1LL;
738 : }
739 :
740 0 : RDMAConnection::~RDMAConnection()
741 : {
742 0 : _close();
743 0 : }
744 :
745 : ////////////////////////////////////////////////////////////////////////////////
746 :
747 0 : void RDMAConnection::_close()
748 : {
749 0 : lunchbox::ScopedWrite close_mutex(_poll_lock);
750 :
751 0 : if (!isClosed())
752 : {
753 0 : LBASSERT(!isClosing());
754 0 : _setState(STATE_CLOSING);
755 :
756 0 : if (_established && ::rdma_disconnect(_cm_id))
757 0 : LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
758 :
759 0 : _setState(STATE_CLOSED);
760 0 : _cleanup();
761 : }
762 0 : }
763 :
764 0 : void RDMAConnection::_cleanup()
765 : {
766 0 : LBASSERT(isClosed());
767 :
768 0 : _sourcebuf.clear();
769 0 : _sinkbuf.clear();
770 0 : _msgbuf.clear();
771 :
772 0 : if (_completions > 0U)
773 : {
774 0 : ::ibv_ack_cq_events(_cq, _completions);
775 0 : _completions = 0U;
776 : }
777 :
778 0 : delete[] _wcs;
779 0 : _wcs = 0;
780 :
781 : #ifdef _WIN32
782 : if (_ccWaitObj)
783 : UnregisterWait(_ccWaitObj);
784 : _ccWaitObj = 0;
785 :
786 : if (_cmWaitObj)
787 : UnregisterWait(_cmWaitObj);
788 : _cmWaitObj = 0;
789 : #endif
790 :
791 0 : if (NULL != _cm_id)
792 0 : ::rdma_destroy_ep(_cm_id);
793 0 : _cm_id = NULL;
794 :
795 0 : if ((NULL != _cq) && ::rdma_seterrno(::ibv_destroy_cq(_cq)))
796 0 : LBWARN << "ibv_destroy_cq : " << lunchbox::sysError << std::endl;
797 0 : _cq = NULL;
798 :
799 0 : if ((NULL != _cc) && ::rdma_seterrno(::ibv_destroy_comp_channel(_cc)))
800 0 : LBWARN << "ibv_destroy_comp_channel : " << lunchbox::sysError
801 0 : << std::endl;
802 0 : _cc = NULL;
803 :
804 0 : if ((NULL != _pd) && ::rdma_seterrno(::ibv_dealloc_pd(_pd)))
805 0 : LBWARN << "ibv_dealloc_pd : " << lunchbox::sysError << std::endl;
806 0 : _pd = NULL;
807 :
808 0 : if (NULL != _cm)
809 0 : ::rdma_destroy_event_channel(_cm);
810 0 : _cm = NULL;
811 :
812 0 : if (NULL != _rai)
813 0 : ::rdma_freeaddrinfo(_rai);
814 0 : _rai = NULL;
815 :
816 0 : _rptr = 0UL;
817 0 : _rbase = _rkey = 0ULL;
818 : #ifndef _WIN32
819 0 : if ((_notifier >= 0) && TEMP_FAILURE_RETRY(::close(_notifier)))
820 0 : LBWARN << "close : " << lunchbox::sysError << std::endl;
821 0 : _notifier = -1;
822 : #endif
823 0 : }
824 :
825 0 : bool RDMAConnection::_finishAccept(struct rdma_cm_id *new_cm_id,
826 : const RDMAConnParamData &cpd)
827 : {
828 0 : LBASSERT(isClosed());
829 0 : _setState(STATE_CONNECTING);
830 :
831 0 : LBASSERT(NULL != new_cm_id);
832 :
833 0 : _cm_id = new_cm_id;
834 :
835 : {
836 : // FIXME: RDMA CM appears to send up invalid addresses when receiving
837 : // connections that use a different protocol than what was bound. E.g.
838 : // if an IPv6 listener gets an IPv4 connection then the sa_family
839 : // will be AF_INET6 but the actual data is struct sockaddr_in. Example:
840 : //
841 : // 0000000: 0a00 bc10 c0a8 b01a 0000 0000 0000 0000 ................
842 : //
843 : // However, in the reverse case, when an IPv4 listener gets an IPv6
844 : // connection not only is the address family incorrect, but the actual
845 : // IPv6 address is only partially there:
846 : //
847 : // 0000000: 0200 bc11 0000 0000 fe80 0000 0000 0000 ................
848 : // 0000010: 0000 0000 0000 0000 0000 0000 0000 0000 ................
849 : // 0000020: 0000 0000 0000 0000 0000 0000 0000 0000 ................
850 : // 0000030: 0000 0000 0000 0000 0000 0000 0000 0000 ................
851 : //
852 : // Surely seems to be a bug in RDMA CM.
853 :
854 : union {
855 : struct sockaddr addr;
856 : struct sockaddr_in sin;
857 : struct sockaddr_in6 sin6;
858 : struct sockaddr_storage storage;
859 : } sss;
860 :
861 : // Make a copy since we might change it.
862 : // sss.storage = _cm_id->route.addr.dst_storage;
863 : ::memcpy((void *)&sss.storage,
864 0 : (const void *)::rdma_get_peer_addr(_cm_id),
865 0 : sizeof(struct sockaddr_storage));
866 : #ifndef _WIN32
867 0 : if ((AF_INET == sss.storage.ss_family) &&
868 0 : (sss.sin6.sin6_addr.s6_addr32[0] != 0 ||
869 0 : sss.sin6.sin6_addr.s6_addr32[1] != 0 ||
870 0 : sss.sin6.sin6_addr.s6_addr32[2] != 0 ||
871 0 : sss.sin6.sin6_addr.s6_addr32[3] != 0))
872 : {
873 0 : LBWARN << "IPv6 address detected but likely invalid!" << std::endl;
874 0 : sss.storage.ss_family = AF_INET6;
875 : }
876 : else
877 : #endif
878 0 : if ((AF_INET6 == sss.storage.ss_family) &&
879 0 : (INADDR_ANY != sss.sin.sin_addr.s_addr))
880 : {
881 0 : sss.storage.ss_family = AF_INET;
882 : }
883 :
884 0 : _updateInfo(&sss.addr);
885 : }
886 :
887 0 : _device_name = ::ibv_get_device_name(_cm_id->verbs->device);
888 :
889 0 : LBVERB << "Connection initiated on " << std::dec << _device_name << ":"
890 0 : << (int)_cm_id->port_num << " from " << _addr << ":" << _serv << " ("
891 0 : << getDescription()->toString() << ")" << std::endl;
892 :
893 0 : if ((RDMA_PROTOCOL_MAGIC != cpd.magic) ||
894 0 : (RDMA_PROTOCOL_VERSION != cpd.version))
895 : {
896 0 : LBERROR << "Protocol mismatch with initiator : " << _addr << ":"
897 0 : << _serv << std::endl;
898 0 : goto err_reject;
899 : }
900 :
901 0 : if (!_createNotifier())
902 : {
903 0 : LBERROR << "Failed to create master notifier." << std::endl;
904 0 : goto err_reject;
905 : }
906 :
907 0 : if (!_createEventChannel())
908 : {
909 0 : LBERROR << "Failed to create event channel." << std::endl;
910 0 : goto err_reject;
911 : }
912 :
913 0 : if (!_migrateId())
914 : {
915 0 : LBERROR << "Failed to migrate communication identifier." << std::endl;
916 0 : goto err_reject;
917 : }
918 :
919 0 : if (!_initProtocol(cpd.depth))
920 : {
921 0 : LBERROR << "Failed to initialize protocol variables." << std::endl;
922 0 : goto err_reject;
923 : }
924 :
925 0 : if (!_initVerbs())
926 : {
927 0 : LBERROR << "Failed to initialize verbs." << std::endl;
928 0 : goto err_reject;
929 : }
930 :
931 0 : if (!_createQP())
932 : {
933 0 : LBERROR << "Failed to create queue pair." << std::endl;
934 0 : goto err_reject;
935 : }
936 :
937 0 : if (!_createBytesAvailableFD())
938 : {
939 0 : LBERROR << "Failed to create available byte notifier." << std::endl;
940 0 : goto err_reject;
941 : }
942 :
943 0 : if (!_initBuffers())
944 : {
945 0 : LBERROR << "Failed to initialize ring buffers." << std::endl;
946 0 : goto err_reject;
947 : }
948 :
949 0 : if (!_postReceives(_depth))
950 : {
951 0 : LBERROR << "Failed to pre-post receives." << std::endl;
952 0 : goto err_reject;
953 : }
954 :
955 0 : if (!_accept())
956 : {
957 0 : LBERROR << "Failed to accept remote connection from : " << _addr << ":"
958 0 : << _serv << std::endl;
959 0 : goto err;
960 : }
961 :
962 0 : LBASSERT(_established);
963 :
964 0 : if (!_waitRecvSetup())
965 : {
966 0 : LBERROR << "Failed to receive setup message." << std::endl;
967 0 : goto err;
968 : }
969 :
970 0 : if (!_postSetup())
971 : {
972 0 : LBERROR << "Failed to post setup message." << std::endl;
973 0 : goto err;
974 : }
975 :
976 0 : LBVERB << "Connection accepted on " << std::dec << _device_name << ":"
977 0 : << (int)_cm_id->port_num << " from " << _addr << ":" << _serv << " ("
978 0 : << getDescription()->toString() << ")" << std::endl;
979 :
980 0 : _setState(STATE_CONNECTED);
981 0 : _updateNotifier();
982 0 : return true;
983 :
984 : err_reject:
985 0 : LBWARN << "Rejecting connection from remote address : " << _addr << ":"
986 0 : << _serv << std::endl;
987 :
988 0 : if (!_reject())
989 0 : LBWARN << "Failed to issue connection reject." << std::endl;
990 :
991 : err:
992 0 : close();
993 :
994 0 : return false;
995 : }
996 :
997 0 : bool RDMAConnection::_lookupAddress(const bool passive)
998 : {
999 : struct rdma_addrinfo hints;
1000 0 : char *node = 0;
1001 0 : char *service = 0;
1002 0 : std::string s;
1003 :
1004 0 : ::memset((void *)&hints, 0, sizeof(struct rdma_addrinfo));
1005 0 : if (passive)
1006 0 : hints.ai_flags |= RAI_PASSIVE;
1007 :
1008 0 : ConstConnectionDescriptionPtr description = getDescription();
1009 0 : const std::string &hostname = description->getHostname();
1010 0 : if (!hostname.empty())
1011 0 : node = const_cast<char *>(hostname.c_str());
1012 :
1013 0 : if (0u != description->port)
1014 : {
1015 0 : std::stringstream ss;
1016 0 : ss << description->port;
1017 0 : s = ss.str();
1018 0 : service = const_cast<char *>(s.c_str());
1019 : }
1020 :
1021 0 : if ((NULL != node) && ::rdma_getaddrinfo(node, service, &hints, &_rai))
1022 : {
1023 0 : LBERROR << "rdma_getaddrinfo: " << lunchbox::sysError << std::endl;
1024 0 : return false;
1025 : }
1026 :
1027 0 : _addrinfo = _rai;
1028 0 : while (_addrinfo && !_isInetFamily(_addrinfo))
1029 0 : _addrinfo = _addrinfo->ai_next;
1030 :
1031 0 : if (!_addrinfo)
1032 : {
1033 0 : if (_rai)
1034 : {
1035 0 : LBWARN << "No IP or IPv6 address found by rdma_getaddrinfo. "
1036 0 : "Connection establishment may fail."
1037 0 : << std::endl;
1038 : }
1039 0 : _addrinfo = _rai;
1040 : }
1041 :
1042 0 : if ((NULL != _addrinfo) && (_addrinfo->ai_connect_len > 0))
1043 0 : LBWARN << "WARNING : ai_connect data specified!" << std::endl;
1044 :
1045 0 : return true;
1046 : }
1047 :
1048 0 : void RDMAConnection::_updateInfo(struct sockaddr *addr)
1049 : {
1050 0 : int salen = sizeof(struct sockaddr);
1051 0 : bool is_unspec = false;
1052 :
1053 0 : if (AF_INET == addr->sa_family)
1054 : {
1055 0 : struct sockaddr_in *sin = reinterpret_cast<struct sockaddr_in *>(addr);
1056 0 : is_unspec = (INADDR_ANY == sin->sin_addr.s_addr);
1057 0 : salen = sizeof(struct sockaddr_in);
1058 : }
1059 : // TODO: IPv6 for WIndows
1060 : #ifndef _WIN32
1061 0 : else if (AF_INET6 == addr->sa_family)
1062 : {
1063 : struct sockaddr_in6 *sin6 =
1064 0 : reinterpret_cast<struct sockaddr_in6 *>(addr);
1065 :
1066 0 : is_unspec = (sin6->sin6_addr.s6_addr32[0] == 0 &&
1067 0 : sin6->sin6_addr.s6_addr32[1] == 0 &&
1068 0 : sin6->sin6_addr.s6_addr32[2] == 0 &&
1069 0 : sin6->sin6_addr.s6_addr32[3] == 0);
1070 0 : salen = sizeof(struct sockaddr_in6);
1071 : }
1072 : #endif
1073 : else
1074 : {
1075 0 : LBERROR << "Unsupported socket address family " << addr->sa_family
1076 0 : << std::endl;
1077 0 : return;
1078 : }
1079 :
1080 : int err;
1081 0 : if ((err = ::getnameinfo(addr, salen, _addr, sizeof(_addr), _serv,
1082 : sizeof(_serv), NI_NUMERICHOST | NI_NUMERICSERV)))
1083 0 : LBWARN << "Name info lookup failed : " << err << std::endl;
1084 :
1085 0 : if (is_unspec)
1086 0 : ::gethostname(_addr, NI_MAXHOST);
1087 :
1088 0 : ConnectionDescriptionPtr description = _getDescription();
1089 0 : if (description->getHostname().empty())
1090 0 : description->setHostname(_addr);
1091 0 : if (0u == description->port)
1092 0 : description->port = atoi(_serv);
1093 : }
1094 :
1095 0 : bool RDMAConnection::_createEventChannel()
1096 : {
1097 0 : LBASSERT(NULL == _cm);
1098 :
1099 0 : _cm = ::rdma_create_event_channel();
1100 0 : if (NULL == _cm)
1101 : {
1102 0 : LBERROR << "rdma_create_event_channel : " << lunchbox::sysError
1103 0 : << std::endl;
1104 0 : return false;
1105 : }
1106 :
1107 : #ifdef _WIN32
1108 : if (!RegisterWaitForSingleObject(&_cmWaitObj, _cm->channel.Event,
1109 : WAITORTIMERCALLBACK(&_triggerNotifierCM),
1110 : this, INFINITE, WT_EXECUTEINWAITTHREAD))
1111 : {
1112 : LBERROR << "RegisterWaitForSingleObject: " << co::base::sysError
1113 : << std::endl;
1114 : return false;
1115 : }
1116 : #else
1117 : struct epoll_event evctl;
1118 :
1119 0 : LBASSERT(_notifier >= 0);
1120 :
1121 : // Use the CM fd to signal Collage of connection events.
1122 0 : ::memset((void *)&evctl, 0, sizeof(struct epoll_event));
1123 0 : evctl.events = EPOLLIN;
1124 0 : evctl.data.fd = _cm->fd;
1125 0 : if (::epoll_ctl(_notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl))
1126 : {
1127 0 : LBERROR << "epoll_ctl: " << lunchbox::sysError << std::endl;
1128 0 : return false;
1129 : }
1130 : #endif
1131 :
1132 0 : return true;
1133 : }
1134 :
1135 0 : bool RDMAConnection::_createId()
1136 : {
1137 0 : LBASSERT(NULL != _cm);
1138 0 : LBASSERT(NULL == _cm_id);
1139 :
1140 0 : if (::rdma_create_id(_cm, &_cm_id, NULL, RDMA_PS_TCP))
1141 : {
1142 0 : LBERROR << "rdma_create_id: " << lunchbox::sysError << std::endl;
1143 0 : return false;
1144 : }
1145 :
1146 0 : return true;
1147 : }
1148 :
1149 0 : bool RDMAConnection::_initVerbs()
1150 : {
1151 0 : LBASSERT(NULL != _cm_id);
1152 0 : LBASSERT(NULL != _cm_id->verbs);
1153 :
1154 0 : LBASSERT(NULL == _pd);
1155 :
1156 : // Allocate protection domain.
1157 0 : _pd = ::ibv_alloc_pd(_cm_id->verbs);
1158 0 : if (NULL == _pd)
1159 : {
1160 0 : LBERROR << "ibv_alloc_pd: " << lunchbox::sysError << std::endl;
1161 0 : return false;
1162 : }
1163 :
1164 0 : LBASSERT(NULL == _cc);
1165 :
1166 : // Create completion channel.
1167 0 : _cc = ::ibv_create_comp_channel(_cm_id->verbs);
1168 0 : if (NULL == _cc)
1169 : {
1170 0 : LBERROR << "ibv_create_comp_channel : " << lunchbox::sysError
1171 0 : << std::endl;
1172 0 : return false;
1173 : }
1174 :
1175 : #ifdef _WIN32
1176 : if (!RegisterWaitForSingleObject(&_ccWaitObj, _cc->comp_channel.Event,
1177 : WAITORTIMERCALLBACK(&_triggerNotifierCQ),
1178 : this, INFINITE, WT_EXECUTEINWAITTHREAD))
1179 : {
1180 : LBERROR << "RegisterWaitForSingleObject : " << lunchbox::sysError
1181 : << std::endl;
1182 : return false;
1183 : }
1184 : #else
1185 0 : LBASSERT(_notifier >= 0);
1186 :
1187 : // Use the completion channel fd to signal Collage of RDMA writes received.
1188 : struct epoll_event evctl;
1189 0 : ::memset((void *)&evctl, 0, sizeof(struct epoll_event));
1190 0 : evctl.events = EPOLLIN;
1191 0 : evctl.data.fd = _cc->fd;
1192 0 : if (::epoll_ctl(_notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl))
1193 : {
1194 0 : LBERROR << "epoll_ctl: " << lunchbox::sysError << std::endl;
1195 0 : return false;
1196 : }
1197 : #endif
1198 :
1199 0 : LBASSERT(NULL == _cq);
1200 :
1201 : // Create a single completion queue for both sends & receives */
1202 0 : _cq = ::ibv_create_cq(_cm_id->verbs, _depth * 2, NULL, _cc, 0);
1203 0 : if (NULL == _cq)
1204 : {
1205 0 : LBERROR << "ibv_create_cq: " << lunchbox::sysError << std::endl;
1206 0 : return false;
1207 : }
1208 :
1209 : // Request IBV_SEND_SOLICITED events only (i.e. RDMA writes, not FC)
1210 0 : if (::rdma_seterrno(::ibv_req_notify_cq(_cq, 1)))
1211 : {
1212 0 : LBERROR << "ibv_req_notify_cq: " << lunchbox::sysError << std::endl;
1213 0 : return false;
1214 : }
1215 :
1216 0 : _wcs = new struct ibv_wc[_depth];
1217 0 : return true;
1218 : }
1219 :
1220 0 : bool RDMAConnection::_createQP()
1221 : {
1222 : struct ibv_qp_init_attr init_attr;
1223 :
1224 0 : LBASSERT(_depth > 0);
1225 0 : LBASSERT(NULL != _cm_id);
1226 0 : LBASSERT(NULL != _pd);
1227 0 : LBASSERT(NULL != _cq);
1228 :
1229 0 : ::memset((void *)&init_attr, 0, sizeof(struct ibv_qp_init_attr));
1230 0 : init_attr.qp_type = IBV_QPT_RC;
1231 0 : init_attr.cap.max_send_wr = _depth;
1232 0 : init_attr.cap.max_recv_wr = _depth;
1233 0 : init_attr.cap.max_recv_sge = 1;
1234 0 : init_attr.cap.max_send_sge = 1;
1235 0 : init_attr.recv_cq = _cq;
1236 0 : init_attr.send_cq = _cq;
1237 0 : init_attr.sq_sig_all = 1; // i.e. always IBV_SEND_SIGNALED
1238 :
1239 : // Create queue pair.
1240 0 : if (::rdma_create_qp(_cm_id, _pd, &init_attr))
1241 : {
1242 0 : LBERROR << "rdma_create_qp: " << lunchbox::sysError << std::endl;
1243 0 : return false;
1244 : }
1245 :
1246 0 : LBVERB << "RDMA QP caps : " << std::dec << init_attr.cap.max_recv_wr
1247 0 : << " receives, " << init_attr.cap.max_send_wr << " sends, "
1248 0 : << std::endl;
1249 :
1250 0 : return true;
1251 : }
1252 :
1253 0 : bool RDMAConnection::_initBuffers()
1254 : {
1255 : const size_t rbs =
1256 0 : 1024 * 1024 *
1257 0 : Global::getIAttribute(Global::IATTR_RDMA_RING_BUFFER_SIZE_MB);
1258 :
1259 0 : LBASSERT(_depth > 0);
1260 0 : LBASSERT(NULL != _pd);
1261 :
1262 0 : if (0 == rbs)
1263 : {
1264 0 : LBERROR << "Invalid RDMA ring buffer size." << std::endl;
1265 0 : return false;
1266 : }
1267 :
1268 0 : if (!_sourcebuf.resize(_pd, rbs))
1269 : {
1270 0 : LBERROR << "Failed to resize source buffer." << std::endl;
1271 0 : return false;
1272 : }
1273 :
1274 0 : if (!_sinkbuf.resize(_pd, rbs))
1275 : {
1276 0 : LBERROR << "Failed to resize sink buffer." << std::endl;
1277 0 : return false;
1278 : }
1279 :
1280 0 : _sourceptr.clear(uint32_t(_sourcebuf.getSize()));
1281 0 : _sinkptr.clear(uint32_t(_sinkbuf.getSize()));
1282 :
1283 : // Need enough space for both sends and receives.
1284 0 : if (!_msgbuf.resize(_pd, _depth * 2))
1285 : {
1286 0 : LBERROR << "Failed to resize message buffer pool." << std::endl;
1287 0 : return false;
1288 : }
1289 :
1290 0 : return true;
1291 : }
1292 :
1293 0 : bool RDMAConnection::_resolveAddress()
1294 : {
1295 0 : LBASSERT(NULL != _cm_id);
1296 0 : LBASSERT(NULL != _addrinfo);
1297 :
1298 0 : if (::rdma_resolve_addr(_cm_id, _addrinfo->ai_src_addr,
1299 0 : _addrinfo->ai_dst_addr, _timeout))
1300 : {
1301 0 : LBERROR << "rdma_resolve_addr: " << lunchbox::sysError << std::endl;
1302 0 : return false;
1303 : }
1304 :
1305 0 : return _waitForCMEvent(RDMA_CM_EVENT_ADDR_RESOLVED);
1306 : }
1307 :
1308 0 : bool RDMAConnection::_resolveRoute()
1309 : {
1310 0 : LBASSERT(NULL != _cm_id);
1311 0 : LBASSERT(NULL != _addrinfo);
1312 :
1313 0 : if ((IBV_TRANSPORT_IB == _cm_id->verbs->device->transport_type) &&
1314 0 : (_addrinfo->ai_route_len > 0))
1315 : {
1316 : #ifdef _WIN32
1317 : if (::rdma_set_option(_cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_TOS,
1318 : #else
1319 0 : if (::rdma_set_option(_cm_id, RDMA_OPTION_IB, RDMA_OPTION_IB_PATH,
1320 : #endif
1321 0 : _addrinfo->ai_route, _addrinfo->ai_route_len))
1322 : {
1323 0 : LBERROR << "rdma_set_option: " << lunchbox::sysError << std::endl;
1324 0 : return false;
1325 : }
1326 : }
1327 0 : else if (::rdma_resolve_route(_cm_id, _timeout))
1328 : {
1329 0 : LBERROR << "rdma_resolve_route: " << lunchbox::sysError << std::endl;
1330 0 : return false;
1331 : }
1332 :
1333 0 : return _waitForCMEvent(RDMA_CM_EVENT_ROUTE_RESOLVED);
1334 : }
1335 :
1336 0 : bool RDMAConnection::_connect()
1337 : {
1338 0 : LBASSERT(NULL != _cm_id);
1339 0 : LBASSERT(!_established);
1340 :
1341 : struct rdma_conn_param conn_param;
1342 :
1343 0 : ::memset((void *)&conn_param, 0, sizeof(struct rdma_conn_param));
1344 :
1345 0 : _cpd.magic = RDMA_PROTOCOL_MAGIC;
1346 0 : _cpd.version = RDMA_PROTOCOL_VERSION;
1347 0 : _cpd.depth = _depth;
1348 0 : conn_param.private_data = reinterpret_cast<const void *>(&_cpd);
1349 0 : conn_param.private_data_len = sizeof(struct RDMAConnParamData);
1350 0 : conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
1351 0 : conn_param.responder_resources = RDMA_MAX_RESP_RES;
1352 : // Magic 3-bit values.
1353 : // conn_param.retry_count = 5;
1354 : // conn_param.rnr_retry_count = 7;
1355 :
1356 0 : LBINFO << "Connect on source lid : " << std::showbase << std::hex
1357 0 : << ntohs(_cm_id->route.path_rec->slid) << " (" << std::dec
1358 0 : << ntohs(_cm_id->route.path_rec->slid) << ") "
1359 0 : << "to dest lid : " << std::hex
1360 0 : << ntohs(_cm_id->route.path_rec->dlid) << " (" << std::dec
1361 0 : << ntohs(_cm_id->route.path_rec->dlid) << ") " << std::endl;
1362 :
1363 0 : if (::rdma_connect(_cm_id, &conn_param))
1364 : {
1365 0 : LBERROR << "rdma_connect: " << lunchbox::sysError << std::endl;
1366 0 : return false;
1367 : }
1368 :
1369 0 : bool ret = _waitForCMEvent(RDMA_CM_EVENT_ESTABLISHED);
1370 :
1371 0 : return ret;
1372 : }
1373 :
1374 0 : bool RDMAConnection::_bindAddress()
1375 : {
1376 0 : LBASSERT(NULL != _cm_id);
1377 0 : ConstConnectionDescriptionPtr description = getDescription();
1378 :
1379 : #if IPV6_DEFAULT
1380 : struct sockaddr_in6 sin;
1381 : memset((void *)&sin, 0, sizeof(struct sockaddr_in6));
1382 : sin.sin6_family = AF_INET6;
1383 : sin.sin6_port = htons(description->port);
1384 : sin.sin6_addr = in6addr_any;
1385 : #else
1386 : struct sockaddr_in sin;
1387 0 : memset((void *)&sin, 0, sizeof(struct sockaddr_in));
1388 0 : sin.sin_family = AF_INET;
1389 0 : sin.sin_port = htons(description->port);
1390 0 : sin.sin_addr.s_addr = INADDR_ANY;
1391 : #endif
1392 :
1393 0 : if (::rdma_bind_addr(_cm_id,
1394 0 : (NULL != _addrinfo)
1395 0 : ? _addrinfo->ai_src_addr
1396 : : reinterpret_cast<struct sockaddr *>(&sin)))
1397 : {
1398 0 : LBERROR << "rdma_bind_addr: " << lunchbox::sysError << std::endl;
1399 0 : return false;
1400 : }
1401 :
1402 0 : const uint16_t port = ntohs(rdma_get_src_port(_cm_id));
1403 0 : LBDEBUG << "Listening on " << description->getHostname() << "[" << _addr
1404 0 : << "]:" << port << " (" << description->toString() << ")"
1405 0 : << std::endl;
1406 :
1407 0 : return true;
1408 : }
1409 :
1410 0 : bool RDMAConnection::_listen(int backlog)
1411 : {
1412 0 : LBASSERT(NULL != _cm_id);
1413 :
1414 0 : if (::rdma_listen(_cm_id, backlog))
1415 : {
1416 0 : LBERROR << "rdma_listen: " << lunchbox::sysError << std::endl;
1417 0 : return false;
1418 : }
1419 :
1420 0 : return true;
1421 : }
1422 :
1423 0 : bool RDMAConnection::_migrateId()
1424 : {
1425 0 : LBASSERT(NULL != _cm_id);
1426 0 : LBASSERT(NULL != _cm);
1427 :
1428 0 : if (::rdma_migrate_id(_cm_id, _cm))
1429 : {
1430 0 : LBERROR << "rdma_migrate_id: " << lunchbox::sysError << std::endl;
1431 0 : return false;
1432 : }
1433 :
1434 0 : return true;
1435 : }
1436 :
1437 0 : bool RDMAConnection::_accept()
1438 : {
1439 : struct rdma_conn_param accept_param;
1440 :
1441 0 : LBASSERT(NULL != _cm_id);
1442 0 : LBASSERT(!_established);
1443 :
1444 0 : ::memset((void *)&accept_param, 0, sizeof(struct rdma_conn_param));
1445 :
1446 0 : _cpd.magic = RDMA_PROTOCOL_MAGIC;
1447 0 : _cpd.version = RDMA_PROTOCOL_VERSION;
1448 0 : _cpd.depth = _depth;
1449 0 : accept_param.private_data = reinterpret_cast<const void *>(&_cpd);
1450 0 : accept_param.private_data_len = sizeof(struct RDMAConnParamData);
1451 0 : accept_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
1452 0 : accept_param.responder_resources = RDMA_MAX_RESP_RES;
1453 : // Magic 3-bit value.
1454 : // accept_param.rnr_retry_count = 7;
1455 :
1456 0 : LBINFO << "Accept on source lid : " << std::showbase << std::hex
1457 0 : << ntohs(_cm_id->route.path_rec->slid) << " (" << std::dec
1458 0 : << ntohs(_cm_id->route.path_rec->slid) << ") "
1459 0 : << "from dest lid : " << std::hex
1460 0 : << ntohs(_cm_id->route.path_rec->dlid) << " (" << std::dec
1461 0 : << ntohs(_cm_id->route.path_rec->dlid) << ") " << std::endl;
1462 :
1463 0 : if (::rdma_accept(_cm_id, &accept_param))
1464 : {
1465 0 : LBERROR << "rdma_accept: " << lunchbox::sysError << std::endl;
1466 0 : return false;
1467 : }
1468 :
1469 0 : return _waitForCMEvent(RDMA_CM_EVENT_ESTABLISHED);
1470 : }
1471 :
1472 0 : bool RDMAConnection::_reject()
1473 : {
1474 0 : LBASSERT(NULL != _cm_id);
1475 0 : LBASSERT(!_established);
1476 :
1477 0 : if (::rdma_reject(_cm_id, NULL, 0))
1478 : {
1479 0 : LBERROR << "rdma_reject: " << lunchbox::sysError << std::endl;
1480 0 : return false;
1481 : }
1482 :
1483 0 : return true;
1484 : }
1485 :
1486 : ////////////////////////////////////////////////////////////////////////////////
1487 :
1488 0 : bool RDMAConnection::_initProtocol(int32_t depth)
1489 : {
1490 0 : if (depth < 2L)
1491 : {
1492 0 : LBERROR << "Invalid queue depth." << std::endl;
1493 0 : return false;
1494 : }
1495 :
1496 0 : _depth = depth;
1497 0 : _writes = 0L;
1498 0 : _fcs = 0L;
1499 0 : _readBytes = 0u;
1500 0 : _wcredits = _depth / 2 - 2;
1501 0 : _fcredits = _depth / 2 + 2;
1502 :
1503 0 : return true;
1504 : }
1505 :
1506 : /* inline */
1507 0 : bool RDMAConnection::_needFC()
1508 : {
1509 : bool bytesAvail;
1510 : #ifdef _WIN32
1511 : lunchbox::ScopedFastRead mutex(_eventLock);
1512 : bytesAvail = (_availBytes != 0);
1513 : #else
1514 : pollfd pfd;
1515 0 : pfd.fd = _pipe_fd[0];
1516 0 : pfd.events = EPOLLIN;
1517 0 : pfd.revents = 0;
1518 0 : int ret = poll(&pfd, 1, 0);
1519 0 : if (ret == -1)
1520 : {
1521 0 : LBERROR << "poll: " << lunchbox::sysError << std::endl;
1522 0 : return true;
1523 : }
1524 0 : bytesAvail = ret != 0;
1525 : #endif
1526 0 : return (!bytesAvail || (uint32_t)_writes >= FC_MESSAGE_FREQUENCY);
1527 : }
1528 :
1529 0 : bool RDMAConnection::_postReceives(const uint32_t count)
1530 : {
1531 0 : LBASSERT(NULL != _cm_id->qp);
1532 0 : LBASSERT(count > 0UL);
1533 :
1534 0 : ibv_sge *sge = new ibv_sge[count];
1535 0 : ibv_recv_wr *wrs = new ibv_recv_wr[count];
1536 :
1537 0 : for (uint32_t i = 0UL; i != count; i++)
1538 : {
1539 0 : sge[i].addr = (uint64_t)(uintptr_t)_msgbuf.getBuffer();
1540 0 : sge[i].length = uint32_t(_msgbuf.getBufferSize());
1541 0 : sge[i].lkey = _msgbuf.getMR()->lkey;
1542 :
1543 0 : wrs[i].wr_id = sge[i].addr;
1544 0 : wrs[i].next = &wrs[i + 1];
1545 0 : wrs[i].sg_list = &sge[i];
1546 0 : wrs[i].num_sge = 1;
1547 : }
1548 0 : wrs[count - 1].next = NULL;
1549 :
1550 : struct ibv_recv_wr *bad_wr;
1551 : const bool error =
1552 0 : ::rdma_seterrno(::ibv_post_recv(_cm_id->qp, wrs, &bad_wr));
1553 :
1554 0 : delete[] sge;
1555 0 : delete[] wrs;
1556 :
1557 0 : if (error)
1558 : {
1559 0 : LBERROR << "ibv_post_recv : " << lunchbox::sysError << std::endl;
1560 0 : return false;
1561 : }
1562 :
1563 0 : return true;
1564 : }
1565 :
1566 : /* inline */
1567 0 : void RDMAConnection::_recvRDMAWrite(const uint32_t imm_data)
1568 : {
1569 : union {
1570 : uint32_t val;
1571 : RDMAFCImm fc;
1572 : } x;
1573 0 : x.val = ntohl(imm_data);
1574 :
1575 : // Analysis:
1576 : //
1577 : // Since the ring pointers are circular, a malicious (presumably overflow)
1578 : // value here would at worst only result in us reading arbitrary regions
1579 : // from our sink buffer, not segfaulting. If the other side wanted us to
1580 : // reread a previous message it should just resend it!
1581 0 : _sinkptr.incrHead(x.fc.bytes_sent);
1582 0 : _incrAvailableBytes(x.fc.bytes_sent);
1583 :
1584 0 : _fcredits += x.fc.fcs_received;
1585 0 : LBASSERTINFO(_fcredits <= _depth, _fcredits << " > " << _depth);
1586 :
1587 0 : _writes++;
1588 0 : }
1589 :
1590 : /* inline */
1591 0 : uint32_t RDMAConnection::_makeImm(const uint32_t b)
1592 : {
1593 : union {
1594 : uint32_t val;
1595 : RDMAFCImm fc;
1596 : } x;
1597 :
1598 0 : x.fc.fcs_received = std::min(MAX_FC, static_cast<uint16_t>(_fcs));
1599 0 : _fcs -= x.fc.fcs_received;
1600 0 : LBASSERT(_fcs >= 0);
1601 :
1602 0 : LBASSERT(b <= MAX_BS);
1603 0 : x.fc.bytes_sent = b;
1604 :
1605 0 : return htonl(x.val);
1606 : }
1607 :
1608 0 : bool RDMAConnection::_postRDMAWrite()
1609 : {
1610 : struct ibv_sge sge;
1611 : struct ibv_send_wr wr;
1612 :
1613 0 : sge.addr = (uint64_t)((uintptr_t)_sourcebuf.getBase() +
1614 0 : _sourceptr.ptr(_sourceptr.MIDDLE));
1615 0 : sge.length =
1616 0 : (uint64_t)_sourceptr.available(_sourceptr.HEAD, _sourceptr.MIDDLE);
1617 0 : sge.lkey = _sourcebuf.getMR()->lkey;
1618 0 : _sourceptr.incr(_sourceptr.MIDDLE, (uint32_t)sge.length);
1619 :
1620 0 : wr.wr_id = (uint64_t)sge.length;
1621 0 : wr.next = NULL;
1622 0 : wr.sg_list = &sge;
1623 0 : wr.num_sge = 1;
1624 0 : wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
1625 0 : wr.send_flags = IBV_SEND_SOLICITED; // Important!
1626 0 : wr.imm_data = _makeImm((uint32_t)sge.length);
1627 0 : wr.wr.rdma.rkey = _rkey;
1628 0 : wr.wr.rdma.remote_addr =
1629 0 : (uint64_t)((uintptr_t)_rbase + _rptr.ptr(_rptr.HEAD));
1630 0 : _rptr.incrHead((uint32_t)sge.length);
1631 :
1632 0 : _wcredits--;
1633 0 : LBASSERT(_wcredits >= 0L);
1634 :
1635 : struct ibv_send_wr *bad_wr;
1636 0 : if (::rdma_seterrno(::ibv_post_send(_cm_id->qp, &wr, &bad_wr)))
1637 : {
1638 0 : LBERROR << "ibv_post_send : " << lunchbox::sysError << std::endl;
1639 0 : return false;
1640 : }
1641 :
1642 0 : return true;
1643 : }
1644 :
1645 0 : bool RDMAConnection::_postMessage(const RDMAMessage &message)
1646 : {
1647 0 : _fcredits--;
1648 0 : LBASSERT(_fcredits >= 0L);
1649 :
1650 0 : if (::rdma_post_send(_cm_id, (void *)&message, (void *)&message,
1651 0 : offsetof(RDMAMessage, payload) + message.length,
1652 : _msgbuf.getMR(), 0))
1653 : {
1654 0 : LBERROR << "rdma_post_send : " << lunchbox::sysError << std::endl;
1655 0 : return false;
1656 : }
1657 :
1658 0 : return true;
1659 : }
1660 :
1661 0 : void RDMAConnection::_recvMessage(const RDMAMessage &message)
1662 : {
1663 0 : switch (message.opcode)
1664 : {
1665 : case FC:
1666 0 : if (sizeof(struct RDMAFCPayload) == (size_t)message.length)
1667 0 : _recvFC(message.payload.fc);
1668 : else
1669 0 : LBWARN << "Invalid flow control message received!" << std::endl;
1670 0 : break;
1671 : case SETUP:
1672 0 : if (sizeof(struct RDMASetupPayload) == (size_t)message.length)
1673 0 : _recvSetup(message.payload.setup);
1674 : else
1675 0 : LBWARN << "Invalid setup message received!" << std::endl;
1676 0 : break;
1677 : default:
1678 0 : LBWARN << "Invalid message received: " << std::hex
1679 0 : << (int)message.opcode << std::dec << std::endl;
1680 : }
1681 0 : }
1682 :
1683 : /* inline */
1684 0 : void RDMAConnection::_recvFC(const RDMAFCPayload &fc)
1685 : {
1686 : // Analysis:
1687 : //
1688 : // Since we will only write a maximum of _sourceptr.available( ) bytes
1689 : // to our source buffer, a malicious (presumably overflow) value here would
1690 : // have no chance of causing us to write beyond our buffer as we have local
1691 : // control over those ring pointers. Worst case, we'd and up writing to
1692 : // arbitrary regions of the remote buffer, since this ring pointer is
1693 : // circular as well.
1694 0 : _rptr.incrTail(fc.bytes_received);
1695 :
1696 0 : _wcredits += fc.writes_received;
1697 0 : LBASSERTINFO(_wcredits <= _depth, _wcredits << " > " << _depth);
1698 :
1699 0 : _fcs++;
1700 0 : }
1701 :
1702 0 : bool RDMAConnection::_postFC()
1703 : {
1704 : RDMAMessage &message =
1705 0 : *reinterpret_cast<RDMAMessage *>(_msgbuf.getBuffer());
1706 :
1707 0 : message.opcode = FC;
1708 0 : message.length = (uint8_t)sizeof(struct RDMAFCPayload);
1709 :
1710 0 : message.payload.fc.bytes_received = _readBytes;
1711 0 : message.payload.fc.writes_received = _writes;
1712 0 : _writes -= message.payload.fc.writes_received;
1713 0 : _readBytes = 0;
1714 :
1715 0 : LBASSERT(_writes >= 0);
1716 0 : return _postMessage(message);
1717 : }
1718 :
1719 0 : void RDMAConnection::_recvSetup(const RDMASetupPayload &setup)
1720 : {
1721 : // Analysis:
1722 : //
1723 : // Malicious values here would only affect the receiver, we're willing
1724 : // to RDMA write to anywhere specified!
1725 0 : _rbase = setup.rbase;
1726 0 : _rptr.clear(setup.rlen);
1727 0 : _rkey = setup.rkey;
1728 :
1729 0 : LBVERB << "RDMA MR: " << std::showbase << std::dec << setup.rlen << " @ "
1730 0 : << std::hex << setup.rbase << std::dec << std::endl;
1731 0 : }
1732 :
1733 0 : bool RDMAConnection::_postSetup()
1734 : {
1735 : RDMAMessage &message =
1736 0 : *reinterpret_cast<RDMAMessage *>(_msgbuf.getBuffer());
1737 :
1738 0 : message.opcode = SETUP;
1739 0 : message.length = (uint8_t)sizeof(struct RDMASetupPayload);
1740 :
1741 0 : message.payload.setup.rbase = (uint64_t)(uintptr_t)_sinkbuf.getBase();
1742 0 : message.payload.setup.rlen = (uint64_t)_sinkbuf.getSize();
1743 0 : message.payload.setup.rkey = _sinkbuf.getMR()->rkey;
1744 :
1745 0 : return _postMessage(message);
1746 : }
1747 :
1748 0 : bool RDMAConnection::_waitRecvSetup()
1749 : {
1750 0 : lunchbox::Clock clock;
1751 0 : const int64_t start = clock.getTime64();
1752 0 : const uint32_t timeout = Global::getTimeout();
1753 0 : eventset events;
1754 :
1755 : retry:
1756 0 : if (!_checkDisconnected(events))
1757 : {
1758 0 : LBERROR << "Error while checking event state." << std::endl;
1759 0 : return false;
1760 : }
1761 :
1762 0 : if (!_established)
1763 : {
1764 0 : LBERROR << "Disconnected while waiting for setup message." << std::endl;
1765 0 : return false;
1766 : }
1767 :
1768 0 : if (!_checkCQ(true))
1769 : {
1770 0 : LBERROR << "Error while polling receive completion queue." << std::endl;
1771 0 : return false;
1772 : }
1773 :
1774 0 : if (0ULL == _rkey)
1775 : {
1776 0 : if (LB_TIMEOUT_INDEFINITE != timeout)
1777 : {
1778 0 : if ((clock.getTime64() - start) > timeout)
1779 : {
1780 0 : LBERROR << "Timed out waiting for setup message." << std::endl;
1781 0 : return false;
1782 : }
1783 : }
1784 :
1785 0 : lunchbox::Thread::yield();
1786 0 : goto retry;
1787 : }
1788 :
1789 0 : return true;
1790 : }
1791 :
1792 : ////////////////////////////////////////////////////////////////////////////////
1793 :
1794 0 : bool RDMAConnection::_createNotifier()
1795 : {
1796 0 : LBASSERT(_notifier < 0);
1797 : #ifdef _WIN32
1798 : _notifier = CreateEvent(0, TRUE, FALSE, 0);
1799 : return true;
1800 : #else
1801 0 : _notifier = ::epoll_create(1);
1802 0 : if (_notifier < 0)
1803 : {
1804 0 : LBERROR << "epoll_create1: " << lunchbox::sysError << std::endl;
1805 0 : return false;
1806 : }
1807 :
1808 0 : return true;
1809 : #endif
1810 : }
1811 :
1812 0 : void RDMAConnection::_updateNotifier()
1813 : {
1814 : #ifdef _WIN32
1815 : lunchbox::ScopedFastWrite mutex(_eventLock);
1816 : if (_availBytes == 0 && !_cm->channel.Head &&
1817 : (!_cc || !_cc->comp_channel.Head))
1818 : ResetEvent(_notifier);
1819 : #else
1820 0 : eventset events;
1821 0 : _checkEvents(events);
1822 : #endif
1823 0 : }
1824 :
1825 0 : bool RDMAConnection::_checkEvents(eventset &events)
1826 : {
1827 0 : events.reset();
1828 :
1829 : #ifdef _WIN32
1830 : lunchbox::ScopedFastRead mutex(_eventLock);
1831 : if (_availBytes != 0)
1832 : events.set(BUF_EVENT);
1833 : if (_cc && (_cc->comp_channel.Head != 0))
1834 : events.set(CQ_EVENT);
1835 : if (_cm->channel.Head)
1836 : events.set(CM_EVENT);
1837 :
1838 : return true;
1839 : #else
1840 : struct epoll_event evts[3];
1841 :
1842 0 : int nfds = TEMP_FAILURE_RETRY(::epoll_wait(_notifier, evts, 3, 0));
1843 0 : if (nfds < 0)
1844 : {
1845 0 : LBERROR << "epoll_wait: " << lunchbox::sysError << std::endl;
1846 0 : return false;
1847 : }
1848 :
1849 0 : for (int i = 0; i < nfds; i++)
1850 : {
1851 0 : const int fd = evts[i].data.fd;
1852 0 : if ((_pipe_fd[0] >= 0) && (fd == _pipe_fd[0]))
1853 0 : events.set(BUF_EVENT);
1854 0 : else if (_cc && (fd == _cc->fd))
1855 0 : events.set(CQ_EVENT);
1856 0 : else if (_cm && (fd == _cm->fd))
1857 0 : events.set(CM_EVENT);
1858 : else
1859 0 : LBUNREACHABLE;
1860 : }
1861 :
1862 0 : return true;
1863 : #endif
1864 : }
1865 :
1866 0 : bool RDMAConnection::_checkDisconnected(eventset &events)
1867 : {
1868 0 : lunchbox::ScopedWrite poll_mutex(_poll_lock);
1869 :
1870 0 : if (!_checkEvents(events))
1871 : {
1872 0 : LBERROR << "Error while checking event state." << std::endl;
1873 0 : return false;
1874 : }
1875 :
1876 0 : if (events.test(CM_EVENT))
1877 : {
1878 0 : if (!_doCMEvent(RDMA_CM_EVENT_DISCONNECTED))
1879 : {
1880 0 : LBERROR << "Unexpected connection manager event." << std::endl;
1881 0 : return false;
1882 : }
1883 :
1884 0 : LBASSERT(!_established);
1885 : }
1886 :
1887 0 : return true;
1888 : }
1889 :
1890 0 : bool RDMAConnection::_createBytesAvailableFD()
1891 : {
1892 : #ifndef _WIN32
1893 0 : if (pipe(_pipe_fd) == -1)
1894 : {
1895 0 : LBERROR << "pipe: " << lunchbox::sysError << std::endl;
1896 0 : return false;
1897 : }
1898 :
1899 0 : LBASSERT(_pipe_fd[0] >= 0 && _pipe_fd[1] >= 0);
1900 :
1901 : // Use the event fd to signal Collage of bytes remaining.
1902 : struct epoll_event evctl;
1903 0 : ::memset((void *)&evctl, 0, sizeof(struct epoll_event));
1904 0 : evctl.events = EPOLLIN;
1905 0 : evctl.data.fd = _pipe_fd[0];
1906 0 : if (::epoll_ctl(_notifier, EPOLL_CTL_ADD, evctl.data.fd, &evctl))
1907 : {
1908 0 : LBERROR << "epoll_ctl: " << lunchbox::sysError << std::endl;
1909 0 : return false;
1910 : }
1911 : #endif
1912 0 : return true;
1913 : }
1914 :
1915 0 : bool RDMAConnection::_incrAvailableBytes(const uint64_t b)
1916 : {
1917 : #ifdef _WIN32
1918 : lunchbox::ScopedFastWrite mutex(_eventLock);
1919 : _availBytes += b;
1920 : ::SetEvent(_notifier);
1921 : #else
1922 0 : if (::write(_pipe_fd[1], (const void *)&b, sizeof(b)) != sizeof(b))
1923 : {
1924 0 : LBERROR << "write: " << lunchbox::sysError << std::endl;
1925 0 : return false;
1926 : }
1927 : #endif
1928 0 : return true;
1929 : }
1930 :
1931 0 : uint64_t RDMAConnection::_getAvailableBytes()
1932 : {
1933 0 : uint64_t available_bytes = 0;
1934 : #ifdef _WIN32
1935 : lunchbox::ScopedFastWrite mutex(_eventLock);
1936 : available_bytes = static_cast<uint64_t>(_availBytes);
1937 : _availBytes = 0;
1938 : return available_bytes;
1939 : #else
1940 0 : uint64_t currBytes = 0;
1941 : ssize_t count;
1942 : pollfd pfd;
1943 0 : pfd.fd = _pipe_fd[0];
1944 0 : pfd.events = EPOLLIN;
1945 :
1946 0 : do
1947 : {
1948 0 : count = ::read(_pipe_fd[0], (void *)&currBytes, sizeof(currBytes));
1949 0 : if (count > 0 && count < (ssize_t)sizeof(currBytes))
1950 0 : return 0ULL;
1951 0 : available_bytes += currBytes;
1952 0 : pfd.revents = 0;
1953 0 : if (::poll(&pfd, 1, 0) == -1)
1954 : {
1955 0 : LBERROR << "poll: " << lunchbox::sysError << std::endl;
1956 0 : return 0ULL;
1957 : }
1958 0 : } while (pfd.revents > 0);
1959 :
1960 0 : if (count == -1)
1961 : {
1962 0 : LBERROR << "read: " << lunchbox::sysError << std::endl;
1963 0 : return 0ULL;
1964 : }
1965 :
1966 0 : LBASSERT(available_bytes > 0ULL);
1967 0 : return available_bytes;
1968 : #endif
1969 : }
1970 :
1971 0 : bool RDMAConnection::_waitForCMEvent(enum rdma_cm_event_type expected)
1972 : {
1973 0 : lunchbox::Clock clock;
1974 0 : const int64_t start = clock.getTime64();
1975 0 : const uint32_t timeout = Global::getTimeout();
1976 0 : bool done = false;
1977 0 : eventset events;
1978 :
1979 : retry:
1980 0 : if (!_checkEvents(events))
1981 : {
1982 0 : LBERROR << "Error while checking event state." << std::endl;
1983 0 : return false;
1984 : }
1985 :
1986 0 : if (events.test(CM_EVENT))
1987 : {
1988 0 : done = _doCMEvent(expected);
1989 0 : if (!done)
1990 : {
1991 0 : LBERROR << "Unexpected connection manager event." << std::endl;
1992 0 : return false;
1993 : }
1994 : }
1995 :
1996 0 : if (!done)
1997 : {
1998 0 : if (LB_TIMEOUT_INDEFINITE != timeout)
1999 : {
2000 0 : if ((clock.getTime64() - start) > timeout)
2001 : {
2002 0 : LBERROR << "Timed out waiting for setup message." << std::endl;
2003 0 : return false;
2004 : }
2005 : }
2006 :
2007 0 : lunchbox::Thread::yield();
2008 0 : goto retry;
2009 : }
2010 :
2011 0 : return true;
2012 : }
2013 :
2014 0 : bool RDMAConnection::_doCMEvent(enum rdma_cm_event_type expected)
2015 : {
2016 0 : bool ok = false;
2017 : struct rdma_cm_event *event;
2018 :
2019 0 : if (::rdma_get_cm_event(_cm, &event))
2020 : {
2021 0 : LBERROR << "rdma_get_cm_event: " << lunchbox::sysError << std::endl;
2022 0 : return false;
2023 : }
2024 :
2025 0 : ok = (event->event == expected);
2026 :
2027 : #ifndef NDEBUG
2028 0 : if (ok)
2029 : {
2030 0 : LBVERB << (void *)this << " (" << _addr << ":" << _serv << ")"
2031 0 : << " event : " << ::rdma_event_str(event->event) << std::endl;
2032 : }
2033 : else
2034 : {
2035 0 : LBINFO << (void *)this << " (" << _addr << ":" << _serv << ")"
2036 : << " event : " << ::rdma_event_str(event->event)
2037 0 : << " expected: " << ::rdma_event_str(expected) << std::endl;
2038 : }
2039 : #endif
2040 :
2041 0 : if (ok && (RDMA_CM_EVENT_DISCONNECTED == event->event))
2042 0 : _established = false;
2043 :
2044 0 : if (ok && (RDMA_CM_EVENT_ESTABLISHED == event->event))
2045 : {
2046 0 : _established = true;
2047 :
2048 0 : struct rdma_conn_param *cp = &event->param.conn;
2049 :
2050 0 : ::memset((void *)&_cpd, 0, sizeof(RDMAConnParamData));
2051 : // Note that the actual amount of data transferred to the remote side
2052 : // is transport dependent and may be larger than that requested.
2053 0 : if (cp->private_data_len >= sizeof(RDMAConnParamData))
2054 0 : _cpd =
2055 0 : *reinterpret_cast<const RDMAConnParamData *>(cp->private_data);
2056 : }
2057 :
2058 0 : if (ok && (RDMA_CM_EVENT_CONNECT_REQUEST == event->event))
2059 : {
2060 0 : _new_cm_id = event->id;
2061 :
2062 0 : struct rdma_conn_param *cp = &event->param.conn;
2063 :
2064 0 : ::memset((void *)&_cpd, 0, sizeof(RDMAConnParamData));
2065 : // TODO : Not sure what happens when initiator sent ai_connect data
2066 : // (assuming the underlying transport doesn't strip it)?
2067 0 : if (cp->private_data_len >= sizeof(RDMAConnParamData))
2068 0 : _cpd =
2069 0 : *reinterpret_cast<const RDMAConnParamData *>(cp->private_data);
2070 : }
2071 :
2072 0 : if (RDMA_CM_EVENT_REJECTED == event->event)
2073 0 : LBINFO << "Connection reject status : " << event->status << std::endl;
2074 :
2075 0 : if (::rdma_ack_cm_event(event))
2076 0 : LBWARN << "rdma_ack_cm_event : " << lunchbox::sysError << std::endl;
2077 :
2078 0 : return ok;
2079 : }
2080 :
2081 0 : bool RDMAConnection::_rearmCQ()
2082 : {
2083 : struct ibv_cq *ev_cq;
2084 : void *ev_ctx;
2085 :
2086 : #ifdef _WIN32
2087 : lunchbox::ScopedFastWrite mutex(_eventLock);
2088 : #endif
2089 0 : if (::ibv_get_cq_event(_cc, &ev_cq, &ev_ctx))
2090 : {
2091 0 : LBERROR << "ibv_get_cq_event: " << lunchbox::sysError << std::endl;
2092 0 : return false;
2093 : }
2094 :
2095 : // http://lists.openfabrics.org/pipermail/general/2008-November/055237.html
2096 0 : _completions++;
2097 0 : if (std::numeric_limits<unsigned int>::max() == _completions)
2098 : {
2099 0 : ::ibv_ack_cq_events(_cq, _completions);
2100 0 : _completions = 0U;
2101 : }
2102 :
2103 : // Solicited only!
2104 0 : if (::rdma_seterrno(::ibv_req_notify_cq(_cq, 1)))
2105 : {
2106 0 : LBERROR << "ibv_req_notify_cq: " << lunchbox::sysError << std::endl;
2107 0 : return false;
2108 : }
2109 :
2110 0 : return true;
2111 : }
2112 :
2113 0 : bool RDMAConnection::_checkCQ(bool drain)
2114 : {
2115 : uint32_t num_recvs;
2116 : int count;
2117 :
2118 0 : lunchbox::ScopedWrite poll_mutex(_poll_lock);
2119 :
2120 0 : if (NULL == _cq)
2121 0 : return true;
2122 :
2123 : repoll:
2124 : /* CHECK RECEIVE COMPLETIONS */
2125 0 : count = ::ibv_poll_cq(_cq, _depth, _wcs);
2126 0 : if (count < 0)
2127 : {
2128 0 : LBERROR << "ibv_poll_cq: " << lunchbox::sysError << std::endl;
2129 0 : return false;
2130 : }
2131 :
2132 0 : num_recvs = 0UL;
2133 0 : for (int i = 0; i != count; i++)
2134 : {
2135 0 : struct ibv_wc &wc = _wcs[i];
2136 :
2137 0 : if (IBV_WC_SUCCESS != wc.status)
2138 : {
2139 : // Non-fatal.
2140 0 : if (IBV_WC_WR_FLUSH_ERR == wc.status)
2141 0 : continue;
2142 :
2143 0 : LBWARN << (void *)this << " !IBV_WC_SUCCESS : " << std::showbase
2144 0 : << std::hex << "wr_id = " << wc.wr_id << ", status = \""
2145 0 : << ::ibv_wc_status_str(wc.status) << "\"" << std::dec << " ("
2146 0 : << (unsigned int)wc.status << ")" << std::hex
2147 0 : << ", vendor_err = " << wc.vendor_err << std::dec
2148 0 : << std::endl;
2149 :
2150 : // All others are fatal.
2151 0 : return false;
2152 : }
2153 :
2154 0 : LBASSERT(IBV_WC_SUCCESS == wc.status);
2155 :
2156 : #ifdef _WIN32 //----------------------------------------------------------------
2157 : // WINDOWS IBV API WORKAROUND.
2158 : // TODO: remove this as soon as IBV API is fixed
2159 : if (wc.opcode == IBV_WC_RECV && wc.wc_flags == IBV_WC_WITH_IMM)
2160 : wc.opcode = IBV_WC_RECV_RDMA_WITH_IMM;
2161 : #endif //-----------------------------------------------------------------------
2162 :
2163 0 : if (IBV_WC_RECV_RDMA_WITH_IMM == wc.opcode)
2164 0 : _recvRDMAWrite(wc.imm_data);
2165 0 : else if (IBV_WC_RECV == wc.opcode)
2166 0 : _recvMessage(*reinterpret_cast<RDMAMessage *>(wc.wr_id));
2167 0 : else if (IBV_WC_SEND == wc.opcode)
2168 0 : _msgbuf.freeBuffer((void *)(uintptr_t)wc.wr_id);
2169 0 : else if (IBV_WC_RDMA_WRITE == wc.opcode)
2170 0 : _sourceptr.incrTail((uint32_t)wc.wr_id);
2171 : else
2172 0 : LBUNREACHABLE;
2173 :
2174 0 : if (IBV_WC_RECV & wc.opcode)
2175 : {
2176 0 : _msgbuf.freeBuffer((void *)(uintptr_t)wc.wr_id);
2177 : // All receive completions need to be reposted.
2178 0 : num_recvs++;
2179 : }
2180 : }
2181 :
2182 0 : if ((num_recvs > 0UL) && !_postReceives(num_recvs))
2183 0 : return false;
2184 :
2185 0 : if (drain && (count > 0))
2186 0 : goto repoll;
2187 :
2188 0 : return true;
2189 : }
2190 :
2191 : /* inline */
2192 0 : uint32_t RDMAConnection::_drain(void *buffer, const uint32_t bytes)
2193 : {
2194 0 : const uint32_t b = std::min(bytes, _sinkptr.available());
2195 0 : ::memcpy(buffer,
2196 0 : (const void *)((uintptr_t)_sinkbuf.getBase() + _sinkptr.tail()),
2197 0 : b);
2198 0 : _sinkptr.incrTail(b);
2199 0 : return b;
2200 : }
2201 :
2202 : /* inline */
2203 0 : uint32_t RDMAConnection::_fill(const void *buffer, const uint32_t bytes)
2204 : {
2205 0 : uint32_t b = std::min(bytes, std::min(_sourceptr.negAvailable(),
2206 0 : _rptr.negAvailable()));
2207 : #ifndef WRAP
2208 0 : b = std::min(b, (uint32_t)(_sourcebuf.getSize() -
2209 0 : _sourceptr.ptr(_sourceptr.HEAD)));
2210 : #endif
2211 0 : ::memcpy((void *)((uintptr_t)_sourcebuf.getBase() +
2212 0 : _sourceptr.ptr(_sourceptr.HEAD)),
2213 0 : buffer, b);
2214 0 : _sourceptr.incrHead(b);
2215 0 : return b;
2216 : }
2217 :
2218 0 : Connection::Notifier RDMAConnection::getNotifier() const
2219 : {
2220 0 : return _notifier;
2221 : }
2222 :
2223 : #ifdef _WIN32
2224 : void RDMAConnection::_triggerNotifierCQ(RDMAConnection *conn)
2225 : {
2226 : conn->_triggerNotifierWorker(CQ_EVENT);
2227 : }
2228 :
2229 : void RDMAConnection::_triggerNotifierCM(RDMAConnection *conn)
2230 : {
2231 : conn->_triggerNotifierWorker(CM_EVENT);
2232 : }
2233 :
2234 : void RDMAConnection::_triggerNotifierWorker(Events event)
2235 : {
2236 : lunchbox::ScopedFastWrite mutex(_eventLock);
2237 : COMP_CHANNEL *chan = event == CQ_EVENT ? &_cc->comp_channel : &_cm->channel;
2238 : EnterCriticalSection(&chan->Lock);
2239 : ResetEvent(chan->Event);
2240 : if (chan->Head)
2241 : SetEvent(_notifier);
2242 : LeaveCriticalSection(&chan->Lock);
2243 : }
2244 : #endif
2245 :
2246 : //////////////////////////////////////////////////////////////////////////////
2247 :
2248 0 : void RDMAConnection::_showStats()
2249 : {
2250 0 : LBVERB << std::dec << "reads = " << _stats.reads
2251 0 : << ", buffer_empty = " << _stats.buffer_empty
2252 0 : << ", no_credits_fc = " << _stats.no_credits_fc
2253 0 : << ", writes = " << _stats.writes
2254 0 : << ", buffer_full = " << _stats.buffer_full
2255 0 : << ", no_credits_rdma = " << _stats.no_credits_rdma << std::endl;
2256 0 : }
2257 :
2258 : //////////////////////////////////////////////////////////////////////////////
2259 :
2260 0 : BufferPool::BufferPool(size_t buffer_size)
2261 : : _buffer_size(buffer_size)
2262 : , _num_bufs(0)
2263 : , _buffer(NULL)
2264 : , _mr(NULL)
2265 0 : , _ring(0)
2266 : {
2267 0 : }
2268 :
2269 0 : BufferPool::~BufferPool()
2270 : {
2271 0 : clear();
2272 0 : }
2273 :
2274 0 : void BufferPool::clear()
2275 : {
2276 0 : _num_bufs = 0;
2277 0 : _ring.clear(_num_bufs);
2278 :
2279 0 : if ((NULL != _mr) && ::rdma_dereg_mr(_mr))
2280 0 : LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
2281 0 : _mr = NULL;
2282 :
2283 0 : if (NULL != _buffer)
2284 : #ifdef _WIN32
2285 : _aligned_free(_buffer);
2286 : #else
2287 0 : ::free(_buffer);
2288 : #endif
2289 0 : _buffer = NULL;
2290 0 : }
2291 :
2292 0 : bool BufferPool::resize(ibv_pd *pd, uint32_t num_bufs)
2293 : {
2294 0 : clear();
2295 :
2296 0 : if (num_bufs)
2297 : {
2298 0 : _num_bufs = num_bufs;
2299 0 : _ring.clear(_num_bufs);
2300 :
2301 : #ifdef _WIN32
2302 : _buffer = _aligned_malloc(
2303 : (size_t)(_num_bufs * _buffer_size),
2304 : boost::interprocess::mapped_region::get_page_size());
2305 : if (!_buffer)
2306 : {
2307 : LBERROR << "_aligned_malloc: Couldn't allocate aligned memory. "
2308 : << lunchbox::sysError << std::endl;
2309 : return false;
2310 : }
2311 : #else
2312 0 : if (::posix_memalign(&_buffer, (size_t)::getpagesize(),
2313 0 : (size_t)(_num_bufs * _buffer_size)))
2314 : {
2315 0 : LBERROR << "posix_memalign: " << lunchbox::sysError << std::endl;
2316 0 : return false;
2317 : }
2318 : #endif
2319 0 : ::memset(_buffer, 0xff, (size_t)(_num_bufs * _buffer_size));
2320 0 : _mr = ::ibv_reg_mr(pd, _buffer, (size_t)(_num_bufs * _buffer_size),
2321 : IBV_ACCESS_LOCAL_WRITE);
2322 0 : if (NULL == _mr)
2323 : {
2324 0 : LBERROR << "ibv_reg_mr: " << lunchbox::sysError << std::endl;
2325 0 : return false;
2326 : }
2327 :
2328 0 : for (uint32_t i = 0; i != _num_bufs; i++)
2329 0 : _ring.put(i);
2330 : }
2331 :
2332 0 : return true;
2333 : }
2334 :
2335 : //////////////////////////////////////////////////////////////////////////////
2336 :
2337 0 : RingBuffer::RingBuffer(int access)
2338 : : _access(access)
2339 : , _size(0)
2340 : #ifdef _WIN32
2341 : , _mapping(0)
2342 : , _map(0)
2343 : #else
2344 : , _map(MAP_FAILED)
2345 : #endif
2346 0 : , _mr(0)
2347 : {
2348 0 : }
2349 :
2350 0 : RingBuffer::~RingBuffer()
2351 : {
2352 0 : clear();
2353 0 : }
2354 :
2355 0 : void RingBuffer::clear()
2356 : {
2357 0 : if ((NULL != _mr) && ::rdma_dereg_mr(_mr))
2358 0 : LBWARN << "rdma_dereg_mr : " << lunchbox::sysError << std::endl;
2359 0 : _mr = NULL;
2360 :
2361 : #ifdef _WIN32
2362 : if (_map)
2363 : {
2364 : UnmapViewOfFile(static_cast<char *>(_map));
2365 : UnmapViewOfFile(static_cast<char *>(_map) + _size);
2366 : }
2367 : if (_mapping)
2368 : CloseHandle(_mapping);
2369 :
2370 : _map = 0;
2371 : _mapping = 0;
2372 : #else
2373 0 : if ((MAP_FAILED != _map) && ::munmap(_map, _size << 1))
2374 0 : LBWARN << "munmap : " << lunchbox::sysError << std::endl;
2375 0 : _map = MAP_FAILED;
2376 : #endif
2377 0 : _size = 0;
2378 0 : }
2379 :
2380 0 : bool RingBuffer::resize(ibv_pd *pd, size_t size)
2381 : {
2382 0 : bool ok = false;
2383 0 : int fd = -1;
2384 :
2385 0 : clear();
2386 :
2387 0 : if (size)
2388 : {
2389 : #ifdef _WIN32
2390 : uint32_t num_retries = RINGBUFFER_ALLOC_RETRIES;
2391 : while (!_map && num_retries-- != 0)
2392 : {
2393 : void *target_addr = determineViableAddr(size * 2);
2394 : if (target_addr)
2395 : allocAt(size, target_addr);
2396 : }
2397 :
2398 : if (!_map)
2399 : {
2400 : LBERROR << "Couldn't allocate desired RingBuffer memory after "
2401 : << RINGBUFFER_ALLOC_RETRIES << " retries." << std::endl;
2402 : }
2403 : else
2404 : {
2405 : LBVERB << "Allocated RDMA Ringbuffer memory in "
2406 : << (RINGBUFFER_ALLOC_RETRIES - num_retries) << " tries."
2407 : << std::endl;
2408 : ok = true;
2409 : }
2410 :
2411 : #else
2412 : void *addr1, *addr2;
2413 0 : char path[] = "/dev/shm/co-rdma-buffer-XXXXXX";
2414 :
2415 0 : _size = size;
2416 :
2417 0 : fd = ::mkstemp(path);
2418 0 : if (fd < 0)
2419 : {
2420 0 : LBERROR << "mkstemp: " << lunchbox::sysError << std::endl;
2421 0 : goto out;
2422 : }
2423 :
2424 0 : if (::unlink(path))
2425 : {
2426 0 : LBERROR << "unlink: " << lunchbox::sysError << std::endl;
2427 0 : goto out;
2428 : }
2429 :
2430 0 : if (::ftruncate(fd, _size))
2431 : {
2432 0 : LBERROR << "ftruncate: " << lunchbox::sysError << std::endl;
2433 0 : goto out;
2434 : }
2435 :
2436 0 : _map = ::mmap(NULL, _size << 1, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE,
2437 : -1, 0);
2438 0 : if (MAP_FAILED == _map)
2439 : {
2440 0 : LBERROR << "mmap: " << lunchbox::sysError << std::endl;
2441 0 : goto out;
2442 : }
2443 :
2444 0 : addr1 = ::mmap(_map, _size, PROT_READ | PROT_WRITE,
2445 0 : MAP_FIXED | MAP_SHARED, fd, 0);
2446 0 : if (MAP_FAILED == addr1)
2447 : {
2448 0 : LBERROR << "mmap: " << lunchbox::sysError << std::endl;
2449 0 : goto out;
2450 : }
2451 :
2452 0 : addr2 = ::mmap((void *)((uintptr_t)_map + _size), _size,
2453 0 : PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
2454 0 : if (MAP_FAILED == addr2)
2455 : {
2456 0 : LBERROR << "mmap: " << lunchbox::sysError << std::endl;
2457 0 : goto out;
2458 : }
2459 :
2460 0 : LBASSERT(addr1 == _map);
2461 0 : LBASSERT(addr2 == (void *)((uintptr_t)_map + _size));
2462 :
2463 : #endif
2464 : #ifdef WRAP
2465 : _mr = ::ibv_reg_mr(pd, _map, _size << 1, _access);
2466 : #else
2467 0 : _mr = ::ibv_reg_mr(pd, _map, _size, _access);
2468 : #endif
2469 :
2470 0 : if (NULL == _mr)
2471 : {
2472 0 : LBERROR << "ibv_reg_mr: " << lunchbox::sysError << std::endl;
2473 0 : goto out;
2474 : }
2475 :
2476 0 : ::memset(_map, 0, _size);
2477 0 : *reinterpret_cast<uint8_t *>(_map) = 0x45;
2478 0 : LBASSERT(0x45 == *reinterpret_cast<uint8_t *>((uintptr_t)_map + _size));
2479 : }
2480 :
2481 0 : ok = true;
2482 :
2483 : out:
2484 : #ifndef _WIN32
2485 0 : if ((fd >= 0) && TEMP_FAILURE_RETRY(::close(fd)))
2486 0 : LBWARN << "close : " << lunchbox::sysError << std::endl;
2487 : #endif
2488 0 : return ok;
2489 : }
2490 :
2491 : #ifdef _WIN32
2492 : void *RingBuffer::determineViableAddr(size_t size)
2493 : {
2494 : void *ptr = VirtualAlloc(0, size, MEM_RESERVE, PAGE_NOACCESS);
2495 : if (!ptr)
2496 : return 0;
2497 :
2498 : VirtualFree(ptr, 0, MEM_RELEASE);
2499 : return ptr;
2500 : }
2501 :
2502 : void RingBuffer::allocAt(size_t size, void *desiredAddr)
2503 : {
2504 : // if we already hold one allocation, refuse to make another.
2505 : LBASSERT(!_map);
2506 : LBASSERT(!_mapping);
2507 : if (_map || _mapping)
2508 : return;
2509 :
2510 : // is ring_size a multiple of 64k? if not, this won't ever work!
2511 : if ((size & 0xffff) != 0)
2512 : return;
2513 :
2514 : // try to allocate and map our space
2515 : size_t alloc_size = size * 2;
2516 : _mapping = CreateFileMappingA(INVALID_HANDLE_VALUE, 0, PAGE_READWRITE,
2517 : (unsigned long long)alloc_size >> 32,
2518 : alloc_size & 0xffffffffu, 0);
2519 :
2520 : if (!_mapping)
2521 : {
2522 : LBERROR << "CreateFileMappingA failed" << std::endl;
2523 : goto err;
2524 : }
2525 :
2526 : _map =
2527 : MapViewOfFileEx(_mapping, FILE_MAP_ALL_ACCESS, 0, 0, size, desiredAddr);
2528 :
2529 : if (!_map)
2530 : {
2531 : LBERROR << "First MapViewOfFileEx failed" << std::endl;
2532 : goto err;
2533 : }
2534 :
2535 : if (!MapViewOfFileEx(_mapping, FILE_MAP_ALL_ACCESS, 0, 0, size,
2536 : (char *)desiredAddr + size))
2537 : {
2538 : LBERROR << "Second MapViewOfFileEx failed" << std::endl;
2539 : goto err;
2540 : }
2541 :
2542 : _size = size;
2543 : return;
2544 : err:
2545 : // something went wrong - clean up
2546 : clear();
2547 : }
2548 : #endif
2549 :
2550 63 : } // namespace co
|