LCOV - code coverage report
Current view: top level - co - dataOStream.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 209 239 87.4 %
Date: 2016-12-14 01:26:48 Functions: 33 37 89.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "dataOStream.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "connectionDescription.h"
      26             : #include "commands.h"
      27             : #include "connections.h"
      28             : #include "global.h"
      29             : #include "log.h"
      30             : #include "node.h"
      31             : #include "types.h"
      32             : 
      33             : #include <lunchbox/clock.h>
      34             : #include <pression/data/Compressor.h>
      35             : #include <pression/data/CompressorInfo.h>
      36             : 
      37             : namespace co
      38             : {
      39             : namespace
      40             : {
      41             : #define CO_INSTRUMENT_DATAOSTREAM
      42             : #ifdef CO_INSTRUMENT_DATAOSTREAM
      43          22 : lunchbox::a_ssize_t nBytes;
      44          22 : lunchbox::a_ssize_t nBytesIn;
      45          22 : lunchbox::a_ssize_t nBytesOut;
      46          22 : lunchbox::a_ssize_t nBytesSaved;
      47          22 : lunchbox::a_ssize_t nBytesSent;
      48          22 : lunchbox::a_ssize_t compressionTime;
      49          22 : lunchbox::a_ssize_t compressionRuns;
      50             : #endif
      51             : 
      52             : enum CompressorState
      53             : {
      54             :     STATE_UNCOMPRESSED,
      55             :     STATE_PARTIAL,
      56             :     STATE_COMPLETE,
      57             :     STATE_UNCOMPRESSIBLE
      58             : };
      59             : }
      60             : 
      61             : namespace detail
      62             : {
      63       72365 : class DataOStream
      64             : {
      65             : public:
      66             :     /** The buffer used for saving and buffering */
      67             :     lunchbox::Bufferb buffer;
      68             : 
      69             :     /** The start position of the buffering, always 0 if !_save */
      70             :     uint64_t bufferStart;
      71             : 
      72             :     /** The uncompressed size of a completely compressed buffer. */
      73             :     uint64_t dataSize;
      74             : 
      75             :     /** The compressed size, 0 for uncompressed or uncompressable data. */
      76             :     uint64_t compressedDataSize;
      77             : 
      78             :     /** Locked connections to the receivers, if _enabled */
      79             :     Connections connections;
      80             : 
      81             :     CompressorState state; //!< State of compression
      82             :     CompressorPtr compressor; //!< The compressor instance.
      83             :     CompressorInfo compressorInfo; //!< Information on the compr.
      84             : 
      85             :     /** The output stream is enabled for writing */
      86             :     bool enabled;
      87             : 
      88             :     /** Some data has been sent since it was enabled */
      89             :     bool dataSent;
      90             : 
      91             :     /** Save all sent data */
      92             :     bool save;
      93             : 
      94       72363 :     DataOStream()
      95       72363 :         : bufferStart( 0 )
      96             :         , dataSize( 0 )
      97             :         , compressedDataSize( 0 )
      98             :         , state( STATE_UNCOMPRESSED )
      99             :         , enabled( false )
     100             :         , dataSent( false )
     101       72363 :         , save( false )
     102       72360 :     {}
     103             : 
     104           0 :     DataOStream( const DataOStream& rhs )
     105           0 :         : bufferStart( rhs.bufferStart )
     106           0 :         , dataSize( rhs.dataSize )
     107           0 :         , compressedDataSize( rhs.compressedDataSize )
     108           0 :         , state( rhs.state )
     109           0 :         , enabled( rhs.enabled )
     110           0 :         , dataSent( rhs.dataSent )
     111           0 :         , save( rhs.save )
     112           0 :     {}
     113             : 
     114         330 :     std::string getCompressorName() const
     115             :     {
     116         346 :         if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
     117          16 :             !compressor )
     118             :         {
     119         314 :             return std::string();
     120             :         }
     121          16 :         return compressorInfo.name;
     122             :     }
     123             : 
     124          10 :     bool initCompressor()
     125             :     {
     126          10 :         if( compressorInfo.name.empty( ))
     127           2 :             return false;
     128           8 :         if( compressor )
     129           0 :             return true;
     130             : 
     131           8 :         compressor.reset( compressorInfo.create( ));
     132           8 :         LBLOG( LOG_OBJECTS ) << "Allocated " << compressorInfo.name <<std::endl;
     133           8 :         return compressor != nullptr;
     134             :     }
     135             : 
     136         173 :     uint32_t getNumChunks() const
     137             :     {
     138         189 :         if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
     139          16 :             !compressor )
     140             :         {
     141         157 :             return 1;
     142             :         }
     143          16 :         return uint32_t(compressor->getCompressedData().size( ));
     144             :     }
     145             : 
     146             :     /** Compress data and update the compressor state. */
     147       72306 :     void compress( const uint8_t* src, const uint64_t size,
     148             :                    const CompressorState result )
     149             :     {
     150       72306 :         if( state == result || state == STATE_UNCOMPRESSIBLE )
     151       72297 :             return;
     152             :         const uint64_t threshold =
     153       72306 :            uint64_t( Global::getIAttribute( Global::IATTR_OBJECT_COMPRESSION ));
     154             : 
     155       72305 :         if( size <= threshold || !initCompressor( ))
     156             :         {
     157       72297 :             state = STATE_UNCOMPRESSED;
     158       72297 :             return;
     159             :         }
     160             : 
     161             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     162          16 :         lunchbox::Clock clock;
     163             : #endif
     164           8 :         const auto& output = compressor->compress( src, size );
     165           8 :         LBASSERT( !output.empty( ));
     166           8 :         compressedDataSize = pression::data::getDataSize( output );
     167             : 
     168             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     169           8 :         compressionTime += size_t( clock.getTimef() * 1000.f );
     170           8 :         nBytesIn += size;
     171           8 :         nBytesOut += compressedDataSize;
     172           8 :         ++compressionRuns;
     173             : #endif
     174             : 
     175           8 :         if( compressedDataSize >= size )
     176             :         {
     177           0 :             state = STATE_UNCOMPRESSIBLE;
     178             : #ifndef CO_AGGRESSIVE_CACHING
     179           0 :             compressor.reset( compressorInfo.create( ));
     180             : 
     181           0 :             if( result == STATE_COMPLETE )
     182           0 :                 buffer.pack();
     183             : #endif
     184           0 :             return;
     185             :         }
     186             : 
     187           8 :         state = result;
     188             : #ifndef CO_AGGRESSIVE_CACHING
     189           8 :         if( result == STATE_COMPLETE )
     190             :         {
     191           2 :             LBASSERT( buffer.getSize() == dataSize );
     192           2 :             buffer.clear();
     193             :         }
     194             : #endif
     195             :     }
     196             : };
     197             : }
     198             : 
     199       72365 : DataOStream::DataOStream()
     200       72365 :     : _impl( new detail::DataOStream )
     201       72360 : {}
     202             : 
     203           0 : DataOStream::DataOStream( DataOStream& rhs )
     204             :     : boost::noncopyable()
     205           0 :     , _impl( new detail::DataOStream( *rhs._impl ))
     206             : {
     207           0 :     _setupConnections( rhs.getConnections( ));
     208           0 :     getBuffer().swap( rhs.getBuffer( ));
     209             : 
     210             :     // disable send of rhs
     211           0 :     rhs._setupConnections( Connections( ));
     212           0 :     rhs.disable();
     213           0 : }
     214             : 
     215      144729 : DataOStream::~DataOStream()
     216             : {
     217             :     // Can't call disable() from destructor since it uses virtual functions
     218       72365 :     LBASSERT( !_impl->enabled );
     219       72365 :     delete _impl;
     220       72364 : }
     221             : 
     222         178 : void DataOStream::_setCompressor( const CompressorInfo& info )
     223             : {
     224         178 :     if( info == _impl->compressorInfo )
     225           0 :         return;
     226         178 :     _impl->compressorInfo = info;
     227         178 :     _impl->compressor.reset( nullptr );
     228             : }
     229             : 
     230       72425 : void DataOStream::_enable()
     231             : {
     232       72425 :     LBASSERT( !_impl->enabled );
     233       72423 :     LBASSERT( _impl->save || !_impl->connections.empty( ));
     234       72424 :     _impl->state = STATE_UNCOMPRESSED;
     235       72424 :     _impl->bufferStart = 0;
     236       72424 :     _impl->dataSent    = false;
     237       72424 :     _impl->dataSize    = 0;
     238       72424 :     _impl->enabled     = true;
     239       72424 :     _impl->buffer.setSize( 0 );
     240             : #ifdef CO_AGGRESSIVE_CACHING
     241             :     _impl->buffer.reserve( COMMAND_ALLOCSIZE );
     242             : #else
     243       72420 :     _impl->buffer.reserve( COMMAND_MINSIZE );
     244             : #endif
     245       72426 : }
     246             : 
     247         236 : void DataOStream::_setupConnections( const Nodes& receivers )
     248             : {
     249         236 :     _impl->connections = gatherConnections( receivers );
     250         236 : }
     251             : 
     252       72147 : void DataOStream::_setupConnections( const Connections& connections )
     253             : {
     254       72147 :     _impl->connections = connections;
     255       72146 : }
     256             : 
     257          38 : void DataOStream::_setupConnection( NodePtr node, const bool useMulticast )
     258             : {
     259          38 :     LBASSERT( _impl->connections.empty( ));
     260          38 :     _impl->connections.push_back( node->getConnection( useMulticast ));
     261          38 : }
     262             : 
     263           1 : void DataOStream::_setupConnection( ConnectionPtr connection )
     264             : {
     265           1 :     _impl->connections.push_back( connection );
     266           1 : }
     267             : 
     268          33 : void DataOStream::_resend()
     269             : {
     270          33 :     LBASSERT( !_impl->enabled );
     271          33 :     LBASSERT( !_impl->connections.empty( ));
     272          33 :     LBASSERT( _impl->save );
     273             : 
     274          33 :     _impl->compress( _impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE );
     275          33 :     sendData( _impl->buffer.getData(), _impl->dataSize, true );
     276          33 : }
     277             : 
     278          33 : void DataOStream::_clearConnections()
     279             : {
     280          33 :     _impl->connections.clear();
     281          33 : }
     282             : 
     283       72263 : void DataOStream::disable()
     284             : {
     285       72263 :     if( !_impl->enabled )
     286           0 :         return;
     287             : 
     288       72263 :     _impl->dataSize = _impl->buffer.getSize();
     289       72262 :     _impl->dataSent = _impl->dataSize > 0;
     290             : 
     291       72262 :     if( _impl->dataSent && !_impl->connections.empty( ))
     292             :     {
     293       72099 :         const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
     294       72099 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     295             : 
     296       72099 :         if( size == 0 && _impl->state == STATE_PARTIAL )
     297             :         {
     298             :             // OPT: all data has been sent in one compressed chunk
     299           0 :             _impl->state = STATE_COMPLETE;
     300             : #ifndef CO_AGGRESSIVE_CACHING
     301           0 :             _impl->buffer.clear();
     302             : #endif
     303             :         }
     304             :         else
     305             :         {
     306       72099 :             _impl->state = STATE_UNCOMPRESSED;
     307       72099 :             const CompressorState state = _impl->bufferStart == 0 ?
     308       72099 :                                               STATE_COMPLETE : STATE_PARTIAL;
     309       72099 :             _impl->compress( ptr, size, state );
     310             :         }
     311             : 
     312       72100 :         sendData( ptr, size, true ); // always send to finalize istream
     313             :     }
     314             : 
     315             : #ifndef CO_AGGRESSIVE_CACHING
     316       72262 :     if( !_impl->save )
     317         121 :         _impl->buffer.clear();
     318             : #endif
     319       72262 :     _impl->enabled = false;
     320       72262 :     _impl->connections.clear();
     321             : }
     322             : 
     323       72305 : void DataOStream::enableSave()
     324             : {
     325       72305 :     LBASSERTINFO( !_impl->enabled ||
     326             :                   ( !_impl->dataSent && _impl->buffer.getSize() == 0 ),
     327             :                   "Can't enable saving after data has been written" );
     328       72305 :     _impl->save = true;
     329       72305 : }
     330             : 
     331           0 : void DataOStream::disableSave()
     332             : {
     333           0 :     LBASSERTINFO( !_impl->enabled ||
     334             :                   (!_impl->dataSent && _impl->buffer.getSize() == 0 ),
     335             :                   "Can't disable saving after data has been written" );
     336           0 :     _impl->save = false;
     337           0 : }
     338             : 
     339         224 : bool DataOStream::hasSentData() const
     340             : {
     341         224 :     return _impl->dataSent;
     342             : }
     343             : 
     344      314293 : void DataOStream::_write( const void* data, uint64_t size )
     345             : {
     346      314293 :     LBASSERT( _impl->enabled );
     347             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     348      314258 :     nBytes += size;
     349             : #endif
     350             : 
     351      628847 :     if( _impl->buffer.getSize() - _impl->bufferStart >
     352      314424 :         Global::getObjectBufferSize( ))
     353             :     {
     354           9 :         flush( false );
     355             :     }
     356      314412 :     _impl->buffer.append( static_cast< const uint8_t* >( data ), size );
     357      314334 : }
     358             : 
     359         174 : void DataOStream::flush( const bool last )
     360             : {
     361         174 :     LBASSERT( _impl->enabled );
     362         174 :     if( !_impl->connections.empty( ))
     363             :     {
     364         172 :         const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
     365         172 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     366             : 
     367         172 :         _impl->state = STATE_UNCOMPRESSED;
     368         172 :         _impl->compress( ptr, size, STATE_PARTIAL );
     369         172 :         sendData( ptr, size, last );
     370             :     }
     371         174 :     _impl->dataSent = true;
     372         174 :     _resetBuffer();
     373         174 : }
     374             : 
     375         390 : void DataOStream::reset()
     376             : {
     377         390 :     _resetBuffer();
     378         390 :     _impl->enabled = false;
     379         390 :     _impl->connections.clear();
     380         390 : }
     381             : 
     382       72787 : const Connections& DataOStream::getConnections() const
     383             : {
     384       72787 :     return _impl->connections;
     385             : }
     386             : 
     387         564 : void DataOStream::_resetBuffer()
     388             : {
     389         564 :     _impl->state = STATE_UNCOMPRESSED;
     390         564 :     if( _impl->save )
     391         332 :         _impl->bufferStart = _impl->buffer.getSize();
     392             :     else
     393             :     {
     394         232 :         _impl->bufferStart = 0;
     395         232 :         _impl->buffer.setSize( 0 );
     396             :     }
     397         564 : }
     398             : 
     399           8 : uint64_t DataOStream::_getCompressedData( const uint8_t** chunks,
     400             :                                           uint64_t* chunkSizes )
     401             :     const
     402             : {
     403           8 :     LBASSERT( _impl->state != STATE_UNCOMPRESSED &&
     404             :               _impl->state != STATE_UNCOMPRESSIBLE && _impl->compressor );
     405             : 
     406           8 :     const auto& result = _impl->compressor->getCompressedData();
     407           8 :     LBASSERT( !result.empty() );
     408           8 :     size_t totalDataSize = 0;
     409          16 :     for( size_t i = 0; i != result.size(); ++i )
     410             :     {
     411           8 :         chunks[i] = result[i].getData();
     412           8 :         const size_t dataSize = result[i].getSize();
     413           8 :         chunkSizes[i] = dataSize;
     414           8 :         totalDataSize += dataSize;
     415             :     }
     416             : 
     417           8 :     LBASSERT( totalDataSize == pression::data::getDataSize( result ));
     418           8 :     return totalDataSize;
     419             : }
     420             : 
     421      288734 : lunchbox::Bufferb& DataOStream::getBuffer()
     422             : {
     423      288734 :     return _impl->buffer;
     424             : }
     425             : 
     426         165 : DataOStream& DataOStream::streamDataHeader( DataOStream& os )
     427             : {
     428         165 :     return os << _impl->getCompressorName() << _impl->getNumChunks();
     429             : }
     430             : 
     431         165 : void DataOStream::sendBody( ConnectionPtr connection, const void* data,
     432             :                             const uint64_t size )
     433             : {
     434             : 
     435         173 :     const auto& compressorName = _impl->getCompressorName();
     436         165 :     if( compressorName.empty( ))
     437             :     {
     438             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     439         157 :         nBytesSent += size;
     440             : #endif
     441         157 :         if( size > 0 )
     442         157 :             LBCHECK( connection->send( data, size, true ));
     443         157 :         return;
     444             :     }
     445             : 
     446           8 :     const size_t nChunks = _impl->compressor->getCompressedData().size();
     447             :     uint64_t* chunkSizes = static_cast< uint64_t* >
     448           8 :                                ( alloca (nChunks * sizeof( uint64_t )));
     449             :     const uint8_t** chunks = static_cast< const uint8_t ** >
     450           8 :                                   ( alloca( nChunks * sizeof( void* )));
     451             : 
     452             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     453           8 :     const uint64_t compressedSize = _getCompressedData( chunks, chunkSizes );
     454           8 :     if( _impl->state == STATE_COMPLETE )
     455             :     {
     456           2 :         nBytesSent += _impl->dataSize;
     457           2 :         nBytesSaved += _impl->dataSize - compressedSize;
     458             :     }
     459             :     else
     460             :     {
     461           6 :         nBytesSent += _impl->buffer.getSize() - _impl->bufferStart;
     462           6 :         nBytesSaved += _impl->buffer.getSize() - _impl->bufferStart -
     463           6 :                        compressedSize;
     464             :     }
     465             : 
     466             : #else
     467             :     _getCompressedData( chunks, chunkSizes );
     468             : #endif
     469             : 
     470          16 :     for( size_t j = 0; j < nChunks; ++j )
     471             :     {
     472           8 :         LBCHECK( connection->send( &chunkSizes[j], sizeof( uint64_t ), true ));
     473           8 :         LBCHECK( connection->send( chunks[j], chunkSizes[j], true ));
     474             :     }
     475             : }
     476             : 
     477         165 : uint64_t DataOStream::getCompressedDataSize() const
     478             : {
     479         338 :     if( _impl->state == STATE_UNCOMPRESSED ||
     480         173 :         _impl->state == STATE_UNCOMPRESSIBLE || !_impl->compressor )
     481             :     {
     482         157 :         return 0;
     483             :     }
     484           8 :     return _impl->compressedDataSize +
     485           8 :            _impl->getNumChunks() * sizeof( uint64_t );
     486             : }
     487             : 
     488          21 : std::ostream& DataOStream::printStatistics( std::ostream& os )
     489             : {
     490             :     return os << "DataOStream "
     491             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     492          21 :               << "compressed " << nBytesIn << " -> " << nBytesOut << " of "
     493          42 :               << nBytes << " @ "
     494          21 :               << int( float( nBytesIn )/1.024f/1.024f/compressionTime + .5f )
     495          63 :               << " MB/s " << compressionRuns << " runs, saved " << nBytesSaved
     496          42 :               << " of " << nBytesSent << " brutto sent ("
     497          42 :               << double( nBytesSaved ) / double( nBytesSent ) * 100. << "%)";
     498             : #else
     499             :               << "without statistics enabled";
     500             : #endif
     501             : }
     502             : 
     503          21 : void DataOStream::clearStatistics()
     504             : {
     505             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     506          21 :     nBytes = 0;
     507          21 :     nBytesIn = 0;
     508          21 :     nBytesOut = 0;
     509          21 :     nBytesSaved = 0;
     510          21 :     nBytesSent = 0;
     511          21 :     compressionTime = 0;
     512          21 :     compressionRuns = 0;
     513             : #endif
     514          21 : }
     515             : 
     516          66 : }

Generated by: LCOV version 1.11