32 #include <QDataStream>
33 #include <QThreadPool>
38 #include "Connection.h"
41 void printThread(
const char *func)
43 int threadId = (int)QThread::currentThread();
44 printf(
"[%s]==========================Run from thread %x\r\n", func, threadId);
49 #define _USE_THREADPOOL_ 1
53 Connection::Connection(QTcpSocket &_tcpSocket) : socket(_tcpSocket), state(&idleState), clientId(kInvalidClientId)
55 Assert(state == &idleState);
57 waitHeaderState.setDataLength(Header::kHeaderLength);
58 messageByteArray =
new QByteArray();
59 state = &waitHeaderState;
61 QObject::connect(&socket, SIGNAL(disconnected()),
this, SLOT(onDisconnected()));
62 QObject::connect(&socket, SIGNAL(readyRead()),
this, SLOT(onReadyRead()));
63 QObject::connect(&socket, SIGNAL(connected()),
this, SLOT(onConnected()));
64 QObject::connect(&socket, SIGNAL(bytesWritten(qint64)),
this, SLOT(onBytesWritten(qint64)));
65 QObject::connect(&socket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(onSocketError(QAbstractSocket::SocketError)));
66 QObject::connect(&socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
this, SLOT(onStateChanged(QAbstractSocket::SocketState)));
68 QObject::connect(
this, SIGNAL(readyRead()),
this, SLOT(onReadyRead()));
69 QObject::connect(
this, SIGNAL(hasMessageToSend(
const std::vector<uint8_t>)),
this, SLOT(onHasMessageToSend(
const std::vector<uint8_t>)));
73 Connection::~Connection(
void)
77 QThreadPool::globalInstance()->waitForDone();
80 while (!incomingMessages.isEmpty()) {
81 QByteArray *byteArray = incomingMessages.dequeue();
85 QObject::disconnect(
this, SLOT(onDisconnected()));
86 QObject::disconnect(
this, SLOT(onReadyRead()));
87 QObject::disconnect(
this, SLOT(onConnected()));
88 QObject::disconnect(
this, SLOT(onBytesWritten(qint64)));
89 QObject::disconnect(
this, SLOT(onSocketError(QAbstractSocket::SocketError)));
90 QObject::disconnect(
this, SLOT(onStateChanged(QAbstractSocket::SocketState)));
92 QObject::disconnect(
this, SLOT(onReadyRead()));
93 QObject::disconnect(
this, SLOT(onHasMessageToSend(
const std::vector<uint8_t>)));
95 Log() <<
"Connection " << (
void *)
this <<
" is deleted" << std::endl;
101 ::printThread(
"Connection::send");
103 socket.write((
const char *)&out[0], out.size());
109 Log() <<
"sendAsync via emit" << std::endl;
110 emit hasMessageToSend(out);
114 void Connection::sendMessage(
const QByteArray &sendBuffer)
const
116 ::printThread(
"Connection::sendMessage");
118 socket.write(sendBuffer);
122 void Connection::recvMessage(QByteArray &recvBuffer)
const
124 ::printThread(
"Connection::recvMessage");
126 if (!incomingMessages.isEmpty()) {
127 QByteArray *byteArray = incomingMessages.dequeue();
128 recvBuffer = *byteArray;
133 const QByteArray & Connection::recvMessage(
void)
const
135 if (incomingMessages.isEmpty()) {
139 QByteArray *byteArray = incomingMessages.dequeue();
143 void Connection::onBytesWritten(qint64 bytes)
145 Log() <<
"Connection written " << bytes <<
"bytes" << std::endl;
148 void Connection::onConnected(
void)
150 Log() <<
"Connection establised " << std::endl;
153 void Connection::onHasMessageToSend(
const std::vector<uint8_t> out)
const
155 Log() <<
"onHasMessageToSend" << std::endl;
159 void Connection::onReadyRead(
void)
161 if (messageByteArray == 0) {
167 (*state).read(socket, *messageByteArray);
172 if (state == &waitHeaderState) {
174 std::vector<uint8_t> headerbytes(messageByteArray->constData(), messageByteArray->constData() + (*state).getDataLength());
175 header.deserialize(headerbytes);
177 Log() <<
"Trans to waitPayloadState and expcets " << header.getPayloadLength() <<
" Bytes" << std::endl;
179 waitPayloadState.setDataLength(header.getPayloadLength());
180 state = &waitPayloadState;
182 else if (state == &waitPayloadState) {
184 Log() <<
"Enqueing msg with " << messageByteArray->count() <<
" Bytes" << std::endl;
186 incomingMessages.enqueue(messageByteArray);
187 emit messageReceived(*
this);
189 Log() <<
"Trans to waitHeaderState and expcets " << Header::kHeaderLength <<
" Bytes" << std::endl;
191 messageByteArray =
new QByteArray();
192 waitHeaderState.setDataLength(Header::kHeaderLength);
193 state = &waitHeaderState;
196 if ((socket.bytesAvailable() > 0) || (state->getDataLength() == 0)) emit readyRead();
200 Log() <<
"Not enough data arriaval " << std::endl;
205 QByteArray & Connection::IdleState::read(QAbstractSocket &, QByteArray &readBuffer)
213 QByteArray & Connection::WaitHeaderState::read(QAbstractSocket &socket, QByteArray &readBuffer)
216 State::read(socket, readBuffer);
218 Assert(readBuffer.size() == (
int)getDataLength());
220 for (
int i = 0; i < readBuffer.count(); i++) {
221 printf(
"%02X ", readBuffer.constData()[i]);
228 QByteArray & Connection::WaitPayloadState::read(QAbstractSocket &socket, QByteArray &readBuffer)
230 if (getDataLength() == 0) {
232 printf(
"Received Empty Message\r\n");
236 State::read(socket, readBuffer);
238 Assert(readBuffer.size() == (getDataLength() + Header::kHeaderLength));
240 for (
int i = Header::kHeaderLength; i < readBuffer.count(); i++) {
241 printf(
"%C", readBuffer.constData()[i]);
248 QByteArray & Connection::State::read(QAbstractSocket &socket, QByteArray &readBuffer)
252 if (socket.bytesAvailable() >= getDataLength()) {
253 Log() <<
"At State " << stateName() <<
"expcets " << getDataLength() <<
" but receives " << socket.bytesAvailable() <<
" Bytes" << std::endl;
254 char * expectedBytes =
new char[getDataLength()];
255 count = socket.read(expectedBytes, getDataLength());
256 if (count == getDataLength()) {
257 Log() <<
"At State " << stateName() <<
"read " << count <<
" bytes " << std::endl;
258 readBuffer.append(expectedBytes, getDataLength());
260 else if (count == -1) {
264 delete[] expectedBytes;
270 if (count <= 0)
throw count;
275 void Connection::onDisconnected(
void)
277 Log()<<
"onDisconnected()" << std::endl;
279 emit disconnected(
this);
282 void Connection::onSocketError(QAbstractSocket::SocketError socketError)
284 Log()<<
"onSocketError()" << socketError;
286 void Connection::onStateChanged(QAbstractSocket::SocketState socketState)
288 Log()<<
"onStateChanged()" << socketState;