CFEL - ASG Software Suite  2.5.0
CASS
tcp_input.cpp
Go to the documentation of this file.
1 // Copyright (C) 2011,2013 Lutz Foucar
2 
3 /**
4  * @file tcp_input.cpp contains input that uses tcp as interface
5  *
6  * @author Lutz Foucar
7  */
8 
9 #include <iostream>
10 
11 #include <QtNetwork/QTcpSocket>
12 #include <QtNetwork/QHostAddress>
13 
14 #include "tcp_input.h"
15 
16 #include "cass_settings.h"
17 #include "cass_exceptions.hpp"
18 #include "tcp_streamer.h"
19 #include "log.h"
20 
21 using namespace cass;
22 using namespace std;
23 
24 
26  Ratemeter &ratemeter,
27  Ratemeter &loadmeter,
28  QObject *parent)
29 {
30  if(_instance)
31  throw logic_error("TCPInput::instance(): The instance of the base class is already initialized");
32  _instance = shared_pointer(new TCPInput(buffer,ratemeter,loadmeter,parent));
33 }
34 
36  Ratemeter &ratemeter,
37  Ratemeter &loadmeter,
38  QObject *parent)
39  :InputBase(ringbuffer,ratemeter,loadmeter,parent)
40 {}
41 
43 {
44  CASSSettings s;
45  s.beginGroup("TCPInput");
46  bool retval(false);
47  /** try to connect until its either connected or the thread is told to quit */
48  while (!shouldQuit())
49  {
50  socket.abort();
51  socket.connectToHost(s.value("Server","localhost").toString(),
52  s.value("Port",9090).toUInt());
53  if (socket.waitForConnected(s.value("SocketConnectionTimout_ms",1000).toInt()))
54  {
55  Log::add(Log::INFO,string("TCPInput::connectToServer: (Re)Connected to server '") +
56  socket.peerAddress().toString().toStdString() +
57  "' on port '" + toString(socket.peerPort()) + "'");
58  retval = true;
59  break;
60  }
61  else
62  {
63  Log::add(Log::WARNING,"TCPInput::connectToServer: Could not connect to server '" +
64  s.value("Server","localhost").toString().toStdString() +
65  "' on port '" + toString(s.value("Port",9090).toUInt()) +
66  "', because of error '" + socket.errorString().toStdString() +
67  "'. Retrying");
68  }
69  sleep(s.value("WaitUntilReconnectionTry_s",5).toUInt());
70  }
71  s.endGroup();
72  return retval;
73 }
74 
75 bool TCPInput::dataAvailable(QTcpSocket &socket, qint64 datasize)
76 {
77  /** first check if we're connected to the server. If not, then try to connect */
78  if (socket.state() != QAbstractSocket::ConnectedState)
79  if(!connectToServer(socket))
80  return false;
81 
82  /** check if the data is available, if not then wait for a while, if it isn't
83  * available then, check the connection to the server
84  */
85  while (socket.bytesAvailable() < datasize)
86  {
87  if (shouldQuit())
88  return false;
89  if (!socket.waitForReadyRead(_timeout) && (socket.state() != QAbstractSocket::ConnectedState))
90  {
91  connectToServer(socket);
92  return false;
93  }
94  }
95  return true;
96 }
97 
99 {
101  QTcpSocket socket;
102 
103  CASSSettings s;
104  s.beginGroup("TCPInput");
105  string functiontype(s.value("DataType","agat").toString().toStdString());
106  TCPStreamer& deserialize(TCPStreamer::instance(functiontype));
107  _timeout = s.value("SocketDataTimeout_ms",2 * 1000).toInt();
108  s.endGroup();
109 
110  /** connect to the server before starting the loop. */
111  connectToServer(socket);
112 
113  while(!shouldQuit())
114  {
115  pausePoint();
116 
117  if (!dataAvailable(socket, static_cast<qint64>(sizeof(quint32))))
118  continue;
119 
120  quint32 payloadSize;
121  QDataStream in(&socket);
122  in.setVersion(QDataStream::Qt_4_0);
123  in.setByteOrder(QDataStream::LittleEndian);
124  in >> payloadSize;
125  /** check whether the upper bit is set (indicating that data is compressed) */
126  const bool dataCompressed(payloadSize & 0x80000000);
127  /** reset the upper bit */
128  payloadSize &= 0x7FFFFFFF;
129  payloadSize -= sizeof(quint32);
130 
131  if(!dataAvailable(socket,payloadSize))
132  continue;
133 
134  /** write received data into a temporary buffer */
135  QByteArray buffer;
136  if (dataCompressed)
137  {
138  QByteArray tmp(payloadSize,'0');
139  in.readRawData(tmp.data(),payloadSize);
140  buffer = qUncompress(tmp);
141  }
142  else
143  {
144  buffer.resize(payloadSize);
145  in.readRawData(buffer.data(),payloadSize);
146  }
147 
148  /** use stream to deserialize buffer */
149  QDataStream stream(buffer);
150 
151  /** deserialize the buffer header */
152  deserialize(stream);
153  while((!shouldQuit()) && (!stream.atEnd()))
154  {
155  rbItem_t rbItem(getNextFillable());
156  if (rbItem == _ringbuffer.end())
157  continue;
158  try
159  {
160  deserialize(stream,*rbItem->element);
161  _ringbuffer.doneFilling(rbItem,true);
162  newEventAdded(rbItem->element->datagrambuffer().size());
163  }
164  catch(const DeserializeError& error)
165  {
166  Log::add(Log::ERROR,string(error.what()) +
167  ": skipping rest of data");
168  _ringbuffer.doneFilling(rbItem,false);
169  break;
170  }
171  }
172  }
173  Log::add(Log::INFO,"TCPInput::run(): Quitting");
174 }
RingBuffer< CASSEvent >::iter_type rbItem_t
define an item in the ringbuffer
Definition: input_base.h:109
class calculating a rate in Hz.
Definition: ratemeter.h:28
sleep(unsigned long secs)
status_t _status
the internal status of the thread
Settings for CASS.
Definition: cass_settings.h:30
static TCPStreamer & instance()
return a reference to the derefenced instance
Input base class.
Definition: input_base.h:31
STL namespace.
readRawData(char *s, int len)
file contains custom exceptions used in cass
resize(int size)
bool connectToServer(QTcpSocket &socket)
connect the socket to server
Definition: tcp_input.cpp:42
static void add(Level level, const std::string &line)
add a string to the log
Definition: log.cpp:31
TCPInput(RingBuffer< CASSEvent > &buffer, Ratemeter &ratemeter, Ratemeter &loadmeter, QObject *parent=0)
constructor
Definition: tcp_input.cpp:35
void runthis()
starts the thread
Definition: tcp_input.cpp:98
A Ringbuffer, handles communication between Input and Worker Threads.
Definition: ringbuffer.hpp:52
contains base class for all tcp streamers
bool shouldQuit() const
query whether this thread is told to quit
setVersion(int v)
base class for all tcp streamers
Definition: tcp_streamer.h:25
TCP Input for receiving data.
Definition: tcp_input.h:55
std::string toString(const Type &t)
convert any type to a string
Definition: cass.h:63
static shared_pointer instance()
get the signelton instance
Definition: input_base.cpp:20
bool dataAvailable(QTcpSocket &socket, qint64 datasize)
wait until the data is available
Definition: tcp_input.cpp:75
value(const QString &key, const QVariant &defaultValue=QVariant()
setByteOrder(ByteOrder bo)
RingBuffer< CASSEvent > & _ringbuffer
reference to the ringbuffer
Definition: input_base.h:140
std::tr1::shared_ptr< InputBase > shared_pointer
shared pointer of this type
Definition: input_base.h:35
file contains specialized class that do the settings for cass
void pausePoint()
point where the thread will be paused
Exception thrown when there is a problem with deserializing QDataStreams.
contains input that uses tcp as interface
rbItem_t getNextFillable(unsigned timeout=500)
retrieve an iterator to the next fillable event
Definition: input_base.cpp:48
contains a logger for cass
void newEventAdded(const size_t eventsize)
increment the numer of events received in the ratemeter
Definition: input_base.cpp:37
beginGroup(const QString &prefix)
int _timeout
the timeout of the socket
Definition: tcp_input.h:130