Line data Source code
1 :
2 : /* Copyright (c) 2011-2014, Stefan Eilemann <eile@eyescale.ch>
3 : * 2011, Carsten Rohn <carsten.rohn@rtt.ag>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "queueSlave.h"
23 :
24 : #include "buffer.h"
25 : #include "commandQueue.h"
26 : #include "dataIStream.h"
27 : #include "global.h"
28 : #include "objectOCommand.h"
29 : #include "objectICommand.h"
30 : #include "queueCommand.h"
31 : #include "exception.h"
32 :
33 : namespace co
34 : {
35 : namespace detail
36 : {
37 1 : class QueueSlave
38 : {
39 : public:
40 1 : QueueSlave( const uint32_t mark, const uint32_t amount)
41 : : masterInstanceID( CO_INSTANCE_ALL )
42 : , prefetchMark( mark == LB_UNDEFINED_UINT32 ?
43 1 : Global::getIAttribute( Global::IATTR_TILE_QUEUE_MIN_SIZE ) :
44 : mark )
45 : , prefetchAmount( amount == LB_UNDEFINED_UINT32 ?
46 1 : Global::getIAttribute( Global::IATTR_TILE_QUEUE_REFILL ) :
47 3 : amount )
48 1 : {}
49 :
50 : co::CommandQueue queue;
51 : NodePtr master;
52 : uint32_t masterInstanceID;
53 :
54 : const uint32_t prefetchMark;
55 : const uint32_t prefetchAmount;
56 : };
57 : }
58 :
59 1 : QueueSlave::QueueSlave( const uint32_t prefetchMark,
60 : const uint32_t prefetchAmount )
61 1 : : _impl( new detail::QueueSlave( prefetchMark, prefetchAmount ))
62 1 : {}
63 :
64 3 : QueueSlave::~QueueSlave()
65 : {
66 1 : delete _impl;
67 2 : }
68 :
69 1 : void QueueSlave::attach( const uint128_t& id, const uint32_t instanceID )
70 : {
71 1 : Object::attach(id, instanceID);
72 1 : registerCommand( CMD_QUEUE_ITEM, CommandFunc<Object>(0, 0), &_impl->queue );
73 1 : registerCommand( CMD_QUEUE_EMPTY, CommandFunc<Object>(0, 0), &_impl->queue);
74 1 : }
75 :
76 1 : void QueueSlave::applyInstanceData( co::DataIStream& is )
77 : {
78 1 : NodeID masterNodeID;
79 1 : is >> _impl->masterInstanceID >> masterNodeID;
80 :
81 1 : LBASSERT( masterNodeID != 0 );
82 1 : LBASSERT( !_impl->master );
83 1 : LocalNodePtr localNode = getLocalNode();
84 1 : _impl->master = localNode->connect( masterNodeID );
85 1 : }
86 :
87 5 : ObjectICommand QueueSlave::pop( const uint32_t timeout )
88 : {
89 5 : static lunchbox::a_int32_t _request;
90 5 : const int32_t request = ++_request;
91 :
92 : while( true )
93 : {
94 5 : const size_t queueSize = _impl->queue.getSize();
95 5 : if( queueSize <= _impl->prefetchMark )
96 : {
97 : send( _impl->master, CMD_QUEUE_GET_ITEM, _impl->masterInstanceID )
98 5 : << _impl->prefetchAmount << getInstanceID() << request;
99 : }
100 :
101 5 : ObjectICommand cmd( _impl->queue.pop( timeout ));
102 5 : if( !cmd.isValid( ))
103 : {
104 0 : LBERROR << "Timeout during QueueSlave::pop()" << std::endl;
105 0 : return ObjectICommand( 0, 0, 0, false );
106 : }
107 :
108 5 : switch( cmd.getCommand( ))
109 : {
110 : case CMD_QUEUE_ITEM:
111 4 : return ObjectICommand( cmd );
112 :
113 : default:
114 0 : LBUNIMPLEMENTED;
115 : case CMD_QUEUE_EMPTY:
116 1 : if( cmd.get< int32_t >() == request )
117 1 : return ObjectICommand( 0, 0, 0, false );
118 : // else left-over or not our empty command, discard and retry
119 0 : break;
120 : }
121 0 : }
122 : }
123 :
124 66 : }
|