11 #include <msgpack.hpp>
34 throw logic_error(
"ZMQInput::instance(): The instance of the base class is already initialized");
43 :
InputBase(ringbuffer,ratemeter,loadmeter,parent),
44 _quitWhenDone(quitwhendone),
108 template <
typename type>
111 const type *d(reinterpret_cast<const type*>(obj.via.str.ptr));
112 size_t payloadsize(obj.via.str.size);
113 out.assign(d,d+payloadsize/
sizeof(type));
124 template <
typename type>
127 const type *d(reinterpret_cast<const type*>(obj.via.bin.ptr));
128 size_t payloadsize(obj.via.bin.size);
129 out.assign(d,d+payloadsize/
sizeof(type));
132 bool iterate(
const msgpack::object &obj,
int depth,
135 typedef map<string,msgpack::object> m_t;
136 m_t m(obj.as<m_t>());
139 for (m_t::iterator it(m.begin()); it!= m.end();++it)
143 string flattenkey(acckey);
145 if(!flattenkey.empty())
146 flattenkey.append(
"$");
147 flattenkey.append(it->first);
149 if ((emap.find(flattenkey) != emap.end()))
155 if (it->second.type == msgpack::type::MAP)
157 m_t mp(it->second.as<m_t>());
158 if ((mp.find(
"nd") != m.end()) &&
159 (mp[
"nd"].type == msgpack::type::BOOLEAN) &&
160 (mp[
"nd"].as<
bool>()) &&
161 (mp.find(
"data") != m.end()) &&
162 (mp.find(
"type") != m.end()) &&
163 (mp[
"type"].type == msgpack::type::STR) &&
164 (mp.find(
"shape") != m.end()) &&
165 (mp[
"shape"].type == msgpack::type::ARRAY))
170 mp[
"shape"].convert(info.
shape);
172 if (mp[
"data"].type == msgpack::type::STR)
174 if (mp[
"type"].as<string>() ==
"<f4")
176 readNDArrayDataAsString<float>(info.
data,mp[
"data"]);
178 else if (mp[
"type"].as<string>() ==
"<f8")
180 readNDArrayDataAsString<double>(info.
data,mp[
"data"]);
182 else if (mp[
"type"].as<string>() ==
"<u8")
184 readNDArrayDataAsString<uint64_t>(info.
data,mp[
"data"]);
186 else if (mp[
"type"].as<string>() ==
"<u2")
188 readNDArrayDataAsString<uint16_t>(info.
data,mp[
"data"]);
190 else if (mp[
"type"].as<string>() ==
"|u1")
192 readNDArrayDataAsString<uint8_t>(info.
data,mp[
"data"]);
197 "': The type '" + (mp[
"type"].as<string>()) +
198 "' of the string type ndarray data is not " +
202 else if (m[
"data"].type == msgpack::type::BIN)
204 if (mp[
"type"].as<string>() ==
"<f4")
206 readNDArrayDataAsBinary<float>(info.
data,mp[
"data"]);
208 else if (mp[
"type"].as<string>() ==
"<f8")
210 readNDArrayDataAsBinary<double>(info.
data,mp[
"data"]);
212 else if (mp[
"type"].as<string>() ==
"<u8")
214 readNDArrayDataAsBinary<uint64_t>(info.
data,mp[
"data"]);
216 else if (mp[
"type"].as<string>() ==
"<u2")
218 readNDArrayDataAsBinary<uint16_t>(info.
data,mp[
"data"]);
220 else if (mp[
"type"].as<string>() ==
"|u1")
222 readNDArrayDataAsBinary<uint8_t>(info.
data,mp[
"data"]);
227 "': The type '" + (mp[
"type"].as<string>()) +
228 "' of the binary type ndarray data is not " +
237 if (it->second.type == msgpack::type::ARRAY)
239 it->second.convert(info.
data);
243 if (it->second.type == msgpack::type::BOOLEAN ||
244 it->second.type == msgpack::type::FLOAT32 ||
245 it->second.type == msgpack::type::FLOAT64 ||
246 it->second.type == msgpack::type::POSITIVE_INTEGER ||
247 it->second.type == msgpack::type::NEGATIVE_INTEGER)
250 it->second.convert(info.
data.front());
258 else if (it->second.type == msgpack::type::MAP)
260 iterate(it->second,depth+1,emap,flattenkey);
273 string functiontype(s.
value(
"DataType",
"agat").toString().toStdString());
275 string serverAddress(s.
value(
"ServerAddress",
"tcp://53.104.0.52:10000").toString().toStdString());
280 const size_t nBunches(s.
value(
"NbrBunchesInTrain",72).toUInt());
281 const size_t bunchOffset(s.
value(
"BunchOffsetInTrain",4).toUInt());
282 const size_t bunchStride(s.
value(
"BunchStrideInTrain",4).toUInt());
287 for (
int i = 0; i <
size; ++i)
291 string key(s.
value(
"Name",
"BAD").toString().toStdString());
295 string dev(s.
value(
"CASSDeviceType",
"Unkown").toString().toLower().toStdString());
296 if ((dev !=
"pixeldetector") &&
297 (dev !=
"machinedata"))
300 key +
"' is unkown");
303 emap[key].CASSDeviceType = dev;
307 emap[key].CASSID = s.
value(
"CASSID",0).toInt();
308 emap[key].CASSValueName = s.
value(
"CASSValueName",
"Unused").toString().toStdString();
309 emap[key].isPerTrain = s.
value(
"IsPerTrain",
false).toBool();
310 emap[key].nCols = s.
value(
"nCols",0).toUInt();
311 emap[key].nRows = s.
value(
"nRows",0).toUInt();
312 emap[key].nPixels = emap[key].nRows * emap[key].nCols;
318 zmq::context_t context (1);
319 zmq::socket_t sock (context, ZMQ_SUB);
320 sock.connect(serverAddress);
321 sock.setsockopt(ZMQ_SUBSCRIBE,
"",0);
323 string output =
"ZMQInput: Trying to retrieve:";
324 for (extractmap_t::const_iterator eit(emap.begin()); eit != emap.end(); ++eit)
326 const Info& ifo(eit->second);
327 output +=
" DataField '" + eit->first +
"'";
332 output +=
"; IsPerTrain '" + string(ifo.
isPerTrain?
"True":
"false") +
"'";
355 while(off != mess.size())
357 msgpack::object_handle objH;
358 msgpack::unpack(objH,static_cast<const char*>(mess.data()),mess.size(),off);
359 msgpack::object obj(objH.get());
362 extractmap_t::iterator emIter(emap.begin());
363 extractmap_t::const_iterator emIterEnd(emap.end());
364 for (; emIter != emIterEnd; ++emIter)
365 emIter->second.clear();
372 for (
size_t iBunch(bunchOffset); iBunch < nBunches; iBunch += bunchStride)
386 CASSEvent::devices_t::iterator devIt;
388 extractmap_t::const_iterator eIt(emap.begin());
389 extractmap_t::const_iterator eEnd(emap.end());
390 for (; eIt != eEnd; ++eIt)
392 const Info& ifo(eIt->second);
394 if (ifo.
data.empty())
397 "datafield '" + eIt->first +
"'");
403 string outp = eIt->first +
" [";
404 for (
size_t ii(0); ii < ifo.
shape.size(); ++ii)
406 outp.replace(outp.size()-1,1,
"]");
410 if(devIt == devices.end())
411 throw runtime_error(
string(
"ZMQInput: CASSEvent does not ") +
412 "contain a pixeldetector device");
420 pixeldetector::Detector::frame_t::const_iterator detBegin(ifo.
data.begin());
421 advance(detBegin,iBunch*ifo.
nPixels);
422 pixeldetector::Detector::frame_t::const_iterator detEnd(detBegin);
425 det.frame().assign(detBegin,detEnd);
427 det.columns() = ifo.
nCols;
428 det.rows() = ifo.
nRows;
435 if(devIt == devices.end())
436 throw runtime_error(
string(
"ZMQInput: CASSEvent does not ") +
437 "contain a pixeldetector device");
Event to store all LCLS Data.
class calculating a rate in Hz.
status_t _status
the internal status of the thread
detectors_t & dets()
instrument setter
std::string CASSDeviceType
what type of data within the CASSEvent does this data belong to
things written only at end of run H5Dump ProcessorSummary size
size_t nPixels
the number of pixels of one image within the data
file contains custom exceptions used in cass
size_t nCols
the number of columns of one image within the data
static void add(Level level, const std::string &line)
add a string to the log
define a structure that holds information about how to parse and extract the info contained in a msgp...
beginReadArray(const QString &prefix)
Container for all Machine related Data.
std::vector< int > shape
in case its multidimensional data, this contains the shape of the data
A Ringbuffer, handles communication between Input and Worker Threads.
devices_t & devices()
setters
bool shouldQuit() const
query whether this thread is told to quit
file contains the declaration of the acqiris part of the CASSEvent
std::string toString(const Type &t)
convert any type to a string
auxiliary data[Processor]
value(const QString &key, const QVariant &defaultValue=QVariant()
std::map< Device, DeviceBackend::shared_pointer > devices_t
mapping from device type to handler instance
void clear()
clear the info's data from the msgpack
std::string CASSValueName
the value name within the beamlinedata of the CASSEvent
int CASSID
in case there multiple devices available, this will tell which device in the list of devices this dat...
contains container for simple pixel detector data
definitions of a machine device
file contains specialized class that do the settings for cass
bool isPerTrain
flag that tell whether data is per bunch or per train
the device containing pixel detector data
Detector containing a ccd camera image.
void pausePoint()
point where the thread will be paused
size_t nRows
the number of rows of one image within the data
std::vector< float > data
the parsed data
contains a logger for cass
const bldMap_t & BeamlineData() const
getter
beginGroup(const QString &prefix)