CFEL - ASG Software Suite  2.5.0
CASS
xfel_online_input.cpp
Go to the documentation of this file.
1 // Copyright (C) 2018 Lutz Foucar
2 
3 /**
4  * @file xfel_online_input.cpp contains input that the xfel api
5  *
6  * @author Lutz Foucar
7  */
8 
9 #include <iostream>
10 
11 #include "xfel_online_input.h"
12 
13 #include "cass_settings.h"
14 #include "cass_exceptions.hpp"
15 #include "log.h"
16 
17 #include "acqiris_device.hpp"
18 #include "pixeldetector.hpp"
19 #include "machine_device.hpp"
20 #include "kb_client.hpp"
21 
22 using namespace cass;
23 using namespace std;
24 
25 
27  Ratemeter &ratemeter,
28  Ratemeter &loadmeter,
29  bool quitwhendone,
30  QObject *parent)
31 {
32  if(_instance)
33  throw logic_error("XFELOnlineInput::instance(): The instance of the base class is already initialized");
34  _instance = shared_pointer(new XFELOnlineInput(buffer,ratemeter,loadmeter,quitwhendone,parent));
35 }
36 
38  Ratemeter &ratemeter,
39  Ratemeter &loadmeter,
40  bool quitwhendone,
41  QObject *parent)
42  : InputBase(ringbuffer,ratemeter,loadmeter,parent),
43  _quitWhenDone(quitwhendone),
44  _counter(0),
45  _scounter(0)
46 {}
47 
48 
49 
51 {
53 
54  CASSSettings s;
55  s.beginGroup("XFELOnlineInput");
56  /** info specific to the xfel data server */
57  string serverAddress(s.value("ServerAddress","tcp://localhost:1234").toString().toStdString());
58 
59  /** how many pulses are within the transferred data */
60  const size_t nPulses(s.value("NbrPulsesInTrain",64).toUInt());
61 
62  /** info about the source of the data of interest */
63  string source(s.value("Source","SPB_DET_AGIPD1M-1/DET/detector-1").toString().toStdString());
64  /** info where the image data is within the transferred data */
65  string imageDataPath(s.value("PathToImage","image.data").toString().toStdString());
66  /** the id that the data should have within the cass-event */
67  int det_CASSID(s.value("CASSID",30).toInt());
68  s.endGroup(); //XFELOnlineInput
69 
70  /** create a karabo client that allows to connect to online karabo */
71  karabo_bridge::Client client;
72 
73  /** write the data structure that is sent by the server to the log */
74  Log::add(Log::INFO,"XFELOnlineInput::run(): trying to connect to server at '"+
75  serverAddress + "'");
76 
77  /** connect to the xfel client */
78  client.connect(serverAddress);
79 
80  /** write the data structure that is sent by the server to the log */
81  Log::add(Log::INFO,"XFELOnlineInput::run(): connected to server at '"+
82  serverAddress + "'. Now waiting for data.");
83 
84  /** write the data structure that is sent by the server to the log */
85  Log::add(Log::INFO,"XFELOnlineInput::run(): available data from the server:\n"+
86  client.showNext());
87 
88  /** run until the thread is told to quit */
89  Log::add(Log::INFO,"XFELOnlineInput::run(): starting loop");
90 
91  while(!shouldQuit())
92  {
93  /** here we can safely pause the execution */
94  pausePoint();
95 
96  /** now retrive new data from the socket */
97  auto data(client.next());
98 
99  /** get the shape of the detector (encodes the pulses in the train and the
100  * and the shape itself)
101  *
102  * should be in the shape of nPulses,nModules,512,128 but is most likely in the
103  * shape of nModules,128,512,nPulses so one needs to transpose the axis.
104  */
105  const auto det_shape(data[source].array[imageDataPath].shape());
106  const bool dataNeedsPermutation(det_shape[3] != 128);
107  size_t nPulsesFromImage;
108  size_t nModules;
109  size_t nRowsInModule;
110  size_t nCols;
111  if (dataNeedsPermutation)
112  {
113  // if in nModules,128,512,nPulses
114  nPulsesFromImage = (det_shape[3]);
115  nModules = (det_shape[0]);
116  nRowsInModule = (det_shape[2]);
117  nCols = (det_shape[1]);
118  }
119  else
120  {
121  // if in nPulses,nModules,512,128
122  nPulsesFromImage = (det_shape[0]);
123  nModules = (det_shape[1]);
124  nRowsInModule = (det_shape[2]);
125  nCols = (det_shape[3]);
126  }
127  const size_t sizeofOneDet(nModules*nRowsInModule*nCols);
128  const size_t nCASSRows(nModules*nRowsInModule);
129 
130  /** get the detector data */
132  const auto nElements(data[source].array[imageDataPath].size());
133  if (data[source].array[imageDataPath].dtype() == "uint16_t")
134  {
135  const auto ptr(data[source].array[imageDataPath].data<uint16_t>());
136  det_data.assign(ptr,ptr+nElements);
137  }
138  else if (data[source].array[imageDataPath].dtype() == "float32")
139  {
140  //auto tmp(data[source].array[imageDataPath].as<float>());
141  //det_data.assign(tmp.begin(),tmp.end());
142  const auto ptr(data[source].array[imageDataPath].data<float>());
143  if (dataNeedsPermutation)
144  {
145  // permute the axis of the original data to go with the exspected layout
146  det_data.resize(nElements);
147  for (size_t iModule(0); iModule < nModules ; ++iModule)
148  {
149  for (size_t iColumn(0); iColumn < nCols ; ++iColumn)
150  {
151  for (size_t iRow(0); iRow < nRowsInModule ; ++iRow)
152  {
153  for (size_t iPulse(0); iPulse < nPulsesFromImage; ++iPulse)
154  {
155  auto origIDX(iPulse +
156  iRow*nPulses +
157  iColumn*nPulsesFromImage*nRowsInModule +
158  iModule*nPulsesFromImage*nRowsInModule*nCols);
159  auto goodIDX(iRow +
160  iColumn*nRowsInModule +
161  iModule*nRowsInModule*nCols +
162  iPulse*nRowsInModule*nCols*nModules);
163  det_data[goodIDX] = ptr[origIDX];
164  }
165  }
166  }
167  }
168  }
169  else
170  {
171  det_data.assign(ptr,ptr+nElements);
172  }
173  }
174 
175  /** check if the data is consistent */
176  if (nPulses != nPulsesFromImage)
177  {
178  Log::add(Log::ERROR,string("The number of pulses within the header '") +
179  toString(nPulses) + "' and the detector data '" +
180  toString(nPulsesFromImage) + "' mismatch. "+
181  "Skipping train.");
182  }
183 
184  /** go through all pulses in the train */
185  for(size_t pulseID(0); pulseID < nPulses; ++pulseID)
186  {
187  /** skip some known bad pulses */
188  if ((pulseID == 0) ||
189  (pulseID == 1) ||
190  (pulseID == 2) ||
191  (pulseID == 3) ||
192  (pulseID == 4) ||
193  (pulseID == 5) ||
194  (pulseID == 6) ||
195  (pulseID == 7) ||
196  (pulseID == 8) ||
197  (pulseID == 9) ||
198  (pulseID == 10) ||
199  (pulseID == 32) ||
200  (pulseID == 61) ||
201  (pulseID == 62) ||
202  (pulseID == 63))
203  continue;
204 
205  /** retrieve a new element from the ringbuffer, continue with next iteration
206  * in case the retrieved element is the iterator to the last element of the
207  * buffer.
208  */
209  rbItem_t rbItem(getNextFillable());
210  if (rbItem == _ringbuffer.end())
211  continue;
212  CASSEvent &evt(*rbItem->element);
213  evt.id() = _counter;
214 
215  /** get reference to all devices of the CASSEvent and an iterator*/
216  CASSEvent::devices_t &devices(evt.devices());
217  CASSEvent::devices_t::iterator devIt;
218 
219  /** retrieve the pixel detector part of the cassevent */
220  devIt = devices.find(CASSEvent::PixelDetectors);
221  if(devIt == devices.end())
222  throw runtime_error(string("XFELOnlineInput: CASSEvent does not ") +
223  "contain a pixeldetector device");
224  pixeldetector::Device &pixdev(dynamic_cast<pixeldetector::Device&>(*(devIt->second)));
225  /** retrieve the right detector from the cassevent and reset it*/
226  pixeldetector::Detector &det(pixdev.dets()[det_CASSID]);
227  det.frame().clear();
228  det.columns() = nCols;
229  det.rows() = nCASSRows;
230  det.id() = _counter;
231 
232  /** copy the data of the pulse from the xfel input to the cassevent */
233  auto offsetToBegin(pulseID * sizeofOneDet);
234  auto offsetToEnd((pulseID+1) * sizeofOneDet);
235  det.frame().assign(det_data.begin()+offsetToBegin,det_data.begin()+offsetToEnd);
236  /** ensure that all values in the frame are numbers */
237  for (auto &pixel : det.frame()) if (!isfinite(pixel)) pixel = 0;
238 
239  /** retrieve the machine detector part of the cassevent */
240  devIt = devices.find(CASSEvent::MachineData);
241  if(devIt == devices.end())
242  throw runtime_error(string("XFELOnlineInput: CASSEvent does not") +
243  " contain a machinedata device");
244  MachineData::Device &md (dynamic_cast<MachineData::Device&>(*(devIt->second)));
245  md.BeamlineData()["PulseId"] = pulseID;
246 
247  /** tell the ringbuffer that we're done with the event */
248  ++_counter;
249  _ringbuffer.doneFilling(rbItem, 1);
250  }// done going through all pulses in the train
251  size_t datasize(0);
252  for (auto& d : data) datasize += d.second.bytesReceived();
253  newEventAdded(datasize);
254  }
255  Log::add(Log::INFO,"XFELOnlineInput::run(): Quitting loop");
256 }
Event to store all LCLS Data.
Definition: cass_event.h:32
RingBuffer< CASSEvent >::iter_type rbItem_t
define an item in the ringbuffer
Definition: input_base.h:109
class calculating a rate in Hz.
Definition: ratemeter.h:28
status_t _status
the internal status of the thread
Settings for CASS.
Definition: cass_settings.h:30
detectors_t & dets()
instrument setter
Input base class.
Definition: input_base.h:31
STL namespace.
things written only at end of run H5Dump ProcessorSummary size
file contains custom exceptions used in cass
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
Container for all Machine related Data.
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
devices_t & devices()
setters
Definition: cass_event.h:66
std::vector< pixel_t > frame_t
a frame is a vector of pixels
bool shouldQuit() const
query whether this thread is told to quit
file contains the declaration of the acqiris part of the CASSEvent
id_t & id()
setters
Definition: cass_event.h:64
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
auxiliary data[Processor]
static shared_pointer instance()
get the signelton instance
Definition: input_base.cpp:20
value(const QString &key, const QVariant &defaultValue=QVariant()
RingBuffer< CASSEvent > & _ringbuffer
reference to the ringbuffer
Definition: input_base.h:140
std::map< Device, DeviceBackend::shared_pointer > devices_t
mapping from device type to handler instance
Definition: cass_event.h:46
XFELOnlineInput(RingBuffer< CASSEvent > &buffer, Ratemeter &ratemeter, Ratemeter &loadmeter, bool quitwhendone, QObject *parent=0)
constructor
contains container for simple pixel detector data
definitions of a machine device
std::tr1::shared_ptr< InputBase > shared_pointer
shared pointer of this type
Definition: input_base.h:35
file contains specialized class that do the settings for cass
the device containing pixel detector data
Detector containing a ccd camera image.
void pausePoint()
point where the thread will be paused
rbItem_t getNextFillable(unsigned timeout=500)
retrieve an iterator to the next fillable event
Definition: input_base.cpp:48
contains a logger for cass
int16_t pixel
define a pixel
Definition: hlltypes.hpp:27
void newEventAdded(const size_t eventsize)
increment the numer of events received in the ratemeter
Definition: input_base.cpp:37
XFEL Input for receiving data.
const bldMap_t & BeamlineData() const
getter
beginGroup(const QString &prefix)
void runthis()
starts the thread
uint64_t _counter
the counter for all events