RDK Documentation (Open Sourced RDK Components)
PacketSender.cpp
1 /*
2  * If not stated otherwise in this file or this component's license file the
3  * following copyright and licenses apply:
4  *
5  * Copyright 2020 RDK Management
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18 */
19 
20 #include <chrono>
21 
22 #include "SubtecPacket.hpp"
23 #include "PacketSender.hpp"
24 
25 #define MAX_SNDBUF_SIZE (8*1024*1024)
26 
27 void runWorkerTask(void *ctx)
28 {
29  try {
30  PacketSender *pkt = reinterpret_cast<PacketSender*>(ctx);
31  pkt->senderTask();
32  }
33  catch (const std::exception& e) {
34  AAMPLOG_WARN("PacketSender: Error in run %s", e.what());
35  }
36 }
37 
38 PacketSender *PacketSender::Instance()
39 {
40  static PacketSender instance;
41  return &instance;
42 }
43 
44 PacketSender::~PacketSender()
45 {
46  PacketSender::Close();
47 }
48 
49 void PacketSender::Close()
50 {
51  closeSenderTask();
52  if (mSubtecSocketHandle)
53  ::close(mSubtecSocketHandle);
54  mSubtecSocketHandle = 0;
55 }
56 
57 void PacketSender::Flush()
58 {
59  flushPacketQueue();
60 }
61 
62 bool PacketSender::Init()
63 {
64  return Init(SOCKET_PATH);
65 }
66 
67 #ifdef AAMP_SIMULATOR_BUILD
68 // in simulator build, create a socket to receive and dump messages that would
69 // otherwise go to subtec
70 #include <pthread.h>
71 static struct SubtecSimulatorState
72 {
73  bool started;
74  pthread_t threadId;
75  int sockfd;
76 } mSubtecSimulatorState;
77 
78 static bool read32(const unsigned char *ptr, size_t len, std::uint32_t &ret32)
79 {
80  bool ret = false;
81  //Load packet header
82  if (len >= sizeof(std::uint32_t))
83  {
84  const std::uint32_t byte0 = static_cast<const uint32_t>(ptr[0]) & 0xFF;
85  const std::uint32_t byte1 = static_cast<const uint32_t>(ptr[1]) & 0xFF;
86  const std::uint32_t byte2 = static_cast<const uint32_t>(ptr[2]) & 0xFF;
87  const std::uint32_t byte3 = static_cast<const uint32_t>(ptr[3]) & 0xFF;
88  ret32 = byte0 | (byte1 << 8) | (byte2 << 16) | (byte3 << 24);
89  ret = true;
90  }
91 
92  return ret;
93 }
94 
95 static void DumpPacket(const unsigned char *ptr, size_t len)
96 {
97  //Get type
98  std::uint32_t type;
99  if (read32(ptr, len, type))
100  {
101  AAMPLOG_INFO("Type:%s:%d", Packet::getTypeString(type).c_str(), type);
102  ptr += 4;
103  len -= 4;
104  }
105  else
106  {
107  AAMPLOG_ERR("Packet read failed on type - returning");
108  return;
109  }
110  //Get Packet counter
111  std::uint32_t counter;
112  if (read32(ptr, len, counter))
113  {
114  AAMPLOG_INFO("Counter:%d", counter);
115  ptr += 4;
116  len -= 4;
117  }
118  else
119  {
120  AAMPLOG_ERR("Packet read failed on type - returning");
121  return;
122  }
123  //Get size
124  std::uint32_t size;
125  if (read32(ptr, len, size))
126  {
127  AAMPLOG_INFO("Packet size:%d", size);
128  ptr += 4;
129  len -= 4;
130  }
131  else
132  {
133  AAMPLOG_ERR("Packet read failed on type - returning");
134  return;
135  }
136  if (len > 0)
137  {
138  AAMPLOG_WARN("Packet data:");
139  DumpBlob(ptr, len);
140  }
141 }
142 
143 static void *SubtecSimulatorThread( void *param )
144 {
145  struct SubtecSimulatorState *state = (SubtecSimulatorState *)param;
146  struct sockaddr cliaddr;
147  socklen_t sockLen = sizeof(cliaddr);
148  size_t maxBuf = 8*1024; // big enough?
149  unsigned char *buffer = (unsigned char *)malloc(maxBuf);
150  if( buffer )
151  {
152  AAMPLOG_WARN( "SubtecSimulatorThread - listening for packets" );
153  for(;;)
154  {
155  int numBytes = recvfrom( state->sockfd, (void *)buffer, maxBuf, MSG_WAITALL, (struct sockaddr *) &cliaddr, &sockLen);
156  AAMPLOG_INFO( "***SubtecSimulatorThread:\n" );
157  DumpPacket( buffer, numBytes );
158  }
159  free( buffer );
160  }
161  close( state->sockfd );
162  return 0;
163 }
164 
165 static bool PrepareSubtecSimulator( const char *name )
166 {
167  struct SubtecSimulatorState *state = &mSubtecSimulatorState;
168  if( !state->started )
169  { // already started - ok
170  unlink( name ); // close if left over from previous session to avoid bind failure
171  state->sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
172  if( state->sockfd>=0 )
173  {
174  struct sockaddr_un serverAddr;
175  memset(&serverAddr, 0, sizeof(serverAddr));
176  serverAddr.sun_family = AF_UNIX;
177  strcpy(serverAddr.sun_path, name );
178  socklen_t len = sizeof(serverAddr);
179  if( bind( state->sockfd, (struct sockaddr*)&serverAddr, len ) == 0 )
180  {
181  state->started = true;
182  pthread_create(&state->threadId, NULL, &SubtecSimulatorThread, (void *)state);
183  }
184  else
185  {
186  AAMPLOG_ERR( "SubtecSimulatorThread bind() error: %d", errno );
187  }
188  }
189  }
190  return state->started;
191 }
192 #endif // AAMP_SIMULATOR_BUILD
193 
194 bool PacketSender::Init(const char *socket_path)
195 {
196  bool ret = true;
197  std::unique_lock<std::mutex> lock(mStartMutex);
198 
199  AAMPLOG_INFO("PacketSender::Init with %s", socket_path);
200 
201 
202 #ifdef AAMP_SIMULATOR_BUILD
203  ret = PrepareSubtecSimulator(socket_path);
204 #endif
205 
206  if (!running)
207  {
208  ret = initSocket(socket_path) && initSenderTask();
209  if (!ret) {
210  AAMPLOG_WARN("SenderTask failed to init");
211  }
212  else
213  AAMPLOG_WARN("senderTask started");
214  }
215  else
216  AAMPLOG_WARN("PacketSender::Init already running");
217 
218  return ret;
219 }
220 
221 void PacketSender::SendPacket(PacketPtr && packet)
222 {
223  std::unique_lock<std::mutex> lock(mPktMutex);
224  uint32_t type = packet->getType();
225  std::string typeString = Packet::getTypeString(type);
226  AAMPLOG_TRACE("PacketSender: queue size %lu type %s:%d counter:%d",
227  mPacketQueue.size(), typeString.c_str(), type, packet->getCounter());
228 
229  mPacketQueue.push(std::move(packet));
230  mCv.notify_all();
231 }
232 
233 void PacketSender::senderTask()
234 {
235  std::unique_lock<std::mutex> lock(mPktMutex);
236  do {
237  running = true;
238  mCv.wait(lock);
239  while (!mPacketQueue.empty())
240  {
241  sendPacket(std::move(mPacketQueue.front()));
242  mPacketQueue.pop();
243  AAMPLOG_TRACE("PacketSender: queue size %lu", mPacketQueue.size());
244  }
245  } while(running);
246 }
247 
248 bool PacketSender::IsRunning()
249 {
250  std::unique_lock<std::mutex> lock(mPktMutex);
251  return running.load();
252 }
253 
254 void PacketSender::flushPacketQueue()
255 {
256  std::queue<PacketPtr> empty;
257  std::unique_lock<std::mutex> lock(mPktMutex);
258 
259  empty.swap(mPacketQueue);
260 }
261 
262 void PacketSender::sendPacket(PacketPtr && pkt)
263 {
264  auto buffer = pkt->getBytes();
265  size_t size = static_cast<ssize_t>(buffer.size());
266  if (size > mSockBufSize && size < MAX_SNDBUF_SIZE)
267  {
268  int newSize = buffer.size();
269  if (::setsockopt(mSubtecSocketHandle, SOL_SOCKET, SO_SNDBUF, &newSize, sizeof(newSize)) == -1)
270  {
271  AAMPLOG_WARN("::setsockopt() SO_SNDBUF failed\n");
272  }
273  else
274  {
275  mSockBufSize = newSize;
276  AAMPLOG_INFO("new socket buffer size %d\n", mSockBufSize);
277  }
278  }
279  auto written = ::write(mSubtecSocketHandle, &buffer[0], size);
280  AAMPLOG_TRACE("PacketSender: Written %ld bytes with size %ld", written, size);
281 }
282 
283 bool PacketSender::initSenderTask()
284 {
285  try {
286  mSendThread = std::thread(runWorkerTask, this);
287  }
288  catch (const std::exception& e) {
289  AAMPLOG_WARN("PacketSender: Error in initSenderTask: %s", e.what());
290  return false;
291  }
292 
293  return true;
294 }
295 
296 void PacketSender::closeSenderTask()
297 {
298  if (running)
299  {
300  running = false;
301  mCv.notify_all();
302  if (mSendThread.joinable())
303  {
304  mSendThread.join();
305  }
306  }
307 
308 }
309 
310 bool PacketSender::initSocket(const char *socket_path)
311 {
312  mSubtecSocketHandle = ::socket(AF_UNIX, SOCK_DGRAM, 0);
313  if (mSubtecSocketHandle == -1)
314  {
315  AAMPLOG_WARN("PacketSender: Unable to init socket");
316  return false;
317  }
318 
319  struct sockaddr_un addr;
320 
321  (void) std::memset(&addr, 0, sizeof(addr));
322  addr.sun_family = AF_UNIX;
323  (void) std::strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
324  addr.sun_path[sizeof(addr.sun_path) - 1] = 0;
325 
326  socklen_t optlen = sizeof(mSockBufSize);
327  ::getsockopt(mSubtecSocketHandle, SOL_SOCKET, SO_SNDBUF, &mSockBufSize, &optlen);
328  mSockBufSize = mSockBufSize / 2; //kernel returns twice the value of actual buffer
329  AAMPLOG_INFO("SockBuffer size : %d\n", mSockBufSize);
330 
331  if (::connect(mSubtecSocketHandle, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) != 0)
332  {
333  ::close(mSubtecSocketHandle);
334  AAMPLOG_WARN("PacketSender: cannot connect to address \'%s\'", socket_path);
335  return false;
336  }
337  AAMPLOG_INFO("PacketSender: Initialised with socket_path %s", socket_path);
338 
339  return true;
340 }
DumpBlob
void DumpBlob(const unsigned char *ptr, size_t len)
Compactly log blobs of binary data.
Definition: aamplogging.cpp:533
AAMPLOG_TRACE
#define AAMPLOG_TRACE(FORMAT,...)
AAMP logging defines, this can be enabled through setLogLevel() as per the need.
Definition: AampLogManager.h:83
PacketSender
Definition: PacketSender.hpp:53