Line data Source code
1 :
2 : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "connection.h"
22 :
23 : #include "buffer.h"
24 : #include "connectionDescription.h"
25 : #include "connectionListener.h"
26 : #include "log.h"
27 : #include "pipeConnection.h"
28 : #include "rspConnection.h"
29 : #include "socketConnection.h"
30 :
31 : #ifdef _WIN32
32 : #include "namedPipeConnection.h"
33 : #endif
34 :
35 : #include <co/exception.h>
36 :
37 : #ifdef COLLAGE_USE_OFED
38 : #include "rdmaConnection.h"
39 : #endif
40 : #ifdef COLLAGE_USE_UDT
41 : #include "udtConnection.h"
42 : #endif
43 :
44 : #include <lunchbox/scopedMutex.h>
45 :
46 : #define STATISTICS
47 : namespace co
48 : {
49 : namespace detail
50 : {
51 : class Connection
52 : {
53 : public:
54 : co::Connection::State state; //!< The connection state
55 : ConnectionDescriptionPtr description; //!< The connection parameters
56 :
57 : /** The lock used to protect concurrent write calls. */
58 : mutable std::mutex sendLock;
59 :
60 : BufferPtr buffer; //!< Current async read buffer
61 : uint64_t bytes; //!< Current read request size
62 :
63 : /** The listeners on state changes */
64 : ConnectionListeners listeners;
65 :
66 : uint64_t outBytes; //!< Statistic: written bytes
67 : uint64_t inBytes; //!< Statistic: read bytes
68 :
69 409 : Connection()
70 409 : : state(co::Connection::STATE_CLOSED)
71 409 : , description(new ConnectionDescription)
72 : , bytes(0)
73 : , outBytes(0)
74 818 : , inBytes(0)
75 : {
76 409 : description->type = CONNECTIONTYPE_NONE;
77 409 : }
78 :
79 401 : ~Connection()
80 401 : {
81 401 : LBASSERT(state == co::Connection::STATE_CLOSED);
82 401 : state = co::Connection::STATE_CLOSED;
83 401 : description = 0;
84 :
85 401 : LBASSERTINFO(!buffer,
86 : "Pending read operation during connection destruction");
87 401 : }
88 :
89 1065 : void fireStateChanged(co::Connection* connection)
90 : {
91 3322 : for (ConnectionListeners::const_iterator i = listeners.begin();
92 2214 : i != listeners.end(); ++i)
93 : {
94 43 : (*i)->notifyStateChanged(connection);
95 : }
96 1064 : }
97 : };
98 : }
99 :
100 409 : Connection::Connection()
101 409 : : _impl(new detail::Connection)
102 : {
103 409 : LBVERB << "New Connection @" << (void*)this << std::endl;
104 409 : }
105 :
106 802 : Connection::~Connection()
107 : {
108 401 : LBVERB << "Delete Connection @" << (void*)this << std::endl;
109 : #ifdef STATISTICS
110 401 : if (_impl->outBytes > LB_1MB || _impl->inBytes > LB_1MB)
111 102 : LBINFO << *this << ": " << (_impl->outBytes >> 20) << " MB out, "
112 102 : << (_impl->inBytes >> 20) << " MB in" << std::endl;
113 : #endif
114 401 : delete _impl;
115 401 : }
116 :
117 0 : bool Connection::operator==(const Connection& rhs) const
118 : {
119 0 : if (this == &rhs)
120 0 : return true;
121 0 : if (_impl->description->type != CONNECTIONTYPE_PIPE)
122 0 : return false;
123 0 : Connection* pipe = const_cast<Connection*>(this);
124 0 : return pipe->acceptSync().get() == &rhs;
125 : }
126 :
127 99 : ConnectionPtr Connection::create(ConnectionDescriptionPtr description)
128 : {
129 198 : ConnectionPtr connection;
130 99 : switch (description->type)
131 : {
132 : case CONNECTIONTYPE_TCPIP:
133 95 : connection = new SocketConnection;
134 95 : break;
135 :
136 : case CONNECTIONTYPE_PIPE:
137 3 : connection = new PipeConnection;
138 3 : break;
139 :
140 : #ifdef _WIN32
141 : case CONNECTIONTYPE_NAMEDPIPE:
142 : connection = new NamedPipeConnection;
143 : break;
144 : #endif
145 :
146 : case CONNECTIONTYPE_RSP:
147 0 : connection = new RSPConnection;
148 0 : break;
149 :
150 : #ifdef COLLAGE_USE_OFED
151 : case CONNECTIONTYPE_RDMA:
152 0 : connection = new RDMAConnection;
153 0 : break;
154 : #endif
155 : #ifdef COLLAGE_USE_UDT
156 : case CONNECTIONTYPE_UDT:
157 : connection = new UDTConnection;
158 : break;
159 : #endif
160 :
161 : default:
162 3 : LBWARN << "Connection type " << description->type << " not supported"
163 3 : << std::endl;
164 1 : return 0;
165 : }
166 :
167 98 : if (description->bandwidth == 0)
168 75 : description->bandwidth = connection->getDescription()->bandwidth;
169 :
170 98 : connection->_setDescription(description);
171 98 : return connection;
172 : }
173 :
174 3271994 : Connection::State Connection::getState() const
175 : {
176 3271994 : return _impl->state;
177 : }
178 :
179 98 : void Connection::_setDescription(ConnectionDescriptionPtr description)
180 : {
181 98 : LBASSERT(description.isValid());
182 98 : LBASSERTINFO(_impl->description->type == description->type,
183 : "Wrong connection type in description");
184 98 : _impl->description = description;
185 98 : LBASSERT(description->bandwidth > 0);
186 98 : }
187 :
188 1117 : void Connection::_setState(const State state)
189 : {
190 1117 : if (_impl->state == state)
191 52 : return;
192 1065 : _impl->state = state;
193 1065 : _impl->fireStateChanged(this);
194 : }
195 :
196 177 : void Connection::lockSend() const
197 : {
198 177 : _impl->sendLock.lock();
199 177 : }
200 :
201 177 : void Connection::unlockSend() const
202 : {
203 177 : _impl->sendLock.unlock();
204 177 : }
205 :
206 168 : void Connection::addListener(ConnectionListener* listener)
207 : {
208 168 : _impl->listeners.push_back(listener);
209 169 : }
210 :
211 168 : void Connection::removeListener(ConnectionListener* listener)
212 : {
213 : ConnectionListeners::iterator i =
214 168 : find(_impl->listeners.begin(), _impl->listeners.end(), listener);
215 168 : if (i != _impl->listeners.end())
216 168 : _impl->listeners.erase(i);
217 168 : }
218 :
219 : //----------------------------------------------------------------------
220 : // read
221 : //----------------------------------------------------------------------
222 2719507 : void Connection::recvNB(BufferPtr buffer, const uint64_t bytes)
223 : {
224 2719507 : LBASSERT(!_impl->buffer);
225 2719507 : LBASSERT(_impl->bytes == 0);
226 2719507 : LBASSERT(buffer);
227 2719507 : LBASSERT(bytes > 0);
228 2719507 : LBASSERTINFO(bytes < LB_BIT48, "Out-of-sync network stream: read size "
229 : << bytes << "?");
230 :
231 2719507 : _impl->buffer = buffer;
232 2719507 : _impl->bytes = bytes;
233 2719507 : buffer->reserve(buffer->getSize() + bytes);
234 2719507 : readNB(buffer->getData() + buffer->getSize(), bytes);
235 2719507 : }
236 :
237 2719393 : bool Connection::recvSync(BufferPtr& outBuffer, const bool block)
238 : {
239 2719393 : LBASSERTINFO(_impl->buffer, "No pending receive on "
240 : << getDescription()->toString());
241 :
242 : // reset async IO data
243 2719393 : outBuffer = _impl->buffer;
244 2719393 : const uint64_t bytes = _impl->bytes;
245 2719393 : _impl->buffer = 0;
246 2719393 : _impl->bytes = 0;
247 :
248 2719393 : if (_impl->state != STATE_CONNECTED || !outBuffer || bytes == 0)
249 33 : return false;
250 2719360 : LBASSERTINFO(bytes < LB_BIT48, "Out-of-sync network stream: read size "
251 : << bytes << "?");
252 : #ifdef STATISTICS
253 2719360 : _impl->inBytes += bytes;
254 : #endif
255 :
256 : // 'Iterators' for receive loop
257 2719360 : uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
258 2719360 : uint64_t bytesLeft = bytes;
259 2719360 : int64_t got = readSync(ptr, bytesLeft, block);
260 :
261 : // WAR: fluke notification: On Win32, we get occasionally a data
262 : // notification and then deadlock when reading from the connection. The
263 : // callee (Node::handleData) will flag the first read, the underlying
264 : // SocketConnection will not block and we will restore the AIO operation if
265 : // no data was present.
266 2719360 : if (got == READ_TIMEOUT)
267 : {
268 0 : _impl->buffer = outBuffer;
269 0 : _impl->bytes = bytes;
270 0 : outBuffer = 0;
271 0 : return true;
272 : }
273 :
274 : // From here on, receive loop until all data read or error
275 : while (true)
276 : {
277 3016576 : if (got < 0) // error
278 : {
279 37 : const uint64_t read = bytes - bytesLeft;
280 37 : outBuffer->resize(outBuffer->getSize() + read);
281 37 : if (bytes == bytesLeft)
282 37 : LBDEBUG << "Read on dead connection" << std::endl;
283 : else
284 0 : LBERROR << "Error during read after " << read << " bytes on "
285 0 : << _impl->description << std::endl;
286 37 : return false;
287 : }
288 3016539 : else if (got == 0)
289 : {
290 : // ConnectionSet::select may report data on an 'empty' connection.
291 : // If we have nothing read so far, we have hit this case.
292 0 : if (bytes == bytesLeft)
293 0 : return false;
294 0 : LBVERB << "Zero bytes read" << std::endl;
295 : }
296 3016539 : if (bytesLeft > static_cast<uint64_t>(got)) // partial read
297 : {
298 297216 : ptr += got;
299 297216 : bytesLeft -= got;
300 :
301 297216 : readNB(ptr, bytesLeft);
302 297216 : got = readSync(ptr, bytesLeft, true);
303 297216 : continue;
304 : }
305 :
306 : // read done
307 2719323 : LBASSERTINFO(static_cast<uint64_t>(got) == bytesLeft, got << " != "
308 : << bytesLeft);
309 :
310 2719323 : outBuffer->resize(outBuffer->getSize() + bytes);
311 : #ifndef NDEBUG
312 2719323 : if (bytes <= 1024 && (lunchbox::Log::topics & LOG_PACKETS))
313 : {
314 0 : ptr -= (bytes - bytesLeft); // rewind
315 0 : LBINFO << "recv:" << lunchbox::format(ptr, bytes) << std::endl;
316 : }
317 : #endif
318 2719323 : return true;
319 297216 : }
320 :
321 : LBUNREACHABLE;
322 : return true;
323 : }
324 :
325 217 : BufferPtr Connection::resetRecvData()
326 : {
327 217 : BufferPtr buffer = _impl->buffer;
328 217 : _impl->buffer = 0;
329 217 : _impl->bytes = 0;
330 217 : return buffer;
331 : }
332 :
333 : //----------------------------------------------------------------------
334 : // write
335 : //----------------------------------------------------------------------
336 2968123 : bool Connection::send(const void* buffer, const uint64_t bytes,
337 : const bool isLocked)
338 : {
339 : #ifdef STATISTICS
340 2968123 : _impl->outBytes += bytes;
341 : #endif
342 2968123 : LBASSERT(bytes > 0);
343 2964517 : if (bytes == 0)
344 0 : return true;
345 :
346 2964517 : const uint8_t* ptr = static_cast<const uint8_t*>(buffer);
347 :
348 : // possible OPT: We need to lock here to guarantee an atomic transmission of
349 : // the buffer. Possible improvements are:
350 : // 1) Disassemble buffer into 'small enough' pieces and use a header to
351 : // reassemble correctly on the other side (aka reliable UDP)
352 : // 2) Introduce a send thread with a thread-safe task queue
353 5930227 : lunchbox::ScopedWrite mutex(isLocked ? 0 : &_impl->sendLock);
354 :
355 : #ifndef NDEBUG
356 2970818 : if (bytes <= 1024 && (lunchbox::Log::topics & LOG_PACKETS))
357 0 : LBINFO << "send:" << lunchbox::format(ptr, bytes) << std::endl;
358 : #endif
359 :
360 2970818 : uint64_t bytesLeft = bytes;
361 8902188 : while (bytesLeft)
362 : {
363 : try
364 : {
365 2970793 : const int64_t wrote = this->write(ptr, bytesLeft);
366 2965685 : if (wrote == -1) // error
367 : {
368 0 : LBERROR << "Error during write after " << bytes - bytesLeft
369 0 : << " bytes, closing " << *this << std::endl;
370 0 : close();
371 0 : return false;
372 : }
373 2965685 : else if (wrote == 0)
374 0 : LBINFO << "Zero bytes write" << std::endl;
375 :
376 2965685 : bytesLeft -= wrote;
377 2965685 : ptr += wrote;
378 : }
379 0 : catch (const co::Exception& e)
380 : {
381 0 : LBERROR << e.what() << " after " << bytes - bytesLeft
382 0 : << " bytes, closing connection" << std::endl;
383 0 : close();
384 0 : return false;
385 : }
386 : }
387 2965710 : return true;
388 : }
389 :
390 164 : bool Connection::isMulticast() const
391 : {
392 164 : return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
393 : }
394 :
395 684 : ConstConnectionDescriptionPtr Connection::getDescription() const
396 : {
397 684 : return _impl->description;
398 : }
399 :
400 496 : ConnectionDescriptionPtr Connection::_getDescription()
401 : {
402 496 : return _impl->description;
403 : }
404 :
405 112 : std::ostream& operator<<(std::ostream& os, const Connection& connection)
406 : {
407 112 : const Connection::State state = connection.getState();
408 224 : ConstConnectionDescriptionPtr desc = connection.getDescription();
409 :
410 224 : os << lunchbox::className(connection) << " " << (void*)&connection
411 : << " state "
412 : << (state == Connection::STATE_CLOSED
413 : ? "closed"
414 : : state == Connection::STATE_CONNECTING
415 0 : ? "connecting"
416 : : state == Connection::STATE_CONNECTED
417 0 : ? "connected"
418 : : state == Connection::STATE_LISTENING
419 0 : ? "listening"
420 : : state == Connection::STATE_CLOSING
421 : ? "closing"
422 224 : : "UNKNOWN");
423 112 : if (desc.isValid())
424 112 : os << " description " << desc->toString();
425 :
426 224 : return os;
427 : }
428 63 : }
|