LCOV - code coverage report
Current view: top level - co - dataOStream.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 133 190 70.0 %
Date: 2015-11-03 13:48:53 Functions: 29 35 82.9 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2015, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "dataOStream.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "connectionDescription.h"
      26             : #include "commands.h"
      27             : #include "connections.h"
      28             : #include "global.h"
      29             : #include "log.h"
      30             : #include "node.h"
      31             : #include "types.h"
      32             : 
      33             : #include <pression/compressor.h>
      34             : #include <pression/compressorResult.h>
      35             : #include <pression/plugins/compressor.h>
      36             : 
      37             : #include  <boost/foreach.hpp>
      38             : 
      39             : namespace co
      40             : {
      41             : namespace
      42             : {
      43             : //#define CO_INSTRUMENT_DATAOSTREAM
      44             : #ifdef CO_INSTRUMENT_DATAOSTREAM
      45             : lunchbox::a_int32_t nBytes;
      46             : lunchbox::a_int32_t nBytesIn;
      47             : lunchbox::a_int32_t nBytesOut;
      48             : CO_API lunchbox::a_int32_t nBytesSaved;
      49             : CO_API lunchbox::a_int32_t nBytesSent;
      50             : lunchbox::a_int32_t compressionTime;
      51             : #endif
      52             : 
      53             : enum CompressorState
      54             : {
      55             :     STATE_UNCOMPRESSED,
      56             :     STATE_PARTIAL,
      57             :     STATE_COMPLETE,
      58             :     STATE_UNCOMPRESSIBLE
      59             : };
      60             : }
      61             : 
      62             : namespace detail
      63             : {
      64       72343 : class DataOStream
      65             : {
      66             : public:
      67             :     CompressorState state;
      68             : 
      69             :     /** The buffer used for saving and buffering */
      70             :     lunchbox::Bufferb buffer;
      71             : 
      72             :     /** The start position of the buffering, always 0 if !_save */
      73             :     uint64_t bufferStart;
      74             : 
      75             :     /** The uncompressed size of a completely compressed buffer. */
      76             :     uint64_t dataSize;
      77             : 
      78             :     /** The compressed size, 0 for uncompressed or uncompressable data. */
      79             :     uint64_t compressedDataSize;
      80             : 
      81             :     /** Locked connections to the receivers, if _enabled */
      82             :     Connections connections;
      83             : 
      84             :     /** The compressor instance. */
      85             :     pression::Compressor compressor;
      86             : 
      87             :     /** The output stream is enabled for writing */
      88             :     bool enabled;
      89             : 
      90             :     /** Some data has been sent since it was enabled */
      91             :     bool dataSent;
      92             : 
      93             :     /** Save all sent data */
      94             :     bool save;
      95             : 
      96       72341 :     DataOStream()
      97             :         : state( STATE_UNCOMPRESSED )
      98             :         , bufferStart( 0 )
      99             :         , dataSize( 0 )
     100             :         , compressedDataSize( 0 )
     101             :         , enabled( false )
     102             :         , dataSent( false )
     103       72341 :         , save( false )
     104       72339 :     {}
     105             : 
     106           0 :     DataOStream( const DataOStream& rhs )
     107             :         : state( rhs.state )
     108             :         , bufferStart( rhs.bufferStart )
     109             :         , dataSize( rhs.dataSize )
     110             :         , compressedDataSize( rhs.compressedDataSize )
     111             :         , enabled( rhs.enabled )
     112             :         , dataSent( rhs.dataSent )
     113           0 :         , save( rhs.save )
     114           0 :     {}
     115             : 
     116         495 :     uint32_t getCompressor() const
     117             :     {
     118         495 :         if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
     119         495 :             return EQ_COMPRESSOR_NONE;
     120           0 :         return compressor.getInfo().name;
     121             :     }
     122             : 
     123         165 :     uint32_t getNumChunks() const
     124             :     {
     125         165 :         if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
     126         165 :             return 1;
     127           0 :         return uint32_t( compressor.getResult().chunks.size( ));
     128             :     }
     129             : 
     130             : 
     131             :     /** Compress data and update the compressor state. */
     132       72282 :     void compress( void* src, const uint64_t size, const CompressorState result)
     133             :     {
     134       72282 :         if( state == result || state == STATE_UNCOMPRESSIBLE )
     135       72289 :             return;
     136             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     137             :         nBytesIn += size;
     138             : #endif
     139             :         const uint64_t threshold =
     140       72282 :            uint64_t( Global::getIAttribute( Global::IATTR_OBJECT_COMPRESSION ));
     141             : 
     142       72283 :         if( !compressor.isGood() || size <= threshold )
     143             :         {
     144       72289 :             state = STATE_UNCOMPRESSED;
     145       72289 :             return;
     146             :         }
     147             : 
     148           0 :         const uint64_t inDims[2] = { 0, size };
     149             : 
     150             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     151             :         lunchbox::Clock clock;
     152             : #endif
     153           0 :         compressor.compress( src, inDims );
     154             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     155             :         compressionTime += uint32_t( clock.getTimef() * 1000.f );
     156             : #endif
     157             : 
     158             :         const pression::CompressorResult &compressorResult =
     159           0 :             compressor.getResult();
     160           0 :         LBASSERT( !compressorResult.chunks.empty() );
     161           0 :         compressedDataSize = compressorResult.getSize();
     162             : 
     163             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     164             :         nBytesOut += compressedDataSize;
     165             : #endif
     166             : 
     167           0 :         if( compressedDataSize >= size )
     168             :         {
     169           0 :             state = STATE_UNCOMPRESSIBLE;
     170             : #ifndef CO_AGGRESSIVE_CACHING
     171           0 :             compressor.realloc();
     172             : 
     173           0 :             if( result == STATE_COMPLETE )
     174           0 :                 buffer.pack();
     175             : #endif
     176           0 :             return;
     177             :         }
     178             : 
     179           0 :         state = result;
     180             : #ifndef CO_AGGRESSIVE_CACHING
     181           0 :         if( result == STATE_COMPLETE )
     182             :         {
     183           0 :             LBASSERT( buffer.getSize() == dataSize );
     184           0 :             buffer.clear();
     185           0 :         }
     186             : #endif
     187             :     }
     188             : };
     189             : }
     190             : 
     191       72338 : DataOStream::DataOStream()
     192       72338 :     : _impl( new detail::DataOStream )
     193       72339 : {}
     194             : 
     195           0 : DataOStream::DataOStream( DataOStream& rhs )
     196             :     : boost::noncopyable()
     197           0 :     , _impl( new detail::DataOStream( *rhs._impl ))
     198             : {
     199           0 :     _setupConnections( rhs.getConnections( ));
     200           0 :     getBuffer().swap( rhs.getBuffer( ));
     201             : 
     202             :     // disable send of rhs
     203           0 :     rhs._setupConnections( Connections( ));
     204           0 :     rhs.disable();
     205           0 : }
     206             : 
     207       72343 : DataOStream::~DataOStream()
     208             : {
     209             :     // Can't call disable() from destructor since it uses virtual functions
     210       72343 :     LBASSERT( !_impl->enabled );
     211       72343 :     delete _impl;
     212       72343 : }
     213             : 
     214         178 : void DataOStream::_initCompressor( const uint32_t name )
     215             : {
     216         178 :     LBCHECK( _impl->compressor.setup( Global::getPluginRegistry(), name ));
     217         178 :     LB_TS_RESET( _impl->compressor._thread );
     218         178 : }
     219             : 
     220       72402 : void DataOStream::_enable()
     221             : {
     222       72402 :     LBASSERT( !_impl->enabled );
     223       72402 :     LBASSERT( _impl->save || !_impl->connections.empty( ));
     224       72402 :     _impl->state = STATE_UNCOMPRESSED;
     225       72402 :     _impl->bufferStart = 0;
     226       72402 :     _impl->dataSent    = false;
     227       72402 :     _impl->dataSize    = 0;
     228       72402 :     _impl->enabled     = true;
     229       72402 :     _impl->buffer.setSize( 0 );
     230             : #ifdef CO_AGGRESSIVE_CACHING
     231             :     _impl->buffer.reserve( COMMAND_ALLOCSIZE );
     232             : #else
     233       72402 :     _impl->buffer.reserve( COMMAND_MINSIZE );
     234             : #endif
     235       72409 : }
     236             : 
     237         239 : void DataOStream::_setupConnections( const Nodes& receivers )
     238             : {
     239         239 :     _impl->connections = gatherConnections( receivers );
     240         239 : }
     241             : 
     242       72127 : void DataOStream::_setupConnections( const Connections& connections )
     243             : {
     244       72127 :     _impl->connections = connections;
     245       72118 : }
     246             : 
     247          37 : void DataOStream::_setupConnection( NodePtr node, const bool useMulticast )
     248             : {
     249          37 :     LBASSERT( _impl->connections.empty( ));
     250          37 :     _impl->connections.push_back( node->getConnection( useMulticast ));
     251          37 : }
     252             : 
     253           1 : void DataOStream::_setupConnection( ConnectionPtr connection )
     254             : {
     255           1 :     _impl->connections.push_back( connection );
     256           1 : }
     257             : 
     258          31 : void DataOStream::_resend()
     259             : {
     260          31 :     LBASSERT( !_impl->enabled );
     261          31 :     LBASSERT( !_impl->connections.empty( ));
     262          31 :     LBASSERT( _impl->save );
     263             : 
     264          31 :     _impl->compress( _impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE );
     265          31 :     sendData( _impl->buffer.getData(), _impl->dataSize, true );
     266          31 : }
     267             : 
     268          31 : void DataOStream::_clearConnections()
     269             : {
     270          31 :     _impl->connections.clear();
     271          31 : }
     272             : 
     273       72243 : void DataOStream::disable()
     274             : {
     275       72243 :     if( !_impl->enabled )
     276       72245 :         return;
     277             : 
     278       72243 :     _impl->dataSize = _impl->buffer.getSize();
     279       72241 :     _impl->dataSent = _impl->dataSize > 0;
     280             : 
     281       72241 :     if( _impl->dataSent && !_impl->connections.empty( ))
     282             :     {
     283       72080 :         void* ptr = _impl->buffer.getData() + _impl->bufferStart;
     284       72080 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     285             : 
     286       72080 :         if( size == 0 && _impl->state == STATE_PARTIAL )
     287             :         {
     288             :             // OPT: all data has been sent in one compressed chunk
     289           0 :             _impl->state = STATE_COMPLETE;
     290             : #ifndef CO_AGGRESSIVE_CACHING
     291           0 :             _impl->buffer.clear();
     292             : #endif
     293             :         }
     294             :         else
     295             :         {
     296       72080 :             _impl->state = STATE_UNCOMPRESSED;
     297       72080 :             const CompressorState state = _impl->bufferStart == 0 ?
     298       72080 :                                               STATE_COMPLETE : STATE_PARTIAL;
     299       72080 :             _impl->compress( ptr, size, state );
     300             :         }
     301             : 
     302       72087 :         sendData( ptr, size, true ); // always send to finalize istream
     303             :     }
     304             : 
     305             : #ifndef CO_AGGRESSIVE_CACHING
     306       72245 :     if( !_impl->save )
     307         124 :         _impl->buffer.clear();
     308             : #endif
     309       72245 :     _impl->enabled = false;
     310       72245 :     _impl->connections.clear();
     311             : }
     312             : 
     313       72278 : void DataOStream::enableSave()
     314             : {
     315       72278 :     LBASSERTINFO( !_impl->enabled ||
     316             :                   ( !_impl->dataSent && _impl->buffer.getSize() == 0 ),
     317             :                   "Can't enable saving after data has been written" );
     318       72278 :     _impl->save = true;
     319       72278 : }
     320             : 
     321           0 : void DataOStream::disableSave()
     322             : {
     323           0 :     LBASSERTINFO( !_impl->enabled ||
     324             :                   (!_impl->dataSent && _impl->buffer.getSize() == 0 ),
     325             :                   "Can't disable saving after data has been written" );
     326           0 :     _impl->save = false;
     327           0 : }
     328             : 
     329         229 : bool DataOStream::hasSentData() const
     330             : {
     331         229 :     return _impl->dataSent;
     332             : }
     333             : 
     334      313981 : void DataOStream::_write( const void* data, uint64_t size )
     335             : {
     336      313981 :     LBASSERT( _impl->enabled );
     337             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     338             :     nBytes += size;
     339             :     if( compressionTime > 100000 )
     340             :         LBWARN << *this << std::endl;
     341             : #endif
     342             : 
     343      627881 :     if( _impl->buffer.getSize() - _impl->bufferStart >
     344      313938 :         Global::getObjectBufferSize( ))
     345             :     {
     346           9 :         flush( false );
     347             :     }
     348      313945 :     _impl->buffer.append( static_cast< const uint8_t* >( data ), size );
     349      313866 : }
     350             : 
     351         174 : void DataOStream::flush( const bool last )
     352             : {
     353         174 :     LBASSERT( _impl->enabled );
     354         174 :     if( !_impl->connections.empty( ))
     355             :     {
     356         172 :         void* ptr = _impl->buffer.getData() + _impl->bufferStart;
     357         172 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     358             : 
     359         172 :         _impl->state = STATE_UNCOMPRESSED;
     360         172 :         _impl->compress( ptr, size, STATE_PARTIAL );
     361         172 :         sendData( ptr, size, last );
     362             :     }
     363         174 :     _impl->dataSent = true;
     364         174 :     _resetBuffer();
     365         174 : }
     366             : 
     367         394 : void DataOStream::reset()
     368             : {
     369         394 :     _resetBuffer();
     370         394 :     _impl->enabled = false;
     371         394 :     _impl->connections.clear();
     372         394 : }
     373             : 
     374       72770 : const Connections& DataOStream::getConnections() const
     375             : {
     376       72770 :     return _impl->connections;
     377             : }
     378             : 
     379         568 : void DataOStream::_resetBuffer()
     380             : {
     381         568 :     _impl->state = STATE_UNCOMPRESSED;
     382         568 :     if( _impl->save )
     383         332 :         _impl->bufferStart = _impl->buffer.getSize();
     384             :     else
     385             :     {
     386         236 :         _impl->bufferStart = 0;
     387         236 :         _impl->buffer.setSize( 0 );
     388             :     }
     389         568 : }
     390             : 
     391           0 : uint64_t DataOStream::_getCompressedData( void** chunks, uint64_t* chunkSizes )
     392             :     const
     393             : {
     394           0 :     LBASSERT( _impl->state != STATE_UNCOMPRESSED &&
     395             :               _impl->state != STATE_UNCOMPRESSIBLE );
     396             : 
     397           0 :     const pression::CompressorResult &result = _impl->compressor.getResult();
     398           0 :     LBASSERT( !result.chunks.empty() );
     399           0 :     size_t totalDataSize = 0;
     400           0 :     for( size_t i = 0; i != result.chunks.size(); ++i )
     401             :     {
     402           0 :         chunks[i] = result.chunks[i].data;
     403           0 :         const size_t dataSize = result.chunks[i].getNumBytes();
     404           0 :         chunkSizes[i] = dataSize;
     405           0 :         totalDataSize += dataSize;
     406             :     }
     407             : 
     408           0 :     return totalDataSize;
     409             : }
     410             : 
     411      288662 : lunchbox::Bufferb& DataOStream::getBuffer()
     412             : {
     413      288662 :     return _impl->buffer;
     414             : }
     415             : 
     416         165 : DataOStream& DataOStream::streamDataHeader( DataOStream& os )
     417             : {
     418         165 :     os << _impl->getCompressor() << _impl->getNumChunks();
     419         165 :     return os;
     420             : }
     421             : 
     422         165 : void DataOStream::sendBody( ConnectionPtr connection, const void* data,
     423             :                             const uint64_t size )
     424             : {
     425             : #ifdef EQ_INSTRUMENT_DATAOSTREAM
     426             :     nBytesSent += size;
     427             : #endif
     428             : 
     429         165 :     const uint32_t compressor = _impl->getCompressor();
     430         165 :     if( compressor == EQ_COMPRESSOR_NONE )
     431             :     {
     432         165 :         if( size > 0 )
     433         165 :             LBCHECK( connection->send( data, size, true ));
     434         330 :         return;
     435             :     }
     436             : 
     437             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     438             :     nBytesSent += _impl->buffer.getSize();
     439             : #endif
     440           0 :     const size_t nChunks = _impl->compressor.getResult().chunks.size();
     441             :     uint64_t* chunkSizes = static_cast< uint64_t* >
     442           0 :                                ( alloca (nChunks * sizeof( uint64_t )));
     443             :     void** chunks = static_cast< void ** >
     444           0 :                                   ( alloca( nChunks * sizeof( void* )));
     445             : 
     446             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     447             :     const uint64_t compressedSize = _getCompressedData( chunks, chunkSizes );
     448             :     nBytesSaved += size - compressedSize;
     449             : #else
     450           0 :     _getCompressedData( chunks, chunkSizes );
     451             : #endif
     452             : 
     453           0 :     for( size_t j = 0; j < nChunks; ++j )
     454             :     {
     455           0 :         LBCHECK( connection->send( &chunkSizes[j], sizeof( uint64_t ), true ));
     456           0 :         LBCHECK( connection->send( chunks[j], chunkSizes[j], true ));
     457             :     }
     458             : }
     459             : 
     460         165 : uint64_t DataOStream::getCompressedDataSize() const
     461             : {
     462         165 :     if( _impl->getCompressor() == EQ_COMPRESSOR_NONE )
     463         165 :         return 0;
     464             :     return _impl->compressedDataSize
     465           0 :             + _impl->getNumChunks() * sizeof( uint64_t );
     466             : }
     467             : 
     468           0 : std::ostream& operator << ( std::ostream& os, const DataOStream& dataOStream )
     469             : {
     470           0 :     os << "DataOStream "
     471             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     472             :        << "compressed " << nBytesIn << " -> " << nBytesOut << " of " << nBytes
     473             :        << " in " << compressionTime/1000 << "ms, saved " << nBytesSaved
     474             :        << " of " << nBytesSent << " brutto sent";
     475             : 
     476             :     nBytes = 0;
     477             :     nBytesIn = 0;
     478             :     nBytesOut = 0;
     479             :     nBytesSaved = 0;
     480             :     nBytesSent = 0;
     481             :     compressionTime = 0;
     482             : #else
     483           0 :        << "@" << (void*)&dataOStream;
     484             : #endif
     485           0 :     return os;
     486             : }
     487             : 
     488          63 : }

Generated by: LCOV version 1.11