Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "dataOStream.h"
23 :
24 : #include "buffer.h"
25 : #include "connectionDescription.h"
26 : #include "commands.h"
27 : #include "connections.h"
28 : #include "global.h"
29 : #include "log.h"
30 : #include "node.h"
31 : #include "types.h"
32 :
33 : #include <lunchbox/clock.h>
34 : #include <pression/data/Compressor.h>
35 : #include <pression/data/CompressorInfo.h>
36 :
37 : namespace co
38 : {
39 : namespace
40 : {
41 : #define CO_INSTRUMENT_DATAOSTREAM
42 : #ifdef CO_INSTRUMENT_DATAOSTREAM
43 22 : lunchbox::a_ssize_t nBytes;
44 22 : lunchbox::a_ssize_t nBytesIn;
45 22 : lunchbox::a_ssize_t nBytesOut;
46 22 : lunchbox::a_ssize_t nBytesSaved;
47 22 : lunchbox::a_ssize_t nBytesSent;
48 22 : lunchbox::a_ssize_t compressionTime;
49 22 : lunchbox::a_ssize_t compressionRuns;
50 : #endif
51 :
52 : enum CompressorState
53 : {
54 : STATE_UNCOMPRESSED,
55 : STATE_PARTIAL,
56 : STATE_COMPLETE,
57 : STATE_UNCOMPRESSIBLE
58 : };
59 : }
60 :
61 : namespace detail
62 : {
63 72365 : class DataOStream
64 : {
65 : public:
66 : /** The buffer used for saving and buffering */
67 : lunchbox::Bufferb buffer;
68 :
69 : /** The start position of the buffering, always 0 if !_save */
70 : uint64_t bufferStart;
71 :
72 : /** The uncompressed size of a completely compressed buffer. */
73 : uint64_t dataSize;
74 :
75 : /** The compressed size, 0 for uncompressed or uncompressable data. */
76 : uint64_t compressedDataSize;
77 :
78 : /** Locked connections to the receivers, if _enabled */
79 : Connections connections;
80 :
81 : CompressorState state; //!< State of compression
82 : CompressorPtr compressor; //!< The compressor instance.
83 : CompressorInfo compressorInfo; //!< Information on the compr.
84 :
85 : /** The output stream is enabled for writing */
86 : bool enabled;
87 :
88 : /** Some data has been sent since it was enabled */
89 : bool dataSent;
90 :
91 : /** Save all sent data */
92 : bool save;
93 :
94 72363 : DataOStream()
95 72363 : : bufferStart( 0 )
96 : , dataSize( 0 )
97 : , compressedDataSize( 0 )
98 : , state( STATE_UNCOMPRESSED )
99 : , enabled( false )
100 : , dataSent( false )
101 72363 : , save( false )
102 72360 : {}
103 :
104 0 : DataOStream( const DataOStream& rhs )
105 0 : : bufferStart( rhs.bufferStart )
106 0 : , dataSize( rhs.dataSize )
107 0 : , compressedDataSize( rhs.compressedDataSize )
108 0 : , state( rhs.state )
109 0 : , enabled( rhs.enabled )
110 0 : , dataSent( rhs.dataSent )
111 0 : , save( rhs.save )
112 0 : {}
113 :
114 330 : std::string getCompressorName() const
115 : {
116 346 : if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
117 16 : !compressor )
118 : {
119 314 : return std::string();
120 : }
121 16 : return compressorInfo.name;
122 : }
123 :
124 10 : bool initCompressor()
125 : {
126 10 : if( compressorInfo.name.empty( ))
127 2 : return false;
128 8 : if( compressor )
129 0 : return true;
130 :
131 8 : compressor.reset( compressorInfo.create( ));
132 8 : LBLOG( LOG_OBJECTS ) << "Allocated " << compressorInfo.name <<std::endl;
133 8 : return compressor != nullptr;
134 : }
135 :
136 173 : uint32_t getNumChunks() const
137 : {
138 189 : if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
139 16 : !compressor )
140 : {
141 157 : return 1;
142 : }
143 16 : return uint32_t(compressor->getCompressedData().size( ));
144 : }
145 :
146 : /** Compress data and update the compressor state. */
147 72306 : void compress( const uint8_t* src, const uint64_t size,
148 : const CompressorState result )
149 : {
150 72306 : if( state == result || state == STATE_UNCOMPRESSIBLE )
151 72297 : return;
152 : const uint64_t threshold =
153 72306 : uint64_t( Global::getIAttribute( Global::IATTR_OBJECT_COMPRESSION ));
154 :
155 72305 : if( size <= threshold || !initCompressor( ))
156 : {
157 72297 : state = STATE_UNCOMPRESSED;
158 72297 : return;
159 : }
160 :
161 : #ifdef CO_INSTRUMENT_DATAOSTREAM
162 16 : lunchbox::Clock clock;
163 : #endif
164 8 : const auto& output = compressor->compress( src, size );
165 8 : LBASSERT( !output.empty( ));
166 8 : compressedDataSize = pression::data::getDataSize( output );
167 :
168 : #ifdef CO_INSTRUMENT_DATAOSTREAM
169 8 : compressionTime += size_t( clock.getTimef() * 1000.f );
170 8 : nBytesIn += size;
171 8 : nBytesOut += compressedDataSize;
172 8 : ++compressionRuns;
173 : #endif
174 :
175 8 : if( compressedDataSize >= size )
176 : {
177 0 : state = STATE_UNCOMPRESSIBLE;
178 : #ifndef CO_AGGRESSIVE_CACHING
179 0 : compressor.reset( compressorInfo.create( ));
180 :
181 0 : if( result == STATE_COMPLETE )
182 0 : buffer.pack();
183 : #endif
184 0 : return;
185 : }
186 :
187 8 : state = result;
188 : #ifndef CO_AGGRESSIVE_CACHING
189 8 : if( result == STATE_COMPLETE )
190 : {
191 2 : LBASSERT( buffer.getSize() == dataSize );
192 2 : buffer.clear();
193 : }
194 : #endif
195 : }
196 : };
197 : }
198 :
199 72365 : DataOStream::DataOStream()
200 72365 : : _impl( new detail::DataOStream )
201 72360 : {}
202 :
203 0 : DataOStream::DataOStream( DataOStream& rhs )
204 : : boost::noncopyable()
205 0 : , _impl( new detail::DataOStream( *rhs._impl ))
206 : {
207 0 : _setupConnections( rhs.getConnections( ));
208 0 : getBuffer().swap( rhs.getBuffer( ));
209 :
210 : // disable send of rhs
211 0 : rhs._setupConnections( Connections( ));
212 0 : rhs.disable();
213 0 : }
214 :
215 144729 : DataOStream::~DataOStream()
216 : {
217 : // Can't call disable() from destructor since it uses virtual functions
218 72365 : LBASSERT( !_impl->enabled );
219 72365 : delete _impl;
220 72364 : }
221 :
222 178 : void DataOStream::_setCompressor( const CompressorInfo& info )
223 : {
224 178 : if( info == _impl->compressorInfo )
225 0 : return;
226 178 : _impl->compressorInfo = info;
227 178 : _impl->compressor.reset( nullptr );
228 : }
229 :
230 72425 : void DataOStream::_enable()
231 : {
232 72425 : LBASSERT( !_impl->enabled );
233 72423 : LBASSERT( _impl->save || !_impl->connections.empty( ));
234 72424 : _impl->state = STATE_UNCOMPRESSED;
235 72424 : _impl->bufferStart = 0;
236 72424 : _impl->dataSent = false;
237 72424 : _impl->dataSize = 0;
238 72424 : _impl->enabled = true;
239 72424 : _impl->buffer.setSize( 0 );
240 : #ifdef CO_AGGRESSIVE_CACHING
241 : _impl->buffer.reserve( COMMAND_ALLOCSIZE );
242 : #else
243 72420 : _impl->buffer.reserve( COMMAND_MINSIZE );
244 : #endif
245 72426 : }
246 :
247 236 : void DataOStream::_setupConnections( const Nodes& receivers )
248 : {
249 236 : _impl->connections = gatherConnections( receivers );
250 236 : }
251 :
252 72147 : void DataOStream::_setupConnections( const Connections& connections )
253 : {
254 72147 : _impl->connections = connections;
255 72146 : }
256 :
257 38 : void DataOStream::_setupConnection( NodePtr node, const bool useMulticast )
258 : {
259 38 : LBASSERT( _impl->connections.empty( ));
260 38 : _impl->connections.push_back( node->getConnection( useMulticast ));
261 38 : }
262 :
263 1 : void DataOStream::_setupConnection( ConnectionPtr connection )
264 : {
265 1 : _impl->connections.push_back( connection );
266 1 : }
267 :
268 33 : void DataOStream::_resend()
269 : {
270 33 : LBASSERT( !_impl->enabled );
271 33 : LBASSERT( !_impl->connections.empty( ));
272 33 : LBASSERT( _impl->save );
273 :
274 33 : _impl->compress( _impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE );
275 33 : sendData( _impl->buffer.getData(), _impl->dataSize, true );
276 33 : }
277 :
278 33 : void DataOStream::_clearConnections()
279 : {
280 33 : _impl->connections.clear();
281 33 : }
282 :
283 72263 : void DataOStream::disable()
284 : {
285 72263 : if( !_impl->enabled )
286 0 : return;
287 :
288 72263 : _impl->dataSize = _impl->buffer.getSize();
289 72262 : _impl->dataSent = _impl->dataSize > 0;
290 :
291 72262 : if( _impl->dataSent && !_impl->connections.empty( ))
292 : {
293 72099 : const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
294 72099 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
295 :
296 72099 : if( size == 0 && _impl->state == STATE_PARTIAL )
297 : {
298 : // OPT: all data has been sent in one compressed chunk
299 0 : _impl->state = STATE_COMPLETE;
300 : #ifndef CO_AGGRESSIVE_CACHING
301 0 : _impl->buffer.clear();
302 : #endif
303 : }
304 : else
305 : {
306 72099 : _impl->state = STATE_UNCOMPRESSED;
307 72099 : const CompressorState state = _impl->bufferStart == 0 ?
308 72099 : STATE_COMPLETE : STATE_PARTIAL;
309 72099 : _impl->compress( ptr, size, state );
310 : }
311 :
312 72100 : sendData( ptr, size, true ); // always send to finalize istream
313 : }
314 :
315 : #ifndef CO_AGGRESSIVE_CACHING
316 72262 : if( !_impl->save )
317 121 : _impl->buffer.clear();
318 : #endif
319 72262 : _impl->enabled = false;
320 72262 : _impl->connections.clear();
321 : }
322 :
323 72305 : void DataOStream::enableSave()
324 : {
325 72305 : LBASSERTINFO( !_impl->enabled ||
326 : ( !_impl->dataSent && _impl->buffer.getSize() == 0 ),
327 : "Can't enable saving after data has been written" );
328 72305 : _impl->save = true;
329 72305 : }
330 :
331 0 : void DataOStream::disableSave()
332 : {
333 0 : LBASSERTINFO( !_impl->enabled ||
334 : (!_impl->dataSent && _impl->buffer.getSize() == 0 ),
335 : "Can't disable saving after data has been written" );
336 0 : _impl->save = false;
337 0 : }
338 :
339 224 : bool DataOStream::hasSentData() const
340 : {
341 224 : return _impl->dataSent;
342 : }
343 :
344 314293 : void DataOStream::_write( const void* data, uint64_t size )
345 : {
346 314293 : LBASSERT( _impl->enabled );
347 : #ifdef CO_INSTRUMENT_DATAOSTREAM
348 314258 : nBytes += size;
349 : #endif
350 :
351 628847 : if( _impl->buffer.getSize() - _impl->bufferStart >
352 314424 : Global::getObjectBufferSize( ))
353 : {
354 9 : flush( false );
355 : }
356 314412 : _impl->buffer.append( static_cast< const uint8_t* >( data ), size );
357 314334 : }
358 :
359 174 : void DataOStream::flush( const bool last )
360 : {
361 174 : LBASSERT( _impl->enabled );
362 174 : if( !_impl->connections.empty( ))
363 : {
364 172 : const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
365 172 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
366 :
367 172 : _impl->state = STATE_UNCOMPRESSED;
368 172 : _impl->compress( ptr, size, STATE_PARTIAL );
369 172 : sendData( ptr, size, last );
370 : }
371 174 : _impl->dataSent = true;
372 174 : _resetBuffer();
373 174 : }
374 :
375 390 : void DataOStream::reset()
376 : {
377 390 : _resetBuffer();
378 390 : _impl->enabled = false;
379 390 : _impl->connections.clear();
380 390 : }
381 :
382 72787 : const Connections& DataOStream::getConnections() const
383 : {
384 72787 : return _impl->connections;
385 : }
386 :
387 564 : void DataOStream::_resetBuffer()
388 : {
389 564 : _impl->state = STATE_UNCOMPRESSED;
390 564 : if( _impl->save )
391 332 : _impl->bufferStart = _impl->buffer.getSize();
392 : else
393 : {
394 232 : _impl->bufferStart = 0;
395 232 : _impl->buffer.setSize( 0 );
396 : }
397 564 : }
398 :
399 8 : uint64_t DataOStream::_getCompressedData( const uint8_t** chunks,
400 : uint64_t* chunkSizes )
401 : const
402 : {
403 8 : LBASSERT( _impl->state != STATE_UNCOMPRESSED &&
404 : _impl->state != STATE_UNCOMPRESSIBLE && _impl->compressor );
405 :
406 8 : const auto& result = _impl->compressor->getCompressedData();
407 8 : LBASSERT( !result.empty() );
408 8 : size_t totalDataSize = 0;
409 16 : for( size_t i = 0; i != result.size(); ++i )
410 : {
411 8 : chunks[i] = result[i].getData();
412 8 : const size_t dataSize = result[i].getSize();
413 8 : chunkSizes[i] = dataSize;
414 8 : totalDataSize += dataSize;
415 : }
416 :
417 8 : LBASSERT( totalDataSize == pression::data::getDataSize( result ));
418 8 : return totalDataSize;
419 : }
420 :
421 288734 : lunchbox::Bufferb& DataOStream::getBuffer()
422 : {
423 288734 : return _impl->buffer;
424 : }
425 :
426 165 : DataOStream& DataOStream::streamDataHeader( DataOStream& os )
427 : {
428 165 : return os << _impl->getCompressorName() << _impl->getNumChunks();
429 : }
430 :
431 165 : void DataOStream::sendBody( ConnectionPtr connection, const void* data,
432 : const uint64_t size )
433 : {
434 :
435 173 : const auto& compressorName = _impl->getCompressorName();
436 165 : if( compressorName.empty( ))
437 : {
438 : #ifdef CO_INSTRUMENT_DATAOSTREAM
439 157 : nBytesSent += size;
440 : #endif
441 157 : if( size > 0 )
442 157 : LBCHECK( connection->send( data, size, true ));
443 157 : return;
444 : }
445 :
446 8 : const size_t nChunks = _impl->compressor->getCompressedData().size();
447 : uint64_t* chunkSizes = static_cast< uint64_t* >
448 8 : ( alloca (nChunks * sizeof( uint64_t )));
449 : const uint8_t** chunks = static_cast< const uint8_t ** >
450 8 : ( alloca( nChunks * sizeof( void* )));
451 :
452 : #ifdef CO_INSTRUMENT_DATAOSTREAM
453 8 : const uint64_t compressedSize = _getCompressedData( chunks, chunkSizes );
454 8 : if( _impl->state == STATE_COMPLETE )
455 : {
456 2 : nBytesSent += _impl->dataSize;
457 2 : nBytesSaved += _impl->dataSize - compressedSize;
458 : }
459 : else
460 : {
461 6 : nBytesSent += _impl->buffer.getSize() - _impl->bufferStart;
462 6 : nBytesSaved += _impl->buffer.getSize() - _impl->bufferStart -
463 6 : compressedSize;
464 : }
465 :
466 : #else
467 : _getCompressedData( chunks, chunkSizes );
468 : #endif
469 :
470 16 : for( size_t j = 0; j < nChunks; ++j )
471 : {
472 8 : LBCHECK( connection->send( &chunkSizes[j], sizeof( uint64_t ), true ));
473 8 : LBCHECK( connection->send( chunks[j], chunkSizes[j], true ));
474 : }
475 : }
476 :
477 165 : uint64_t DataOStream::getCompressedDataSize() const
478 : {
479 338 : if( _impl->state == STATE_UNCOMPRESSED ||
480 173 : _impl->state == STATE_UNCOMPRESSIBLE || !_impl->compressor )
481 : {
482 157 : return 0;
483 : }
484 8 : return _impl->compressedDataSize +
485 8 : _impl->getNumChunks() * sizeof( uint64_t );
486 : }
487 :
488 21 : std::ostream& DataOStream::printStatistics( std::ostream& os )
489 : {
490 : return os << "DataOStream "
491 : #ifdef CO_INSTRUMENT_DATAOSTREAM
492 21 : << "compressed " << nBytesIn << " -> " << nBytesOut << " of "
493 42 : << nBytes << " @ "
494 21 : << int( float( nBytesIn )/1.024f/1.024f/compressionTime + .5f )
495 63 : << " MB/s " << compressionRuns << " runs, saved " << nBytesSaved
496 42 : << " of " << nBytesSent << " brutto sent ("
497 42 : << double( nBytesSaved ) / double( nBytesSent ) * 100. << "%)";
498 : #else
499 : << "without statistics enabled";
500 : #endif
501 : }
502 :
503 21 : void DataOStream::clearStatistics()
504 : {
505 : #ifdef CO_INSTRUMENT_DATAOSTREAM
506 21 : nBytes = 0;
507 21 : nBytesIn = 0;
508 21 : nBytesOut = 0;
509 21 : nBytesSaved = 0;
510 21 : nBytesSent = 0;
511 21 : compressionTime = 0;
512 21 : compressionRuns = 0;
513 : #endif
514 21 : }
515 :
516 66 : }
|