21 #include "rmh_monitor.h"
26 #define RMH_MONITOR_STATUS_DUMP_MIN 60
31 #define RMH_MONITOR_STATUS_PING_MIN 2
37 #define RMH_MONITOR_MIN_NETWORK_STABALIZE_SEC 10
51 bool RMHMonitor_Event_SelfNodeId(
RMHMonitor *app,
struct timeval *time,
const uint32_t nodeId) {
56 if ((app->linkStatus != RMH_LINK_STATUS_UP) ||
57 (app->netStatus.selfNodeId == nodeId && app->netStatus.nodes[app->netStatus.selfNodeId].joined)) {
63 if (ret != RMH_SUCCESS) {
68 if (ret != RMH_SUCCESS) {
69 RMH_PrintErr(
"RMH_Self_GetPreferredNCEnabled failed -- %s!\n",
RMH_ResultToString(ret));
73 if (ret != RMH_SUCCESS) {
74 RMH_PrintErr(
"RMH_Self_GetHighestSupportedMoCAVersion failed -- %s!\n",
RMH_ResultToString(ret));
77 RMH_PrintMsgT(time,
"Node:%02u %s MoCA:%s PNC:%s **Self Node\n", nodeId,
RMH_MoCAVersionToString(app->netStatus.nodes[nodeId].mocaVersion),
78 RMH_MacToString(app->netStatus.nodes[nodeId].mac, printBuff,
sizeof(printBuff)),
79 app->netStatus.nodes[nodeId].preferredNC ?
"TRUE" :
"FALSE");
81 app->netStatus.selfNodeId=nodeId;
82 app->netStatus.nodes[nodeId].joined=
true;
91 bool RMHMonitor_Event_JoinNode(
RMHMonitor *app,
struct timeval *time,
const uint32_t nodeId) {
96 if (app->linkStatus != RMH_LINK_STATUS_UP) {
100 if (app->netStatus.nodes[nodeId].joined) {
107 if (ret != RMH_SUCCESS) {
112 if (ret != RMH_SUCCESS) {
113 RMH_PrintErr(
"RMH_RemoteNode_GetPreferredNC failed -- %s!\n",
RMH_ResultToString(ret));
117 if (ret != RMH_SUCCESS) {
118 RMH_PrintErr(
"RMH_RemoteNode_GetHighestSupportedMoCAVersion failed -- %s!\n",
RMH_ResultToString(ret));
121 RMH_PrintMsgT(time,
"Node:%02u %s MoCA:%s PNC:%s\n", nodeId,
RMH_MoCAVersionToString(app->netStatus.nodes[nodeId].mocaVersion),
122 RMH_MacToString(app->netStatus.nodes[nodeId].mac, printBuff,
sizeof(printBuff)),
123 app->netStatus.nodes[nodeId].preferredNC ?
"TRUE" :
"FALSE");
125 app->netStatus.nodes[nodeId].joined=
true;
134 bool RMHMonitor_Event_DropNode(
RMHMonitor *app,
struct timeval *time,
const uint32_t nodeId) {
138 if (app->linkStatus != RMH_LINK_STATUS_UP) {
142 if (app->netStatus.nodes[nodeId].joined) {
143 RMH_PrintMsgT(time,
"Node:%02u %s MoCA:%s PNC:%s %s\n", nodeId,
RMH_MoCAVersionToString(app->netStatus.nodes[nodeId].mocaVersion),
144 RMH_MacToString(app->netStatus.nodes[nodeId].mac, printBuff,
sizeof(printBuff)),
145 app->netStatus.nodes[nodeId].preferredNC ?
"TRUE" :
"FALSE",
146 app->netStatus.selfNodeId == nodeId ?
"**Self node" :
"");
149 RMH_PrintWrn(
"Got 'drop' notification for node ID %u which is not on the network\n", nodeId);
152 app->netStatus.nodes[nodeId].joined=
false;
161 bool RMHMonitor_Event_NCChanged(
RMHMonitor *app,
struct timeval *time,
const uint32_t ncNodeId) {
166 if ((app->linkStatus != RMH_LINK_STATUS_UP) ||
167 (app->netStatus.ncNodeIdValid && app->netStatus.ncNodeId == ncNodeId)) {
171 if (app->netStatus.ncNodeIdValid) {
172 RMH_PrintMsgT(time,
"NC is now node:%02u %s [Previous:%02u %s]\n", ncNodeId,
RMH_MacToString(app->netStatus.nodes[ncNodeId].mac, newNC,
sizeof(newNC)),
173 app->netStatus.ncNodeId,
RMH_MacToString(app->netStatus.nodes[app->netStatus.ncNodeId].mac, oldNC,
sizeof(oldNC)));
176 RMH_PrintMsgT(time,
"NC is now node:%02u %s\n", ncNodeId,
RMH_MacToString(app->netStatus.nodes[ncNodeId].mac, newNC,
sizeof(newNC)));
178 app->netStatus.ncNodeId=ncNodeId;
179 app->netStatus.ncNodeIdValid=
true;
188 bool RMHMonitor_Event_NetworkMoCAVersionChanged(
RMHMonitor *app,
struct timeval *time,
const RMH_MoCAVersion mocaVersion) {
190 if ((app->linkStatus != RMH_LINK_STATUS_UP) ||
191 (app->netStatus.networkMoCAVerValid && app->netStatus.networkMoCAVer == mocaVersion)) {
195 if (app->netStatus.networkMoCAVerValid) {
203 app->netStatus.networkMoCAVer=mocaVersion;
204 app->netStatus.networkMoCAVerValid=
true;
213 bool RMHMonitor_Event_LinkStatusChanged(
RMHMonitor *app,
struct timeval *time,
const RMH_LinkStatus linkStatus) {
215 if (app->linkStatusValid && app->linkStatus == linkStatus) {
220 app->linkStatus=linkStatus;
221 app->linkStatusValid=
true;
223 if (app->linkStatus == RMH_LINK_STATUS_UP) {
228 RMH_MoCAVersion networkMoCAVer;
232 if (ret != RMH_SUCCESS) {
236 RMHMonitor_Event_SelfNodeId(app, time, nodeId);
240 if (ret != RMH_SUCCESS) {
241 RMH_PrintErr(
"RMH_Network_GetRemoteNodeIds failed -- %s!\n",
RMH_ResultToString(ret));
244 for (i = 0; i < RMH_MAX_MOCA_NODES; i++) {
245 if (nodeIds.nodePresent[i]) {
246 RMHMonitor_Event_JoinNode(app, time, i);
252 if (ret != RMH_SUCCESS) {
253 RMH_PrintErr(
"RMH_Network_GetRemoteNodeIds failed -- %s!\n",
RMH_ResultToString(ret));
256 RMHMonitor_Event_NCChanged(app, time, nodeId);
260 if (ret != RMH_SUCCESS) {
264 RMHMonitor_Event_NetworkMoCAVersionChanged(app, time, networkMoCAVer);
269 memset(&app->netStatus, 0,
sizeof(app->netStatus));
279 void RMHMonitor_Event_PrintPing(
RMHMonitor *app,
struct timeval *time) {
282 RMH_PrintMsgT(time,
"Link:DISABLED\n");
284 else if (app->linkStatus != RMH_LINK_STATUS_UP) {
285 RMH_PrintMsgT(time,
"Link:DOWN\n");
288 uint32_t numNodes, ncNode, selfNode, uptime;
289 int32_t numNodesPrint, ncNodePrint, selfNodePrint, uptimePrint;
294 RMH_PrintMsgT(time,
"Link:UP [%02uh:%02um:%02us] Nodes:%d Self:%d NC:%d\n", uptimePrint/3600, (uptimePrint%3600)/60, uptimePrint%60, numNodesPrint, selfNodePrint, ncNodePrint);
303 void RMHMonitor_Event_PrintFull(
RMHMonitor *app,
struct timeval *time) {
304 bool mocaEnabled =
false;
316 RMH_PrintErr(
"Failed to set the log level on temporary status handle!\n");
321 RMH_PrintErr(
"Failed setting callback events on temporary status handle!\n");
331 RMH_PrintMsg(
"==============================================================================================================================\n");
333 RMH_PrintMsg(
"==============================================================================================================================\n");
335 RMH_PrintMsg(
"==============================================================================================================================\n");
337 RMH_PrintMsg(
"==============================================================================================================================\n");
339 RMH_PrintMsg(
"==============================================================================================================================\n");
343 if (rmh != app->rmh) {
353 void RMHMonitor_Event_Thread(
void * context) {
357 bool networkStable=
false;
360 bool printStatus =
false;
362 struct timeval lastStatusPrint;
363 struct timeval lastStatusPing;
367 app->linkStatusValid=
false;
368 memset(&app->netStatus, 0,
sizeof(app->netStatus));
371 ret = RMH_ValidateHandle(app->rmh);
372 if (ret == RMH_UNIMPLEMENTED || ret == RMH_NOT_SUPPORTED) {
373 RMH_PrintWrn(
"RMH_ValidateHandle returned %s. We will not be able to monitor to make sure the handle remains valid\n",
RMH_ResultToString(ret));
375 else if (ret != RMH_SUCCESS) {
376 RMH_PrintErr(
"The RMH handle %p seems to no longer be valid. Aborting\n", app->rmh);
381 gettimeofday(&now, NULL);
386 else if (mocaEnabled) {
387 RMH_LinkStatus linkStatus;
389 RMHMonitor_Event_LinkStatusChanged(app, &now, linkStatus);
393 RMHMonitor_Event_LinkStatusChanged(app, &now, RMH_LINK_STATUS_DISABLED);
395 RMHMonitor_Event_PrintFull(app, &now);
398 app->reconnectSeconds=0;
405 while (app->eventThreadRunning) {
406 ret=RMHMonitor_Semaphore_WaitTimeout(app->eventNotify, 2000);
407 if (ret == RMH_FAILURE) {
408 RMH_PrintErr(
"Failed in RMHMonitor_Semaphore_WaitTimeout\n");
411 gettimeofday(&now, NULL);
413 ret = RMH_ValidateHandle(app->rmh);
414 if (ret != RMH_SUCCESS && ret != RMH_UNIMPLEMENTED && ret != RMH_NOT_SUPPORTED) {
415 RMH_PrintErr(
"The RMH handle %p seems to no longer be valid. Aborting\n", app->rmh);
424 networkStable=(now.tv_sec-app->lastPrint.tv_sec) > RMH_MONITOR_MIN_NETWORK_STABALIZE_SEC;
433 if ((now.tv_sec-lastStatusPrint.tv_sec) > RMH_MONITOR_STATUS_DUMP_MIN*60 ||
434 (printStatus && networkStable)) {
436 RMHMonitor_Event_PrintFull(app, &now);
442 else if (networkStable && (lastStatusPing.tv_sec == 0 || (now.tv_sec-lastStatusPing.tv_sec) > RMH_MONITOR_STATUS_PING_MIN*60)) {
444 RMHMonitor_Event_PrintPing(app, &now);
449 cbE = app->eventQueue.tqh_first;
453 case RMH_EVENT_ADMISSION_STATUS_CHANGED:
454 RMH_PrintMsgT(&cbE->eventTime,
"[ADMNSTUS] Admission status: %s\n",
RMH_AdmissionStatusToString(cbE->eventData.RMH_EVENT_ADMISSION_STATUS_CHANGED.status));
456 case RMH_EVENT_LINK_STATUS_CHANGED:
457 app->appPrefix=
"[CHANGE] ";
458 printStatus|=RMHMonitor_Event_LinkStatusChanged(app, &cbE->eventTime, cbE->eventData.RMH_EVENT_LINK_STATUS_CHANGED.status);
460 case RMH_EVENT_MOCA_RESET:
461 RMH_PrintMsgT(&cbE->eventTime,
"[*RESET* ] MoCA Reset triggered - %s\n",
RMH_MoCAResetReasonToString(cbE->eventData.RMH_EVENT_MOCA_RESET.reason));
463 case RMH_EVENT_MOCA_VERSION_CHANGED:
464 app->appPrefix=
"[CHANGE] ";
465 printStatus|=RMHMonitor_Event_NetworkMoCAVersionChanged(app, &cbE->eventTime, cbE->eventData.RMH_EVENT_MOCA_VERSION_CHANGED.version);
467 case RMH_EVENT_NODE_JOINED:
468 app->appPrefix=
"[CHANGE] ";
469 printStatus|=RMHMonitor_Event_JoinNode(app, &cbE->eventTime, cbE->eventData.RMH_EVENT_NODE_JOINED.nodeId);
471 case RMH_EVENT_NODE_DROPPED:
472 app->appPrefix=
"[CHANGE] ";
473 printStatus|=RMHMonitor_Event_DropNode(app, &cbE->eventTime, cbE->eventData.RMH_EVENT_NODE_JOINED.nodeId);
475 case RMH_EVENT_NC_ID_CHANGED:
476 if (cbE->eventData.RMH_EVENT_NC_ID_CHANGED.ncValid) {
477 app->appPrefix=
"[CHANGE] ";
478 printStatus|=RMHMonitor_Event_NCChanged(app, &cbE->eventTime, cbE->eventData.RMH_EVENT_NC_ID_CHANGED.ncNodeId);
481 case RMH_EVENT_LOW_BANDWIDTH:
482 RMH_PrintMsgT(&cbE->eventTime,
"WARNING: Low bandwidth reported\n");
485 RMH_PrintMsgT(&cbE->eventTime,
"WARNING: Unhandled MoCA event %s!\n",
RMH_EventToString(cbE->event, printBuff,
sizeof(printBuff)));
489 if (!printStatus) app->appPrefix=NULL;
492 RMHMonitor_Queue_Dequeue(app);
497 pthread_exit(&threadRet);
501 pthread_exit(&threadRet);