CFEL - ASG Software Suite  2.5.0
CASS
tcpserver.cpp
Go to the documentation of this file.
1 // Copyright (C) 2010 Jochen Küpper
2 // Copyright (C) 2011 - 2013 Lutz Foucar
3 
4 /**
5  * @file tcpserver.cpp the soap server implementation
6  *
7  * @author Jochen Kuepper
8  */
9 
10 #include <stdexcept>
11 #include <deque>
12 
13 #include <QtCore/QReadLocker>
14 #include <QtCore/QBuffer>
15 #include <QtCore/QByteArray>
16 #include <QtCore/QQueue>
17 #include <QtGui/QColor>
18 #include <QtGui/QImage>
19 #include <QThreadPool>
20 
21 #include "tcpserver.h"
22 
23 #include "CASSsoap.nsmap"
24 #include "processor_manager.h"
25 #include "processor.h"
26 #include "id_list.h"
27 #include "cass_exceptions.hpp"
28 #include "input_base.h"
29 #include "worker.h"
30 #include "result.hpp"
31 #include "common_data.h"
32 #include "log.h"
33 
34 
35 using namespace cass;
36 using namespace std;
37 using namespace tr1;
38 
39 //==========DEFINE static variables=========================
42 const size_t SoapServer::_backlog(100);
43 
45 {
46  QMutexLocker locker(&_mutex);
47  if(!_instance)
48  _instance = shared_pointer(new SoapServer(port));
49  return _instance;
50 }
51 //==========================================================
52 
53 
54 
56 {
57  _soap->serve(); // serve request
58  _soap->destroy(); // dealloc C++ data, dealloc data and clean up (destroy + end)
59 }
60 
61 SoapServer::SoapServer(size_t port, QObject *parent)
62  : QThread(parent),
63  _soap(new CASSsoapService),
64  _port(port)
65 {
66  Log::add(Log::INFO,"SoapServer starting on port '" + toString(_port) +"'");
67 }
68 
70 {
71  if(isRunning())
72  terminate();
73  wait();
74  _soap->destroy();
75  delete _soap;
76 }
77 
79 {
80  // define timeouts and such for individual requests
81  _soap->send_timeout = 60; // 60 seconds
82  _soap->recv_timeout = 60; // 60 seconds
83  _soap->accept_timeout = 0; // server never stops accepting
84  _soap->max_keep_alive = 1000; // max keep-alive sequence
85  // allow immediate re-use of address/socket
86  _soap->bind_flags = SO_REUSEADDR;
87  // start SOAP
88  if(SOAP_INVALID_SOCKET == _soap->bind(NULL, _port, _backlog))
89  {
90  Log::add(Log::ERROR,"SoapServer::run(): Can't connect to socket on port '" +
91  toString(_port) +"' for SOAP connection, Quitting the soap server");
92  return;
93  }
94  while(true)
95  {
96  if(SOAP_INVALID_SOCKET == _soap->accept())
97  {
98  if(_soap->errnum)
99  {
100  _soap->soap_stream_fault(cerr);
101  Log::add(Log::ERROR,"SoapServer::run(): No valid socket for SOAP connection");
102  }
103  else
104  Log::add(Log::ERROR,"SoapServer::run(): Server timeout for SOAP connection");
105  break;
106  }
107  CASSsoapService *tsoap(_soap->copy()); // make a safe copy
108  if(! tsoap)
109  break;
110  SoapHandler *handler(new SoapHandler(tsoap));
111  QThreadPool::globalInstance()->start(handler); // SoapHandler::setAutoDelete() is set by default.
112  }
113 }
114 
115 /** quits CASS by telling the input to end */
116 int CASSsoapService::quit(bool *success)
117 {
118  Log::add(Log::VERBOSEINFO,"CASSsoapService::quit");
119  InputBase::reference().end();
120  *success = true;;
121  return SOAP_OK;
122 }
123 
124 /** will call the load settings members of input and the workers
125  *
126  * first pause the input and the workers. Then call the load settings. then
127  * resume the threads.
128  */
129 int CASSsoapService::readini(size_t what, bool *success)
130 {
131  Log::add(Log::VERBOSEINFO,"CASSsoapService::readini(what=" + toString(what) + ")");
133  QMutexLocker inputLock(&InputBase::reference().lock);
134  QMutexLocker workerLock(&Workers::reference().lock);
135  InputBase::reference().pause(true);
136  Workers::reference().pause();
137  InputBase::reference().load();
138  QWriteLocker pplock(&ProcessorManager::instance()->lock);
139  ProcessorManager::reference().loadSettings(what);
140  Workers::reference().resume();
141  InputBase::reference().resume();
142  *success = true;;
143  return SOAP_OK;
144 }
145 
146 /** retrieve which processors are running
147  *
148  * get a list of keys of the running processors. Lock the processors
149  * handler lock to prevent that the list changes while its being created.
150  */
151 int CASSsoapService::getPostprocessorIds(bool *success)
152 {
155  tr1::shared_ptr<IdList> keys(ProcessorManager::reference().keys());
156  std::tr1::shared_ptr<Serializer> ser(new Serializer);
157  keys->serialize(*ser);
158  soap_set_dime(this);
159  std::tr1::shared_ptr<string> datstr(new string(ser->buffer()));
160  int result = soap_set_dime_attachment(this, (char*) datstr->data(),datstr->size(), "application/processorList", "0", 0, NULL);
161  string output("CASSsoapService::getPostprocessorIds: Sending the following names:");
162  for (Processor::names_t::const_iterator it(keys->getList().begin());
163  it != keys->getList().end(); ++it)
164  output += *it + ", ";
165  output += " size of serializer: " + toString(datstr->size());
166  Log::add(Log::DEBUG4,output);
167  queue.enqueue(datstr);
168  if(100 < queue.size())
169  queue.dequeue();
170  *success = true;
171  return result;
172 }
173 
174 /** will call savesettings members of workers
175  *
176  * pause the workers before calling this function then call savesettings for
177  * analyzers and processors, then resume the worker threads Prevent others
178  * from retrieving data from the processors by locking it
179  */
180 int CASSsoapService::writeini(size_t /*what*/, bool */*success*/)
181 {
182 // Log::add(Log::VERBOSEINFO,"CASSsoapService::readini(what=" + toString(what) + ")");
183 // QMutexLocker workerLock(&Workers::reference().lock);
184 // Workers::reference().pause();
185 // Analyzer::instance()->saveSettings();
186 // QWriteLocker pplock(&PostProcessors::instance()->lock);
187 // PostProcessors::instance()->saveSettings();
188 // Workers::reference().resume();
189 // InputBase::reference().resume();
190 // *success = false;;
191  return SOAP_FATAL_ERROR;
192  throw logic_error("CASSsoapService::writeini: should not be used anymore");
193 
194 }
195 
196 /** clear the selected processors histogram list
197  *
198  * lock the processors handler processor list and retrieve the
199  * requested processor from it. Tell it to clear its histogram list.
200  */
201 int CASSsoapService::clearHistogram(const ProcessorManager::key_t &type, bool *success)
202 {
203  Log::add(Log::VERBOSEINFO,"CASSsoapService::clearHistogram(type=" + type + ")");
204  QWriteLocker pplock(&ProcessorManager::instance()->lock);
205  try
206  {
207  ProcessorManager::instance()->getProcessor(type).clearHistograms();
208  *success = true;;
209  return SOAP_OK;
210  }
211  catch(const InvalidProcessorError&)
212  {
213  *success = false;
214  return SOAP_FATAL_ERROR;
215  }
216 }
217 
218 /** control the darkcal calibration
219  *
220  * will tell the map creators of all defined pixeldetectors to start calibrating
221  */
222 int CASSsoapService::controlDarkcal(const string &controlCommand, bool *success)
223 {
224  try
225  {
229  ProcessorManager::processors_t::iterator iter(processors.begin());
230  ProcessorManager::processors_t::iterator end(processors.end());
231  while( iter != end )
232  (*iter++)->processCommand(controlCommand);
233  *success = true;
234  return SOAP_OK;
235  }
236  catch(const invalid_argument &)
237  {
238  *success = false;
239  return SOAP_FATAL_ERROR;
240  }
241 }
242 
243 /** pass the string on to the requested processor
244  *
245  * lock the processor handler list then retrieve the requested processor
246  * and pass the string to process to it.
247  */
248 int CASSsoapService::receiveCommand(const ProcessorManager::key_t &type, const string &command, bool *success)
249 {
250  Log::add(Log::VERBOSEINFO,"CASSsoapService::receiveCommand: command '" +
251  command + "' is for processor " + type );
252  QWriteLocker pplock(&ProcessorManager::instance()->lock);
253  try
254  {
255  ProcessorManager::instance()->getProcessor(type).processCommand(command);
256  *success = true;;
257  return SOAP_OK;
258  }
259  catch(const InvalidProcessorError&)
260  {
261  *success = false;
262  return SOAP_FATAL_ERROR;
263  }
264 }
265 
266 int CASSsoapService::getEvent(size_t /*eventID*/, unsigned /*t1*/, unsigned /*t2*/, bool */*success*/)
267 {
268 // Log::add(Log::VERBOSEINFO,"CASSsoapService::getEvent with id "+ toString(eventId));
269 // static QQueue<shared_ptr<string> > queue;
270 // Serializer serializer;
271  // get event somehow
272 // event.serialize(serializer);
273 // shared_ptr<string> data(new serializer.buffer())
274 // queue.enqueue(data);
275 // if(10 < queue.size())
276 // queue.dequeue();
277 // *success = true;
278 // soap_set_dime(this); // enable dime
279 // return soap_set_dime_attachment(this, (char *)data->data(), data->size(), "application/cassevent",
280 // QString::number(type).toStdString().c_str(), 0, NULL);
281  throw runtime_error("CASSsoapService::getEvent: has not yet been properly implemented");
282 }
283 
284 /** get the the requested histogram and return it as dime attachement
285  *
286  * lock the processor handlers list by using the lock. then get a copy of
287  * the requested histogram and serialize it. The mimetype of the dime is
288  * determined by the dimension of the histogram.
289  *
290  * The serialized data is enqueued to make sure that the data is still around
291  * also after the scope of this function is left since it might be transferred
292  * after this funtion returned.
293  */
294 int CASSsoapService::getHistogram(const ProcessorManager::key_t& type, ULONG64 eventId, bool *success)
295 {
297  QWriteLocker pplock(&ProcessorManager::instance()->lock);
298  try
299  {
300  // get data
302  (ProcessorManager::reference().getProcessor(type).resultCopy(eventId));
303  Serializer serializer(0);
304  const size_t dim(result->dim());
305  serializer << *result;
306  std::tr1::shared_ptr<pair<size_t, string> >data(
307  new pair<size_t, string>(make_pair(dim,serializer.buffer())));
308  // MIME type
309  string mimetype;
310  switch(data->first)
311  {
312  case 0:
313  mimetype = string("application/cass0Dhistogram");
314  break;
315  case 1:
316  mimetype = string("application/cass1Dhistogram");
317  break;
318  case 2:
319  mimetype = string("application/cass2Dhistogram");
320  break;
321  default:
322  mimetype = string("application/casshistogram");
323  break;
324  }
325  // keep bytes around for a while -- this should mitigate the "zeros" problem
326  queue.enqueue(data);
327  if(200 < queue.size())
328  queue.dequeue();
329  // answer request
330  *success = true;
331  soap_set_dime(this);
332  Log::add(Log::DEBUG4,"CASSsoapService::getHistogram " + type +
333  " from event " + toString(eventId) + ", the serialized size is "
334  + toString(data->second.size()));
335  return soap_set_dime_attachment(this, (char *)data->second.data(),
336  data->second.size(), mimetype.c_str(),
337  type.c_str(), 0, NULL);
338  }
339  catch(const InvalidResultError& error)
340  {
341  Log::add(Log::ERROR,string("CASSsoapService::getHistogram: ") + error.what());
342  *success = false;
343  return SOAP_FATAL_ERROR;
344  }
345  catch(const InvalidProcessorError& error)
346  {
347  Log::add(Log::ERROR,string("CASSsoapService::getHistogram: ") + error.what());
348  *success = false;
349  return SOAP_FATAL_ERROR;
350  }
351 }
352 
353 int CASSsoapService::getResults(bool sameEventID, bool *success)
354 {
355  static deque<string> cache;
356  try
357  {
358  /** get the list of requested processors */
359  soap_multipart::iterator attachment(this->dime.begin());
360  cout << "TCPServer: DIME attachment:" << endl
361  << " TCPServer: Memory=" << (void*)(*attachment).ptr << endl
362  << " TCPServer: Size=" << (*attachment).size << endl
363  << " TCPServer: Type=" << ((*attachment).type?(*attachment).type:"null") << endl
364  << " TCPServer: ID=" << ((*attachment).id?(*attachment).id:"null") << endl;
365  Serializer deserializer(string((char*)(*attachment).ptr, (*attachment).size));
366  IdList list(deserializer);
367 
368  /** get the result and serialize them */
369  Serializer serializer;
371  Processor::names_t::const_iterator it(list.getList().begin());
372  id_t eventId(0);
373  while (it != list.getList().end())
374  {
375  result = ProcessorManager::reference().getProcessor(*it).resultCopy(eventId);
376  /** if the same id is requested for all events and it is the first event
377  * remember the event id of this event
378  */
379  if (sameEventID && it == list.getList().begin())
380  eventId = 0;
381  serializer << *result;
382  ++it;
383  }
384  Log::add(Log::VERBOSEINFO,"CASSsoapService::getResults ");
385 
386  /** get the serialized data and store it in a cache to prevent the data from
387  * being deleted before completely delivered
388  */
389  string data(serializer.buffer());
390  cache.push_back(data);
391  if (cache.size() > 100)
392  cache.pop_front();
393 
394  /** send the data from the cache as dime attachment */
395  *success = true;
396  soap_set_dime(this);
397  return soap_set_dime_attachment(this, (char *)cache.back().data(),
398  cache.back().size(), "cass/Result",
399  "0", 0, NULL);
400  }
401  catch(const InvalidResultError& error)
402  {
403  Log::add(Log::ERROR,string("CASSsoapService::getResults: ") + error.what());
404  *success = false;
405  return SOAP_FATAL_ERROR;
406  }
407  catch(const InvalidProcessorError& error)
408  {
409  Log::add(Log::ERROR,string("CASSsoapService::getResults: ") + error.what());
410  *success = false;
411  return SOAP_FATAL_ERROR;
412  }
413 }
file contains declaration of class Worker and Workers
Processor::name_t key_t
type of proccessor accessor key
static void loadSettings()
load the logging settings from the .ini file
Definition: log.cpp:41
enqueue(const T &t)
~SoapServer()
Destructor.
Definition: tcpserver.cpp:69
std::tr1::shared_ptr< self_type > shared_pointer
a shared pointer of this class
Definition: result.hpp:323
std::list< Processor::shared_pointer > processors_t
Container of all currently active processors.
static void controlCalibration(const std::string &command)
issue a command to the map creators of all instances
CASSsoapService * _soap
the service
Definition: tcpserver.h:105
STL namespace.
result classes
file contains custom exceptions used in cass
virtual void run()
perform thread-work
Definition: tcpserver.cpp:78
static QMutex _mutex
Singleton operation locker.
Definition: tcpserver.h:146
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
A string serializer.
Definition: serializer.hpp:287
static shared_pointer instance()
return the already created instance of this
const std::string buffer() const
retrieve a const reference to the compressed string.
Definition: serializer.hpp:329
static shared_pointer::element_type & reference()
return a reference to the instance itselve if it exists
Definition: worker.cpp:76
static shared_pointer instance()
return existing instance for our friends
Definition: tcpserver.h:96
the processors
Definition: CsPadCrystal.ini:6
contains the manager for the processors
std::tr1::shared_ptr< SoapServer > shared_pointer
a shared pointer of this class
Definition: tcpserver.h:71
Exception thrown when accessing invalid processor.
static const size_t _backlog
maximum backlog of open requests
Definition: tcpserver.h:108
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
auxiliary data[Processor]
file contains processors baseclass declaration
Handle a single SOAP request.
Definition: tcpserver.h:33
contains the base class for all input modules
static shared_pointer _instance
pointer to the singleton instance
Definition: tcpserver.h:143
wait(unsigned long time=ULONG_MAX)
SoapServer()
Disabled default constructor.
contains the common data for one advanced pixeldetector
virtual void run()
handle request and terminate
Definition: tcpserver.cpp:55
static shared_pointer::element_type & reference()
return a reference to this instance
Exception thrown when accessing invalid histogram.
static shared_pointer::element_type & reference()
get reference to the singelton instance
Definition: input_base.cpp:28
the soap server implementation
contains a logger for cass
const size_t _port
server port
Definition: tcpserver.h:111
SOAP server.
Definition: tcpserver.h:64