Line data Source code
1 :
2 : /* Copyright (c) 2007-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "dataOStream.h"
23 :
24 : #include "buffer.h"
25 : #include "commands.h"
26 : #include "connectionDescription.h"
27 : #include "connections.h"
28 : #include "log.h"
29 : #include "node.h"
30 : #include "types.h"
31 :
32 : #include <lunchbox/clock.h>
33 : #include <pression/data/Compressor.h>
34 : #include <pression/data/CompressorInfo.h>
35 :
36 : namespace co
37 : {
38 : namespace
39 : {
40 : #define CO_INSTRUMENT_DATAOSTREAM
41 : #ifdef CO_INSTRUMENT_DATAOSTREAM
42 21 : lunchbox::a_ssize_t nBytes;
43 21 : lunchbox::a_ssize_t nBytesIn;
44 21 : lunchbox::a_ssize_t nBytesOut;
45 21 : lunchbox::a_ssize_t nBytesSaved;
46 21 : lunchbox::a_ssize_t nBytesSent;
47 21 : lunchbox::a_ssize_t compressionTime;
48 21 : lunchbox::a_ssize_t compressionRuns;
49 : #endif
50 :
51 : enum CompressorState
52 : {
53 : STATE_UNCOMPRESSED,
54 : STATE_PARTIAL,
55 : STATE_COMPLETE,
56 : STATE_UNCOMPRESSIBLE
57 : };
58 : }
59 :
60 : namespace detail
61 : {
62 72284 : class DataOStream
63 : {
64 : public:
65 : /** The buffer used for saving and buffering */
66 : lunchbox::Bufferb buffer;
67 :
68 : /** The start position of the buffering, always 0 if !_save */
69 : uint64_t bufferStart;
70 :
71 : /** The uncompressed size of a completely compressed buffer. */
72 : uint64_t dataSize;
73 :
74 : /** The compressed size, 0 for uncompressed or uncompressable data. */
75 : uint64_t compressedDataSize;
76 :
77 : /** The maximum of size of chunk that gets flushed/sent. */
78 : const size_t chunkSize;
79 :
80 : /** Locked connections to the receivers, if _enabled */
81 : Connections connections;
82 :
83 : CompressorState state; //!< State of compression
84 : CompressorPtr compressor; //!< The compressor instance.
85 : CompressorInfo compressorInfo; //!< Information on the compr.
86 :
87 : /** The output stream is enabled for writing */
88 : bool enabled;
89 :
90 : /** Some data has been sent since it was enabled */
91 : bool dataSent;
92 :
93 : /** Save all sent data */
94 : bool save;
95 :
96 72280 : DataOStream(const size_t chunkSize_)
97 72280 : : bufferStart(0)
98 : , dataSize(0)
99 : , compressedDataSize(0)
100 : , chunkSize(chunkSize_)
101 : , state(STATE_UNCOMPRESSED)
102 : , enabled(false)
103 : , dataSent(false)
104 72280 : , save(false)
105 : {
106 72278 : }
107 :
108 0 : DataOStream(const DataOStream& rhs)
109 0 : : bufferStart(rhs.bufferStart)
110 0 : , dataSize(rhs.dataSize)
111 0 : , compressedDataSize(rhs.compressedDataSize)
112 0 : , chunkSize(rhs.chunkSize)
113 0 : , state(rhs.state)
114 0 : , enabled(rhs.enabled)
115 0 : , dataSent(rhs.dataSent)
116 0 : , save(rhs.save)
117 : {
118 0 : }
119 :
120 318 : std::string getCompressorName() const
121 : {
122 334 : if (state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
123 16 : !compressor)
124 : {
125 302 : return std::string();
126 : }
127 16 : return compressorInfo.name;
128 : }
129 :
130 10 : bool initCompressor()
131 : {
132 10 : if (compressorInfo.name.empty())
133 2 : return false;
134 8 : if (compressor)
135 0 : return true;
136 :
137 8 : compressor.reset(compressorInfo.create());
138 8 : LBLOG(LOG_OBJECTS) << "Allocated " << compressorInfo.name << std::endl;
139 8 : return compressor != nullptr;
140 : }
141 :
142 167 : uint32_t getNumChunks() const
143 : {
144 183 : if (state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
145 16 : !compressor)
146 : {
147 151 : return 1;
148 : }
149 16 : return uint32_t(compressor->getCompressedData().size());
150 : }
151 :
152 : /** Compress data and update the compressor state. */
153 72224 : void compress(const uint8_t* src, const uint64_t size,
154 : const CompressorState result)
155 : {
156 72224 : if (state == result || state == STATE_UNCOMPRESSIBLE)
157 72216 : return;
158 : const uint64_t threshold =
159 72224 : uint64_t(Global::getIAttribute(Global::IATTR_OBJECT_COMPRESSION));
160 :
161 72224 : if (size <= threshold || !initCompressor())
162 : {
163 72216 : state = STATE_UNCOMPRESSED;
164 72216 : return;
165 : }
166 :
167 : #ifdef CO_INSTRUMENT_DATAOSTREAM
168 16 : lunchbox::Clock clock;
169 : #endif
170 8 : const auto& output = compressor->compress(src, size);
171 8 : LBASSERT(!output.empty());
172 8 : compressedDataSize = pression::data::getDataSize(output);
173 :
174 : #ifdef CO_INSTRUMENT_DATAOSTREAM
175 8 : compressionTime += size_t(clock.getTimef() * 1000.f);
176 8 : nBytesIn += size;
177 8 : nBytesOut += compressedDataSize;
178 8 : ++compressionRuns;
179 : #endif
180 :
181 8 : if (compressedDataSize >= size)
182 : {
183 0 : state = STATE_UNCOMPRESSIBLE;
184 : #ifndef CO_AGGRESSIVE_CACHING
185 0 : compressor.reset(compressorInfo.create());
186 :
187 0 : if (result == STATE_COMPLETE)
188 0 : buffer.pack();
189 : #endif
190 0 : return;
191 : }
192 :
193 8 : state = result;
194 : #ifndef CO_AGGRESSIVE_CACHING
195 8 : if (result == STATE_COMPLETE)
196 : {
197 2 : LBASSERT(buffer.getSize() == dataSize);
198 2 : buffer.clear();
199 : }
200 : #endif
201 : }
202 : };
203 : }
204 :
205 72284 : DataOStream::DataOStream(const size_t chunkSize)
206 72284 : : _impl(new detail::DataOStream(chunkSize))
207 : {
208 72278 : }
209 :
210 0 : DataOStream::DataOStream(DataOStream& rhs)
211 : : boost::noncopyable()
212 0 : , _impl(new detail::DataOStream(*rhs._impl))
213 : {
214 0 : _setupConnections(rhs.getConnections());
215 0 : getBuffer().swap(rhs.getBuffer());
216 :
217 : // disable send of rhs
218 0 : rhs._setupConnections(Connections());
219 0 : rhs.disable();
220 0 : }
221 :
222 144568 : DataOStream::~DataOStream()
223 : {
224 : // Can't call disable() from destructor since it uses virtual functions
225 72284 : LBASSERT(!_impl->enabled);
226 72284 : delete _impl;
227 72284 : }
228 :
229 172 : void DataOStream::_setCompressor(const CompressorInfo& info)
230 : {
231 172 : if (info == _impl->compressorInfo)
232 0 : return;
233 172 : _impl->compressorInfo = info;
234 172 : _impl->compressor.reset(nullptr);
235 : }
236 :
237 72344 : void DataOStream::_enable()
238 : {
239 72344 : LBASSERT(!_impl->enabled);
240 72341 : LBASSERT(_impl->save || !_impl->connections.empty());
241 72340 : _impl->state = STATE_UNCOMPRESSED;
242 72340 : _impl->bufferStart = 0;
243 72340 : _impl->dataSent = false;
244 72340 : _impl->dataSize = 0;
245 72340 : _impl->enabled = true;
246 72340 : _impl->buffer.setSize(0);
247 : #ifdef CO_AGGRESSIVE_CACHING
248 : _impl->buffer.reserve(COMMAND_ALLOCSIZE);
249 : #else
250 72337 : _impl->buffer.reserve(COMMAND_MINSIZE);
251 : #endif
252 72347 : }
253 :
254 236 : void DataOStream::_setupConnections(const Nodes& receivers)
255 : {
256 236 : _impl->connections = gatherConnections(receivers);
257 236 : }
258 :
259 72072 : void DataOStream::_setupConnections(const Connections& connections)
260 : {
261 72072 : _impl->connections = connections;
262 72071 : }
263 :
264 32 : void DataOStream::_setupConnection(NodePtr node, const bool useMulticast)
265 : {
266 32 : LBASSERT(_impl->connections.empty());
267 32 : _impl->connections.push_back(node->getConnection(useMulticast));
268 32 : }
269 :
270 1 : void DataOStream::_setupConnection(ConnectionPtr connection)
271 : {
272 1 : _impl->connections.push_back(connection);
273 1 : }
274 :
275 33 : void DataOStream::_resend()
276 : {
277 33 : LBASSERT(!_impl->enabled);
278 33 : LBASSERT(!_impl->connections.empty());
279 33 : LBASSERT(_impl->save);
280 :
281 33 : _impl->compress(_impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE);
282 33 : sendData(_impl->buffer.getData(), _impl->dataSize, true);
283 33 : }
284 :
285 33 : void DataOStream::_clearConnections()
286 : {
287 33 : _impl->connections.clear();
288 33 : }
289 :
290 72186 : void DataOStream::disable()
291 : {
292 72186 : if (!_impl->enabled)
293 0 : return;
294 :
295 72186 : _impl->dataSize = _impl->buffer.getSize();
296 72187 : _impl->dataSent = _impl->dataSize > 0;
297 :
298 72187 : if (_impl->dataSent && !_impl->connections.empty())
299 : {
300 72025 : const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
301 72025 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
302 :
303 72024 : if (size == 0 && _impl->state == STATE_PARTIAL)
304 : {
305 : // OPT: all data has been sent in one compressed chunk
306 0 : _impl->state = STATE_COMPLETE;
307 : #ifndef CO_AGGRESSIVE_CACHING
308 0 : _impl->buffer.clear();
309 : #endif
310 : }
311 : else
312 : {
313 72024 : _impl->state = STATE_UNCOMPRESSED;
314 : const CompressorState state =
315 72024 : _impl->bufferStart == 0 ? STATE_COMPLETE : STATE_PARTIAL;
316 72024 : _impl->compress(ptr, size, state);
317 : }
318 :
319 72025 : sendData(ptr, size, true); // always send to finalize istream
320 : }
321 :
322 : #ifndef CO_AGGRESSIVE_CACHING
323 72188 : if (!_impl->save)
324 115 : _impl->buffer.clear();
325 : #endif
326 72188 : _impl->enabled = false;
327 72188 : _impl->connections.clear();
328 : }
329 :
330 72229 : void DataOStream::enableSave()
331 : {
332 72229 : LBASSERTINFO(!_impl->enabled ||
333 : (!_impl->dataSent && _impl->buffer.getSize() == 0),
334 : "Can't enable saving after data has been written");
335 72229 : _impl->save = true;
336 72229 : }
337 :
338 0 : void DataOStream::disableSave()
339 : {
340 0 : LBASSERTINFO(!_impl->enabled ||
341 : (!_impl->dataSent && _impl->buffer.getSize() == 0),
342 : "Can't disable saving after data has been written");
343 0 : _impl->save = false;
344 0 : }
345 :
346 218 : bool DataOStream::hasSentData() const
347 : {
348 218 : return _impl->dataSent;
349 : }
350 :
351 313707 : void DataOStream::_write(const void* data, uint64_t size)
352 : {
353 313707 : LBASSERT(_impl->enabled);
354 : #ifdef CO_INSTRUMENT_DATAOSTREAM
355 313642 : nBytes += size;
356 : #endif
357 :
358 313879 : if (_impl->buffer.getSize() - _impl->bufferStart > _impl->chunkSize)
359 9 : flush(false);
360 313866 : _impl->buffer.append(static_cast<const uint8_t*>(data), size);
361 313792 : }
362 :
363 168 : void DataOStream::flush(const bool last)
364 : {
365 168 : LBASSERT(_impl->enabled);
366 168 : if (!_impl->connections.empty())
367 : {
368 166 : const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
369 166 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
370 :
371 166 : _impl->state = STATE_UNCOMPRESSED;
372 166 : _impl->compress(ptr, size, STATE_PARTIAL);
373 166 : sendData(ptr, size, last);
374 : }
375 168 : _impl->dataSent = true;
376 168 : _resetBuffer();
377 168 : }
378 :
379 384 : void DataOStream::reset()
380 : {
381 384 : _resetBuffer();
382 384 : _impl->enabled = false;
383 384 : _impl->connections.clear();
384 384 : }
385 :
386 72690 : const Connections& DataOStream::getConnections() const
387 : {
388 72690 : return _impl->connections;
389 : }
390 :
391 552 : void DataOStream::_resetBuffer()
392 : {
393 552 : _impl->state = STATE_UNCOMPRESSED;
394 552 : if (_impl->save)
395 320 : _impl->bufferStart = _impl->buffer.getSize();
396 : else
397 : {
398 232 : _impl->bufferStart = 0;
399 232 : _impl->buffer.setSize(0);
400 : }
401 552 : }
402 :
403 8 : uint64_t DataOStream::_getCompressedData(const uint8_t** chunks,
404 : uint64_t* chunkSizes) const
405 : {
406 8 : LBASSERT(_impl->state != STATE_UNCOMPRESSED &&
407 : _impl->state != STATE_UNCOMPRESSIBLE && _impl->compressor);
408 :
409 8 : const auto& result = _impl->compressor->getCompressedData();
410 8 : LBASSERT(!result.empty());
411 8 : size_t totalDataSize = 0;
412 16 : for (size_t i = 0; i != result.size(); ++i)
413 : {
414 8 : chunks[i] = result[i].getData();
415 8 : const size_t dataSize = result[i].getSize();
416 8 : chunkSizes[i] = dataSize;
417 8 : totalDataSize += dataSize;
418 : }
419 :
420 8 : LBASSERT(totalDataSize == pression::data::getDataSize(result));
421 8 : return totalDataSize;
422 : }
423 :
424 288441 : lunchbox::Bufferb& DataOStream::getBuffer()
425 : {
426 288441 : return _impl->buffer;
427 : }
428 :
429 159 : DataOStream& DataOStream::streamDataHeader(DataOStream& os)
430 : {
431 159 : return os << _impl->getCompressorName() << _impl->getNumChunks();
432 : }
433 :
434 159 : void DataOStream::sendBody(ConnectionPtr connection, const void* data,
435 : const uint64_t size)
436 : {
437 167 : const auto& compressorName = _impl->getCompressorName();
438 159 : if (compressorName.empty())
439 : {
440 : #ifdef CO_INSTRUMENT_DATAOSTREAM
441 151 : nBytesSent += size;
442 : #endif
443 151 : if (size > 0)
444 151 : LBCHECK(connection->send(data, size, true));
445 151 : return;
446 : }
447 :
448 8 : const size_t nChunks = _impl->compressor->getCompressedData().size();
449 : uint64_t* chunkSizes =
450 8 : static_cast<uint64_t*>(alloca(nChunks * sizeof(uint64_t)));
451 : const uint8_t** chunks =
452 8 : static_cast<const uint8_t**>(alloca(nChunks * sizeof(void*)));
453 :
454 : #ifdef CO_INSTRUMENT_DATAOSTREAM
455 8 : const uint64_t compressedSize = _getCompressedData(chunks, chunkSizes);
456 8 : if (_impl->state == STATE_COMPLETE)
457 : {
458 2 : nBytesSent += _impl->dataSize;
459 2 : nBytesSaved += _impl->dataSize - compressedSize;
460 : }
461 : else
462 : {
463 6 : nBytesSent += _impl->buffer.getSize() - _impl->bufferStart;
464 6 : nBytesSaved +=
465 12 : _impl->buffer.getSize() - _impl->bufferStart - compressedSize;
466 : }
467 :
468 : #else
469 : _getCompressedData(chunks, chunkSizes);
470 : #endif
471 :
472 16 : for (size_t j = 0; j < nChunks; ++j)
473 : {
474 8 : LBCHECK(connection->send(&chunkSizes[j], sizeof(uint64_t), true));
475 8 : LBCHECK(connection->send(chunks[j], chunkSizes[j], true));
476 : }
477 : }
478 :
479 159 : uint64_t DataOStream::getCompressedDataSize() const
480 : {
481 326 : if (_impl->state == STATE_UNCOMPRESSED ||
482 167 : _impl->state == STATE_UNCOMPRESSIBLE || !_impl->compressor)
483 : {
484 151 : return 0;
485 : }
486 8 : return _impl->compressedDataSize + _impl->getNumChunks() * sizeof(uint64_t);
487 : }
488 :
489 20 : std::ostream& DataOStream::printStatistics(std::ostream& os)
490 : {
491 : return os << "DataOStream "
492 : #ifdef CO_INSTRUMENT_DATAOSTREAM
493 20 : << "compressed " << nBytesIn << " -> " << nBytesOut << " of "
494 40 : << nBytes << " @ "
495 20 : << int(float(nBytesIn) / 1.024f / 1.024f / compressionTime + .5f)
496 60 : << " MB/s " << compressionRuns << " runs, saved " << nBytesSaved
497 40 : << " of " << nBytesSent << " brutto sent ("
498 40 : << double(nBytesSaved) / double(nBytesSent) * 100. << "%)";
499 : #else
500 : << "without statistics enabled";
501 : #endif
502 : }
503 :
504 20 : void DataOStream::clearStatistics()
505 : {
506 : #ifdef CO_INSTRUMENT_DATAOSTREAM
507 20 : nBytes = 0;
508 20 : nBytesIn = 0;
509 20 : nBytesOut = 0;
510 20 : nBytesSaved = 0;
511 20 : nBytesSent = 0;
512 20 : compressionTime = 0;
513 20 : compressionRuns = 0;
514 : #endif
515 20 : }
516 63 : }
|