RDK Documentation (Open Sourced RDK Components)
ip_out.cpp
1 /*
2  * If not stated otherwise in this file or this component's Licenses.txt file the
3  * following copyright and licenses apply:
4  *
5  * Copyright 2016 RDK Management
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18 */
19 #include "ip_out.h"
20 #include <signal.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <sys/un.h>
24 #include <stdio.h>
25 #include <errno.h>
26 
27 #define _GNU_SOURCE
28 #include <fcntl.h>
29 #include <unistd.h>
30 #include "safec_lib.h"
31 
32 using namespace audiocapturemgr;
33 std::string SOCKNAME_PREFIX = "/tmp/acm_ip_out_";
34 static unsigned int ticker;
35 static const int PIPE_READ_FD = 0;
36 static const int PIPE_WRITE_FD = 1;
37 static const unsigned int MAX_CONNECTIONS = 1;
38 
39 static bool g_one_time_init_complete = false;
40 
41 typedef enum
42 {
43  MSG_EXIT = 0,
44  MSG_MAX
45 }ip_out_message_t;
46 
47 static void * ip_out_thread_launcher(void * data)
48 {
49  ip_out_client * ptr = (ip_out_client *) data;
50  ptr->worker_thread();
51  return NULL;
52 }
53 
54 ip_out_client::ip_out_client(q_mgr *mgr) : audio_capture_client(mgr), m_listen_fd(-1), m_write_fd(-1), m_num_connections(0)
55 {
56  INFO("Enter\n")
57  if(!g_one_time_init_complete)
58  {
59  /*SIGPIPE must be ignored or process will exit when client closes connection*/
60  struct sigaction sig_settings;
61  sig_settings.sa_handler = SIG_IGN;
62  sigemptyset(&sig_settings.sa_mask); //CID:80675 Intialize the uninit
63  sig_settings.sa_flags = 0; //CID:80675 Intialize the uninit
64  sigaction(SIGPIPE, &sig_settings, NULL);
65  m_control_pipe[PIPE_READ_FD] = 0; //CID:90206 - Initialize m_control_pipe
66  m_control_pipe[PIPE_WRITE_FD] = 0;
67  g_one_time_init_complete = true;
68  }
69  REPORT_IF_UNEQUAL(0, pipe2(m_control_pipe, O_NONBLOCK));
70  open_output();
71 }
72 
73 ip_out_client::~ip_out_client()
74 {
75  INFO("Enter\n")
76  close_output();
77  close(m_control_pipe[PIPE_WRITE_FD]);
78  close(m_control_pipe[PIPE_READ_FD]);
79 }
80 
81 int ip_out_client::data_callback(audio_buffer *buf)
82 {
83  lock();
84  if(0 < m_write_fd)
85  {
86  int ret = write(m_write_fd, buf->m_start_ptr, buf->m_size);
87  if(0 > ret)
88  {
89  WARN("Write error! Closing socket. errno: 0x%x\n", errno);
90  perror("ip_out_client::data_callback() ");
91  close(m_write_fd);
92  m_write_fd = -1;
93  m_num_connections--;
94  //audio_capture_client::stop();
95  }
96  else if(ret != buf->m_size)
97  {
98  WARN("Incomplete buffer write!\n");
99  }
100  }
101  unlock();
102  release_buffer(buf);
103  return 0; //CID:88863 ; Missing Return
104 }
105 
106 std::string ip_out_client::get_data_path()
107 {
108  return m_data_path;
109 }
110 
111 std::string ip_out_client::open_output()
112 {
113  lock();
114  if(!m_data_path.empty())
115  {
116  WARN("Already open.\n");
117  unlock();
118  return m_data_path;
119  }
120 
121  /*Open new UNIX socket to transfer data*/
122  m_listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); //TODO: Does it really need to be non-blocking?
123  if (m_listen_fd < 0) {
124  ERROR("Could not open socket.\n");
125  unlock();
126  return m_data_path;
127  }
128  DEBUG("Socket created.\n");
129 
130  unsigned int num_retries = 6;
131  errno_t rc = -1;
132  while(num_retries)
133  {
134  num_retries--;
135  std::string sockpath = SOCKNAME_PREFIX + get_suffix(ticker++);
136 
137  struct sockaddr_un bind_path;
138  bind_path.sun_family = AF_UNIX;
139  rc = strcpy_s(bind_path.sun_path, sizeof(bind_path.sun_path), sockpath.c_str());
140  if(rc != EOK)
141  {
142  ERR_CHK(rc);
143  }
144 
145  INFO("Binding to path %s\n", bind_path.sun_path);
146  int ret = bind(m_listen_fd, (const struct sockaddr *) &bind_path, sizeof(bind_path));
147  if(-1 == ret)
148  {
149  if(EADDRINUSE == errno)
150  {
151  WARN("Retrying as the path is already in use.\n");
152  continue;
153  }
154  ERROR("Failed to bind to path. Error is %d\n", errno);
155  perror("bind error");
156  close(m_listen_fd);
157  m_listen_fd = -1;
158  break;
159  }
160  else
161  {
162  INFO("Bound successfully to path.\n");
163  m_data_path = sockpath;
164  REPORT_IF_UNEQUAL(0, listen(m_listen_fd, 3));
165  REPORT_IF_UNEQUAL(0, pthread_create(&m_thread, NULL, ip_out_thread_launcher, (void *) this));
166  break;
167  }
168  }
169 
170  unlock();
171  return m_data_path;
172 
173 }
174 
175 void ip_out_client::close_output()
176 {
177  INFO("Enter\n");
178  lock();
179 
180  /*Shut down worker thread that listens to incoming connections.*/
181  int message = MSG_EXIT;
182  int ret = write(m_control_pipe[PIPE_WRITE_FD], &message, sizeof(message));
183  if(ret != sizeof(message))
184  {
185  ERROR("Couldn't trigger worker thread shutdown.\n");
186  }
187  else
188  {
189  INFO("Waiting for worker thread to join.\n");
190  REPORT_IF_UNEQUAL(0, pthread_join(m_thread, NULL));
191  INFO("Worker thread has joined.\n");
192  }
193  if(!m_data_path.empty())
194  {
195  if(0 < m_write_fd)
196  {
197  close(m_write_fd);
198  m_write_fd = -1;
199  m_num_connections--;
200  INFO("Closed write fd. Total active connections now is %d\n", m_num_connections);
201  }
202  INFO("Removing named socket %s.\n", m_data_path.c_str());
203  unlink(m_data_path.c_str());
204  m_data_path.clear();
205  }
206  unlock();
207  INFO("Exit\n");
208 }
209 
210 void ip_out_client::process_new_connection()
211 {
212  INFO("Enter\n");
213  lock();
214  if(0 < m_write_fd)
215  {
216  WARN("Trashing old write socket in favour of new one.\n");
217  close(m_write_fd);
218  m_write_fd = -1;
219  m_num_connections--;
220  }
221 
222  DEBUG("Setting up a new connection.\n");
223  struct sockaddr_un incoming_addr;
224  int addrlen = sizeof(incoming_addr);
225  m_write_fd = accept(m_listen_fd, (sockaddr *) &incoming_addr, (socklen_t *)&addrlen);
226 
227  if(0 > m_write_fd)
228  {
229  ERROR("Error accepting connection.\n");
230  }
231  else
232  {
233  m_num_connections++;
234  INFO("Connected to new client.\n");
235 #if 0
236  if(!m_is_enabled)
237  {
238  INFO("Starting data delivery.\n");
239  audio_capture_client::start();
240  }
241 #endif
242  }
243  unlock();
244  INFO("Exit\n");
245  return;
246 }
247 
248 void ip_out_client::worker_thread()
249 {
250  INFO("Enter\n");
251  int control_fd = m_control_pipe[PIPE_READ_FD];
252  int max_fd = (m_listen_fd > control_fd ? m_listen_fd : control_fd);
253 
254 
255  bool check_fds = true;
256 
257  while(check_fds)
258  {
259  fd_set poll_fd_set;
260  FD_ZERO(&poll_fd_set);
261  FD_SET(m_listen_fd, &poll_fd_set);
262  FD_SET(control_fd, &poll_fd_set);
263 
264  int ret = select((max_fd + 1), &poll_fd_set, NULL, NULL, NULL);
265  DEBUG("Unblocking now. ret is 0x%x\n", ret);
266  if(0 == ret)
267  {
268  ERROR("select() returned 0.\n");
269  }
270  else if(0 < ret)
271  {
272  //Some activity was detected. Process event further.
273  if(0 != FD_ISSET(control_fd, &poll_fd_set))
274  {
275  INFO("Exiting monitor thread.\n");
276  break;
277  }
278  if(0 != FD_ISSET(m_listen_fd, &poll_fd_set))
279  {
280  process_new_connection();
281  }
282  }
283  else
284  {
285  ERROR("Error polling monitor FD!\n");
286  break;
287  }
288  }
289 
290  INFO("Exit\n");
291 }
audio_capture_client
Definition: audio_capture_manager.h:219
audio_buffer
Definition: audio_buffer.h:25
q_mgr
Definition: audio_capture_manager.h:70
ip_out_client
Definition: ip_out.h:28