Line data Source code
1 : // -*- mode: c++ -*-
2 : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
3 : * United States Naval Research Laboratory
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 : #pragma once
21 :
22 : #include "ring.h"
23 : #include <co/connection.h>
24 :
25 : #include <lunchbox/monitor.h>
26 : #include <lunchbox/scopedMutex.h>
27 : #include <lunchbox/spinLock.h>
28 : #include <lunchbox/thread.h>
29 :
30 : #include <bitset>
31 :
32 : #define NOMINMAX // WIN32 :-/
33 : #include <rdma/rdma_cma.h>
34 :
35 : #ifndef _WIN32
36 : #include <netdb.h>
37 : #endif
38 :
39 : namespace co
40 : {
41 : /**
42 : * A registered memory region (MR) broken up into a number of fixed size
43 : * buffers.
44 : */
45 : class BufferPool
46 : {
47 : public:
48 : BufferPool(size_t buffer_size);
49 : ~BufferPool();
50 :
51 0 : ibv_mr *getMR() const { return _mr; }
52 0 : size_t getBufferSize() const { return _buffer_size; }
53 0 : void *getBuffer()
54 : {
55 0 : lunchbox::ScopedWrite mutex(_buffer_lock);
56 :
57 0 : return (void *)((uintptr_t)_buffer + (_ring.get() * _buffer_size));
58 : }
59 0 : void freeBuffer(void *buf)
60 : {
61 0 : lunchbox::ScopedWrite mutex(_buffer_lock);
62 :
63 0 : ::memset(buf, 0xff, _buffer_size); // Paranoid
64 :
65 0 : _ring.put(
66 0 : (uint32_t)(((uintptr_t)buf - (uintptr_t)_buffer) / _buffer_size));
67 0 : }
68 :
69 : void clear();
70 : bool resize(ibv_pd *pd, uint32_t num_bufs);
71 :
72 : private:
73 : const size_t _buffer_size;
74 : uint32_t _num_bufs;
75 : void *_buffer;
76 : struct ibv_mr *_mr;
77 : BufferQ<uint32_t> _ring;
78 : std::mutex _buffer_lock;
79 : }; // BufferPool
80 :
81 : /**
82 : * A registered memory region (MR) backed by a fixed size circular buffer
83 : * <a href="http://en.wikipedia.org/wiki/Circular_buffer#Optimization">mapped
84 : * to two contiguous regions of virtual memory</a>.
85 : */
86 : class RingBuffer
87 : {
88 : public:
89 : RingBuffer(int access = 0);
90 : ~RingBuffer();
91 :
92 0 : ibv_mr *getMR() const { return _mr; }
93 0 : size_t getSize() const { return _size; }
94 0 : void *getBase() const { return _map; }
95 : void clear();
96 : bool resize(ibv_pd *pd, size_t size);
97 :
98 : private:
99 : const int _access;
100 : size_t _size;
101 : void *_map;
102 : struct ibv_mr *_mr;
103 : #ifdef _WIN32
104 : HANDLE _mapping;
105 : void *determineViableAddr(size_t size);
106 : void allocAt(size_t size, void *desiredAddr);
107 : #endif
108 : }; // RingBuffer
109 :
110 : /**
111 : * Private data sent with connect/accept to validate protocol version and
112 : * pass protocol parameters (NB: limited to 56 bytes for RDMA_PS_TCP).
113 : */
114 : struct RDMAConnParamData
115 : {
116 0 : RDMAConnParamData()
117 0 : : magic(0)
118 : , version(0)
119 0 : , depth(0)
120 : {
121 0 : }
122 : uint16_t magic;
123 : uint16_t version;
124 : uint32_t depth;
125 : };
126 :
127 : struct RDMASetupPayload;
128 : struct RDMAFCPayload;
129 : struct RDMAMessage;
130 :
131 : class EventConnection;
132 :
133 : /**
134 : * An RDMA connection implementation.
135 : *
136 : * This connection utilizes the OFED RDMA library to send Collage messages by
137 : * RDMA write operations on a remote memory region.
138 : *
139 : * In order to use this connection type, at least:
140 : *
141 : * 1) The rdma_ucm kernel module must be loaded
142 : * 2) The application must have read/write access to the RDMA device nodes
143 : * (typically /dev/infiniband/[rdma_cm|uverbs*])
144 : * 3) The IP address assigned to the connection must be an address assigned to
145 : * an RDMA-capable device (i.e. IPoIB)
146 : * 4) Shared memory must be sufficient for all RDMA connections,
147 : * 2 * Global::IATTR_RDMA_RING_BUFFER_SIZE_MB for each
148 : * (i.e. /dev/shm, kernel.shm[min|max|all])
149 : * 5) The user must be able to lock the memory registered with verbs, such that
150 : * the locked memory limit needs to be sufficient ("ulimit -l" for bash,
151 : * "limit memorylocked" for csh). Updating /etc/security/limits.conf with
152 : * entries like this is usually adequate (e.g. to raise the limit to 2GB for
153 : * all users):
154 : * * soft memlock 2048000
155 : * * hard memlock 2048000
156 : *
157 : * NB : Binding to "localhost" does *not* limit remote access, rdma_cm will
158 : * bind to all available RDMA interfaces as if bound to a wildcard address!
159 : *
160 : * TODO? : Binding to wildcard address updates the description with the
161 : * canonical hostname, which is possibly not a valid RDMA IPoIB name.
162 : *
163 : * TODO? : Mixed IPv6/IPv4 naming isn't handled correctly. If one listens on
164 : * IPv6 and gets an IPv4 connection the address in the route struct doesn't
165 : * appear to be valid, and vice versa. Not sure if this is an RDMA CM
166 : * issue or improper handling.
167 : *
168 : */
169 : class RDMAConnection : public Connection
170 : {
171 : public:
172 : RDMAConnection();
173 :
174 : bool connect() override;
175 : bool listen() override;
176 0 : void close() override { _close(); }
177 : void acceptNB() override;
178 : ConnectionPtr acceptSync() override;
179 :
180 : protected:
181 : void readNB(void *buffer, const uint64_t bytes) override;
182 : int64_t readSync(void *buffer, const uint64_t bytes,
183 : const bool block) override;
184 : int64_t write(const void *buffer, const uint64_t bytes) override;
185 :
186 : public:
187 : Notifier getNotifier() const override;
188 :
189 : protected:
190 : virtual ~RDMAConnection();
191 :
192 : private:
193 : /* Teardown */
194 : void _close();
195 : void _cleanup();
196 :
197 : /* Setup */
198 : bool _finishAccept(struct rdma_cm_id *new_cm_id,
199 : const RDMAConnParamData &cpd);
200 :
201 : bool _lookupAddress(const bool passive);
202 : void _updateInfo(struct sockaddr *addr);
203 :
204 : bool _createEventChannel();
205 : bool _createId();
206 :
207 : bool _initVerbs();
208 : bool _createQP();
209 : bool _initBuffers();
210 :
211 : bool _resolveAddress();
212 : bool _resolveRoute();
213 : bool _connect();
214 :
215 : bool _bindAddress();
216 : bool _listen(int backlog);
217 : bool _migrateId();
218 : bool _accept();
219 : bool _reject();
220 :
221 : /* Protocol */
222 : bool _initProtocol(int32_t depth);
223 :
224 : inline bool _needFC();
225 :
226 : bool _postReceives(const uint32_t count);
227 :
228 : inline void _recvRDMAWrite(const uint32_t imm_data);
229 : inline uint32_t _makeImm(const uint32_t b);
230 : bool _postRDMAWrite();
231 :
232 : bool _postMessage(const RDMAMessage &message);
233 : void _recvMessage(const RDMAMessage &message);
234 : inline void _recvFC(const RDMAFCPayload &fc);
235 : bool _postFC();
236 : void _recvSetup(const RDMASetupPayload &setup);
237 : bool _postSetup();
238 :
239 : bool _waitRecvSetup();
240 :
241 : private:
242 : enum Events
243 : {
244 : CM_EVENT = 0,
245 : CQ_EVENT = 1,
246 : BUF_EVENT = 2,
247 : };
248 : typedef std::bitset<3> eventset;
249 :
250 : bool _createNotifier();
251 : void _updateNotifier();
252 : bool _checkEvents(eventset &events);
253 :
254 : /* Connection manager events */
255 : bool _checkDisconnected(eventset &events);
256 : bool _waitForCMEvent(enum rdma_cm_event_type expected);
257 : bool _doCMEvent(enum rdma_cm_event_type expected);
258 :
259 : /* Completion queue events */
260 : bool _rearmCQ();
261 : bool _checkCQ(bool drain);
262 :
263 : /* Available byte events */
264 : bool _createBytesAvailableFD();
265 : bool _incrAvailableBytes(const uint64_t b);
266 : uint64_t _getAvailableBytes();
267 :
268 : #ifdef _WIN32
269 : static void _triggerNotifierCQ(RDMAConnection *conn);
270 : static void _triggerNotifierCM(RDMAConnection *conn);
271 : void _triggerNotifierWorker(Events event);
272 : #endif
273 :
274 : private:
275 : Notifier _notifier;
276 :
277 : /* Protect RDMA/Verbs vars from multiple threads */
278 : std::mutex _poll_lock;
279 :
280 : /* Timeout for resolving RDMA address & route */
281 : const int32_t _timeout;
282 :
283 : /* Final connection info */
284 : char _addr[NI_MAXHOST], _serv[NI_MAXSERV];
285 : std::string _device_name;
286 :
287 : /* RDMA/Verbs vars */
288 : struct rdma_addrinfo *_rai; // The addrinfo returned by rdma_getaddrinfo.
289 : struct rdma_addrinfo *_addrinfo; // The addrinfo to be used. Points to
290 : // one node in the list from _rai.
291 : struct rdma_event_channel *_cm;
292 : struct rdma_cm_id *_cm_id;
293 : struct rdma_cm_id *_new_cm_id;
294 : struct ibv_comp_channel *_cc;
295 : struct ibv_cq *_cq;
296 : struct ibv_pd *_pd;
297 : struct ibv_wc *_wcs;
298 :
299 : #ifndef _WIN32
300 : int _pipe_fd[2];
301 : #else
302 : uint64_t _availBytes;
303 : uint32_t _eventFlag;
304 : lunchbox::SpinLock _eventLock;
305 : #endif
306 :
307 : uint64_t _readBytes;
308 :
309 : struct RDMAConnParamData _cpd;
310 : bool _established;
311 :
312 : int32_t _depth; // Maximum sends in flight (RDMA & FC)
313 : lunchbox::a_int32_t _writes; // Number of unacked RDMA writes received
314 : lunchbox::a_int32_t _fcs; // Number of unacked FC messages received
315 : lunchbox::a_int32_t _wcredits; // Number of RDMA write credits available
316 : lunchbox::a_int32_t _fcredits; // Number of FC message credits available
317 :
318 : unsigned int _completions;
319 :
320 : /* MR for setup and ack messages */
321 : BufferPool _msgbuf;
322 :
323 : /* source RDMA MR */
324 : RingBuffer _sourcebuf;
325 : Ring<uint32_t, 3> _sourceptr;
326 : // : initialized during connect/accept
327 : // HEAD : advanced after copying buffer (fill) on local write
328 : // - write thread only
329 : // MIDDLE : advanced before posting RDMA write
330 : // - write thread only
331 : // TAIL : advanced after completing RDMA write
332 : // - write & read threads (in pollCQ)
333 :
334 : /* sink RDMA MR */
335 : RingBuffer _sinkbuf;
336 : Ring<uint32_t, 2> _sinkptr;
337 : // : initialized during connect/accept
338 : // HEAD : advanced on receipt of RDMA WRITE
339 : // - write & read threads (in pollCQ)
340 : // TAIL : advanced after copying buffer (drain) on local read
341 : // - read thread only
342 :
343 : /* local "view" of remote sink MR */
344 : Ring<uint32_t, 2> _rptr;
345 : // : initialized on receipt of setup message
346 : // HEAD : advanced before posting RDMA write
347 : // - write thread only
348 : // TAIL : advanced on receipt of FC
349 : // - write & read threads (in pollCQ)
350 :
351 : /* remote sink MR parameters */
352 : uint64_t _rbase, _rkey;
353 :
354 : /* copy bytes out of the sink buffer */
355 : inline uint32_t _drain(void *buffer, const uint32_t bytes);
356 : /* copy bytes in to the source buffer */
357 : inline uint32_t _fill(const void *buffer, const uint32_t bytes);
358 :
359 : #ifdef WIN32
360 : HANDLE _ccWaitObj;
361 : HANDLE _cmWaitObj;
362 : #endif
363 :
364 : private:
365 : struct stats
366 : {
367 0 : stats()
368 0 : : reads(0ULL)
369 : , buffer_empty(0ULL)
370 : , no_credits_fc(0ULL)
371 : , writes(0ULL)
372 : , buffer_full(0ULL)
373 0 : , no_credits_rdma(0ULL)
374 : {
375 0 : }
376 :
377 : uint64_t reads;
378 : uint64_t buffer_empty;
379 : uint64_t no_credits_fc;
380 : uint64_t writes;
381 : uint64_t buffer_full;
382 : uint64_t no_credits_rdma;
383 : } _stats;
384 :
385 : void _showStats();
386 : }; // RDMAConnection
387 : } // namespace co
|