Line data Source code
1 :
2 : /* Copyright (c) 2011-2014, Stefan Eilemann <eile@eyescale.ch>
3 : * 2011, Carsten Rohn <carsten.rohn@rtt.ag>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "queueMaster.h"
23 :
24 : #include "dataOStream.h"
25 : #include "objectICommand.h"
26 : #include "objectOCommand.h"
27 : #include "queueCommand.h"
28 : #include "queueItem.h"
29 :
30 : #include <lunchbox/buffer.h>
31 : #include <lunchbox/mtQueue.h>
32 :
33 : namespace co
34 : {
35 :
36 : namespace detail
37 : {
38 :
39 : class ItemBuffer : public lunchbox::Bufferb, public lunchbox::Referenced
40 : {
41 : public:
42 4 : ItemBuffer( lunchbox::Bufferb& from )
43 : : lunchbox::Bufferb( from )
44 4 : , lunchbox::Referenced()
45 4 : {}
46 :
47 8 : ~ItemBuffer()
48 8 : {}
49 : };
50 :
51 : typedef lunchbox::RefPtr< ItemBuffer > ItemBufferPtr;
52 :
53 2 : class QueueMaster : public co::Dispatcher
54 : {
55 : public:
56 1 : QueueMaster( const co::QueueMaster& parent )
57 : : co::Dispatcher()
58 1 : , _parent( parent )
59 1 : {}
60 :
61 : /** The command handler functions. */
62 5 : bool cmdGetItem( co::ICommand& comd )
63 : {
64 5 : co::ObjectICommand command( comd );
65 :
66 5 : const uint32_t itemsRequested = command.get< uint32_t >();
67 5 : const uint32_t slaveInstanceID = command.get< uint32_t >();
68 5 : const int32_t requestID = command.get< int32_t >();
69 :
70 : typedef std::vector< ItemBufferPtr > Items;
71 10 : Items items;
72 5 : queue.tryPop( itemsRequested, items );
73 :
74 10 : Connections connections( 1, command.getNode()->getConnection( ));
75 9 : for( Items::const_iterator i = items.begin(); i != items.end(); ++i )
76 : {
77 : co::ObjectOCommand cmd( connections, CMD_QUEUE_ITEM,
78 4 : COMMANDTYPE_OBJECT, _parent.getID(),
79 4 : slaveInstanceID );
80 :
81 8 : const ItemBufferPtr item = *i;
82 4 : if( !item->isEmpty( ))
83 3 : cmd << Array< const void >( item->getData(), item->getSize( ));
84 4 : }
85 :
86 5 : if( itemsRequested > items.size( ))
87 : co::ObjectOCommand( connections, CMD_QUEUE_EMPTY,
88 1 : COMMANDTYPE_OBJECT, command.getObjectID(),
89 1 : slaveInstanceID ) << requestID;
90 10 : return true;
91 : }
92 :
93 : typedef lunchbox::MTQueue< ItemBufferPtr > ItemQueue;
94 :
95 : ItemQueue queue;
96 :
97 : private:
98 : const co::QueueMaster& _parent;
99 : };
100 : }
101 :
102 1 : QueueMaster::QueueMaster()
103 : #pragma warning(push)
104 : #pragma warning(disable: 4355)
105 1 : : _impl( new detail::QueueMaster( *this ))
106 : #pragma warning(pop)
107 : {
108 1 : }
109 :
110 3 : QueueMaster::~QueueMaster()
111 : {
112 1 : clear();
113 1 : delete _impl;
114 2 : }
115 :
116 1 : void QueueMaster::attach( const uint128_t& id, const uint32_t instanceID )
117 : {
118 1 : Object::attach( id, instanceID );
119 :
120 1 : CommandQueue* queue = getLocalNode()->getCommandThreadQueue();
121 : registerCommand( CMD_QUEUE_GET_ITEM,
122 : CommandFunc< detail::QueueMaster >(
123 1 : _impl, &detail::QueueMaster::cmdGetItem ), queue );
124 1 : }
125 :
126 1 : void QueueMaster::clear()
127 : {
128 1 : _impl->queue.clear();
129 1 : }
130 :
131 1 : void QueueMaster::getInstanceData( co::DataOStream& os )
132 : {
133 1 : os << getInstanceID() << getLocalNode()->getNodeID();
134 1 : }
135 :
136 4 : QueueItem QueueMaster::push()
137 : {
138 4 : return QueueItem( *this );
139 : }
140 :
141 4 : void QueueMaster::_addItem( QueueItem& item )
142 : {
143 4 : detail::ItemBufferPtr newBuffer = new detail::ItemBuffer( item.getBuffer());
144 4 : _impl->queue.push( newBuffer );
145 4 : }
146 :
147 60 : } // co
|