CFEL - ASG Software Suite  2.5.0
CASS
zmq_input.cpp
Go to the documentation of this file.
1 // Copyright (C) 2017 Lutz Foucar
2 
3 /**
4  * @file zmq_input.cpp contains input that uses ZMQ as interface
5  *
6  * @author Lutz Foucar
7  */
8 
9 #include <iostream>
10 #include <zmq.hpp>
11 #include <msgpack.hpp>
12 
13 #include "zmq_input.h"
14 
15 #include "cass_settings.h"
16 #include "cass_exceptions.hpp"
17 #include "log.h"
18 
19 #include "acqiris_device.hpp"
20 #include "pixeldetector.hpp"
21 #include "machine_device.hpp"
22 
23 using namespace cass;
24 using namespace std;
25 
26 
28  Ratemeter &ratemeter,
29  Ratemeter &loadmeter,
30  bool quitwhendone,
31  QObject *parent)
32 {
33  if(_instance)
34  throw logic_error("ZMQInput::instance(): The instance of the base class is already initialized");
35  _instance = shared_pointer(new ZMQInput(buffer,ratemeter,loadmeter,quitwhendone,parent));
36 }
37 
39  Ratemeter &ratemeter,
40  Ratemeter &loadmeter,
41  bool quitwhendone,
42  QObject *parent)
43  : InputBase(ringbuffer,ratemeter,loadmeter,parent),
44  _quitWhenDone(quitwhendone),
45  _counter(0),
46  _scounter(0)
47 {}
48 
49 /** define a structure that holds information about how to parse and extract
50  * the info contained in a msgpack object.
51  *
52  * @tparam type the type of data wrapped in the msgpack object as string
53  * @param out reference to the vector where the data will be written to
54  * @param obj the msgpack object who's payload will be written to the out vector
55  *
56  * @author Lutz Foucar
57  */
58 struct Info
59 {
60  /** clear the info's data from the msgpack */
61  void clear()
62  {
63  data.clear();
64  shape.clear();
65  }
66 
67  /** flag that tell whether data is per bunch or per train */
68  bool isPerTrain;
69 
70  /** the value name within the beamlinedata of the CASSEvent */
71  std::string CASSValueName;
72 
73  /** what type of data within the CASSEvent does this data belong to */
74  std::string CASSDeviceType;
75 
76  /** in case there multiple devices available, this will tell which device
77  * in the list of devices this data should belong to
78  */
79  int CASSID;
80 
81  /** the number of pixels of one image within the data */
82  size_t nPixels;
83 
84  /** the number of columns of one image within the data */
85  size_t nCols;
86 
87  /** the number of rows of one image within the data */
88  size_t nRows;
89 
90  /** the parsed data */
91  std::vector<float> data;
92 
93  /** in case its multidimensional data, this contains the shape of the data */
94  std::vector<int> shape;
95 };
96 
97 typedef std::map<std::string,Info> extractmap_t;
98 
99 
100 /** read the string like payload of an msgpack object into an vector of floats
101  *
102  * @tparam type the type of data wrapped in the msgpack object as string
103  * @param out reference to the vector where the data will be written to
104  * @param obj the msgpack object who's payload will be written to the out vector
105  *
106  * @author Lutz Foucar
107  */
108 template <typename type>
109 void readNDArrayDataAsString(std::vector<float>& out, const msgpack::object &obj)
110 {
111  const type *d(reinterpret_cast<const type*>(obj.via.str.ptr));
112  size_t payloadsize(obj.via.str.size);
113  out.assign(d,d+payloadsize/sizeof(type));
114 }
115 
116 /** read the binary payload of an msgpack object into an vector of floats
117  *
118  * @tparam type the type of data wrapped in the msgpack object as string
119  * @param out reference to the vector where the data will be written to
120  * @param obj the msgpack object who's payload will be written to the out vector
121  *
122  * @author Lutz Foucar
123  */
124 template <typename type>
125 void readNDArrayDataAsBinary(std::vector<float>& out, const msgpack::object &obj)
126 {
127  const type *d(reinterpret_cast<const type*>(obj.via.bin.ptr));
128  size_t payloadsize(obj.via.bin.size);
129  out.assign(d,d+payloadsize/sizeof(type));
130 }
131 
132 bool iterate(const msgpack::object &obj, int depth,
133  extractmap_t& emap, string acckey="")
134 {
135  typedef map<string,msgpack::object> m_t;
136  m_t m(obj.as<m_t>());
137 
138  /** just go through the msgpack object and */
139  for (m_t::iterator it(m.begin()); it!= m.end();++it)
140  {
141 // for (extractmap_t::const_iterator eit(emap.begin()); eit != emap.end(); ++eit)
142 // cout << eit->first << " " << eit->second.shape.size()<<endl;
143  string flattenkey(acckey);
144  /** separate the keys of the nested dictionaries by a '$' character */
145  if(!flattenkey.empty())
146  flattenkey.append("$");
147  flattenkey.append(it->first);
148  /** check if we're interested in the data */
149  if ((emap.find(flattenkey) != emap.end()))
150  {
151  //cout << flattenkey <<" found it!!!"<<endl;
152  /** get a reference to the info that will now attempt to fill. */
153  Info &info(emap[flattenkey]);
154  /** check if its ndarray data */
155  if (it->second.type == msgpack::type::MAP)
156  {
157  m_t mp(it->second.as<m_t>());
158  if ((mp.find("nd") != m.end()) &&
159  (mp["nd"].type == msgpack::type::BOOLEAN) &&
160  (mp["nd"].as<bool>()) &&
161  (mp.find("data") != m.end()) &&
162  (mp.find("type") != m.end()) &&
163  (mp["type"].type == msgpack::type::STR) &&
164  (mp.find("shape") != m.end()) &&
165  (mp["shape"].type == msgpack::type::ARRAY))
166  {
167  /** if it is then extract the shape and the data according to the
168  * type of the data an how it is packed
169  */
170  mp["shape"].convert(info.shape);
171  //cout <<flattenkey<< " "<<info.shape.size()<<endl;
172  if (mp["data"].type == msgpack::type::STR)
173  {
174  if (mp["type"].as<string>() == "<f4")
175  {
176  readNDArrayDataAsString<float>(info.data,mp["data"]);
177  }
178  else if (mp["type"].as<string>() == "<f8")
179  {
180  readNDArrayDataAsString<double>(info.data,mp["data"]);
181  }
182  else if (mp["type"].as<string>() == "<u8")
183  {
184  readNDArrayDataAsString<uint64_t>(info.data,mp["data"]);
185  }
186  else if (mp["type"].as<string>() == "<u2")
187  {
188  readNDArrayDataAsString<uint16_t>(info.data,mp["data"]);
189  }
190  else if (mp["type"].as<string>() == "|u1")
191  {
192  readNDArrayDataAsString<uint8_t>(info.data,mp["data"]);
193  }
194  else
195  {
196  Log::add(Log::WARNING,"ZMQInput::ParseMSGPACKObject: '" + flattenkey +
197  "': The type '" + (mp["type"].as<string>()) +
198  "' of the string type ndarray data is not " +
199  "supported.");
200  }
201  }
202  else if (m["data"].type == msgpack::type::BIN)
203  {
204  if (mp["type"].as<string>() == "<f4")
205  {
206  readNDArrayDataAsBinary<float>(info.data,mp["data"]);
207  }
208  else if (mp["type"].as<string>() == "<f8")
209  {
210  readNDArrayDataAsBinary<double>(info.data,mp["data"]);
211  }
212  else if (mp["type"].as<string>() == "<u8")
213  {
214  readNDArrayDataAsBinary<uint64_t>(info.data,mp["data"]);
215  }
216  else if (mp["type"].as<string>() == "<u2")
217  {
218  readNDArrayDataAsBinary<uint16_t>(info.data,mp["data"]);
219  }
220  else if (mp["type"].as<string>() == "|u1")
221  {
222  readNDArrayDataAsBinary<uint8_t>(info.data,mp["data"]);
223  }
224  else
225  {
226  Log::add(Log::WARNING,"ZMQInput::ParseMSGPACKObject: '" + flattenkey +
227  "': The type '" + (mp["type"].as<string>()) +
228  "' of the binary type ndarray data is not " +
229  "supported.");
230  }
231  }
232  //cout << flattenkey << " " << info.data.size() <<endl;
233  }
234  } // end parsing the ndarray type data
235 
236  /** check if its an array */
237  if (it->second.type == msgpack::type::ARRAY)
238  {
239  it->second.convert(info.data);
240  }
241 
242  /** check if its a single data value */
243  if (it->second.type == msgpack::type::BOOLEAN ||
244  it->second.type == msgpack::type::FLOAT32 ||
245  it->second.type == msgpack::type::FLOAT64 ||
246  it->second.type == msgpack::type::POSITIVE_INTEGER ||
247  it->second.type == msgpack::type::NEGATIVE_INTEGER)
248  {
249  info.data.resize(1);
250  it->second.convert(info.data.front());
251  }
252 
253  }//end info found in extraction map
254 
255  /** if we are not interested in the data then just check to see if its
256  * another map that we should iterate into
257  */
258  else if (it->second.type == msgpack::type::MAP)
259  {
260  iterate(it->second,depth+1,emap,flattenkey);
261  }
262  }
263 
264  return true;
265 }
266 
268 {
270 
271  CASSSettings s;
272  s.beginGroup("ZMQInput");
273  string functiontype(s.value("DataType","agat").toString().toStdString());
274  /** info specific to the zeromq server */
275  string serverAddress(s.value("ServerAddress","tcp://53.104.0.52:10000").toString().toStdString());
276  s.endGroup(); //ZMQInput
277 
278  s.beginGroup("XFELMSGPACK");
279  /** things needed to iterate through the xfel data */
280  const size_t nBunches(s.value("NbrBunchesInTrain",72).toUInt());
281  const size_t bunchOffset(s.value("BunchOffsetInTrain",4).toUInt());
282  const size_t bunchStride(s.value("BunchStrideInTrain",4).toUInt());
283 
284  /** things needed to parse the msgpack data */
285  extractmap_t emap;
286  int size = s.beginReadArray("DataFields");
287  for (int i = 0; i < size; ++i)
288  {
289  s.setArrayIndex(i);
290  /** get the name within the msgpack that should be extracted */
291  string key(s.value("Name","BAD").toString().toStdString());
292  if (key == "BAD")
293  continue;
294  /** check what kind of data this will be, skip if type of data is unkown */
295  string dev(s.value("CASSDeviceType","Unkown").toString().toLower().toStdString());
296  if ((dev != "pixeldetector") &&
297  (dev != "machinedata"))
298  {
299  Log::add(Log::INFO,"ZMQInput: DeviceType '" + dev + "' of DataField '" +
300  key + "' is unkown");
301  continue;
302  }
303  emap[key].CASSDeviceType = dev;
304  /** extract additional info that one needs to add the parsed info to the
305  * CASSEvent
306  */
307  emap[key].CASSID = s.value("CASSID",0).toInt();
308  emap[key].CASSValueName = s.value("CASSValueName","Unused").toString().toStdString();
309  emap[key].isPerTrain = s.value("IsPerTrain",false).toBool();
310  emap[key].nCols = s.value("nCols",0).toUInt();
311  emap[key].nRows = s.value("nRows",0).toUInt();
312  emap[key].nPixels = emap[key].nRows * emap[key].nCols;
313  }
314  s.endArray();//DataFields
315  s.endGroup();//XFELMSGPACK
316 
317  /** connect to the zmq socket */
318  zmq::context_t context (1);
319  zmq::socket_t sock (context, ZMQ_SUB);
320  sock.connect(serverAddress);
321  sock.setsockopt(ZMQ_SUBSCRIBE, "",0);
322  Log::add(Log::INFO,"ZMQInput: Connecting to '" + serverAddress + "'");
323  string output = "ZMQInput: Trying to retrieve:";
324  for (extractmap_t::const_iterator eit(emap.begin()); eit != emap.end(); ++eit)
325  {
326  const Info& ifo(eit->second);
327  output += " DataField '" + eit->first + "'";
328  output += " (";
329  output += "DeviceType '" + ifo.CASSDeviceType + "'";
330  output += "; CASSID '" + toString(ifo.CASSID) + "'";
331  output += "; ValueName '" + ifo.CASSValueName + "'";
332  output += "; IsPerTrain '" + string(ifo.isPerTrain?"True":"false") + "'";
333  output += "; nCols '" + toString(ifo.nCols) + "'";
334  output += "; nRows '" + toString(ifo.nRows) + "'";
335  output += "; nPixels '" + toString(ifo.nPixels) + "'";
336  output += ");";
337  }
338  Log::add(Log::INFO,output);
339 
340  /** run until the thread is told to quit */
341  Log::add(Log::DEBUG0,"ZMQInput::run(): starting loop");
342 
343  while(!shouldQuit())
344  {
345  /** here we can safely pause the execution */
346  pausePoint();
347 
348  /** now retrive new data from the socket */
349  zmq::message_t mess;
350  sock.recv(&mess);
351 
352  /** now deserialize the data from the socket */
353  size_t off(0);
354  bool success(false);
355  while(off != mess.size())
356  {
357  msgpack::object_handle objH;
358  msgpack::unpack(objH,static_cast<const char*>(mess.data()),mess.size(),off);
359  msgpack::object obj(objH.get());
360 
361  /** clear the info container */
362  extractmap_t::iterator emIter(emap.begin());
363  extractmap_t::const_iterator emIterEnd(emap.end());
364  for (; emIter != emIterEnd; ++emIter)
365  emIter->second.clear();
366  success=iterate(obj,0,emap);
367  }
368  if (!success)
369  continue;
370 
371  /** how many pixels has a detector */
372  for (size_t iBunch(bunchOffset); iBunch < nBunches; iBunch += bunchStride)
373  {
374  /** retrieve a new element from the ringbuffer, continue with next iteration
375  * in case the retrieved element is the iterator to the last element of the
376  * buffer.
377  */
378  rbItem_t rbItem(getNextFillable());
379  if (rbItem == _ringbuffer.end())
380  continue;
381  CASSEvent &evt(*rbItem->element);
382  evt.id() = _counter;
383 
384  /** get reference to all devices of the CASSEvent and an iterator*/
385  CASSEvent::devices_t &devices(evt.devices());
386  CASSEvent::devices_t::iterator devIt;
387  /** go through the list of requested infos an put them in to the event */
388  extractmap_t::const_iterator eIt(emap.begin());
389  extractmap_t::const_iterator eEnd(emap.end());
390  for (; eIt != eEnd; ++eIt)
391  {
392  const Info& ifo(eIt->second);
393  /** check if the requested data was sent */
394  if (ifo.data.empty())
395  {
396  Log::add(Log::WARNING,string("ZMQInput: There is no data for ") +
397  "datafield '" + eIt->first + "'");
398  continue;
399  }
400  if (ifo.CASSDeviceType == "pixeldetector")
401  {
402  /** output the shape of the pixeldetector */
403  string outp = eIt->first + " [";
404  for (size_t ii(0); ii < ifo.shape.size(); ++ii)
405  outp += toString(ifo.shape[ii]) + ",";
406  outp.replace(outp.size()-1,1,"]");
407  Log::add(Log::DEBUG0,outp);
408  /** retrieve the pixel detector part of the cassevent */
409  devIt = devices.find(CASSEvent::PixelDetectors);
410  if(devIt == devices.end())
411  throw runtime_error(string("ZMQInput: CASSEvent does not ") +
412  "contain a pixeldetector device");
413  pixeldetector::Device &pixdev(dynamic_cast<pixeldetector::Device&>(*(devIt->second)));
414  /** retrieve the right detector from the cassevent and reset it*/
415  pixeldetector::Detector &det(pixdev.dets()[ifo.CASSID]);
416  det.frame().clear();
417  /** get iterators to the corresponding data and advance it to the
418  * right bunch within the train
419  */
420  pixeldetector::Detector::frame_t::const_iterator detBegin(ifo.data.begin());
421  advance(detBegin,iBunch*ifo.nPixels);
422  pixeldetector::Detector::frame_t::const_iterator detEnd(detBegin);
423  advance(detEnd,ifo.nPixels);
424  /** copy the det data to the frame */
425  det.frame().assign(detBegin,detEnd);
426  /** set the additional info of the detector */
427  det.columns() = ifo.nCols;
428  det.rows() = ifo.nRows;
429  det.id() = _counter;
430  }
431  else if (ifo.CASSDeviceType == "machinedata")
432  {
433  /** retrieve the pixel detector part of the cassevent */
434  devIt = devices.find(CASSEvent::MachineData);
435  if(devIt == devices.end())
436  throw runtime_error(string("ZMQInput: CASSEvent does not ") +
437  "contain a pixeldetector device");
438  MachineData::Device &md (dynamic_cast<MachineData::Device&>(*(devIt->second)));
439  if (ifo.isPerTrain)
440  md.BeamlineData()[ifo.CASSValueName] = ifo.data[0];
441  else
442  md.BeamlineData()[ifo.CASSValueName] = ifo.data[iBunch];
443  }
444  }
445 
446  /** tell the ringbuffer that we're done with the event */
447  ++_counter;
448  _ringbuffer.doneFilling(rbItem, 1);
449  }
450  newEventAdded(mess.size());
451  }
452  Log::add(Log::INFO,"ZMQInput::run(): Quitting loop");
453 }
setArrayIndex(int i)
std::map< std::string, Info > extractmap_t
Definition: zmq_input.cpp:97
Event to store all LCLS Data.
Definition: cass_event.h:32
ZMQInput(RingBuffer< CASSEvent > &buffer, Ratemeter &ratemeter, Ratemeter &loadmeter, bool quitwhendone, QObject *parent=0)
constructor
Definition: zmq_input.cpp:38
RingBuffer< CASSEvent >::iter_type rbItem_t
define an item in the ringbuffer
Definition: input_base.h:109
class calculating a rate in Hz.
Definition: ratemeter.h:28
void readNDArrayDataAsString(std::vector< float > &out, const msgpack::object &obj)
read the string like payload of an msgpack object into an vector of floats
Definition: zmq_input.cpp:109
status_t _status
the internal status of the thread
Settings for CASS.
Definition: cass_settings.h:30
detectors_t & dets()
instrument setter
void runthis()
starts the thread
Definition: zmq_input.cpp:267
std::string CASSDeviceType
what type of data within the CASSEvent does this data belong to
Definition: zmq_input.cpp:74
Input base class.
Definition: input_base.h:31
STL namespace.
ZMQ Input for receiving data.
Definition: zmq_input.h:33
things written only at end of run H5Dump ProcessorSummary size
contains input that uses ZMQ as interface
size_t nPixels
the number of pixels of one image within the data
Definition: zmq_input.cpp:82
file contains custom exceptions used in cass
additional info
size_t nCols
the number of columns of one image within the data
Definition: zmq_input.cpp:85
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
define a structure that holds information about how to parse and extract the info contained in a msgp...
Definition: zmq_input.cpp:58
beginReadArray(const QString &prefix)
Container for all Machine related Data.
std::vector< int > shape
in case its multidimensional data, this contains the shape of the data
Definition: zmq_input.cpp:94
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
devices_t & devices()
setters
Definition: cass_event.h:66
void readNDArrayDataAsBinary(std::vector< float > &out, const msgpack::object &obj)
read the binary payload of an msgpack object into an vector of floats
Definition: zmq_input.cpp:125
bool shouldQuit() const
query whether this thread is told to quit
file contains the declaration of the acqiris part of the CASSEvent
id_t & id()
setters
Definition: cass_event.h:64
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
auxiliary data[Processor]
static shared_pointer instance()
get the signelton instance
Definition: input_base.cpp:20
uint64_t _counter
the counter for all events
Definition: zmq_input.h:102
value(const QString &key, const QVariant &defaultValue=QVariant()
RingBuffer< CASSEvent > & _ringbuffer
reference to the ringbuffer
Definition: input_base.h:140
bool iterate(const msgpack::object &obj, int depth, extractmap_t &emap, string acckey="")
Definition: zmq_input.cpp:132
std::map< Device, DeviceBackend::shared_pointer > devices_t
mapping from device type to handler instance
Definition: cass_event.h:46
void clear()
clear the info's data from the msgpack
Definition: zmq_input.cpp:61
std::string CASSValueName
the value name within the beamlinedata of the CASSEvent
Definition: zmq_input.cpp:71
int CASSID
in case there multiple devices available, this will tell which device in the list of devices this dat...
Definition: zmq_input.cpp:79
contains container for simple pixel detector data
definitions of a machine device
std::tr1::shared_ptr< InputBase > shared_pointer
shared pointer of this type
Definition: input_base.h:35
file contains specialized class that do the settings for cass
bool isPerTrain
flag that tell whether data is per bunch or per train
Definition: zmq_input.cpp:68
the device containing pixel detector data
Detector containing a ccd camera image.
void pausePoint()
point where the thread will be paused
rbItem_t getNextFillable(unsigned timeout=500)
retrieve an iterator to the next fillable event
Definition: input_base.cpp:48
size_t nRows
the number of rows of one image within the data
Definition: zmq_input.cpp:88
std::vector< float > data
the parsed data
Definition: zmq_input.cpp:91
contains a logger for cass
void newEventAdded(const size_t eventsize)
increment the numer of events received in the ratemeter
Definition: input_base.cpp:37
const bldMap_t & BeamlineData() const
getter
beginGroup(const QString &prefix)