Line data Source code
1 : // -*- mode: c++ -*-
2 : /* Copyright (c) 2012, Computer Integration & Programming Solutions, Corp. and
3 : * United States Naval Research Laboratory
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 : #pragma once
21 :
22 : #include <co/connection.h>
23 : #include "ring.h"
24 :
25 : #include <lunchbox/thread.h>
26 : #include <lunchbox/monitor.h>
27 : #include <lunchbox/scopedMutex.h>
28 : #include <lunchbox/spinLock.h>
29 :
30 : #include <bitset>
31 :
32 : #define NOMINMAX // WIN32 :-/
33 : #include <rdma/rdma_cma.h>
34 :
35 : #ifndef _WIN32
36 : # include <netdb.h>
37 : #endif
38 :
39 : namespace co
40 : {
41 : /**
42 : * A registered memory region (MR) broken up into a number of fixed size
43 : * buffers.
44 : */
45 : class BufferPool
46 : {
47 : public:
48 : BufferPool( size_t buffer_size );
49 : ~BufferPool( );
50 :
51 597368 : ibv_mr *getMR( ) const { return _mr; }
52 562753 : size_t getBufferSize( ) const { return _buffer_size; }
53 597359 : void *getBuffer( )
54 : {
55 597359 : lunchbox::ScopedWrite mutex( _buffer_lock );
56 :
57 597370 : return (void *)( (uintptr_t)_buffer + ( _ring.get( ) * _buffer_size ));
58 : }
59 595325 : void freeBuffer( void *buf )
60 : {
61 595325 : lunchbox::ScopedWrite mutex( _buffer_lock );
62 :
63 595325 : ::memset( buf, 0xff, _buffer_size ); // Paranoid
64 :
65 595325 : _ring.put((uint32_t)(( (uintptr_t)buf - (uintptr_t)_buffer ) /
66 595325 : _buffer_size ));
67 595324 : }
68 :
69 : void clear( );
70 : bool resize( ibv_pd *pd, uint32_t num_bufs );
71 : private:
72 : const size_t _buffer_size;
73 : uint32_t _num_bufs;
74 : void *_buffer;
75 : struct ibv_mr *_mr;
76 : BufferQ<uint32_t> _ring;
77 : lunchbox::Lock _buffer_lock;
78 : }; // BufferPool
79 :
80 : /**
81 : * A registered memory region (MR) backed by a fixed size circular buffer
82 : * <a href="http://en.wikipedia.org/wiki/Circular_buffer#Optimization">mapped
83 : * to two contiguous regions of virtual memory</a>.
84 : */
85 : class RingBuffer
86 : {
87 : public:
88 : RingBuffer( int access = 0 );
89 : ~RingBuffer( );
90 :
91 526163 : ibv_mr *getMR( ) const { return _mr; }
92 905797 : size_t getSize( ) const { return _size; }
93 1938359 : void *getBase( ) const { return _map; }
94 :
95 : void clear( );
96 : bool resize( ibv_pd *pd, size_t size );
97 : private:
98 : const int _access;
99 : size_t _size;
100 : void* _map;
101 : struct ibv_mr *_mr;
102 : #ifdef _WIN32
103 : HANDLE _mapping;
104 : void* determineViableAddr( size_t size );
105 : void allocAt( size_t size, void* desiredAddr );
106 : #endif
107 : }; // RingBuffer
108 :
109 : /**
110 : * Private data sent with connect/accept to validate protocol version and
111 : * pass protocol parameters (NB: limited to 56 bytes for RDMA_PS_TCP).
112 : */
113 : struct RDMAConnParamData
114 : {
115 6 : RDMAConnParamData() : magic( 0 ), version( 0 ), depth( 0 ) {}
116 : uint16_t magic;
117 : uint16_t version;
118 : uint32_t depth;
119 : };
120 :
121 : struct RDMASetupPayload;
122 : struct RDMAFCPayload;
123 : struct RDMAMessage;
124 :
125 : class EventConnection;
126 :
127 : /**
128 : * An RDMA connection implementation.
129 : *
130 : * This connection utilizes the OFED RDMA library to send Collage messages by
131 : * RDMA write operations on a remote memory region.
132 : *
133 : * In order to use this connection type, at least:
134 : *
135 : * 1) The rdma_ucm kernel module must be loaded
136 : * 2) The application must have read/write access to the RDMA device nodes
137 : * (typically /dev/infiniband/[rdma_cm|uverbs*])
138 : * 3) The IP address assigned to the connection must be an address assigned to
139 : * an RDMA-capable device (i.e. IPoIB)
140 : * 4) Shared memory must be sufficient for all RDMA connections,
141 : * 2 * Global::IATTR_RDMA_RING_BUFFER_SIZE_MB for each
142 : * (i.e. /dev/shm, kernel.shm[min|max|all])
143 : * 5) The user must be able to lock the memory registered with verbs, such that
144 : * the locked memory limit needs to be sufficient ("ulimit -l" for bash,
145 : * "limit memorylocked" for csh). Updating /etc/security/limits.conf with
146 : * entries like this is usually adequate (e.g. to raise the limit to 2GB for
147 : * all users):
148 : * * soft memlock 2048000
149 : * * hard memlock 2048000
150 : *
151 : * NB : Binding to "localhost" does *not* limit remote access, rdma_cm will
152 : * bind to all available RDMA interfaces as if bound to a wildcard address!
153 : *
154 : * TODO? : Binding to wildcard address updates the description with the
155 : * canonical hostname, which is possibly not a valid RDMA IPoIB name.
156 : *
157 : * TODO? : Mixed IPv6/IPv4 naming isn't handled correctly. If one listens on
158 : * IPv6 and gets an IPv4 connection the address in the route struct doesn't
159 : * appear to be valid, and vice versa. Not sure if this is an RDMA CM
160 : * issue or improper handling.
161 : *
162 : */
163 : class RDMAConnection : public Connection
164 : {
165 : public:
166 : RDMAConnection( );
167 :
168 : bool connect() override;
169 : bool listen() override;
170 6 : void close() override { _close( ); }
171 :
172 : void acceptNB() override;
173 : ConnectionPtr acceptSync() override;
174 :
175 : protected:
176 : void readNB ( void* buffer, const uint64_t bytes ) override;
177 : int64_t readSync( void* buffer, const uint64_t bytes,
178 : const bool block ) override;
179 : int64_t write( const void* buffer, const uint64_t bytes ) override;
180 :
181 : public:
182 : Notifier getNotifier() const override;
183 :
184 : protected:
185 : virtual ~RDMAConnection();
186 :
187 : private:
188 : /* Teardown */
189 : void _close( );
190 : void _cleanup( );
191 :
192 : /* Setup */
193 : bool _finishAccept( struct rdma_cm_id *new_cm_id,
194 : const RDMAConnParamData &cpd );
195 :
196 : bool _lookupAddress( const bool passive );
197 : void _updateInfo( struct sockaddr *addr );
198 :
199 : bool _createEventChannel( );
200 : bool _createId( );
201 :
202 : bool _initVerbs( );
203 : bool _createQP( );
204 : bool _initBuffers( );
205 :
206 : bool _resolveAddress( );
207 : bool _resolveRoute( );
208 : bool _connect( );
209 :
210 : bool _bindAddress( );
211 : bool _listen( int backlog );
212 : bool _migrateId( );
213 : bool _accept( );
214 : bool _reject( );
215 :
216 : /* Protocol */
217 : bool _initProtocol( int32_t depth );
218 :
219 : inline bool _needFC( );
220 :
221 : bool _postReceives( const uint32_t count );
222 :
223 : inline void _recvRDMAWrite( const uint32_t imm_data );
224 : inline uint32_t _makeImm( const uint32_t b );
225 : bool _postRDMAWrite( );
226 :
227 : bool _postMessage( const RDMAMessage &message );
228 : void _recvMessage( const RDMAMessage &message );
229 : inline void _recvFC( const RDMAFCPayload &fc );
230 : bool _postFC();
231 : void _recvSetup( const RDMASetupPayload &setup );
232 : bool _postSetup( );
233 :
234 : bool _waitRecvSetup( );
235 :
236 : private:
237 : enum Events
238 : {
239 : CM_EVENT = 0,
240 : CQ_EVENT = 1,
241 : BUF_EVENT = 2,
242 : };
243 : typedef std::bitset<3> eventset;
244 :
245 : bool _createNotifier( );
246 : void _updateNotifier();
247 : bool _checkEvents( eventset &events );
248 :
249 : /* Connection manager events */
250 : bool _checkDisconnected( eventset &events );
251 : bool _waitForCMEvent( enum rdma_cm_event_type expected );
252 : bool _doCMEvent( enum rdma_cm_event_type expected );
253 :
254 : /* Completion queue events */
255 : bool _rearmCQ( );
256 : bool _checkCQ( bool drain );
257 :
258 : /* Available byte events */
259 : bool _createBytesAvailableFD( );
260 : bool _incrAvailableBytes( const uint64_t b );
261 : uint64_t _getAvailableBytes( );
262 :
263 : #ifdef _WIN32
264 : static void _triggerNotifierCQ( RDMAConnection* conn );
265 : static void _triggerNotifierCM( RDMAConnection* conn );
266 : void _triggerNotifierWorker( Events event );
267 : #endif
268 :
269 : private:
270 : Notifier _notifier;
271 :
272 : /* Protect RDMA/Verbs vars from multiple threads */
273 : lunchbox::Lock _poll_lock;
274 :
275 : /* Timeout for resolving RDMA address & route */
276 : const int32_t _timeout;
277 :
278 : /* Final connection info */
279 : char _addr[NI_MAXHOST], _serv[NI_MAXSERV];
280 : std::string _device_name;
281 :
282 : /* RDMA/Verbs vars */
283 : struct rdma_addrinfo *_rai; // The addrinfo returned by rdma_getaddrinfo.
284 : struct rdma_addrinfo *_addrinfo; // The addrinfo to be used. Points to
285 : // one node in the list from _rai.
286 : struct rdma_event_channel *_cm;
287 : struct rdma_cm_id *_cm_id;
288 : struct rdma_cm_id *_new_cm_id;
289 : struct ibv_comp_channel *_cc;
290 : struct ibv_cq *_cq;
291 : struct ibv_pd *_pd;
292 : struct ibv_wc* _wcs;
293 :
294 : #ifndef _WIN32
295 : int _pipe_fd[2];
296 : #else
297 : uint64_t _availBytes;
298 : uint32_t _eventFlag;
299 : lunchbox::SpinLock _eventLock;
300 : #endif
301 :
302 : uint64_t _readBytes;
303 :
304 : struct RDMAConnParamData _cpd;
305 : bool _established;
306 :
307 : int32_t _depth; // Maximum sends in flight (RDMA & FC)
308 : lunchbox::a_int32_t _writes; // Number of unacked RDMA writes received
309 : lunchbox::a_int32_t _fcs; // Number of unacked FC messages received
310 : lunchbox::a_int32_t _wcredits; // Number of RDMA write credits available
311 : lunchbox::a_int32_t _fcredits; // Number of FC message credits available
312 :
313 : unsigned int _completions;
314 :
315 : /* MR for setup and ack messages */
316 : BufferPool _msgbuf;
317 :
318 : /* source RDMA MR */
319 : RingBuffer _sourcebuf;
320 : Ring<uint32_t, 3> _sourceptr;
321 : // : initialized during connect/accept
322 : // HEAD : advanced after copying buffer (fill) on local write
323 : // - write thread only
324 : // MIDDLE : advanced before posting RDMA write
325 : // - write thread only
326 : // TAIL : advanced after completing RDMA write
327 : // - write & read threads (in pollCQ)
328 :
329 : /* sink RDMA MR */
330 : RingBuffer _sinkbuf;
331 : Ring<uint32_t, 2> _sinkptr;
332 : // : initialized during connect/accept
333 : // HEAD : advanced on receipt of RDMA WRITE
334 : // - write & read threads (in pollCQ)
335 : // TAIL : advanced after copying buffer (drain) on local read
336 : // - read thread only
337 :
338 : /* local "view" of remote sink MR */
339 : Ring<uint32_t, 2> _rptr;
340 : // : initialized on receipt of setup message
341 : // HEAD : advanced before posting RDMA write
342 : // - write thread only
343 : // TAIL : advanced on receipt of FC
344 : // - write & read threads (in pollCQ)
345 :
346 : /* remote sink MR parameters */
347 : uint64_t _rbase, _rkey;
348 :
349 : /* copy bytes out of the sink buffer */
350 : inline uint32_t _drain( void *buffer, const uint32_t bytes );
351 : /* copy bytes in to the source buffer */
352 : inline uint32_t _fill( const void *buffer, const uint32_t bytes );
353 :
354 : #ifdef WIN32
355 : HANDLE _ccWaitObj;
356 : HANDLE _cmWaitObj;
357 : #endif
358 :
359 : private:
360 : struct stats
361 : {
362 6 : stats( )
363 : : reads( 0ULL )
364 : , buffer_empty( 0ULL )
365 : , no_credits_fc( 0ULL )
366 : , writes( 0ULL )
367 : , buffer_full( 0ULL )
368 6 : , no_credits_rdma( 0ULL )
369 6 : { }
370 :
371 : uint64_t reads;
372 : uint64_t buffer_empty;
373 : uint64_t no_credits_fc;
374 : uint64_t writes;
375 : uint64_t buffer_full;
376 : uint64_t no_credits_rdma;
377 : } _stats;
378 :
379 : void _showStats( );
380 : }; // RDMAConnection
381 : } // namespace co
|