LCOV - code coverage report
Current view: top level - co - dataOStream.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 206 237 86.9 %
Date: 2018-01-09 16:37:03 Functions: 33 37 89.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "dataOStream.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "commands.h"
      26             : #include "connectionDescription.h"
      27             : #include "connections.h"
      28             : #include "log.h"
      29             : #include "node.h"
      30             : #include "types.h"
      31             : 
      32             : #include <lunchbox/clock.h>
      33             : #include <pression/data/Compressor.h>
      34             : #include <pression/data/CompressorInfo.h>
      35             : 
      36             : namespace co
      37             : {
      38             : namespace
      39             : {
      40             : #define CO_INSTRUMENT_DATAOSTREAM
      41             : #ifdef CO_INSTRUMENT_DATAOSTREAM
      42          21 : lunchbox::a_ssize_t nBytes;
      43          21 : lunchbox::a_ssize_t nBytesIn;
      44          21 : lunchbox::a_ssize_t nBytesOut;
      45          21 : lunchbox::a_ssize_t nBytesSaved;
      46          21 : lunchbox::a_ssize_t nBytesSent;
      47          21 : lunchbox::a_ssize_t compressionTime;
      48          21 : lunchbox::a_ssize_t compressionRuns;
      49             : #endif
      50             : 
      51             : enum CompressorState
      52             : {
      53             :     STATE_UNCOMPRESSED,
      54             :     STATE_PARTIAL,
      55             :     STATE_COMPLETE,
      56             :     STATE_UNCOMPRESSIBLE
      57             : };
      58             : }
      59             : 
      60             : namespace detail
      61             : {
      62       72284 : class DataOStream
      63             : {
      64             : public:
      65             :     /** The buffer used for saving and buffering */
      66             :     lunchbox::Bufferb buffer;
      67             : 
      68             :     /** The start position of the buffering, always 0 if !_save */
      69             :     uint64_t bufferStart;
      70             : 
      71             :     /** The uncompressed size of a completely compressed buffer. */
      72             :     uint64_t dataSize;
      73             : 
      74             :     /** The compressed size, 0 for uncompressed or uncompressable data. */
      75             :     uint64_t compressedDataSize;
      76             : 
      77             :     /** The maximum of size of chunk that gets flushed/sent. */
      78             :     const size_t chunkSize;
      79             : 
      80             :     /** Locked connections to the receivers, if _enabled */
      81             :     Connections connections;
      82             : 
      83             :     CompressorState state;         //!< State of compression
      84             :     CompressorPtr compressor;      //!< The compressor instance.
      85             :     CompressorInfo compressorInfo; //!< Information on the compr.
      86             : 
      87             :     /** The output stream is enabled for writing */
      88             :     bool enabled;
      89             : 
      90             :     /** Some data has been sent since it was enabled */
      91             :     bool dataSent;
      92             : 
      93             :     /** Save all sent data */
      94             :     bool save;
      95             : 
      96       72280 :     DataOStream(const size_t chunkSize_)
      97       72280 :         : bufferStart(0)
      98             :         , dataSize(0)
      99             :         , compressedDataSize(0)
     100             :         , chunkSize(chunkSize_)
     101             :         , state(STATE_UNCOMPRESSED)
     102             :         , enabled(false)
     103             :         , dataSent(false)
     104       72280 :         , save(false)
     105             :     {
     106       72278 :     }
     107             : 
     108           0 :     DataOStream(const DataOStream& rhs)
     109           0 :         : bufferStart(rhs.bufferStart)
     110           0 :         , dataSize(rhs.dataSize)
     111           0 :         , compressedDataSize(rhs.compressedDataSize)
     112           0 :         , chunkSize(rhs.chunkSize)
     113           0 :         , state(rhs.state)
     114           0 :         , enabled(rhs.enabled)
     115           0 :         , dataSent(rhs.dataSent)
     116           0 :         , save(rhs.save)
     117             :     {
     118           0 :     }
     119             : 
     120         318 :     std::string getCompressorName() const
     121             :     {
     122         334 :         if (state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
     123          16 :             !compressor)
     124             :         {
     125         302 :             return std::string();
     126             :         }
     127          16 :         return compressorInfo.name;
     128             :     }
     129             : 
     130          10 :     bool initCompressor()
     131             :     {
     132          10 :         if (compressorInfo.name.empty())
     133           2 :             return false;
     134           8 :         if (compressor)
     135           0 :             return true;
     136             : 
     137           8 :         compressor.reset(compressorInfo.create());
     138           8 :         LBLOG(LOG_OBJECTS) << "Allocated " << compressorInfo.name << std::endl;
     139           8 :         return compressor != nullptr;
     140             :     }
     141             : 
     142         167 :     uint32_t getNumChunks() const
     143             :     {
     144         183 :         if (state == STATE_UNCOMPRESSED || state == STATE_UNCOMPRESSIBLE ||
     145          16 :             !compressor)
     146             :         {
     147         151 :             return 1;
     148             :         }
     149          16 :         return uint32_t(compressor->getCompressedData().size());
     150             :     }
     151             : 
     152             :     /** Compress data and update the compressor state. */
     153       72224 :     void compress(const uint8_t* src, const uint64_t size,
     154             :                   const CompressorState result)
     155             :     {
     156       72224 :         if (state == result || state == STATE_UNCOMPRESSIBLE)
     157       72216 :             return;
     158             :         const uint64_t threshold =
     159       72224 :             uint64_t(Global::getIAttribute(Global::IATTR_OBJECT_COMPRESSION));
     160             : 
     161       72224 :         if (size <= threshold || !initCompressor())
     162             :         {
     163       72216 :             state = STATE_UNCOMPRESSED;
     164       72216 :             return;
     165             :         }
     166             : 
     167             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     168          16 :         lunchbox::Clock clock;
     169             : #endif
     170           8 :         const auto& output = compressor->compress(src, size);
     171           8 :         LBASSERT(!output.empty());
     172           8 :         compressedDataSize = pression::data::getDataSize(output);
     173             : 
     174             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     175           8 :         compressionTime += size_t(clock.getTimef() * 1000.f);
     176           8 :         nBytesIn += size;
     177           8 :         nBytesOut += compressedDataSize;
     178           8 :         ++compressionRuns;
     179             : #endif
     180             : 
     181           8 :         if (compressedDataSize >= size)
     182             :         {
     183           0 :             state = STATE_UNCOMPRESSIBLE;
     184             : #ifndef CO_AGGRESSIVE_CACHING
     185           0 :             compressor.reset(compressorInfo.create());
     186             : 
     187           0 :             if (result == STATE_COMPLETE)
     188           0 :                 buffer.pack();
     189             : #endif
     190           0 :             return;
     191             :         }
     192             : 
     193           8 :         state = result;
     194             : #ifndef CO_AGGRESSIVE_CACHING
     195           8 :         if (result == STATE_COMPLETE)
     196             :         {
     197           2 :             LBASSERT(buffer.getSize() == dataSize);
     198           2 :             buffer.clear();
     199             :         }
     200             : #endif
     201             :     }
     202             : };
     203             : }
     204             : 
     205       72284 : DataOStream::DataOStream(const size_t chunkSize)
     206       72284 :     : _impl(new detail::DataOStream(chunkSize))
     207             : {
     208       72278 : }
     209             : 
     210           0 : DataOStream::DataOStream(DataOStream& rhs)
     211             :     : boost::noncopyable()
     212           0 :     , _impl(new detail::DataOStream(*rhs._impl))
     213             : {
     214           0 :     _setupConnections(rhs.getConnections());
     215           0 :     getBuffer().swap(rhs.getBuffer());
     216             : 
     217             :     // disable send of rhs
     218           0 :     rhs._setupConnections(Connections());
     219           0 :     rhs.disable();
     220           0 : }
     221             : 
     222      144568 : DataOStream::~DataOStream()
     223             : {
     224             :     // Can't call disable() from destructor since it uses virtual functions
     225       72284 :     LBASSERT(!_impl->enabled);
     226       72284 :     delete _impl;
     227       72284 : }
     228             : 
     229         172 : void DataOStream::_setCompressor(const CompressorInfo& info)
     230             : {
     231         172 :     if (info == _impl->compressorInfo)
     232           0 :         return;
     233         172 :     _impl->compressorInfo = info;
     234         172 :     _impl->compressor.reset(nullptr);
     235             : }
     236             : 
     237       72344 : void DataOStream::_enable()
     238             : {
     239       72344 :     LBASSERT(!_impl->enabled);
     240       72341 :     LBASSERT(_impl->save || !_impl->connections.empty());
     241       72340 :     _impl->state = STATE_UNCOMPRESSED;
     242       72340 :     _impl->bufferStart = 0;
     243       72340 :     _impl->dataSent = false;
     244       72340 :     _impl->dataSize = 0;
     245       72340 :     _impl->enabled = true;
     246       72340 :     _impl->buffer.setSize(0);
     247             : #ifdef CO_AGGRESSIVE_CACHING
     248             :     _impl->buffer.reserve(COMMAND_ALLOCSIZE);
     249             : #else
     250       72337 :     _impl->buffer.reserve(COMMAND_MINSIZE);
     251             : #endif
     252       72347 : }
     253             : 
     254         236 : void DataOStream::_setupConnections(const Nodes& receivers)
     255             : {
     256         236 :     _impl->connections = gatherConnections(receivers);
     257         236 : }
     258             : 
     259       72072 : void DataOStream::_setupConnections(const Connections& connections)
     260             : {
     261       72072 :     _impl->connections = connections;
     262       72071 : }
     263             : 
     264          32 : void DataOStream::_setupConnection(NodePtr node, const bool useMulticast)
     265             : {
     266          32 :     LBASSERT(_impl->connections.empty());
     267          32 :     _impl->connections.push_back(node->getConnection(useMulticast));
     268          32 : }
     269             : 
     270           1 : void DataOStream::_setupConnection(ConnectionPtr connection)
     271             : {
     272           1 :     _impl->connections.push_back(connection);
     273           1 : }
     274             : 
     275          33 : void DataOStream::_resend()
     276             : {
     277          33 :     LBASSERT(!_impl->enabled);
     278          33 :     LBASSERT(!_impl->connections.empty());
     279          33 :     LBASSERT(_impl->save);
     280             : 
     281          33 :     _impl->compress(_impl->buffer.getData(), _impl->dataSize, STATE_COMPLETE);
     282          33 :     sendData(_impl->buffer.getData(), _impl->dataSize, true);
     283          33 : }
     284             : 
     285          33 : void DataOStream::_clearConnections()
     286             : {
     287          33 :     _impl->connections.clear();
     288          33 : }
     289             : 
     290       72186 : void DataOStream::disable()
     291             : {
     292       72186 :     if (!_impl->enabled)
     293           0 :         return;
     294             : 
     295       72186 :     _impl->dataSize = _impl->buffer.getSize();
     296       72187 :     _impl->dataSent = _impl->dataSize > 0;
     297             : 
     298       72187 :     if (_impl->dataSent && !_impl->connections.empty())
     299             :     {
     300       72025 :         const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
     301       72025 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     302             : 
     303       72024 :         if (size == 0 && _impl->state == STATE_PARTIAL)
     304             :         {
     305             :             // OPT: all data has been sent in one compressed chunk
     306           0 :             _impl->state = STATE_COMPLETE;
     307             : #ifndef CO_AGGRESSIVE_CACHING
     308           0 :             _impl->buffer.clear();
     309             : #endif
     310             :         }
     311             :         else
     312             :         {
     313       72024 :             _impl->state = STATE_UNCOMPRESSED;
     314             :             const CompressorState state =
     315       72024 :                 _impl->bufferStart == 0 ? STATE_COMPLETE : STATE_PARTIAL;
     316       72024 :             _impl->compress(ptr, size, state);
     317             :         }
     318             : 
     319       72025 :         sendData(ptr, size, true); // always send to finalize istream
     320             :     }
     321             : 
     322             : #ifndef CO_AGGRESSIVE_CACHING
     323       72188 :     if (!_impl->save)
     324         115 :         _impl->buffer.clear();
     325             : #endif
     326       72188 :     _impl->enabled = false;
     327       72188 :     _impl->connections.clear();
     328             : }
     329             : 
     330       72229 : void DataOStream::enableSave()
     331             : {
     332       72229 :     LBASSERTINFO(!_impl->enabled ||
     333             :                      (!_impl->dataSent && _impl->buffer.getSize() == 0),
     334             :                  "Can't enable saving after data has been written");
     335       72229 :     _impl->save = true;
     336       72229 : }
     337             : 
     338           0 : void DataOStream::disableSave()
     339             : {
     340           0 :     LBASSERTINFO(!_impl->enabled ||
     341             :                      (!_impl->dataSent && _impl->buffer.getSize() == 0),
     342             :                  "Can't disable saving after data has been written");
     343           0 :     _impl->save = false;
     344           0 : }
     345             : 
     346         218 : bool DataOStream::hasSentData() const
     347             : {
     348         218 :     return _impl->dataSent;
     349             : }
     350             : 
     351      313707 : void DataOStream::_write(const void* data, uint64_t size)
     352             : {
     353      313707 :     LBASSERT(_impl->enabled);
     354             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     355      313642 :     nBytes += size;
     356             : #endif
     357             : 
     358      313879 :     if (_impl->buffer.getSize() - _impl->bufferStart > _impl->chunkSize)
     359           9 :         flush(false);
     360      313866 :     _impl->buffer.append(static_cast<const uint8_t*>(data), size);
     361      313792 : }
     362             : 
     363         168 : void DataOStream::flush(const bool last)
     364             : {
     365         168 :     LBASSERT(_impl->enabled);
     366         168 :     if (!_impl->connections.empty())
     367             :     {
     368         166 :         const uint8_t* ptr = _impl->buffer.getData() + _impl->bufferStart;
     369         166 :         const uint64_t size = _impl->buffer.getSize() - _impl->bufferStart;
     370             : 
     371         166 :         _impl->state = STATE_UNCOMPRESSED;
     372         166 :         _impl->compress(ptr, size, STATE_PARTIAL);
     373         166 :         sendData(ptr, size, last);
     374             :     }
     375         168 :     _impl->dataSent = true;
     376         168 :     _resetBuffer();
     377         168 : }
     378             : 
     379         384 : void DataOStream::reset()
     380             : {
     381         384 :     _resetBuffer();
     382         384 :     _impl->enabled = false;
     383         384 :     _impl->connections.clear();
     384         384 : }
     385             : 
     386       72690 : const Connections& DataOStream::getConnections() const
     387             : {
     388       72690 :     return _impl->connections;
     389             : }
     390             : 
     391         552 : void DataOStream::_resetBuffer()
     392             : {
     393         552 :     _impl->state = STATE_UNCOMPRESSED;
     394         552 :     if (_impl->save)
     395         320 :         _impl->bufferStart = _impl->buffer.getSize();
     396             :     else
     397             :     {
     398         232 :         _impl->bufferStart = 0;
     399         232 :         _impl->buffer.setSize(0);
     400             :     }
     401         552 : }
     402             : 
     403           8 : uint64_t DataOStream::_getCompressedData(const uint8_t** chunks,
     404             :                                          uint64_t* chunkSizes) const
     405             : {
     406           8 :     LBASSERT(_impl->state != STATE_UNCOMPRESSED &&
     407             :              _impl->state != STATE_UNCOMPRESSIBLE && _impl->compressor);
     408             : 
     409           8 :     const auto& result = _impl->compressor->getCompressedData();
     410           8 :     LBASSERT(!result.empty());
     411           8 :     size_t totalDataSize = 0;
     412          16 :     for (size_t i = 0; i != result.size(); ++i)
     413             :     {
     414           8 :         chunks[i] = result[i].getData();
     415           8 :         const size_t dataSize = result[i].getSize();
     416           8 :         chunkSizes[i] = dataSize;
     417           8 :         totalDataSize += dataSize;
     418             :     }
     419             : 
     420           8 :     LBASSERT(totalDataSize == pression::data::getDataSize(result));
     421           8 :     return totalDataSize;
     422             : }
     423             : 
     424      288441 : lunchbox::Bufferb& DataOStream::getBuffer()
     425             : {
     426      288441 :     return _impl->buffer;
     427             : }
     428             : 
     429         159 : DataOStream& DataOStream::streamDataHeader(DataOStream& os)
     430             : {
     431         159 :     return os << _impl->getCompressorName() << _impl->getNumChunks();
     432             : }
     433             : 
     434         159 : void DataOStream::sendBody(ConnectionPtr connection, const void* data,
     435             :                            const uint64_t size)
     436             : {
     437         167 :     const auto& compressorName = _impl->getCompressorName();
     438         159 :     if (compressorName.empty())
     439             :     {
     440             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     441         151 :         nBytesSent += size;
     442             : #endif
     443         151 :         if (size > 0)
     444         151 :             LBCHECK(connection->send(data, size, true));
     445         151 :         return;
     446             :     }
     447             : 
     448           8 :     const size_t nChunks = _impl->compressor->getCompressedData().size();
     449             :     uint64_t* chunkSizes =
     450           8 :         static_cast<uint64_t*>(alloca(nChunks * sizeof(uint64_t)));
     451             :     const uint8_t** chunks =
     452           8 :         static_cast<const uint8_t**>(alloca(nChunks * sizeof(void*)));
     453             : 
     454             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     455           8 :     const uint64_t compressedSize = _getCompressedData(chunks, chunkSizes);
     456           8 :     if (_impl->state == STATE_COMPLETE)
     457             :     {
     458           2 :         nBytesSent += _impl->dataSize;
     459           2 :         nBytesSaved += _impl->dataSize - compressedSize;
     460             :     }
     461             :     else
     462             :     {
     463           6 :         nBytesSent += _impl->buffer.getSize() - _impl->bufferStart;
     464           6 :         nBytesSaved +=
     465          12 :             _impl->buffer.getSize() - _impl->bufferStart - compressedSize;
     466             :     }
     467             : 
     468             : #else
     469             :     _getCompressedData(chunks, chunkSizes);
     470             : #endif
     471             : 
     472          16 :     for (size_t j = 0; j < nChunks; ++j)
     473             :     {
     474           8 :         LBCHECK(connection->send(&chunkSizes[j], sizeof(uint64_t), true));
     475           8 :         LBCHECK(connection->send(chunks[j], chunkSizes[j], true));
     476             :     }
     477             : }
     478             : 
     479         159 : uint64_t DataOStream::getCompressedDataSize() const
     480             : {
     481         326 :     if (_impl->state == STATE_UNCOMPRESSED ||
     482         167 :         _impl->state == STATE_UNCOMPRESSIBLE || !_impl->compressor)
     483             :     {
     484         151 :         return 0;
     485             :     }
     486           8 :     return _impl->compressedDataSize + _impl->getNumChunks() * sizeof(uint64_t);
     487             : }
     488             : 
     489          20 : std::ostream& DataOStream::printStatistics(std::ostream& os)
     490             : {
     491             :     return os << "DataOStream "
     492             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     493          20 :               << "compressed " << nBytesIn << " -> " << nBytesOut << " of "
     494          40 :               << nBytes << " @ "
     495          20 :               << int(float(nBytesIn) / 1.024f / 1.024f / compressionTime + .5f)
     496          60 :               << " MB/s " << compressionRuns << " runs, saved " << nBytesSaved
     497          40 :               << " of " << nBytesSent << " brutto sent ("
     498          40 :               << double(nBytesSaved) / double(nBytesSent) * 100. << "%)";
     499             : #else
     500             :               << "without statistics enabled";
     501             : #endif
     502             : }
     503             : 
     504          20 : void DataOStream::clearStatistics()
     505             : {
     506             : #ifdef CO_INSTRUMENT_DATAOSTREAM
     507          20 :     nBytes = 0;
     508          20 :     nBytesIn = 0;
     509          20 :     nBytesOut = 0;
     510          20 :     nBytesSaved = 0;
     511          20 :     nBytesSent = 0;
     512          20 :     compressionTime = 0;
     513          20 :     compressionRuns = 0;
     514             : #endif
     515          20 : }
     516          63 : }

Generated by: LCOV version 1.11