RDK Documentation (Open Sourced RDK Components)
AampScheduler.cpp
Go to the documentation of this file.
1 /*
2  * If not stated otherwise in this file or this component's license file the
3  * following copyright and licenses apply:
4  *
5  * Copyright 2020 RDK Management
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18 */
19 
20 /**
21  * @file AampScheduler.cpp
22  * @brief Class to schedule commands for async execution
23  */
24 
25 #include "AampScheduler.h"
26 
27 /**
28  * @brief AampScheduler Constructor
29  */
30 AampScheduler::AampScheduler() : mTaskQueue(), mQMutex(), mQCond(),
31  mSchedulerRunning(false), mSchedulerThread(), mExMutex(),
32  mExLock(mExMutex, std::defer_lock), mNextTaskId(AAMP_SCHEDULER_ID_DEFAULT),
33  mCurrentTaskId(AAMP_TASK_ID_INVALID), mLockOut(false),
34  mLogObj(NULL),mState(eSTATE_IDLE)
35 {
36 }
37 
38 /**
39  * @brief AampScheduler Destructor
40  */
42 {
44  {
45  StopScheduler();
46  }
47 }
48 
49 /**
50  * @brief To start scheduler thread
51  */
53 {
54  //Turn on thread for processing async operations
55  std::lock_guard<std::mutex>lock(mQMutex);
56  mSchedulerThread = std::thread(std::bind(&AampScheduler::ExecuteAsyncTask, this));
57  mSchedulerRunning = true;
58  AAMPLOG_WARN("Started Async Worker Thread");
59 }
60 
61 /**
62  * @brief To schedule a task to be executed later
63  */
65 {
66  int id = AAMP_TASK_ID_INVALID;
68  {
69 
71  return id;
72 
73  std::lock_guard<std::mutex>lock(mQMutex);
74  if (!mLockOut)
75  {
76  id = mNextTaskId++;
77  // Upper limit check
78  if (mNextTaskId >= AAMP_SCHEDULER_ID_MAX_VALUE)
79  {
80  mNextTaskId = AAMP_SCHEDULER_ID_DEFAULT;
81  }
82  obj.mId = id;
83  mTaskQueue.push_back(obj);
84  mQCond.notify_one();
85  }
86  else
87  {
88  // Operation is skipped here, this might happen due to race conditions during normal operation, hence setting as info log
89  AAMPLOG_INFO("Warning: Attempting to schedule a task when scheduler is locked out, skipping operation %s!!", obj.mTaskName.c_str());
90  }
91  }
92  else
93  {
94  AAMPLOG_ERR("Attempting to schedule a task when scheduler is not running, undefined behavior, task ignored:%s",obj.mTaskName.c_str());
95  }
96  return id;
97 }
98 
99 /**
100  * @brief Executes scheduled tasks - invoked by thread
101  */
103 {
104  std::unique_lock<std::mutex>queueLock(mQMutex);
105  while (mSchedulerRunning)
106  {
107  if (mTaskQueue.empty())
108  {
109  mQCond.wait(queueLock);
110  }
111  else
112  {
113  /* DELIA-57121
114  Take the execution lock before taking a task from the queue
115  otherwise this function could hold a task, out of the queue,
116  that cannot be deleted by RemoveAllTasks()!
117  Allow the queue to be modified while waiting.*/
118  queueLock.unlock();
119  std::lock_guard<std::mutex>executionLock(mExMutex);
120  queueLock.lock();
121 
122  //DELIA-57121 - note: mTaskQueue could have been modified while waiting for execute permission
123  if (!mTaskQueue.empty())
124  {
125  AsyncTaskObj obj = mTaskQueue.front();
126  mTaskQueue.pop_front();
127  if (obj.mId != AAMP_TASK_ID_INVALID)
128  {
129  mCurrentTaskId = obj.mId;
130  AAMPLOG_INFO("Found entry in function queue!!, task:%s. State:%d",obj.mTaskName.c_str(),mState);
132  {
133  //Unlock so that new entries can be added to queue while function executes
134  queueLock.unlock();
135 
136  AAMPLOG_WARN("SchedulerTask Execution:%s",obj.mTaskName.c_str());
137  //Execute function
138  obj.mTask(obj.mData);
139  //May be used in a wait() in future loops, it needs to be locked
140  queueLock.lock();
141  }
142  }
143  else
144  {
145  AAMPLOG_ERR("Scheduler found a task with invalid ID, skip task!");
146  }
147  }
148  }
149  }
150  AAMPLOG_INFO("Exited Async Worker Thread");
151 }
152 
153 /**
154  * @brief To remove all scheduled tasks and prevent further tasks from scheduling
155  */
157 {
158  std::lock_guard<std::mutex>lock(mQMutex);
159  if(!mLockOut)
160  {
161  AAMPLOG_WARN("The scheduler is active. An active task may continue to execute after this function exits. Call SuspendScheduler() prior to this function to prevent this.");
162  }
163  if (!mTaskQueue.empty())
164  {
165  AAMPLOG_WARN("Clearing up %zu entries from mFuncQueue", mTaskQueue.size());
166  mTaskQueue.clear();
167  }
168 }
169 
170 /**
171  * @brief To stop scheduler and associated resources
172  */
174 {
175  AAMPLOG_WARN("Stopping Async Worker Thread");
176  // Clean up things in queue
177  mSchedulerRunning = false;
178 
179  //DELIA-57121 allow StopScheduler() to be called without warning from a nonsuspended state and
180  //DELIA-57122 not cause an error in ResumeScheduler() below due to trying to unlock an unlocked lock
181  if(!mLockOut)
182  {
184  }
185 
186  RemoveAllTasks();
187 
188  //DELIA-57122 prevent possible deadlock where mSchedulerThread is waiting for mExLock/mExMutex
189  ResumeScheduler();
190  mQCond.notify_one();
191  if (mSchedulerThread.joinable())
192  mSchedulerThread.join();
193 }
194 
195 /**
196  * @brief To acquire execution lock for synchronisation purposes
197  */
199 {
200  mExLock.lock();
201  std::lock_guard<std::mutex>lock(mQMutex);
202  mLockOut = true;
203 }
204 
205 /**
206  * @brief To release execution lock
207  */
209 {
210  mExLock.unlock();
211  std::lock_guard<std::mutex>lock(mQMutex);
212  mLockOut = false;
213 }
214 
215 /**
216  * @brief To remove a scheduled tasks with ID
217  */
219 {
220  bool ret = false;
221  std::lock_guard<std::mutex>lock(mQMutex);
222  // Make sure its not currently executing/executed task
223  if (id != AAMP_TASK_ID_INVALID && mCurrentTaskId != id)
224  {
225  for (auto it = mTaskQueue.begin(); it != mTaskQueue.end(); )
226  {
227  if (it->mId == id)
228  {
229  mTaskQueue.erase(it);
230  ret = true;
231  break;
232  }
233  else
234  {
235  it++;
236  }
237  }
238  }
239  return ret;
240 }
241 
242 /**
243  * @brief To enable scheduler to queue new tasks
244  */
246 {
247  std::lock_guard<std::mutex>lock(mQMutex);
248  mLockOut = false;
249 }
250 
251 /**
252  * @brief To player state to Scheduler
253  */
255 {
256  mState = sstate;
257 }
258 
259 #ifdef __UNIT_TESTING__
260 
261 
262 void printTask(const char *str)
263 {
264  AAMPLOG_WARN("Inside task function :%s\n",str);
265  int seconds = rand() % 10 + 1;
266  sleep(seconds);
267 }
268 
269 
270 void AddTask(AampScheduler &mScheduler, std::string str )
271 {
272  mScheduler.ScheduleTask(AsyncTaskObj([str](void *data)
273  {
274  printTask(str.c_str());
275  }, (void *) nullptr, __FUNCTION__));
276 
277 }
278 
279 
280 void task1(void *inst) {
281  std::string input="Task1";
282  AampScheduler *mScheduler = (AampScheduler *)inst;
283  for(int i=0;i<10;i++)
284  {
285  mScheduler->ScheduleTask(AsyncTaskObj([input](void *data)
286  {
287  printTask(input.c_str());
288  }, (void *) nullptr, __FUNCTION__));
289 
290  int state = rand() % 13 + 1;
291  mScheduler->SetState((PrivAAMPState)state);
292  AAMPLOG_WARN("setting state:%d",state);
293  }
294 }
295 
296 void task2(void *inst) {
297  std::string input="Task2";
298  AampScheduler *mScheduler = (AampScheduler *)inst;
299  mScheduler->ScheduleTask(AsyncTaskObj([input](void *data)
300  {
301  printTask(input.c_str());
302  }, (void *) nullptr, __FUNCTION__));
303 
304 }
305 
306 int main()
307 {
308  AAMPLOG_WARN("Testing the Player Scheduler Class\n");
309  AampScheduler mScheduler;
310  srand(time(0));
311  //Test 1: Simple start / stop scheduler
312  //mScheduler.StartScheduler();
313  //mScheduler.StopScheduler();
314  // Test 2: start but no stop
315  //mScheduler.StartScheduler();
316  // Test 3 :
317  //mScheduler.StartScheduler();
318  //AddTask(mScheduler, "Test1");
319  //sleep(10);
320  //mScheduler.StopScheduler();
321  // Test 4 :
322  //mScheduler.StartScheduler();
323  //mScheduler.SetState(eSTATE_RELEASED);
324  //AddTask(mScheduler, "Test2");
325  //sleep(10);
326  //mScheduler.StopScheduler();
327  // Test 5 :
328  //mScheduler.StartScheduler();
329  //AddTask(mScheduler, "Test2");
330  //mScheduler.SetState(eSTATE_RELEASED);
331  //sleep(10);
332  //mScheduler.StopScheduler();
333  // Test 6: Stop without start
334  //mScheduler.StopScheduler();
335  // Test 7: Other API calls with start Scheduler
336  //for(int i=0;i<10;i++)
337  //{
338  // AddTask(mScheduler, "LoopTest");
339  // mScheduler.RemoveAllTasks();
340  // mScheduler.SuspendScheduler();
341  // mScheduler.ResumeScheduler();
342  //}
343  // Test 8 :
344  mScheduler.StartScheduler();
345  std::thread t1(task1,(void *)&mScheduler);
346  std::thread t2(task2,(void *)&mScheduler);
347  sleep(10);
348  t1.join();
349  t2.join();
350  mScheduler.StopScheduler();
351  return 0;
352 }
353 #endif
AampScheduler::ScheduleTask
int ScheduleTask(AsyncTaskObj obj)
To schedule a task to be executed later.
Definition: AampScheduler.cpp:64
AampScheduler::mCurrentTaskId
int mCurrentTaskId
Definition: AampScheduler.h:192
AampScheduler::mSchedulerThread
std::thread mSchedulerThread
Definition: AampScheduler.h:188
AampScheduler::mLockOut
bool mLockOut
Definition: AampScheduler.h:193
AampScheduler::EnableScheduleTask
void EnableScheduleTask()
To enable scheduler to queue new tasks.
Definition: AampScheduler.cpp:245
AampScheduler::SetState
void SetState(PrivAAMPState sstate)
To player state to Scheduler.
Definition: AampScheduler.cpp:254
AampScheduler::AampScheduler
AampScheduler()
AampScheduler Constructor.
Definition: AampScheduler.cpp:30
AampScheduler::mExLock
std::unique_lock< std::mutex > mExLock
Definition: AampScheduler.h:190
eSTATE_ERROR
@ eSTATE_ERROR
Definition: AampEvent.h:170
AampScheduler::StartScheduler
void StartScheduler()
To start scheduler thread.
Definition: AampScheduler.cpp:52
AampScheduler
Scheduler class for asynchronous operations.
Definition: AampScheduler.h:86
AsyncTaskObj
Async task operations.
Definition: AampScheduler.h:57
AampScheduler::~AampScheduler
virtual ~AampScheduler()
AampScheduler Destructor.
Definition: AampScheduler.cpp:41
AampScheduler::ResumeScheduler
void ResumeScheduler()
To release execution lock.
Definition: AampScheduler.cpp:208
AampScheduler::mSchedulerRunning
bool mSchedulerRunning
Definition: AampScheduler.h:187
AampScheduler::RemoveTask
bool RemoveTask(int id)
To remove a scheduled tasks with ID.
Definition: AampScheduler.cpp:218
AampScheduler::mQMutex
std::mutex mQMutex
Definition: AampScheduler.h:185
AampScheduler::mState
PrivAAMPState mState
Definition: AampScheduler.h:194
eSTATE_RELEASED
@ eSTATE_RELEASED
Definition: AampEvent.h:171
AampScheduler::StopScheduler
void StopScheduler()
To stop scheduler and associated resources.
Definition: AampScheduler.cpp:173
AampScheduler::mQCond
std::condition_variable mQCond
Definition: AampScheduler.h:186
AampScheduler.h
Class to schedule commands for async execution.
PrivAAMPState
PrivAAMPState
Mapping all required status codes based on JS player requirement. These requirements may be forced by...
Definition: AampEvent.h:156
AampScheduler::SuspendScheduler
void SuspendScheduler()
To acquire execution lock for synchronisation purposes.
Definition: AampScheduler.cpp:198
AampScheduler::mNextTaskId
int mNextTaskId
Definition: AampScheduler.h:191
AampScheduler::mTaskQueue
std::deque< AsyncTaskObj > mTaskQueue
Definition: AampScheduler.h:184
eSTATE_IDLE
@ eSTATE_IDLE
Definition: AampEvent.h:158
AampScheduler::RemoveAllTasks
void RemoveAllTasks()
To remove all scheduled tasks and prevent further tasks from scheduling.
Definition: AampScheduler.cpp:156
AampScheduler::mExMutex
std::mutex mExMutex
Definition: AampScheduler.h:189
AampScheduler::ExecuteAsyncTask
void ExecuteAsyncTask()
Executes scheduled tasks - invoked by thread.
Definition: AampScheduler.cpp:102