Line data Source code
1 :
2 : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "dataOStream.h"
23 :
24 : #include "buffer.h"
25 : #include "connectionDescription.h"
26 : #include "commands.h"
27 : #include "connections.h"
28 : #include "global.h"
29 : #include "log.h"
30 : #include "node.h"
31 : #include "types.h"
32 :
33 : #include <lunchbox/compressor.h>
34 : #include <lunchbox/compressorResult.h>
35 : #include <lunchbox/plugins/compressor.h>
36 :
37 : #include <boost/foreach.hpp>
38 :
39 : namespace co
40 : {
41 : namespace
42 : {
43 : //#define CO_INSTRUMENT_DATAOSTREAM
44 : #ifdef CO_INSTRUMENT_DATAOSTREAM
45 : lunchbox::a_int32_t nBytes;
46 : lunchbox::a_int32_t nBytesIn;
47 : lunchbox::a_int32_t nBytesOut;
48 : CO_API lunchbox::a_int32_t nBytesSaved;
49 : CO_API lunchbox::a_int32_t nBytesSent;
50 : lunchbox::a_int32_t compressionTime;
51 : #endif
52 :
53 : enum CompressorState
54 : {
55 : STATE_UNCOMPRESSED,
56 : STATE_PARTIAL,
57 : STATE_COMPLETE,
58 : STATE_UNCOMPRESSIBLE
59 : };
60 : }
61 :
62 : namespace detail
63 : {
64 72343 : class DataOStream
65 : {
66 : public:
67 : CompressorState state;
68 :
69 : /** The buffer used for saving and buffering */
70 : lunchbox::Bufferb buffer;
71 :
72 : /** The start position of the buffering, always 0 if !_save */
73 : uint64_t bufferStart;
74 :
75 : /** The uncompressed size of a completely compressed buffer. */
76 : uint64_t dataSize;
77 :
78 : /** The compressed size, 0 for uncompressed or uncompressable data. */
79 : uint64_t compressedDataSize;
80 :
81 : /** Locked connections to the receivers, if _enabled */
82 : Connections connections;
83 :
84 : /** The compressor instance. */
85 : lunchbox::Compressor compressor;
86 :
87 : /** The output stream is enabled for writing */
88 : bool enabled;
89 :
90 : /** Some data has been sent since it was enabled */
91 : bool dataSent;
92 :
93 : /** Save all sent data */
94 : bool save;
95 :
96 72343 : DataOStream()
97 : : state( STATE_UNCOMPRESSED )
98 : , bufferStart( 0 )
99 : , dataSize( 0 )
100 : , enabled( false )
101 : , dataSent( false )
102 72343 : , save( false )
103 72342 : {}
104 :
105 0 : DataOStream( const DataOStream& rhs )
106 : : state( rhs.state )
107 : , bufferStart( rhs.bufferStart )
108 : , dataSize( rhs.dataSize )
109 : , enabled( rhs.enabled )
110 : , dataSent( rhs.dataSent )
111 0 : , save( rhs.save )
112 0 : {}
113 :
114 495 : uint32_t getCompressor() const
115 : {
116 495 : if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
117 495 : return EQ_COMPRESSOR_NONE;
118 0 : return compressor.getInfo().name;
119 : }
120 :
121 165 : uint32_t getNumChunks() const
122 : {
123 165 : if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
124 165 : return 1;
125 0 : return uint32_t( compressor.getResult().chunks.size( ));
126 : }
127 :
128 :
129 : /** Compress data and update the compressor state. */
130 72291 : void compress( void* src, const uint64_t size, const CompressorState result)
131 : {
132 72291 : if( state == result || state == STATE_UNCOMPRESSIBLE )
133 72291 : return;
134 : #ifdef CO_INSTRUMENT_DATAOSTREAM
135 : nBytesIn += size;
136 : #endif
137 : const uint64_t threshold =
138 72291 : uint64_t( Global::getIAttribute( Global::IATTR_OBJECT_COMPRESSION ));
139 :
140 72291 : if( !compressor.isGood() || size <= threshold )
141 : {
142 72291 : state = STATE_UNCOMPRESSED;
143 72291 : return;
144 : }
145 :
146 0 : const uint64_t inDims[2] = { 0, size };
147 :
148 : #ifdef CO_INSTRUMENT_DATAOSTREAM
149 : lunchbox::Clock clock;
150 : #endif
151 0 : compressor.compress( src, inDims );
152 : #ifdef CO_INSTRUMENT_DATAOSTREAM
153 : compressionTime += uint32_t( clock.getTimef() * 1000.f );
154 : #endif
155 :
156 : const lunchbox::CompressorResult &compressorResult =
157 0 : compressor.getResult();
158 0 : LBASSERT( !compressorResult.chunks.empty() );
159 0 : compressedDataSize = compressorResult.getSize();
160 :
161 : #ifdef CO_INSTRUMENT_DATAOSTREAM
162 : nBytesOut += compressedDataSize;
163 : #endif
164 :
165 0 : if( compressedDataSize >= size )
166 : {
167 0 : state = STATE_UNCOMPRESSIBLE;
168 : #ifndef CO_AGGRESSIVE_CACHING
169 0 : compressor.realloc();
170 :
171 0 : if( result == STATE_COMPLETE )
172 0 : buffer.pack();
173 : #endif
174 0 : return;
175 : }
176 :
177 0 : state = result;
178 : #ifndef CO_AGGRESSIVE_CACHING
179 0 : if( result == STATE_COMPLETE )
180 : {
181 0 : LBASSERT( buffer.getSize() == dataSize );
182 0 : buffer.clear();
183 0 : }
184 : #endif
185 : }
186 : };
187 : }
188 :
189 72343 : DataOStream::DataOStream()
190 72343 : : _impl( new detail::DataOStream )
191 72343 : {}
192 :
193 0 : DataOStream::DataOStream( DataOStream& rhs )
194 : : boost::noncopyable()
195 0 : , _impl( new detail::DataOStream( *rhs._impl ))
196 : {
197 0 : _setupConnections( rhs.getConnections( ));
198 0 : getBuffer().swap( rhs.getBuffer( ));
199 :
200 : // disable send of rhs
201 0 : rhs._setupConnections( Connections( ));
202 0 : rhs.disable();
203 0 : }
204 :
205 72343 : DataOStream::~DataOStream()
206 : {
207 : // Can't call disable() from destructor since it uses virtual functions
208 72343 : LBASSERT( !_impl->enabled );
209 72343 : delete _impl;
210 72343 : }
211 :
212 178 : void DataOStream::_initCompressor( const uint32_t name )
213 : {
214 178 : LBCHECK( _impl->compressor.setup( Global::getPluginRegistry(), name ));
215 178 : LB_TS_RESET( _impl->compressor._thread );
216 178 : }
217 :
218 72410 : void DataOStream::_enable()
219 : {
220 72410 : LBASSERT( !_impl->enabled );
221 72410 : LBASSERT( _impl->save || !_impl->connections.empty( ));
222 72410 : _impl->state = STATE_UNCOMPRESSED;
223 72410 : _impl->bufferStart = 0;
224 72410 : _impl->dataSent = false;
225 72410 : _impl->dataSize = 0;
226 72410 : _impl->enabled = true;
227 72410 : _impl->buffer.setSize( 0 );
228 : #ifdef CO_AGGRESSIVE_CACHING
229 : _impl->buffer.reserve( COMMAND_ALLOCSIZE );
230 : #else
231 72410 : _impl->buffer.reserve( COMMAND_MINSIZE );
232 : #endif
233 72409 : }
234 :
235 239 : void DataOStream::_setupConnections( const Nodes& receivers )
236 : {
237 239 : _impl->connections = gatherConnections( receivers );
238 239 : }
239 :
240 72127 : void DataOStream::_setupConnections( const Connections& connections )
241 : {
242 72127 : _impl->connections = connections;
243 72127 : }
244 :
245 37 : void DataOStream::_setupConnection( NodePtr node, const bool useMulticast )
246 : {
247 37 : LBASSERT( _impl->connections.empty( ));
248 37 : _impl->connections.push_back( node->getConnection( useMulticast ));
249 37 : }
250 :
251 1 : void DataOStream::_setupConnection( ConnectionPtr connection )
252 : {
253 1 : _impl->connections.push_back( connection );
254 1 : }
255 :
256 31 : void DataOStream::_resend()
257 : {
258 31 : LBASSERT( !_impl->enabled );
259 31 : LBASSERT( !_impl->connections.empty( ));
260 31 : LBASSERT( _impl->save );
261 :
262 31 : _impl->compress( _impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE );
263 31 : sendData( _impl->buffer.getData(), _impl->dataSize, true );
264 31 : }
265 :
266 31 : void DataOStream::_clearConnections()
267 : {
268 31 : _impl->connections.clear();
269 31 : }
270 :
271 72245 : void DataOStream::disable()
272 : {
273 72245 : if( !_impl->enabled )
274 72245 : return;
275 :
276 72245 : _impl->dataSize = _impl->buffer.getSize();
277 72245 : _impl->dataSent = _impl->dataSize > 0;
278 :
279 72245 : if( _impl->dataSent && !_impl->connections.empty( ))
280 : {
281 72088 : void* ptr = _impl->buffer.getData() + _impl->bufferStart;
282 72088 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
283 :
284 72088 : if( size == 0 && _impl->state == STATE_PARTIAL )
285 : {
286 : // OPT: all data has been sent in one compressed chunk
287 0 : _impl->state = STATE_COMPLETE;
288 : #ifndef CO_AGGRESSIVE_CACHING
289 0 : _impl->buffer.clear();
290 : #endif
291 : }
292 : else
293 : {
294 72088 : _impl->state = STATE_UNCOMPRESSED;
295 72088 : const CompressorState state = _impl->bufferStart == 0 ?
296 72088 : STATE_COMPLETE : STATE_PARTIAL;
297 72088 : _impl->compress( ptr, size, state );
298 : }
299 :
300 72088 : sendData( ptr, size, true ); // always send to finalize istream
301 : }
302 :
303 : #ifndef CO_AGGRESSIVE_CACHING
304 72245 : if( !_impl->save )
305 124 : _impl->buffer.clear();
306 : #endif
307 72245 : _impl->enabled = false;
308 72245 : _impl->connections.clear();
309 : }
310 :
311 72286 : void DataOStream::enableSave()
312 : {
313 72286 : LBASSERTINFO( !_impl->enabled ||
314 : ( !_impl->dataSent && _impl->buffer.getSize() == 0 ),
315 : "Can't enable saving after data has been written" );
316 72286 : _impl->save = true;
317 72286 : }
318 :
319 0 : void DataOStream::disableSave()
320 : {
321 0 : LBASSERTINFO( !_impl->enabled ||
322 : (!_impl->dataSent && _impl->buffer.getSize() == 0 ),
323 : "Can't disable saving after data has been written" );
324 0 : _impl->save = false;
325 0 : }
326 :
327 229 : bool DataOStream::hasSentData() const
328 : {
329 229 : return _impl->dataSent;
330 : }
331 :
332 314301 : void DataOStream::_write( const void* data, uint64_t size )
333 : {
334 314301 : LBASSERT( _impl->enabled );
335 : #ifdef CO_INSTRUMENT_DATAOSTREAM
336 : nBytes += size;
337 : if( compressionTime > 100000 )
338 : LBWARN << *this << std::endl;
339 : #endif
340 :
341 628597 : if( _impl->buffer.getSize() - _impl->bufferStart >
342 314299 : Global::getObjectBufferSize( ))
343 : {
344 9 : flush( false );
345 : }
346 314296 : _impl->buffer.append( static_cast< const uint8_t* >( data ), size );
347 314293 : }
348 :
349 174 : void DataOStream::flush( const bool last )
350 : {
351 174 : LBASSERT( _impl->enabled );
352 174 : if( !_impl->connections.empty( ))
353 : {
354 172 : void* ptr = _impl->buffer.getData() + _impl->bufferStart;
355 172 : const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
356 :
357 172 : _impl->state = STATE_UNCOMPRESSED;
358 172 : _impl->compress( ptr, size, STATE_PARTIAL );
359 172 : sendData( ptr, size, last );
360 : }
361 174 : _impl->dataSent = true;
362 174 : _resetBuffer();
363 174 : }
364 :
365 394 : void DataOStream::reset()
366 : {
367 394 : _resetBuffer();
368 394 : _impl->enabled = false;
369 394 : _impl->connections.clear();
370 394 : }
371 :
372 72790 : const Connections& DataOStream::getConnections() const
373 : {
374 72790 : return _impl->connections;
375 : }
376 :
377 568 : void DataOStream::_resetBuffer()
378 : {
379 568 : _impl->state = STATE_UNCOMPRESSED;
380 568 : if( _impl->save )
381 332 : _impl->bufferStart = _impl->buffer.getSize();
382 : else
383 : {
384 236 : _impl->bufferStart = 0;
385 236 : _impl->buffer.setSize( 0 );
386 : }
387 568 : }
388 :
389 0 : uint64_t DataOStream::_getCompressedData( void** chunks, uint64_t* chunkSizes )
390 : const
391 : {
392 0 : LBASSERT( _impl->state != STATE_UNCOMPRESSED &&
393 : _impl->state != STATE_UNCOMPRESSIBLE );
394 :
395 0 : const lunchbox::CompressorResult &result = _impl->compressor.getResult();
396 0 : LBASSERT( !result.chunks.empty() );
397 0 : size_t totalDataSize = 0;
398 0 : for( size_t i = 0; i != result.chunks.size(); ++i )
399 : {
400 0 : chunks[i] = result.chunks[i].data;
401 0 : const size_t dataSize = result.chunks[i].getNumBytes();
402 0 : chunkSizes[i] = dataSize;
403 0 : totalDataSize += dataSize;
404 : }
405 :
406 0 : return totalDataSize;
407 : }
408 :
409 288740 : lunchbox::Bufferb& DataOStream::getBuffer()
410 : {
411 288740 : return _impl->buffer;
412 : }
413 :
414 165 : DataOStream& DataOStream::streamDataHeader( DataOStream& os )
415 : {
416 165 : os << _impl->getCompressor() << _impl->getNumChunks();
417 165 : return os;
418 : }
419 :
420 165 : void DataOStream::sendBody( ConnectionPtr connection, const void* data,
421 : const uint64_t size )
422 : {
423 : #ifdef EQ_INSTRUMENT_DATAOSTREAM
424 : nBytesSent += size;
425 : #endif
426 :
427 165 : const uint32_t compressor = _impl->getCompressor();
428 165 : if( compressor == EQ_COMPRESSOR_NONE )
429 : {
430 165 : if( size > 0 )
431 165 : LBCHECK( connection->send( data, size, true ));
432 330 : return;
433 : }
434 :
435 : #ifdef CO_INSTRUMENT_DATAOSTREAM
436 : nBytesSent += _impl->buffer.getSize();
437 : #endif
438 0 : const size_t nChunks = _impl->compressor.getResult().chunks.size();
439 : uint64_t* chunkSizes = static_cast< uint64_t* >
440 0 : ( alloca (nChunks * sizeof( uint64_t )));
441 : void** chunks = static_cast< void ** >
442 0 : ( alloca( nChunks * sizeof( void* )));
443 :
444 : #ifdef CO_INSTRUMENT_DATAOSTREAM
445 : const uint64_t compressedSize = _getCompressedData( chunks, chunkSizes );
446 : nBytesSaved += size - compressedSize;
447 : #else
448 0 : _getCompressedData( chunks, chunkSizes );
449 : #endif
450 :
451 0 : for( size_t j = 0; j < nChunks; ++j )
452 : {
453 0 : LBCHECK( connection->send( &chunkSizes[j], sizeof( uint64_t ), true ));
454 0 : LBCHECK( connection->send( chunks[j], chunkSizes[j], true ));
455 : }
456 : }
457 :
458 165 : uint64_t DataOStream::getCompressedDataSize() const
459 : {
460 165 : if( _impl->getCompressor() == EQ_COMPRESSOR_NONE )
461 165 : return 0;
462 : return _impl->compressedDataSize
463 0 : + _impl->getNumChunks() * sizeof( uint64_t );
464 : }
465 :
466 0 : std::ostream& operator << ( std::ostream& os, const DataOStream& dataOStream )
467 : {
468 0 : os << "DataOStream "
469 : #ifdef CO_INSTRUMENT_DATAOSTREAM
470 : << "compressed " << nBytesIn << " -> " << nBytesOut << " of " << nBytes
471 : << " in " << compressionTime/1000 << "ms, saved " << nBytesSaved
472 : << " of " << nBytesSent << " brutto sent";
473 :
474 : nBytes = 0;
475 : nBytesIn = 0;
476 : nBytesOut = 0;
477 : nBytesSaved = 0;
478 : nBytesSent = 0;
479 : compressionTime = 0;
480 : #else
481 0 : << "@" << (void*)&dataOStream;
482 : #endif
483 0 : return os;
484 : }
485 :
486 60 : }
|