Line data Source code
1 :
2 : /* Copyright (c) 2007-2015, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "dataOStream.h"
23 :
24 : #include "buffer.h"
25 : #include "connectionDescription.h"
26 : #include "commands.h"
27 : #include "connections.h"
28 : #include "global.h"
29 : #include "log.h"
30 : #include "node.h"
31 : #include "types.h"
32 :
33 : #include <pression/compressor.h>
34 : #include <pression/compressorResult.h>
35 : #include <pression/plugins/compressor.h>
36 :
37 : #include <boost/foreach.hpp>
38 :
39 : namespace co
40 : {
41 : namespace
42 : {
43 : //#define CO_INSTRUMENT_DATAOSTREAM
44 : #ifdef CO_INSTRUMENT_DATAOSTREAM
45 : lunchbox::a_int32_t nBytes;
46 : lunchbox::a_int32_t nBytesIn;
47 : lunchbox::a_int32_t nBytesOut;
48 : CO_API lunchbox::a_int32_t nBytesSaved;
49 : CO_API lunchbox::a_int32_t nBytesSent;
50 : lunchbox::a_int32_t compressionTime;
51 : #endif
52 :
53 : enum CompressorState
54 : {
55 : STATE_UNCOMPRESSED,
56 : STATE_PARTIAL,
57 : STATE_COMPLETE,
58 : STATE_UNCOMPRESSIBLE
59 : };
60 : }
61 :
62 : namespace detail
63 : {
64 72365 : class DataOStream
65 : {
66 : public:
67 : CompressorState state;
68 :
69 : /** The buffer used for saving and buffering */
70 : lunchbox::Bufferb buffer;
71 :
72 : /** The start position of the buffering, always 0 if !_save */
73 : uint64_t bufferStart;
74 :
75 : /** The uncompressed size of a completely compressed buffer. */
76 : uint64_t dataSize;
77 :
78 : /** The compressed size, 0 for uncompressed or uncompressable data. */
79 : uint64_t compressedDataSize;
80 :
81 : /** Locked connections to the receivers, if _enabled */
82 : Connections connections;
83 :
84 : /** The compressor instance. */
85 : pression::Compressor compressor;
86 :
87 : /** The output stream is enabled for writing */
88 : bool enabled;
89 :
90 : /** Some data has been sent since it was enabled */
91 : bool dataSent;
92 :
93 : /** Save all sent data */
94 : bool save;
95 :
96 72365 : DataOStream()
97 : : state( STATE_UNCOMPRESSED )
98 : , bufferStart( 0 )
99 : , dataSize( 0 )
100 : , compressedDataSize( 0 )
101 : , enabled( false )
102 : , dataSent( false )
103 72365 : , save( false )
104 72364 : {}
105 :
106 0 : DataOStream( const DataOStream& rhs )
107 : : state( rhs.state )
108 : , bufferStart( rhs.bufferStart )
109 : , dataSize( rhs.dataSize )
110 : , compressedDataSize( rhs.compressedDataSize )
111 : , enabled( rhs.enabled )
112 : , dataSent( rhs.dataSent )
113 0 : , save( rhs.save )
114 0 : {}
115 :
116 495 : uint32_t getCompressor() const
117 : {
118 495 : if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
119 495 : return EQ_COMPRESSOR_NONE;
120 0 : return compressor.getInfo().name;
121 : }
122 :
123 165 : uint32_t getNumChunks() const
124 : {
125 165 : if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
126 165 : return 1;
127 0 : return uint32_t( compressor.getResult().chunks.size( ));
128 : }
129 :
130 :
131 : /** Compress data and update the compressor state. */
132 72305 : void compress( void* src, const uint64_t size, const CompressorState result)
133 : {
134 72305 : if( state == result || state == STATE_UNCOMPRESSIBLE )
135 72311 : return;
136 : #ifdef CO_INSTRUMENT_DATAOSTREAM
137 : nBytesIn += size;
138 : #endif
139 : const uint64_t threshold =
140 72305 : uint64_t( Global::getIAttribute( Global::IATTR_OBJECT_COMPRESSION ));
141 :
142 72307 : if( !compressor.isGood() || size <= threshold )
143 : {
144 72311 : state = STATE_UNCOMPRESSED;
145 72311 : return;
146 : }
147 :
148 0 : const uint64_t inDims[2] = { 0, size };
149 :
150 : #ifdef CO_INSTRUMENT_DATAOSTREAM
151 : lunchbox::Clock clock;
152 : #endif
153 0 : compressor.compress( src, inDims );
154 : #ifdef CO_INSTRUMENT_DATAOSTREAM
155 : compressionTime += uint32_t( clock.getTimef() * 1000.f );
156 : #endif
157 :
158 : const pression::CompressorResult &compressorResult =
159 0 : compressor.getResult();
160 0 : LBASSERT( !compressorResult.chunks.empty() );
161 0 : compressedDataSize = compressorResult.getSize();
162 :
163 : #ifdef CO_INSTRUMENT_DATAOSTREAM
164 : nBytesOut += compressedDataSize;
165 : #endif
166 :
167 0 : if( compressedDataSize >= size )
168 : {
169 0 : state = STATE_UNCOMPRESSIBLE;
170 : #ifndef CO_AGGRESSIVE_CACHING
171 0 : compressor.realloc();
172 :
173 0 : if( result == STATE_COMPLETE )
174 0 : buffer.pack();
175 : #endif
176 0 : return;
177 : }
178 :
179 0 : state = result;
180 : #ifndef CO_AGGRESSIVE_CACHING
181 0 : if( result == STATE_COMPLETE )
182 : {
183 0 : LBASSERT( buffer.getSize() == dataSize );
184 0 : buffer.clear();
185 0 : }
186 : #endif
187 : }
188 : };
189 : }
190 :
191 72364 : DataOStream::DataOStream()
192 72364 : : _impl( new detail::DataOStream )
193 72364 : {}
194 :
195 0 : DataOStream::DataOStream( DataOStream& rhs )
196 : : boost::noncopyable()
197 0 : , _impl( new detail::DataOStream( *rhs._impl ))
198 : {
199 0 : _setupConnections( rhs.getConnections( ));
200 0 : getBuffer().swap( rhs.getBuffer( ));
201 :
202 : // disable send of rhs
203 0 : rhs._setupConnections( Connections( ));
204 0 : rhs.disable();
205 0 : }
206 :
207 72365 : DataOStream::~DataOStream()
208 : {
209 : // Can't call disable() from destructor since it uses virtual functions
210 72365 : LBASSERT( !_impl->enabled );
211 72365 : delete _impl;
212 72365 : }
213 :
214 178 : void DataOStream::_initCompressor( const uint32_t name )
215 : {
216 178 : LBCHECK( _impl->compressor.setup( Global::getPluginRegistry(), name ));
217 178 : LB_TS_RESET( _impl->compressor._thread );
218 178 : }
219 :
220 72423 : void DataOStream::_enable()
221 : {
222 72423 : LBASSERT( !_impl->enabled );
223 72423 : LBASSERT( _impl->save || !_impl->connections.empty( ));
224 72423 : _impl->state = STATE_UNCOMPRESSED;
225 72423 : _impl->bufferStart = 0;
226 72423 : _impl->dataSent = false;
227 72423 : _impl->dataSize = 0;
228 72423 : _impl->enabled = true;
229 72423 : _impl->buffer.setSize( 0 );
230 : #ifdef CO_AGGRESSIVE_CACHING
231 : _impl->buffer.reserve( COMMAND_ALLOCSIZE );
232 : #else
233 72421 : _impl->buffer.reserve( COMMAND_MINSIZE );
234 : #endif
235 72425 : }
236 :
237 236 : void DataOStream::_setupConnections( const Nodes& receivers )
238 : {
239 236 : _impl->connections = gatherConnections( receivers );
240 236 : }
241 :
242 72146 : void DataOStream::_setupConnections( const Connections& connections )
243 : {
244 72146 : _impl->connections = connections;
245 72146 : }
246 :
247 38 : void DataOStream::_setupConnection( NodePtr node, const bool useMulticast )
248 : {
249 38 : LBASSERT( _impl->connections.empty( ));
250 38 : _impl->connections.push_back( node->getConnection( useMulticast ));
251 38 : }
252 :
253 1 : void DataOStream::_setupConnection( ConnectionPtr connection )
254 : {
255 1 : _impl->connections.push_back( connection );
256 1 : }
257 :
258 33 : void DataOStream::_resend()
259 : {
260 33 : LBASSERT( !_impl->enabled );
261 33 : LBASSERT( !_impl->connections.empty( ));
262 33 : LBASSERT( _impl->save );
263 :
264 33 : _impl->compress( _impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE );
265 33 : sendData( _impl->buffer.getData(), _impl->dataSize, true );
266 33 : }
267 :
268 33 : void DataOStream::_clearConnections()
269 : {
270 33 : _impl->connections.clear();
271 33 : }
272 :
273 72263 : void DataOStream::disable()
274 : {
275 72263 : if( !_impl->enabled )
276 72263 : return;
277 :
278 72263 : _impl->dataSize = _impl->buffer.getSize();
279 72263 : _impl->dataSent = _impl->dataSize > 0;
280 :
281 72263 : if( _impl->dataSent && !_impl->connections.empty( ))
282 : {
283 72098 : void* ptr = _impl->buffer.getData() + _impl->bufferStart;
284 72098 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
285 :
286 72098 : if( size == 0 && _impl->state == STATE_PARTIAL )
287 : {
288 : // OPT: all data has been sent in one compressed chunk
289 0 : _impl->state = STATE_COMPLETE;
290 : #ifndef CO_AGGRESSIVE_CACHING
291 0 : _impl->buffer.clear();
292 : #endif
293 : }
294 : else
295 : {
296 72098 : _impl->state = STATE_UNCOMPRESSED;
297 72098 : const CompressorState state = _impl->bufferStart == 0 ?
298 72098 : STATE_COMPLETE : STATE_PARTIAL;
299 72098 : _impl->compress( ptr, size, state );
300 : }
301 :
302 72106 : sendData( ptr, size, true ); // always send to finalize istream
303 : }
304 :
305 : #ifndef CO_AGGRESSIVE_CACHING
306 72263 : if( !_impl->save )
307 121 : _impl->buffer.clear();
308 : #endif
309 72263 : _impl->enabled = false;
310 72263 : _impl->connections.clear();
311 : }
312 :
313 72305 : void DataOStream::enableSave()
314 : {
315 72305 : LBASSERTINFO( !_impl->enabled ||
316 : ( !_impl->dataSent && _impl->buffer.getSize() == 0 ),
317 : "Can't enable saving after data has been written" );
318 72305 : _impl->save = true;
319 72305 : }
320 :
321 0 : void DataOStream::disableSave()
322 : {
323 0 : LBASSERTINFO( !_impl->enabled ||
324 : (!_impl->dataSent && _impl->buffer.getSize() == 0 ),
325 : "Can't disable saving after data has been written" );
326 0 : _impl->save = false;
327 0 : }
328 :
329 224 : bool DataOStream::hasSentData() const
330 : {
331 224 : return _impl->dataSent;
332 : }
333 :
334 314141 : void DataOStream::_write( const void* data, uint64_t size )
335 : {
336 314141 : LBASSERT( _impl->enabled );
337 : #ifdef CO_INSTRUMENT_DATAOSTREAM
338 : nBytes += size;
339 : if( compressionTime > 100000 )
340 : LBWARN << *this << std::endl;
341 : #endif
342 :
343 628237 : if( _impl->buffer.getSize() - _impl->bufferStart >
344 314108 : Global::getObjectBufferSize( ))
345 : {
346 9 : flush( false );
347 : }
348 314112 : _impl->buffer.append( static_cast< const uint8_t* >( data ), size );
349 314093 : }
350 :
351 174 : void DataOStream::flush( const bool last )
352 : {
353 174 : LBASSERT( _impl->enabled );
354 174 : if( !_impl->connections.empty( ))
355 : {
356 172 : void* ptr = _impl->buffer.getData() + _impl->bufferStart;
357 172 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
358 :
359 172 : _impl->state = STATE_UNCOMPRESSED;
360 172 : _impl->compress( ptr, size, STATE_PARTIAL );
361 172 : sendData( ptr, size, last );
362 : }
363 174 : _impl->dataSent = true;
364 174 : _resetBuffer();
365 174 : }
366 :
367 390 : void DataOStream::reset()
368 : {
369 390 : _resetBuffer();
370 390 : _impl->enabled = false;
371 390 : _impl->connections.clear();
372 390 : }
373 :
374 72793 : const Connections& DataOStream::getConnections() const
375 : {
376 72793 : return _impl->connections;
377 : }
378 :
379 564 : void DataOStream::_resetBuffer()
380 : {
381 564 : _impl->state = STATE_UNCOMPRESSED;
382 564 : if( _impl->save )
383 332 : _impl->bufferStart = _impl->buffer.getSize();
384 : else
385 : {
386 232 : _impl->bufferStart = 0;
387 232 : _impl->buffer.setSize( 0 );
388 : }
389 564 : }
390 :
391 0 : uint64_t DataOStream::_getCompressedData( void** chunks, uint64_t* chunkSizes )
392 : const
393 : {
394 0 : LBASSERT( _impl->state != STATE_UNCOMPRESSED &&
395 : _impl->state != STATE_UNCOMPRESSIBLE );
396 :
397 0 : const pression::CompressorResult &result = _impl->compressor.getResult();
398 0 : LBASSERT( !result.chunks.empty() );
399 0 : size_t totalDataSize = 0;
400 0 : for( size_t i = 0; i != result.chunks.size(); ++i )
401 : {
402 0 : chunks[i] = result.chunks[i].data;
403 0 : const size_t dataSize = result.chunks[i].getNumBytes();
404 0 : chunkSizes[i] = dataSize;
405 0 : totalDataSize += dataSize;
406 : }
407 :
408 0 : return totalDataSize;
409 : }
410 :
411 288753 : lunchbox::Bufferb& DataOStream::getBuffer()
412 : {
413 288753 : return _impl->buffer;
414 : }
415 :
416 165 : DataOStream& DataOStream::streamDataHeader( DataOStream& os )
417 : {
418 165 : os << _impl->getCompressor() << _impl->getNumChunks();
419 165 : return os;
420 : }
421 :
422 165 : void DataOStream::sendBody( ConnectionPtr connection, const void* data,
423 : const uint64_t size )
424 : {
425 : #ifdef EQ_INSTRUMENT_DATAOSTREAM
426 : nBytesSent += size;
427 : #endif
428 :
429 165 : const uint32_t compressor = _impl->getCompressor();
430 165 : if( compressor == EQ_COMPRESSOR_NONE )
431 : {
432 165 : if( size > 0 )
433 165 : LBCHECK( connection->send( data, size, true ));
434 330 : return;
435 : }
436 :
437 : #ifdef CO_INSTRUMENT_DATAOSTREAM
438 : nBytesSent += _impl->buffer.getSize();
439 : #endif
440 0 : const size_t nChunks = _impl->compressor.getResult().chunks.size();
441 : uint64_t* chunkSizes = static_cast< uint64_t* >
442 0 : ( alloca (nChunks * sizeof( uint64_t )));
443 : void** chunks = static_cast< void ** >
444 0 : ( alloca( nChunks * sizeof( void* )));
445 :
446 : #ifdef CO_INSTRUMENT_DATAOSTREAM
447 : const uint64_t compressedSize = _getCompressedData( chunks, chunkSizes );
448 : nBytesSaved += size - compressedSize;
449 : #else
450 0 : _getCompressedData( chunks, chunkSizes );
451 : #endif
452 :
453 0 : for( size_t j = 0; j < nChunks; ++j )
454 : {
455 0 : LBCHECK( connection->send( &chunkSizes[j], sizeof( uint64_t ), true ));
456 0 : LBCHECK( connection->send( chunks[j], chunkSizes[j], true ));
457 : }
458 : }
459 :
460 165 : uint64_t DataOStream::getCompressedDataSize() const
461 : {
462 165 : if( _impl->getCompressor() == EQ_COMPRESSOR_NONE )
463 165 : return 0;
464 : return _impl->compressedDataSize
465 0 : + _impl->getNumChunks() * sizeof( uint64_t );
466 : }
467 :
468 0 : std::ostream& operator << ( std::ostream& os, const DataOStream& dataOStream )
469 : {
470 0 : os << "DataOStream "
471 : #ifdef CO_INSTRUMENT_DATAOSTREAM
472 : << "compressed " << nBytesIn << " -> " << nBytesOut << " of " << nBytes
473 : << " in " << compressionTime/1000 << "ms, saved " << nBytesSaved
474 : << " of " << nBytesSent << " brutto sent";
475 :
476 : nBytes = 0;
477 : nBytesIn = 0;
478 : nBytesOut = 0;
479 : nBytesSaved = 0;
480 : nBytesSent = 0;
481 : compressionTime = 0;
482 : #else
483 0 : << "@" << (void*)&dataOStream;
484 : #endif
485 0 : return os;
486 : }
487 :
488 66 : }
|