LCOV - code coverage report
Current view: top level - co - dataOStream.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 133 190 70.0 %
Date: 2014-10-06 Functions: 29 35 82.9 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "dataOStream.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "connectionDescription.h"
      26             : #include "commands.h"
      27             : #include "connections.h"
      28             : #include "global.h"
      29             : #include "log.h"
      30             : #include "node.h"
      31             : #include "types.h"
      32             : 
      33             : #include <lunchbox/compressor.h>
      34             : #include <lunchbox/compressorResult.h>
      35             : #include <lunchbox/plugins/compressor.h>
      36             : 
      37             : #include  <boost/foreach.hpp>
      38             : 
      39             : namespace co
      40             : {
      41             : namespace
      42             : {
      43             : //#define CO_INSTRUMENT_DATAOSTREAM
      44             : #ifdef CO_INSTRUMENT_DATAOSTREAM
      45             : lunchbox::a_int32_t nBytes;
      46             : lunchbox::a_int32_t nBytesIn;
      47             : lunchbox::a_int32_t nBytesOut;
      48             : CO_API lunchbox::a_int32_t nBytesSaved;
      49             : CO_API lunchbox::a_int32_t nBytesSent;
      50             : lunchbox::a_int32_t compressionTime;
      51             : #endif
      52             : 
      53             : enum CompressorState
      54             : {
      55             :     STATE_UNCOMPRESSED,
      56             :     STATE_PARTIAL,
      57             :     STATE_COMPLETE,
      58             :     STATE_UNCOMPRESSIBLE
      59             : };
      60             : }
      61             : 
      62             : namespace detail
      63             : {
      64       72343 : class DataOStream
      65             : {
      66             : public:
      67             :     CompressorState state;
      68             : 
      69             :     /** The buffer used for saving and buffering */
      70             :     lunchbox::Bufferb buffer;
      71             : 
      72             :     /** The start position of the buffering, always 0 if !_save */
      73             :     uint64_t bufferStart;
      74             : 
      75             :     /** The uncompressed size of a completely compressed buffer. */
      76             :     uint64_t dataSize;
      77             : 
      78             :     /** The compressed size, 0 for uncompressed or uncompressable data. */
      79             :     uint64_t compressedDataSize;
      80             : 
      81             :     /** Locked connections to the receivers, if _enabled */
      82             :     Connections connections;
      83             : 
      84             :     /** The compressor instance. */
      85             :     lunchbox::Compressor compressor;
      86             : 
      87             :     /** The output stream is enabled for writing */
      88             :     bool enabled;
      89             : 
      90             :     /** Some data has been sent since it was enabled */
      91             :     bool dataSent;
      92             : 
      93             :     /** Save all sent data */
      94             :     bool save;
      95             : 
      96       72343 :     DataOStream()
      97             :         : state( STATE_UNCOMPRESSED )
      98             :         , bufferStart( 0 )
      99             :         , dataSize( 0 )
     100             :         , enabled( false )
     101             :         , dataSent( false )
     102       72343 :         , save( false )
     103       72342 :     {}
     104             : 
     105           0 :     DataOStream( const DataOStream& rhs )
     106             :         : state( rhs.state )
     107             :         , bufferStart( rhs.bufferStart )
     108             :         , dataSize( rhs.dataSize )
     109             :         , enabled( rhs.enabled )
     110             :         , dataSent( rhs.dataSent )
     111           0 :         , save( rhs.save )
     112           0 :     {}
     113             : 
     114         495 :     uint32_t getCompressor() const
     115             :     {
     116         495 :         if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
     117         495 :             return EQ_COMPRESSOR_NONE;
     118           0 :         return compressor.getInfo().name;
     119             :     }
     120             : 
     121         165 :     uint32_t getNumChunks() const
     122             :     {
     123         165 :         if( state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE )
     124         165 :             return 1;
     125           0 :         return uint32_t( compressor.getResult().chunks.size( ));
     126             :     }
     127             : 
     128             : 
     129             :     /** Compress data and update the compressor state. */
     130       72291 :     void compress( void* src, const uint64_t size, const CompressorState result)
     131             :     {
     132       72291 :         if( state == result || state == STATE_UNCOMPRESSIBLE )
     133       72291 :             return;
     134             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     135             :         nBytesIn += size;
     136             : #endif
     137             :         const uint64_t threshold =
     138       72291 :            uint64_t( Global::getIAttribute( Global::IATTR_OBJECT_COMPRESSION ));
     139             : 
     140       72291 :         if( !compressor.isGood() || size <= threshold )
     141             :         {
     142       72291 :             state = STATE_UNCOMPRESSED;
     143       72291 :             return;
     144             :         }
     145             : 
     146           0 :         const uint64_t inDims[2] = { 0, size };
     147             : 
     148             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     149             :         lunchbox::Clock clock;
     150             : #endif
     151           0 :         compressor.compress( src, inDims );
     152             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     153             :         compressionTime += uint32_t( clock.getTimef() * 1000.f );
     154             : #endif
     155             : 
     156             :         const lunchbox::CompressorResult &compressorResult =
     157           0 :             compressor.getResult();
     158           0 :         LBASSERT( !compressorResult.chunks.empty() );
     159           0 :         compressedDataSize = compressorResult.getSize();
     160             : 
     161             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     162             :         nBytesOut += compressedDataSize;
     163             : #endif
     164             : 
     165           0 :         if( compressedDataSize >= size )
     166             :         {
     167           0 :             state = STATE_UNCOMPRESSIBLE;
     168             : #ifndef CO_AGGRESSIVE_CACHING
     169           0 :             compressor.realloc();
     170             : 
     171           0 :             if( result == STATE_COMPLETE )
     172           0 :                 buffer.pack();
     173             : #endif
     174           0 :             return;
     175             :         }
     176             : 
     177           0 :         state = result;
     178             : #ifndef CO_AGGRESSIVE_CACHING
     179           0 :         if( result == STATE_COMPLETE )
     180             :         {
     181           0 :             LBASSERT( buffer.getSize() == dataSize );
     182           0 :             buffer.clear();
     183           0 :         }
     184             : #endif
     185             :     }
     186             : };
     187             : }
     188             : 
     189       72343 : DataOStream::DataOStream()
     190       72343 :     : _impl( new detail::DataOStream )
     191       72343 : {}
     192             : 
     193           0 : DataOStream::DataOStream( DataOStream& rhs )
     194             :     : boost::noncopyable()
     195           0 :     , _impl( new detail::DataOStream( *rhs._impl ))
     196             : {
     197           0 :     _setupConnections( rhs.getConnections( ));
     198           0 :     getBuffer().swap( rhs.getBuffer( ));
     199             : 
     200             :     // disable send of rhs
     201           0 :     rhs._setupConnections( Connections( ));
     202           0 :     rhs.disable();
     203           0 : }
     204             : 
     205       72343 : DataOStream::~DataOStream()
     206             : {
     207             :     // Can't call disable() from destructor since it uses virtual functions
     208       72343 :     LBASSERT( !_impl->enabled );
     209       72343 :     delete _impl;
     210       72343 : }
     211             : 
     212         178 : void DataOStream::_initCompressor( const uint32_t name )
     213             : {
     214         178 :     LBCHECK( _impl->compressor.setup( Global::getPluginRegistry(), name ));
     215         178 :     LB_TS_RESET( _impl->compressor._thread );
     216         178 : }
     217             : 
     218       72410 : void DataOStream::_enable()
     219             : {
     220       72410 :     LBASSERT( !_impl->enabled );
     221       72410 :     LBASSERT( _impl->save || !_impl->connections.empty( ));
     222       72410 :     _impl->state = STATE_UNCOMPRESSED;
     223       72410 :     _impl->bufferStart = 0;
     224       72410 :     _impl->dataSent    = false;
     225       72410 :     _impl->dataSize    = 0;
     226       72410 :     _impl->enabled     = true;
     227       72410 :     _impl->buffer.setSize( 0 );
     228             : #ifdef CO_AGGRESSIVE_CACHING
     229             :     _impl->buffer.reserve( COMMAND_ALLOCSIZE );
     230             : #else
     231       72410 :     _impl->buffer.reserve( COMMAND_MINSIZE );
     232             : #endif
     233       72409 : }
     234             : 
     235         239 : void DataOStream::_setupConnections( const Nodes& receivers )
     236             : {
     237         239 :     _impl->connections = gatherConnections( receivers );
     238         239 : }
     239             : 
     240       72127 : void DataOStream::_setupConnections( const Connections& connections )
     241             : {
     242       72127 :     _impl->connections = connections;
     243       72127 : }
     244             : 
     245          37 : void DataOStream::_setupConnection( NodePtr node, const bool useMulticast )
     246             : {
     247          37 :     LBASSERT( _impl->connections.empty( ));
     248          37 :     _impl->connections.push_back( node->getConnection( useMulticast ));
     249          37 : }
     250             : 
     251           1 : void DataOStream::_setupConnection( ConnectionPtr connection )
     252             : {
     253           1 :     _impl->connections.push_back( connection );
     254           1 : }
     255             : 
     256          31 : void DataOStream::_resend()
     257             : {
     258          31 :     LBASSERT( !_impl->enabled );
     259          31 :     LBASSERT( !_impl->connections.empty( ));
     260          31 :     LBASSERT( _impl->save );
     261             : 
     262          31 :     _impl->compress( _impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE );
     263          31 :     sendData( _impl->buffer.getData(), _impl->dataSize, true );
     264          31 : }
     265             : 
     266          31 : void DataOStream::_clearConnections()
     267             : {
     268          31 :     _impl->connections.clear();
     269          31 : }
     270             : 
     271       72245 : void DataOStream::disable()
     272             : {
     273       72245 :     if( !_impl->enabled )
     274       72245 :         return;
     275             : 
     276       72245 :     _impl->dataSize = _impl->buffer.getSize();
     277       72245 :     _impl->dataSent = _impl->dataSize > 0;
     278             : 
     279       72245 :     if( _impl->dataSent && !_impl->connections.empty( ))
     280             :     {
     281       72088 :         void* ptr = _impl->buffer.getData() + _impl->bufferStart;
     282       72088 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     283             : 
     284       72088 :         if( size == 0 && _impl->state == STATE_PARTIAL )
     285             :         {
     286             :             // OPT: all data has been sent in one compressed chunk
     287           0 :             _impl->state = STATE_COMPLETE;
     288             : #ifndef CO_AGGRESSIVE_CACHING
     289           0 :             _impl->buffer.clear();
     290             : #endif
     291             :         }
     292             :         else
     293             :         {
     294       72088 :             _impl->state = STATE_UNCOMPRESSED;
     295       72088 :             const CompressorState state = _impl->bufferStart == 0 ?
     296       72088 :                                               STATE_COMPLETE : STATE_PARTIAL;
     297       72088 :             _impl->compress( ptr, size, state );
     298             :         }
     299             : 
     300       72088 :         sendData( ptr, size, true ); // always send to finalize istream
     301             :     }
     302             : 
     303             : #ifndef CO_AGGRESSIVE_CACHING
     304       72245 :     if( !_impl->save )
     305         124 :         _impl->buffer.clear();
     306             : #endif
     307       72245 :     _impl->enabled = false;
     308       72245 :     _impl->connections.clear();
     309             : }
     310             : 
     311       72286 : void DataOStream::enableSave()
     312             : {
     313       72286 :     LBASSERTINFO( !_impl->enabled ||
     314             :                   ( !_impl->dataSent && _impl->buffer.getSize() == 0 ),
     315             :                   "Can't enable saving after data has been written" );
     316       72286 :     _impl->save = true;
     317       72286 : }
     318             : 
     319           0 : void DataOStream::disableSave()
     320             : {
     321           0 :     LBASSERTINFO( !_impl->enabled ||
     322             :                   (!_impl->dataSent && _impl->buffer.getSize() == 0 ),
     323             :                   "Can't disable saving after data has been written" );
     324           0 :     _impl->save = false;
     325           0 : }
     326             : 
     327         229 : bool DataOStream::hasSentData() const
     328             : {
     329         229 :     return _impl->dataSent;
     330             : }
     331             : 
     332      314301 : void DataOStream::_write( const void* data, uint64_t size )
     333             : {
     334      314301 :     LBASSERT( _impl->enabled );
     335             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     336             :     nBytes += size;
     337             :     if( compressionTime > 100000 )
     338             :         LBWARN << *this << std::endl;
     339             : #endif
     340             : 
     341      628597 :     if( _impl->buffer.getSize() - _impl->bufferStart >
     342      314299 :         Global::getObjectBufferSize( ))
     343             :     {
     344           9 :         flush( false );
     345             :     }
     346      314296 :     _impl->buffer.append( static_cast< const uint8_t* >( data ), size );
     347      314293 : }
     348             : 
     349         174 : void DataOStream::flush( const bool last )
     350             : {
     351         174 :     LBASSERT( _impl->enabled );
     352         174 :     if( !_impl->connections.empty( ))
     353             :     {
     354         172 :         void* ptr = _impl->buffer.getData() + _impl->bufferStart;
     355         172 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     356             : 
     357         172 :         _impl->state = STATE_UNCOMPRESSED;
     358         172 :         _impl->compress( ptr, size, STATE_PARTIAL );
     359         172 :         sendData( ptr, size, last );
     360             :     }
     361         174 :     _impl->dataSent = true;
     362         174 :     _resetBuffer();
     363         174 : }
     364             : 
     365         394 : void DataOStream::reset()
     366             : {
     367         394 :     _resetBuffer();
     368         394 :     _impl->enabled = false;
     369         394 :     _impl->connections.clear();
     370         394 : }
     371             : 
     372       72790 : const Connections& DataOStream::getConnections() const
     373             : {
     374       72790 :     return _impl->connections;
     375             : }
     376             : 
     377         568 : void DataOStream::_resetBuffer()
     378             : {
     379         568 :     _impl->state = STATE_UNCOMPRESSED;
     380         568 :     if( _impl->save )
     381         332 :         _impl->bufferStart = _impl->buffer.getSize();
     382             :     else
     383             :     {
     384         236 :         _impl->bufferStart = 0;
     385         236 :         _impl->buffer.setSize( 0 );
     386             :     }
     387         568 : }
     388             : 
     389           0 : uint64_t DataOStream::_getCompressedData( void** chunks, uint64_t* chunkSizes )
     390             :     const
     391             : {
     392           0 :     LBASSERT( _impl->state != STATE_UNCOMPRESSED &&
     393             :               _impl->state != STATE_UNCOMPRESSIBLE );
     394             : 
     395           0 :     const lunchbox::CompressorResult &result = _impl->compressor.getResult();
     396           0 :     LBASSERT( !result.chunks.empty() );
     397           0 :     size_t totalDataSize = 0;
     398           0 :     for( size_t i = 0; i != result.chunks.size(); ++i )
     399             :     {
     400           0 :         chunks[i] = result.chunks[i].data;
     401           0 :         const size_t dataSize = result.chunks[i].getNumBytes();
     402           0 :         chunkSizes[i] = dataSize;
     403           0 :         totalDataSize += dataSize;
     404             :     }
     405             : 
     406           0 :     return totalDataSize;
     407             : }
     408             : 
     409      288740 : lunchbox::Bufferb& DataOStream::getBuffer()
     410             : {
     411      288740 :     return _impl->buffer;
     412             : }
     413             : 
     414         165 : DataOStream& DataOStream::streamDataHeader( DataOStream& os )
     415             : {
     416         165 :     os << _impl->getCompressor() << _impl->getNumChunks();
     417         165 :     return os;
     418             : }
     419             : 
     420         165 : void DataOStream::sendBody( ConnectionPtr connection, const void* data,
     421             :                             const uint64_t size )
     422             : {
     423             : #ifdef EQ_INSTRUMENT_DATAOSTREAM
     424             :     nBytesSent += size;
     425             : #endif
     426             : 
     427         165 :     const uint32_t compressor = _impl->getCompressor();
     428         165 :     if( compressor == EQ_COMPRESSOR_NONE )
     429             :     {
     430         165 :         if( size > 0 )
     431         165 :             LBCHECK( connection->send( data, size, true ));
     432         330 :         return;
     433             :     }
     434             : 
     435             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     436             :     nBytesSent += _impl->buffer.getSize();
     437             : #endif
     438           0 :     const size_t nChunks = _impl->compressor.getResult().chunks.size();
     439             :     uint64_t* chunkSizes = static_cast< uint64_t* >
     440           0 :                                ( alloca (nChunks * sizeof( uint64_t )));
     441             :     void** chunks = static_cast< void ** >
     442           0 :                                   ( alloca( nChunks * sizeof( void* )));
     443             : 
     444             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     445             :     const uint64_t compressedSize = _getCompressedData( chunks, chunkSizes );
     446             :     nBytesSaved += size - compressedSize;
     447             : #else
     448           0 :     _getCompressedData( chunks, chunkSizes );
     449             : #endif
     450             : 
     451           0 :     for( size_t j = 0; j < nChunks; ++j )
     452             :     {
     453           0 :         LBCHECK( connection->send( &chunkSizes[j], sizeof( uint64_t ), true ));
     454           0 :         LBCHECK( connection->send( chunks[j], chunkSizes[j], true ));
     455             :     }
     456             : }
     457             : 
     458         165 : uint64_t DataOStream::getCompressedDataSize() const
     459             : {
     460         165 :     if( _impl->getCompressor() == EQ_COMPRESSOR_NONE )
     461         165 :         return 0;
     462             :     return _impl->compressedDataSize
     463           0 :             + _impl->getNumChunks() * sizeof( uint64_t );
     464             : }
     465             : 
     466           0 : std::ostream& operator << ( std::ostream& os, const DataOStream& dataOStream )
     467             : {
     468           0 :     os << "DataOStream "
     469             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     470             :        << "compressed " << nBytesIn << " -> " << nBytesOut << " of " << nBytes
     471             :        << " in " << compressionTime/1000 << "ms, saved " << nBytesSaved
     472             :        << " of " << nBytesSent << " brutto sent";
     473             : 
     474             :     nBytes = 0;
     475             :     nBytesIn = 0;
     476             :     nBytesOut = 0;
     477             :     nBytesSaved = 0;
     478             :     nBytesSent = 0;
     479             :     compressionTime = 0;
     480             : #else
     481           0 :        << "@" << (void*)&dataOStream;
     482             : #endif
     483           0 :     return os;
     484             : }
     485             : 
     486          60 : }

Generated by: LCOV version 1.10