RDK Documentation (Open Sourced RDK Components)
socket_adaptor.cpp
1 /*
2  * If not stated otherwise in this file or this component's Licenses.txt file the
3  * following copyright and licenses apply:
4  *
5  * Copyright 2016 RDK Management
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18 */
19 #include "socket_adaptor.h"
20 #include "basic_types.h"
21 #include <signal.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <sys/un.h>
25 #include <stdio.h>
26 #include <errno.h>
27 #include "safec_lib.h"
28 
29 #define _GNU_SOURCE
30 #include <fcntl.h>
31 #include <unistd.h>
32 
33 static const int PIPE_READ_FD = 0;
34 static const int PIPE_WRITE_FD = 1;
35 static const unsigned int MAX_CONNECTIONS = 1;
36 
37 static bool g_one_time_init_complete = false;
38 
39 socket_adaptor::socket_adaptor() : m_listen_fd(-1), m_write_fd(-1), m_num_connections(0), m_callback(nullptr)
40 {
41  INFO("Enter\n")
42  if(!g_one_time_init_complete)
43  {
44  /*SIGPIPE must be ignored or process will exit when client closes connection*/
45  struct sigaction sig_settings;
46  sig_settings.sa_handler = SIG_IGN;
47  sig_settings.sa_flags = 0; //CID:81611 - Initialize uninit
48  sigemptyset(&sig_settings.sa_mask); //CID:81611 - Initialize uninit
49  sigaction(SIGPIPE, &sig_settings, NULL);
50  m_callback_data = nullptr ; //CID:80575 - Intialize a nullptr
51  g_one_time_init_complete = true;
52  }
53  REPORT_IF_UNEQUAL(0, pipe2(m_control_pipe, O_NONBLOCK));
54 }
55 
56 socket_adaptor::~socket_adaptor()
57 {
58  INFO("Enter\n")
59  stop_listening();
60  close(m_control_pipe[PIPE_WRITE_FD]);
61  close(m_control_pipe[PIPE_READ_FD]);
62 }
63 
64 int socket_adaptor::write_data(const char * buffer, const unsigned int size)
65 {
66  int ret = write(m_write_fd, buffer, size);
67  if(0 > ret)
68  {
69  WARN("Write error! Closing socket. errno: 0x%x\n", errno);
70  perror("socket_adaptor::data_callback() ");
71 
72  close(m_write_fd);
73  lock();
74  if(0 < m_write_fd)
75  {
76  m_write_fd = -1;
77  m_num_connections--;
78  }
79  unlock();
80  }
81  else if(ret != size)
82  {
83  WARN("Incomplete buffer write!\n");
84  }
85  return ret;
86 }
87 
89 {
90  return m_path;
91 }
92 
93 int socket_adaptor::start_listening(const std::string &path)
94 {
95  int ret = 0;
96  m_path = path;
97  errno_t rc = -1;
98  /*Open new UNIX socket to transfer data*/
99  m_listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); //TODO: Does it really need to be non-blocking?
100  if(0 < m_listen_fd)
101  {
102  DEBUG("Socket created.\n");
103 
104  for(int retries = 0; retries < 2; retries++) //If first attempt fails due to path in use, unlink it and try again.
105  {
106  struct sockaddr_un bind_path;
107  bind_path.sun_family = AF_UNIX;
108  rc = strcpy_s(bind_path.sun_path, sizeof(bind_path.sun_path), m_path.c_str());
109  if(rc != EOK)
110  {
111  ERR_CHK(rc);
112  }
113  INFO("Binding to path %s\n", bind_path.sun_path);
114  ret = bind(m_listen_fd, (const struct sockaddr *) &bind_path, sizeof(bind_path));
115  if(-1 == ret)
116  {
117  if(EADDRINUSE == errno)
118  {
119  WARN("path is already in use. Using brute force.\n");
120  unlink(m_path.c_str());
121  }
122  else
123  {
124  ERROR("Failed to bind to path. Error is %d\n", errno);
125  perror("bind error");
126  close(m_listen_fd);
127  m_listen_fd = -1;
128  break;
129  }
130  }
131  else
132  {
133  INFO("Bound successfully to path.\n");
134  REPORT_IF_UNEQUAL(0, listen(m_listen_fd, 3));
135  m_thread = std::thread(&socket_adaptor::worker_thread, this);
136  break;
137  }
138  }
139  }
140  else
141  {
142  ERROR("Could not open socket.\n");
143  ret = -1;
144  }
145  return ret;
146 }
147 
148 void socket_adaptor::stop_listening()
149 {
150  INFO("Enter\n");
151  lock();
152 
153  /*Shut down worker thread that listens to incoming connections.*/
154  if(m_thread.joinable())
155  {
156  int message = EXIT;
157  int ret = write(m_control_pipe[PIPE_WRITE_FD], &message, sizeof(message));
158  if(ret != sizeof(message))
159  {
160  ERROR("Couldn't trigger worker thread shutdown.\n");
161  }
162  else
163  {
164  INFO("Waiting for worker thread to join.\n");
165  unlock();
166  m_thread.join();
167  lock();
168  INFO("Worker thread has joined.\n");
169  }
170  }
171  if(!m_path.empty())
172  {
173  if(0 < m_write_fd)
174  {
175  close(m_write_fd);
176  m_write_fd = -1;
177  m_num_connections--;
178  INFO("Closed write fd. Total active connections now is %d\n", m_num_connections);
179  }
180  INFO("Removing named socket %s.\n", m_path.c_str());
181  unlink(m_path.c_str());
182  m_path.clear();
183  }
184  unlock();
185  INFO("Exit\n");
186 }
187 
188 void socket_adaptor::process_new_connection()
189 {
190  INFO("Enter\n");
191  lock();
192  if(0 < m_write_fd)
193  {
194  WARN("Trashing old write socket in favour of new one.\n");
195  close(m_write_fd);
196  m_write_fd = -1;
197  m_num_connections--;
198  }
199 
200  DEBUG("Setting up a new connection.\n");
201  struct sockaddr_un incoming_addr;
202  int addrlen = sizeof(incoming_addr);
203  m_write_fd = accept(m_listen_fd, (sockaddr *) &incoming_addr, (socklen_t *)&addrlen);
204 
205  if(0 > m_write_fd)
206  {
207  ERROR("Error accepting connection.\n");
208  }
209  else
210  {
211  m_num_connections++;
212  INFO("Connected to new client.\n");
213  }
214  unlock();
215 
216  if(m_callback)
217  {
218  m_callback(m_callback_data);
219  }
220  return;
221 }
222 
223 void socket_adaptor::worker_thread()
224 {
225  INFO("Enter\n");
226  int control_fd = m_control_pipe[PIPE_READ_FD];
227  int max_fd = (m_listen_fd > control_fd ? m_listen_fd : control_fd);
228 
229 
230  bool check_fds = true;
231 
232  while(check_fds)
233  {
234  fd_set poll_fd_set;
235  FD_ZERO(&poll_fd_set);
236  FD_SET(m_listen_fd, &poll_fd_set);
237  FD_SET(control_fd, &poll_fd_set);
238 
239  int ret = select((max_fd + 1), &poll_fd_set, NULL, NULL, NULL);
240  DEBUG("Unblocking now. ret is 0x%x\n", ret);
241  if(0 == ret)
242  {
243  ERROR("select() returned 0.\n");
244  }
245  else if(0 < ret)
246  {
247  //Some activity was detected. Process event further.
248  if(0 != FD_ISSET(control_fd, &poll_fd_set))
249  {
250  control_code_t message;
251  REPORT_IF_UNEQUAL((sizeof(message)), read(m_control_pipe[PIPE_READ_FD], &message, sizeof(message)));
252  if(EXIT == message)
253  {
254  INFO("Exiting monitor thread.\n");
255  break;
256  }
257  else
258  {
259  process_control_message(message);
260  }
261  }
262  if(0 != FD_ISSET(m_listen_fd, &poll_fd_set))
263  {
264  process_new_connection();
265  }
266  }
267  else
268  {
269  ERROR("Error polling monitor FD!\n");
270  break;
271  }
272  }
273 
274  INFO("Exit\n");
275 }
276 
278 {
279  lock();
280  if(0 < m_write_fd)
281  {
282  close(m_write_fd);
283  m_write_fd = -1;
284  m_num_connections--;
285  INFO("Terminated current connection.\n");
286  }
287  unlock();
288 }
289 
290 
291 void socket_adaptor::lock()
292 {
293  m_mutex.lock();
294 }
295 
296 void socket_adaptor::unlock()
297 {
298  m_mutex.unlock();
299 }
300 
301 void socket_adaptor::process_control_message(control_code_t message)
302 {
303  if(NEW_CALLBACK == message)
304  {
305  lock();
306  auto num_connections = m_num_connections;
307  auto callback = m_callback;
308  auto callback_data = m_callback_data;
309  unlock();
310 
311  if(callback && (0 != num_connections))
312  {
313  callback(callback_data);
314  }
315  }
316 }
317 
318 
320 {
321  lock();
322  unsigned int num_conn = m_num_connections;
323  unlock();
324  return m_num_connections;
325 }
326 
327 void socket_adaptor::register_data_ready_callback(socket_adaptor_cb_t callback, void *data)
328 {
329  lock();
330  m_callback = callback;
331  m_callback_data = data;
332  unlock();
333 
334  if(nullptr != callback)
335  {
336  control_code_t message = NEW_CALLBACK;
337  int ret = write(m_control_pipe[PIPE_WRITE_FD], &message, sizeof(message));
338  }
339 }
socket_adaptor::get_active_connections
unsigned int get_active_connections()
This api returns the number of active connections.
Definition: socket_adaptor.cpp:319
socket_adaptor::terminate_current_connection
void terminate_current_connection()
This api invokes close() to terminate the current connection.
Definition: socket_adaptor.cpp:277
socket_adaptor::get_path
std::string & get_path()
This api returns the data path.
Definition: socket_adaptor.cpp:88
socket_adaptor
Definition: socket_adaptor.h:33
socket_adaptor::start_listening
int start_listening(const std::string &path)
This function makes the audiocapturemgr listen for incoming unix domain connections to the given path...
Definition: socket_adaptor.cpp:93
socket_adaptor::write_data
int write_data(const char *buffer, const unsigned int size)
This api invokes unix write() to write data to the socket.
Definition: socket_adaptor.cpp:64