22 #include "SubtecPacket.hpp"
23 #include "PacketSender.hpp"
25 #define MAX_SNDBUF_SIZE (8*1024*1024)
27 void runWorkerTask(
void *ctx)
33 catch (
const std::exception& e) {
34 AAMPLOG_WARN(
"PacketSender: Error in run %s", e.what());
44 PacketSender::~PacketSender()
46 PacketSender::Close();
49 void PacketSender::Close()
52 if (mSubtecSocketHandle)
53 ::close(mSubtecSocketHandle);
54 mSubtecSocketHandle = 0;
57 void PacketSender::Flush()
62 bool PacketSender::Init()
64 return Init(SOCKET_PATH);
67 #ifdef AAMP_SIMULATOR_BUILD
71 static struct SubtecSimulatorState
76 } mSubtecSimulatorState;
78 static bool read32(
const unsigned char *ptr,
size_t len, std::uint32_t &ret32)
82 if (len >=
sizeof(std::uint32_t))
84 const std::uint32_t byte0 =
static_cast<const uint32_t
>(ptr[0]) & 0xFF;
85 const std::uint32_t byte1 =
static_cast<const uint32_t
>(ptr[1]) & 0xFF;
86 const std::uint32_t byte2 =
static_cast<const uint32_t
>(ptr[2]) & 0xFF;
87 const std::uint32_t byte3 =
static_cast<const uint32_t
>(ptr[3]) & 0xFF;
88 ret32 = byte0 | (byte1 << 8) | (byte2 << 16) | (byte3 << 24);
95 static void DumpPacket(
const unsigned char *ptr,
size_t len)
99 if (read32(ptr, len, type))
101 AAMPLOG_INFO(
"Type:%s:%d", Packet::getTypeString(type).c_str(), type);
107 AAMPLOG_ERR(
"Packet read failed on type - returning");
111 std::uint32_t counter;
112 if (read32(ptr, len, counter))
114 AAMPLOG_INFO(
"Counter:%d", counter);
120 AAMPLOG_ERR(
"Packet read failed on type - returning");
125 if (read32(ptr, len, size))
127 AAMPLOG_INFO(
"Packet size:%d", size);
133 AAMPLOG_ERR(
"Packet read failed on type - returning");
138 AAMPLOG_WARN(
"Packet data:");
143 static void *SubtecSimulatorThread(
void *param )
145 struct SubtecSimulatorState *state = (SubtecSimulatorState *)param;
146 struct sockaddr cliaddr;
147 socklen_t sockLen =
sizeof(cliaddr);
148 size_t maxBuf = 8*1024;
149 unsigned char *buffer = (
unsigned char *)malloc(maxBuf);
152 AAMPLOG_WARN(
"SubtecSimulatorThread - listening for packets" );
155 int numBytes = recvfrom( state->sockfd, (
void *)buffer, maxBuf, MSG_WAITALL, (
struct sockaddr *) &cliaddr, &sockLen);
156 AAMPLOG_INFO(
"***SubtecSimulatorThread:\n" );
157 DumpPacket( buffer, numBytes );
161 close( state->sockfd );
165 static bool PrepareSubtecSimulator(
const char *name )
167 struct SubtecSimulatorState *state = &mSubtecSimulatorState;
168 if( !state->started )
171 state->sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
172 if( state->sockfd>=0 )
174 struct sockaddr_un serverAddr;
175 memset(&serverAddr, 0,
sizeof(serverAddr));
176 serverAddr.sun_family = AF_UNIX;
177 strcpy(serverAddr.sun_path, name );
178 socklen_t len =
sizeof(serverAddr);
179 if( bind( state->sockfd, (
struct sockaddr*)&serverAddr, len ) == 0 )
181 state->started =
true;
182 pthread_create(&state->threadId, NULL, &SubtecSimulatorThread, (
void *)state);
186 AAMPLOG_ERR(
"SubtecSimulatorThread bind() error: %d", errno );
190 return state->started;
192 #endif // AAMP_SIMULATOR_BUILD
194 bool PacketSender::Init(
const char *socket_path)
197 std::unique_lock<std::mutex> lock(mStartMutex);
199 AAMPLOG_INFO(
"PacketSender::Init with %s", socket_path);
202 #ifdef AAMP_SIMULATOR_BUILD
203 ret = PrepareSubtecSimulator(socket_path);
208 ret = initSocket(socket_path) && initSenderTask();
210 AAMPLOG_WARN(
"SenderTask failed to init");
213 AAMPLOG_WARN(
"senderTask started");
216 AAMPLOG_WARN(
"PacketSender::Init already running");
221 void PacketSender::SendPacket(PacketPtr && packet)
223 std::unique_lock<std::mutex> lock(mPktMutex);
224 uint32_t type = packet->getType();
225 std::string typeString = Packet::getTypeString(type);
226 AAMPLOG_TRACE(
"PacketSender: queue size %lu type %s:%d counter:%d",
227 mPacketQueue.size(), typeString.c_str(), type, packet->getCounter());
229 mPacketQueue.push(std::move(packet));
233 void PacketSender::senderTask()
235 std::unique_lock<std::mutex> lock(mPktMutex);
239 while (!mPacketQueue.empty())
241 sendPacket(std::move(mPacketQueue.front()));
243 AAMPLOG_TRACE(
"PacketSender: queue size %lu", mPacketQueue.size());
248 bool PacketSender::IsRunning()
250 std::unique_lock<std::mutex> lock(mPktMutex);
251 return running.load();
254 void PacketSender::flushPacketQueue()
256 std::queue<PacketPtr> empty;
257 std::unique_lock<std::mutex> lock(mPktMutex);
259 empty.swap(mPacketQueue);
262 void PacketSender::sendPacket(PacketPtr && pkt)
264 auto buffer = pkt->getBytes();
265 size_t size =
static_cast<ssize_t
>(buffer.size());
266 if (size > mSockBufSize && size < MAX_SNDBUF_SIZE)
268 int newSize = buffer.size();
269 if (::setsockopt(mSubtecSocketHandle, SOL_SOCKET, SO_SNDBUF, &newSize,
sizeof(newSize)) == -1)
271 AAMPLOG_WARN(
"::setsockopt() SO_SNDBUF failed\n");
275 mSockBufSize = newSize;
276 AAMPLOG_INFO(
"new socket buffer size %d\n", mSockBufSize);
279 auto written = ::write(mSubtecSocketHandle, &buffer[0], size);
280 AAMPLOG_TRACE(
"PacketSender: Written %ld bytes with size %ld", written, size);
283 bool PacketSender::initSenderTask()
286 mSendThread = std::thread(runWorkerTask,
this);
288 catch (
const std::exception& e) {
289 AAMPLOG_WARN(
"PacketSender: Error in initSenderTask: %s", e.what());
296 void PacketSender::closeSenderTask()
302 if (mSendThread.joinable())
310 bool PacketSender::initSocket(
const char *socket_path)
312 mSubtecSocketHandle = ::socket(AF_UNIX, SOCK_DGRAM, 0);
313 if (mSubtecSocketHandle == -1)
315 AAMPLOG_WARN(
"PacketSender: Unable to init socket");
319 struct sockaddr_un addr;
321 (void) std::memset(&addr, 0,
sizeof(addr));
322 addr.sun_family = AF_UNIX;
323 (void) std::strncpy(addr.sun_path, socket_path,
sizeof(addr.sun_path));
324 addr.sun_path[
sizeof(addr.sun_path) - 1] = 0;
326 socklen_t optlen =
sizeof(mSockBufSize);
327 ::getsockopt(mSubtecSocketHandle, SOL_SOCKET, SO_SNDBUF, &mSockBufSize, &optlen);
328 mSockBufSize = mSockBufSize / 2;
329 AAMPLOG_INFO(
"SockBuffer size : %d\n", mSockBufSize);
331 if (::connect(mSubtecSocketHandle,
reinterpret_cast<struct sockaddr*
>(&addr),
sizeof(addr)) != 0)
333 ::close(mSubtecSocketHandle);
334 AAMPLOG_WARN(
"PacketSender: cannot connect to address \'%s\'", socket_path);
337 AAMPLOG_INFO(
"PacketSender: Initialised with socket_path %s", socket_path);