ref: c51a4ff09427aff7c4963bc1beb0f5a8cc06c7c2
parent: 4484df4bbb4561128a3320a1ab329cff20843431
author: [email protected] <[email protected]>
date: Tue Apr 18 12:50:15 EDT 2017
fix potential deadlock prob in multi-thread
--- a/codec/common/inc/WelsThread.h
+++ b/codec/common/inc/WelsThread.h
@@ -82,7 +82,7 @@
}
void SignalThread() {
- WelsEventSignal (&m_hEvent);
+ WelsEventSignal (&m_hEvent, &m_hMutex, &m_iConVar);
}
private:
@@ -91,6 +91,7 @@
CWelsLock m_cLockStatus;
bool m_bRunning;
bool m_bEndFlag;
+ int m_iConVar;
DISALLOW_COPY_AND_ASSIGN (CWelsThread);
};
--- a/codec/common/inc/WelsThreadLib.h
+++ b/codec/common/inc/WelsThreadLib.h
@@ -112,8 +112,8 @@
WELS_THREAD_ERROR_CODE WelsEventOpen (WELS_EVENT* p_event, const char* event_name = NULL);
WELS_THREAD_ERROR_CODE WelsEventClose (WELS_EVENT* event, const char* event_name = NULL);
-WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event);
-WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event,WELS_MUTEX *pMutex = NULL);
+WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event,WELS_MUTEX *pMutex, int* iCondition);
+WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event,WELS_MUTEX *pMutex, int& iCondition);
WELS_THREAD_ERROR_CODE WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds,WELS_MUTEX *pMutex = NULL);
WELS_THREAD_ERROR_CODE WelsMultipleEventsWaitSingleBlocking (uint32_t nCount, WELS_EVENT* event_list,
WELS_EVENT* master_event = NULL,WELS_MUTEX *pMutex = NULL);
--- a/codec/common/src/WelsThread.cpp
+++ b/codec/common/src/WelsThread.cpp
@@ -49,7 +49,7 @@
WelsEventOpen (&m_hEvent);
WelsMutexInit(&m_hMutex);
-
+ m_iConVar = 1;
}
CWelsThread::~CWelsThread() {
@@ -60,13 +60,14 @@
void CWelsThread::Thread() {
while (true) {
- WelsEventWait (&m_hEvent,&m_hMutex);
+ WelsEventWait (&m_hEvent,&m_hMutex,m_iConVar);
if (GetEndFlag()) {
break;
}
- ExecuteTask();
+ m_iConVar = 1;
+ ExecuteTask();//in ExecuteTask there will be OnTaskStop which opens the potential new Signaling of next run, so the setting of m_iConVar = 1 should be before ExecuteTask()
}
SetRunning (false);
@@ -105,7 +106,7 @@
SetEndFlag (true);
- WelsEventSignal (&m_hEvent);
+ WelsEventSignal (&m_hEvent,&m_hMutex,&m_iConVar);
WelsThreadJoin (m_hThread);
return;
}
--- a/codec/common/src/WelsThreadLib.cpp
+++ b/codec/common/src/WelsThreadLib.cpp
@@ -140,7 +140,7 @@
return WELS_THREAD_ERROR_OK;
}
-WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event) {
+WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event, WELS_MUTEX *pMutex, int* iCondition) {
if (SetEvent (*event)) {
return WELS_THREAD_ERROR_OK;
}
@@ -147,7 +147,7 @@
return WELS_THREAD_ERROR_GENERAL;
}
-WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex) {
+WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex, int& iCondition) {
return WaitForSingleObject (*event, INFINITE);
}
@@ -327,11 +327,21 @@
usleep (dwMilliSecond * 1000);
}
-WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event) {
+WELS_THREAD_ERROR_CODE WelsEventSignal (WELS_EVENT* event, WELS_MUTEX *pMutex, int* iCondition) {
WELS_THREAD_ERROR_CODE err = 0;
+ //fprintf( stderr, "before signal it, event=%x iCondition= %d..\n", event, *iCondition );
#ifdef __APPLE__
+ WelsMutexLock (pMutex);
+ (*iCondition) --;
+ WelsMutexUnlock (pMutex);
+ if ((*iCondition) <= 0) {
err = pthread_cond_signal (event);
+ //fprintf( stderr, "signal it, event=%x iCondition= %d..\n",event, *iCondition );
+
+ }
#else
+ (*iCondition) --;
+ if ((*iCondition) <= 0) {
// int32_t val = 0;
// sem_getvalue(event, &val);
// fprintf( stderr, "before signal it, val= %d..\n",val );
@@ -338,14 +348,23 @@
if (event != NULL)
err = sem_post (*event);
// sem_getvalue(event, &val);
-// fprintf( stderr, "after signal it, val= %d..\n",val );
+ //fprintf( stderr, "signal it, event=%x iCondition= %d..\n",event, *iCondition );
+ }
#endif
+ //fprintf( stderr, "after signal it, event=%x iCondition= %d..\n",event, *iCondition );
return err;
}
-WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex) {
+WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex, int& iCondition) {
#ifdef __APPLE__
- return pthread_cond_wait (event, pMutex);
+ int err = 0;
+ WelsMutexLock(pMutex);
+ //fprintf( stderr, "WelsEventWait event %x %d..\n", event, iCondition );
+ while (iCondition>0) {
+ err = pthread_cond_wait (event, pMutex);
+ }
+ WelsMutexUnlock(pMutex);
+ return err;
#else
return sem_wait (*event); // blocking until signaled
#endif
--- a/codec/common/src/WelsThreadPool.cpp
+++ b/codec/common/src/WelsThreadPool.cpp
@@ -94,7 +94,7 @@
}
}
- //fprintf(stdout, "m_iRefCount=%d, iMaxThreadNum=%d\n", m_iRefCount, m_iMaxThreadNum);
+ ////fprintf(stdout, "m_iRefCount=%d, iMaxThreadNum=%d\n", m_iRefCount, m_iMaxThreadNum);
++ m_iRefCount;
//fprintf(stdout, "m_iRefCount2=%d\n", m_iRefCount);
@@ -135,9 +135,10 @@
RemoveThreadFromBusyList (pThread);
AddThreadToIdleQueue (pThread);
- if (pTask->GetSink()) {
- pTask->GetSink()->OnTaskExecuted();
+ if (pTask && pTask->GetSink()) {
//fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
+ pTask->GetSink()->OnTaskExecuted();
+ ////fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
}
//if (m_pSink) {
// m_pSink->OnTaskExecuted (pTask);
@@ -222,13 +223,20 @@
CWelsTaskThread* pThread = NULL;
IWelsTask* pTask = NULL;
while (GetWaitedTaskNum() > 0) {
+ //fprintf(stdout, "ThreadPool: ExecuteTask: waiting task %d\n", GetWaitedTaskNum());
pThread = GetIdleThread();
if (pThread == NULL) {
+ //fprintf(stdout, "ThreadPool: ExecuteTask: no IdleThread\n");
+
break;
}
pTask = GetWaitedTask();
//fprintf(stdout, "ThreadPool: ExecuteTask = %x at thread %x\n", pTask, pThread);
- pThread->SetTask (pTask);
+ if (pTask) {
+ pThread->SetTask (pTask);
+ } else {
+ AddThreadToIdleQueue (pThread);
+ }
}
}
@@ -309,11 +317,12 @@
CWelsTaskThread* CWelsThreadPool::GetIdleThread() {
CWelsAutoLock cLock (m_cLockIdleTasks);
- //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
- if (m_cIdleThreads->size() == 0) {
+ if (NULL == m_cIdleThreads || m_cIdleThreads->size() == 0) {
return NULL;
}
+ //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
+
CWelsTaskThread* pThread = m_cIdleThreads->begin();
m_cIdleThreads->pop_front();
return pThread;
@@ -320,21 +329,21 @@
}
int32_t CWelsThreadPool::GetBusyThreadNum() {
- return m_cBusyThreads->size();
+ return (m_cBusyThreads?m_cBusyThreads->size():0);
}
int32_t CWelsThreadPool::GetIdleThreadNum() {
- return m_cIdleThreads->size();
+ return (m_cIdleThreads?m_cIdleThreads->size():0);
}
int32_t CWelsThreadPool::GetWaitedTaskNum() {
- return m_cWaitedTasks->size();
+ return (m_cWaitedTasks?m_cWaitedTasks->size():0);
}
IWelsTask* CWelsThreadPool::GetWaitedTask() {
CWelsAutoLock lock (m_cLockWaitedTasks);
- if (m_cWaitedTasks->size() == 0) {
+ if (NULL==m_cWaitedTasks || m_cWaitedTasks->size() == 0) {
return NULL;
}
@@ -347,6 +356,9 @@
void CWelsThreadPool::ClearWaitedTasks() {
CWelsAutoLock cLock (m_cLockWaitedTasks);
+ if (NULL == m_cWaitedTasks) {
+ return;
+ }
IWelsTask* pTask = NULL;
while (0 != m_cWaitedTasks->size()) {
pTask = m_cWaitedTasks->begin();
--- a/codec/encoder/core/src/wels_task_management.cpp
+++ b/codec/encoder/core/src/wels_task_management.cpp
@@ -195,12 +195,17 @@
}
void CWelsTaskManageBase::OnTaskMinusOne() {
+ //fprintf(stdout, "OnTaskMinusOne event %x m_iWaitTaskNum=%d\n", &m_hEventMutex, m_iWaitTaskNum);
WelsCommon::CWelsAutoLock cAutoLock (m_cWaitTaskNumLock);
+ WelsEventSignal (&m_hTaskEvent,&m_hEventMutex, &m_iWaitTaskNum);
+ /*WelsMutexLock(&m_hEventMutex);
m_iWaitTaskNum --;
+ WelsMutexUnlock(&m_hEventMutex);
+
if (m_iWaitTaskNum <= 0) {
WelsEventSignal (&m_hTaskEvent);
- //fprintf(stdout, "OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
- }
+ fprintf(stdout, "OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
+ }*/
//fprintf(stdout, "OnTaskMinusOne m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
}
@@ -228,7 +233,8 @@
m_pThreadPool->QueueTask (pTargetTaskList->getNode (iIdx));
iIdx ++;
}
- WelsEventWait (&m_hTaskEvent, &m_hEventMutex);
+
+ WelsEventWait (&m_hTaskEvent,&m_hEventMutex, m_iWaitTaskNum);
return ENC_RETURN_SUCCESS;
}