RDK Documentation (Open Sourced RDK Components)
Connection.cpp
1 /*
2  * If not stated otherwise in this file or this component's Licenses.txt file the
3  * following copyright and licenses apply:
4  *
5  * Copyright 2016 RDK Management
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18 */
19 
20 
21 
22 /**
23 * @defgroup trm
24 * @{
25 * @defgroup qtapp
26 * @{
27 **/
28 
29 
30 #include <QThread>
31 #include <QDebug>
32 #include <QDataStream>
33 #include <QThreadPool>
34 
35 #include <vector>
36 
37 #include "Util.h"
38 #include "Connection.h"
39 #include "trm/Header.h"
40 
41 void printThread(const char *func)
42 {
43  int threadId = (int)QThread::currentThread();
44  printf("[%s]==========================Run from thread %x\r\n", func, threadId);
45 }
46 
47 
48 using namespace TRM;
49 #define _USE_THREADPOOL_ 1
50 
51 #if 1
52 
53 Connection::Connection(QTcpSocket &_tcpSocket) : socket(_tcpSocket), state(&idleState), clientId(kInvalidClientId)
54 {
55  Assert(state == &idleState);
56 
57  waitHeaderState.setDataLength(Header::kHeaderLength);
58  messageByteArray = new QByteArray();
59  state = &waitHeaderState;
60 
61  QObject::connect(&socket, SIGNAL(disconnected()), this, SLOT(onDisconnected()));
62  QObject::connect(&socket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
63  QObject::connect(&socket, SIGNAL(connected()), this, SLOT(onConnected()));
64  QObject::connect(&socket, SIGNAL(bytesWritten(qint64)), this, SLOT(onBytesWritten(qint64)));
65  QObject::connect(&socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(onSocketError(QAbstractSocket::SocketError)));
66  QObject::connect(&socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(onStateChanged(QAbstractSocket::SocketState)));
67 
68  QObject::connect(this, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
69  QObject::connect(this, SIGNAL(hasMessageToSend(const std::vector<uint8_t>)), this, SLOT(onHasMessageToSend(const std::vector<uint8_t>)));
70 
71 };
72 
73 Connection::~Connection(void)
74 {
75 
76 #if _USE_THREADPOOL_
77  QThreadPool::globalInstance()->waitForDone();
78 #endif
79 
80  while (!incomingMessages.isEmpty()) {
81  QByteArray *byteArray = incomingMessages.dequeue();
82  delete byteArray;
83  }
84 
85  QObject::disconnect(this, SLOT(onDisconnected()));
86  QObject::disconnect(this, SLOT(onReadyRead()));
87  QObject::disconnect(this, SLOT(onConnected()));
88  QObject::disconnect(this, SLOT(onBytesWritten(qint64)));
89  QObject::disconnect(this, SLOT(onSocketError(QAbstractSocket::SocketError)));
90  QObject::disconnect(this, SLOT(onStateChanged(QAbstractSocket::SocketState)));
91 
92  QObject::disconnect(this, SLOT(onReadyRead()));
93  QObject::disconnect(this, SLOT(onHasMessageToSend(const std::vector<uint8_t>)));
94 
95  Log() << "Connection " << (void *)this << " is deleted" << std::endl;
96 }
97 #endif
98 
99 void Connection::send(const std::vector<uint8_t> &out) const
100 {
101  ::printThread("Connection::send");
102 
103  socket.write((const char *)&out[0], out.size());
104  socket.flush();
105 }
106 
107 void Connection::sendAsync(const std::vector<uint8_t> &out) const
108 {
109  Log() << "sendAsync via emit" << std::endl;
110  emit hasMessageToSend(out);
111 }
112 
113 
114 void Connection::sendMessage(const QByteArray &sendBuffer) const
115 {
116  ::printThread("Connection::sendMessage");
117 
118  socket.write(sendBuffer);
119  socket.flush();
120 }
121 
122 void Connection::recvMessage(QByteArray &recvBuffer) const
123 {
124  ::printThread("Connection::recvMessage");
125 
126  if (!incomingMessages.isEmpty()) {
127  QByteArray *byteArray = incomingMessages.dequeue();
128  recvBuffer = *byteArray;
129  delete byteArray;
130  }
131 }
132 
133 const QByteArray & Connection::recvMessage(void) const
134 {
135  if (incomingMessages.isEmpty()) {
137  }
138 
139  QByteArray *byteArray = incomingMessages.dequeue();
140  return *byteArray;
141 }
142 
143 void Connection::onBytesWritten(qint64 bytes)
144 {
145  Log() << "Connection written " << bytes << "bytes" << std::endl;
146 }
147 
148 void Connection::onConnected(void)
149 {
150  Log() << "Connection establised " << std::endl;
151 }
152 
153 void Connection::onHasMessageToSend(const std::vector<uint8_t> out) const
154 {
155  Log() << "onHasMessageToSend" << std::endl;
156  send(out);
157 }
158 
159 void Connection::onReadyRead(void)
160 {
161  if (messageByteArray == 0) {
162  //Connection already deleted.
163  return;
164  }
165 
166  try {
167  (*state).read(socket, *messageByteArray);
168  /*
169  * The readyRead() signal is emitted every time a new chunk of data has arrived.
170  * state: IDLE --> READ_HEADER -- > READ_PAYLOAD --> IDLE
171  */
172  if (state == &waitHeaderState) {
173  Header header;
174  std::vector<uint8_t> headerbytes(messageByteArray->constData(), messageByteArray->constData() + (*state).getDataLength());
175  header.deserialize(headerbytes);
176 
177  Log() << "Trans to waitPayloadState and expcets " << header.getPayloadLength() << " Bytes" << std::endl;
178 
179  waitPayloadState.setDataLength(header.getPayloadLength());
180  state = &waitPayloadState;
181  }
182  else if (state == &waitPayloadState) {
183 
184  Log() << "Enqueing msg with " << messageByteArray->count() << " Bytes" << std::endl;
185 
186  incomingMessages.enqueue(messageByteArray);
187  emit messageReceived(*this);
188 
189  Log() << "Trans to waitHeaderState and expcets " << Header::kHeaderLength << " Bytes" << std::endl;
190 
191  messageByteArray = new QByteArray();
192  waitHeaderState.setDataLength(Header::kHeaderLength);
193  state = &waitHeaderState;
194  }
195 
196  if ((socket.bytesAvailable() > 0) || (state->getDataLength() == 0)) emit readyRead();
197 
198  }
199  catch (...) {
200  Log() << "Not enough data arriaval " << std::endl;
201 
202  }
203 }
204 
205 QByteArray & Connection::IdleState::read(QAbstractSocket &, QByteArray &readBuffer)
206 {
207  /*Do nothing */
208  Assert(0);
209 
210  return readBuffer;
211 }
212 
213 QByteArray & Connection::WaitHeaderState::read(QAbstractSocket &socket, QByteArray &readBuffer)
214 {
215  readBuffer.clear();
216  State::read(socket, readBuffer);
217  /* Buffer should contains header bytes only */
218  Assert(readBuffer.size() == (int)getDataLength());
219 
220  for (int i = 0; i < readBuffer.count(); i++) {
221  printf("%02X ", readBuffer.constData()[i]);
222  }
223  printf("\r\n");
224 
225  return readBuffer;
226 }
227 
228 QByteArray & Connection::WaitPayloadState::read(QAbstractSocket &socket, QByteArray &readBuffer)
229 {
230  if (getDataLength() == 0) {
231  /* Empty Message */
232  printf("Received Empty Message\r\n");
233  return readBuffer;
234  }
235 
236  State::read(socket, readBuffer);
237  /* Buffer should contains header bytes + payload bytes */
238  Assert(readBuffer.size() == (getDataLength() + Header::kHeaderLength));
239 
240  for (int i = Header::kHeaderLength; i < readBuffer.count(); i++) {
241  printf("%C", readBuffer.constData()[i]);
242  }
243  printf("\r\n");
244 
245  return readBuffer;
246 }
247 
248 QByteArray & Connection::State::read(QAbstractSocket &socket, QByteArray &readBuffer)
249 {
250  size_t count = 0;
251  /* Read expected bytes */
252  if (socket.bytesAvailable() >= getDataLength()) {
253  Log() << "At State " << stateName() << "expcets " << getDataLength() << " but receives " << socket.bytesAvailable() << " Bytes" << std::endl;
254  char * expectedBytes = new char[getDataLength()];
255  count = socket.read(expectedBytes, getDataLength());
256  if (count == getDataLength()) {
257  Log() << "At State " << stateName() << "read " << count << " bytes " << std::endl;
258  readBuffer.append(expectedBytes, getDataLength());
259  }
260  else if (count == -1) {
261  // Disconnected.
262  }
263 
264  delete[] expectedBytes;
265  }
266  else {
267  // Throw data_unavailable exception.
268  }
269 
270  if (count <= 0) throw count;
271 
272  return readBuffer;
273 }
274 
275 void Connection::onDisconnected(void)
276 {
277  Log()<< "onDisconnected()" << std::endl;
278  state = &idleState;
279  emit disconnected(this);
280 }
281 
282 void Connection::onSocketError(QAbstractSocket::SocketError socketError)
283 {
284  Log()<< "onSocketError()" << socketError;
285 }
286 void Connection::onStateChanged(QAbstractSocket::SocketState socketState)
287 {
288  Log()<< "onStateChanged()" << socketState;
289 }
290 
291 
292 /** @} */
293 /** @} */
Connection::send
void send(const CECFrame &frame, int timeout, const Throw_e &doThrow)
This function is used to send CEC frame to CEC Bus.
Definition: Connection.cpp:208
Header.h
Connection::sendAsync
void sendAsync(const CECFrame &frame)
This function is used to send the CEC frame to physical CEC Bus using asynchronous method.
Definition: Connection.cpp:283
TRM::Header
Definition: Header.h:94
TRM::InvalidOperationException
Definition: TRM.h:54