19 #include "socket_adaptor.h"
20 #include "basic_types.h"
22 #include <sys/types.h>
23 #include <sys/socket.h>
27 #include "safec_lib.h"
33 static const int PIPE_READ_FD = 0;
34 static const int PIPE_WRITE_FD = 1;
35 static const unsigned int MAX_CONNECTIONS = 1;
37 static bool g_one_time_init_complete =
false;
39 socket_adaptor::socket_adaptor() : m_listen_fd(-1), m_write_fd(-1), m_num_connections(0), m_callback(nullptr)
42 if(!g_one_time_init_complete)
45 struct sigaction sig_settings;
46 sig_settings.sa_handler = SIG_IGN;
47 sig_settings.sa_flags = 0;
48 sigemptyset(&sig_settings.sa_mask);
49 sigaction(SIGPIPE, &sig_settings, NULL);
50 m_callback_data = nullptr ;
51 g_one_time_init_complete =
true;
53 REPORT_IF_UNEQUAL(0, pipe2(m_control_pipe, O_NONBLOCK));
56 socket_adaptor::~socket_adaptor()
60 close(m_control_pipe[PIPE_WRITE_FD]);
61 close(m_control_pipe[PIPE_READ_FD]);
66 int ret = write(m_write_fd, buffer, size);
69 WARN(
"Write error! Closing socket. errno: 0x%x\n", errno);
70 perror(
"socket_adaptor::data_callback() ");
83 WARN(
"Incomplete buffer write!\n");
99 m_listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
102 DEBUG(
"Socket created.\n");
104 for(
int retries = 0; retries < 2; retries++)
106 struct sockaddr_un bind_path;
107 bind_path.sun_family = AF_UNIX;
108 rc = strcpy_s(bind_path.sun_path,
sizeof(bind_path.sun_path), m_path.c_str());
113 INFO(
"Binding to path %s\n", bind_path.sun_path);
114 ret = bind(m_listen_fd, (
const struct sockaddr *) &bind_path,
sizeof(bind_path));
117 if(EADDRINUSE == errno)
119 WARN(
"path is already in use. Using brute force.\n");
120 unlink(m_path.c_str());
124 ERROR(
"Failed to bind to path. Error is %d\n", errno);
125 perror(
"bind error");
133 INFO(
"Bound successfully to path.\n");
134 REPORT_IF_UNEQUAL(0, listen(m_listen_fd, 3));
135 m_thread = std::thread(&socket_adaptor::worker_thread,
this);
142 ERROR(
"Could not open socket.\n");
148 void socket_adaptor::stop_listening()
154 if(m_thread.joinable())
157 int ret = write(m_control_pipe[PIPE_WRITE_FD], &message,
sizeof(message));
158 if(ret !=
sizeof(message))
160 ERROR(
"Couldn't trigger worker thread shutdown.\n");
164 INFO(
"Waiting for worker thread to join.\n");
168 INFO(
"Worker thread has joined.\n");
178 INFO(
"Closed write fd. Total active connections now is %d\n", m_num_connections);
180 INFO(
"Removing named socket %s.\n", m_path.c_str());
181 unlink(m_path.c_str());
188 void socket_adaptor::process_new_connection()
194 WARN(
"Trashing old write socket in favour of new one.\n");
200 DEBUG(
"Setting up a new connection.\n");
201 struct sockaddr_un incoming_addr;
202 int addrlen =
sizeof(incoming_addr);
203 m_write_fd = accept(m_listen_fd, (sockaddr *) &incoming_addr, (socklen_t *)&addrlen);
207 ERROR(
"Error accepting connection.\n");
212 INFO(
"Connected to new client.\n");
218 m_callback(m_callback_data);
223 void socket_adaptor::worker_thread()
226 int control_fd = m_control_pipe[PIPE_READ_FD];
227 int max_fd = (m_listen_fd > control_fd ? m_listen_fd : control_fd);
230 bool check_fds =
true;
235 FD_ZERO(&poll_fd_set);
236 FD_SET(m_listen_fd, &poll_fd_set);
237 FD_SET(control_fd, &poll_fd_set);
239 int ret = select((max_fd + 1), &poll_fd_set, NULL, NULL, NULL);
240 DEBUG(
"Unblocking now. ret is 0x%x\n", ret);
243 ERROR(
"select() returned 0.\n");
248 if(0 != FD_ISSET(control_fd, &poll_fd_set))
250 control_code_t message;
251 REPORT_IF_UNEQUAL((
sizeof(message)), read(m_control_pipe[PIPE_READ_FD], &message,
sizeof(message)));
254 INFO(
"Exiting monitor thread.\n");
259 process_control_message(message);
262 if(0 != FD_ISSET(m_listen_fd, &poll_fd_set))
264 process_new_connection();
269 ERROR(
"Error polling monitor FD!\n");
285 INFO(
"Terminated current connection.\n");
291 void socket_adaptor::lock()
296 void socket_adaptor::unlock()
301 void socket_adaptor::process_control_message(control_code_t message)
303 if(NEW_CALLBACK == message)
306 auto num_connections = m_num_connections;
307 auto callback = m_callback;
308 auto callback_data = m_callback_data;
311 if(callback && (0 != num_connections))
313 callback(callback_data);
322 unsigned int num_conn = m_num_connections;
324 return m_num_connections;
327 void socket_adaptor::register_data_ready_callback(socket_adaptor_cb_t callback,
void *data)
330 m_callback = callback;
331 m_callback_data = data;
334 if(
nullptr != callback)
336 control_code_t message = NEW_CALLBACK;
337 int ret = write(m_control_pipe[PIPE_WRITE_FD], &message,
sizeof(message));