Line data Source code
1 :
2 : /* Copyright (c) 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
3 : * 2013, Stefan.Eilemann@epfl.ch
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "oCommand.h"
22 :
23 : #include "buffer.h"
24 : #include "iCommand.h"
25 :
26 : namespace co
27 : {
28 : namespace detail
29 : {
30 :
31 72160 : class OCommand
32 : {
33 : public:
34 72160 : OCommand( co::Dispatcher* const dispatcher_, LocalNodePtr localNode_ )
35 : : isLocked( false )
36 : , size( 0 )
37 : , dispatcher( dispatcher_ )
38 72160 : , localNode( localNode_ )
39 72160 : {}
40 :
41 : bool isLocked;
42 : uint64_t size;
43 : co::Dispatcher* const dispatcher;
44 : LocalNodePtr localNode;
45 : };
46 :
47 : }
48 :
49 72127 : OCommand::OCommand( const Connections& receivers, const uint32_t cmd,
50 : const uint32_t type )
51 : : DataOStream()
52 72127 : , _impl( new detail::OCommand( 0, 0 ))
53 : {
54 72127 : _setupConnections( receivers );
55 72127 : _init( cmd, type );
56 72127 : }
57 :
58 33 : OCommand::OCommand( Dispatcher* const dispatcher, LocalNodePtr localNode,
59 : const uint32_t cmd, const uint32_t type )
60 : : DataOStream()
61 33 : , _impl( new detail::OCommand( dispatcher, localNode ))
62 : {
63 33 : _init( cmd, type );
64 33 : }
65 :
66 0 : OCommand::OCommand( const OCommand& rhs )
67 : : DataOStream( const_cast< OCommand& >( rhs ))
68 0 : , _impl( new detail::OCommand( *rhs._impl ))
69 : {
70 0 : }
71 :
72 144320 : OCommand::~OCommand()
73 : {
74 72160 : if( _impl->isLocked )
75 : {
76 165 : LBASSERT( _impl->size > 0 );
77 165 : const uint64_t size = _impl->size + getBuffer().getSize();
78 165 : const Connections& connections = getConnections();
79 165 : if( size < COMMAND_MINSIZE ) // Fill send to minimal size
80 : {
81 152 : const size_t delta = COMMAND_MINSIZE - size;
82 152 : void* padding = alloca( delta );
83 912 : for( ConnectionsCIter i = connections.begin();
84 608 : i != connections.end(); ++i )
85 : {
86 152 : ConnectionPtr connection = *i;
87 152 : connection->send( padding, delta, true );
88 152 : }
89 : }
90 990 : for( ConnectionsCIter i = connections.begin();
91 660 : i != connections.end(); ++i )
92 : {
93 165 : ConnectionPtr connection = *i;
94 165 : connection->unlockSend();
95 165 : }
96 165 : _impl->isLocked = false;
97 165 : _impl->size = 0;
98 165 : reset();
99 : }
100 : else
101 71995 : disable();
102 :
103 72160 : if( _impl->dispatcher )
104 : {
105 33 : LBASSERT( _impl->localNode );
106 :
107 : // #145 proper local command dispatch?
108 33 : LBASSERT( _impl->size == 0 );
109 33 : const uint64_t size = getBuffer().getSize();
110 33 : BufferPtr buffer = _impl->localNode->allocBuffer( size );
111 33 : buffer->swap( getBuffer( ));
112 33 : reinterpret_cast< uint64_t* >( buffer->getData( ))[ 0 ] = size;
113 :
114 66 : ICommand cmd( _impl->localNode, _impl->localNode, buffer, false );
115 66 : _impl->dispatcher->dispatchCommand( cmd );
116 : }
117 :
118 72160 : delete _impl;
119 72160 : }
120 :
121 72160 : void OCommand::_init( const uint32_t cmd, const uint32_t type )
122 : {
123 : #ifndef COLLAGE_BIGENDIAN
124 : // big endian hosts swap handshake commands to little endian...
125 72160 : LBASSERTINFO( cmd < CMD_NODE_MAXIMUM, std::hex << "0x" << cmd << std::dec );
126 : #endif
127 72160 : enableSave();
128 72160 : _enable();
129 72160 : *this << 0ull /* size */ << type << cmd;
130 72160 : }
131 :
132 165 : void OCommand::sendHeader( const uint64_t additionalSize )
133 : {
134 165 : LBASSERT( !_impl->dispatcher );
135 165 : LBASSERT( !_impl->isLocked );
136 165 : LBASSERT( additionalSize > 0 );
137 :
138 165 : const Connections& connections = getConnections();
139 330 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
140 : {
141 165 : ConnectionPtr connection = *i;
142 165 : connection->lockSend();
143 165 : }
144 165 : _impl->isLocked = true;
145 165 : _impl->size = additionalSize;
146 165 : flush( true );
147 165 : }
148 :
149 3 : size_t OCommand::getSize()
150 : {
151 3 : return sizeof( uint64_t ) + sizeof( uint32_t ) + sizeof( uint32_t );
152 : }
153 :
154 72126 : void OCommand::sendData( const void* buffer LB_UNUSED, const uint64_t size,
155 : const bool last LB_UNUSED )
156 : {
157 72126 : LBASSERT( !_impl->dispatcher );
158 72126 : LBASSERT( last );
159 72126 : LBASSERTINFO( size >= 16, size );
160 72126 : LBASSERT( getBuffer().getData() == buffer );
161 72126 : LBASSERT( getBuffer().getSize() == size );
162 72126 : LBASSERT( getBuffer().getMaxSize() >= COMMAND_MINSIZE );
163 :
164 : // Update size field
165 72126 : uint8_t* bytes = getBuffer().getData();
166 72126 : reinterpret_cast< uint64_t* >( bytes )[ 0 ] = _impl->size + size;
167 72126 : const uint64_t sendSize = _impl->isLocked ? size : LB_MAX( size,
168 : COMMAND_MINSIZE);
169 72126 : const Connections& connections = getConnections();
170 144252 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
171 : {
172 72126 : ConnectionPtr connection = *i;
173 72126 : if ( connection )
174 72126 : connection->send( bytes, sendSize, _impl->isLocked );
175 : else
176 0 : LBERROR << "Can't send data, node is closed" << std::endl;
177 72126 : }
178 72126 : }
179 :
180 60 : }
|