shithub: libvpx

Download patch

ref: ff0107f60dbddec82d7d7feacd1c9a3ff9ab4a51
parent: b8c2a4eb0c47b633096f5c428b70607e7bf8d570
author: Yunqing Wang <[email protected]>
date: Wed Jan 6 13:27:37 EST 2016

Amend and improve VP8 multithreading implementation

There are flaws in current implementation of VP8 multithreading encoder
and decoder as reported in the following issue:
https://code.google.com/p/chromium/issues/detail?id=158922

Although the data race warnings are harmless, and wouldn't cause real
problems while encoding and decoding videos, it is better to fix the
warnings so that VP8 code could pass the TSan test.

To synchronize the thread-shared data access and maintain the speed
(i.e. decoding speed), use multiple mutexes based on mb_rows to reduce
the number of synchronizations needed, make the reads and writes of
the shared data protected, and reduce the number of mb_col writes by
nsync times.

The decoder speed tests showed < 3% speed loss while using 2 ~ 4
threads.

Change-Id: Ie296defffcd86a693188b668270d811964227882

--- a/vp8/common/threading.h
+++ b/vp8/common/threading.h
@@ -185,6 +185,47 @@
 #define x86_pause_hint()
 #endif
 
+#include "vpx_util/vpx_thread.h"
+
+static INLINE void mutex_lock(pthread_mutex_t *const mutex) {
+    const int kMaxTryLocks = 4000;
+    int locked = 0;
+    int i;
+
+    for (i = 0; i < kMaxTryLocks; ++i) {
+        if (!pthread_mutex_trylock(mutex)) {
+            locked = 1;
+            break;
+        }
+    }
+
+    if (!locked)
+        pthread_mutex_lock(mutex);
+}
+
+static INLINE int protected_read(pthread_mutex_t *const mutex, const int *p) {
+    int ret;
+    mutex_lock(mutex);
+    ret = *p;
+    pthread_mutex_unlock(mutex);
+    return ret;
+}
+
+static INLINE void sync_read(pthread_mutex_t *const mutex, int mb_col,
+                             const int *last_row_current_mb_col,
+                             const int nsync) {
+    while (mb_col > (protected_read(mutex, last_row_current_mb_col) - nsync)) {
+        x86_pause_hint();
+        thread_sleep(0);
+    }
+}
+
+static INLINE void protected_write(pthread_mutex_t *mutex, int *p, int v) {
+    mutex_lock(mutex);
+    *p = v;
+    pthread_mutex_unlock(mutex);
+}
+
 #endif /* CONFIG_OS_SUPPORT && CONFIG_MULTITHREAD */
 
 #ifdef __cplusplus
--- a/vp8/decoder/onyxd_int.h
+++ b/vp8/decoder/onyxd_int.h
@@ -81,7 +81,7 @@
 #if CONFIG_MULTITHREAD
     /* variable for threading */
 
-    volatile int b_multithreaded_rd;
+    int b_multithreaded_rd;
     int max_threads;
     int current_mb_col_main;
     unsigned int decoding_thread_count;
@@ -90,6 +90,8 @@
     int mt_baseline_filter_level[MAX_MB_SEGMENTS];
     int sync_range;
     int *mt_current_mb_col;                  /* Each row remembers its already decoded column. */
+    pthread_mutex_t *pmutex;
+    pthread_mutex_t mt_mutex;                /* mutex for b_multithreaded_rd */
 
     unsigned char **mt_yabove_row;           /* mb_rows x width */
     unsigned char **mt_uabove_row;
--- a/vp8/decoder/threading.c
+++ b/vp8/decoder/threading.c
@@ -52,9 +52,6 @@
         mbd->subpixel_predict8x8     = xd->subpixel_predict8x8;
         mbd->subpixel_predict16x16   = xd->subpixel_predict16x16;
 
-        mbd->mode_info_context = pc->mi   + pc->mode_info_stride * (i + 1);
-        mbd->mode_info_stride  = pc->mode_info_stride;
-
         mbd->frame_type = pc->frame_type;
         mbd->pre = xd->pre;
         mbd->dst = xd->dst;
@@ -298,8 +295,8 @@
 
 static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd, int start_mb_row)
 {
-    volatile const int *last_row_current_mb_col;
-    volatile int *current_mb_col;
+    const int *last_row_current_mb_col;
+    int *current_mb_col;
     int mb_row;
     VP8_COMMON *pc = &pbi->common;
     const int nsync = pbi->sync_range;
@@ -337,6 +334,9 @@
 
     xd->up_available = (start_mb_row != 0);
 
+    xd->mode_info_context = pc->mi + pc->mode_info_stride * start_mb_row;
+    xd->mode_info_stride = pc->mode_info_stride;
+
     for (mb_row = start_mb_row; mb_row < pc->mb_rows; mb_row += (pbi->decoding_thread_count + 1))
     {
        int recon_yoffset, recon_uvoffset;
@@ -405,17 +405,15 @@
                                  xd->dst.uv_stride);
        }
 
-       for (mb_col = 0; mb_col < pc->mb_cols; mb_col++)
-       {
-           *current_mb_col = mb_col - 1;
+       for (mb_col = 0; mb_col < pc->mb_cols; mb_col++) {
+           if (((mb_col - 1) % nsync) == 0) {
+               pthread_mutex_t *mutex = &pbi->pmutex[mb_row];
+               protected_write(mutex, current_mb_col, mb_col - 1);
+           }
 
-           if ((mb_col & (nsync - 1)) == 0)
-           {
-               while (mb_col > (*last_row_current_mb_col - nsync))
-               {
-                   x86_pause_hint();
-                   thread_sleep(0);
-               }
+           if (mb_row && !(mb_col & (nsync - 1))) {
+               pthread_mutex_t *mutex = &pbi->pmutex[mb_row-1];
+               sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
            }
 
            /* Distance of MB to the various image edges.
@@ -604,7 +602,7 @@
                              xd->dst.u_buffer + 8, xd->dst.v_buffer + 8);
 
        /* last MB of row is ready just after extension is done */
-       *current_mb_col = mb_col + nsync;
+       protected_write(&pbi->pmutex[mb_row], current_mb_col, mb_col + nsync);
 
        ++xd->mode_info_context;      /* skip prediction column */
        xd->up_available = 1;
@@ -629,12 +627,12 @@
 
     while (1)
     {
-        if (pbi->b_multithreaded_rd == 0)
+        if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0)
             break;
 
         if (sem_wait(&pbi->h_event_start_decoding[ithread]) == 0)
         {
-            if (pbi->b_multithreaded_rd == 0)
+            if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0)
                 break;
             else
             {
@@ -657,6 +655,7 @@
 
     pbi->b_multithreaded_rd = 0;
     pbi->allocated_decoding_thread_count = 0;
+    pthread_mutex_init(&pbi->mt_mutex, NULL);
 
     /* limit decoding threads to the max number of token partitions */
     core_count = (pbi->max_threads > 8) ? 8 : pbi->max_threads;
@@ -699,8 +698,17 @@
 {
     int i;
 
-    if (pbi->b_multithreaded_rd)
+    if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
     {
+        /* De-allocate mutex */
+        if (pbi->pmutex != NULL) {
+            for (i = 0; i < mb_rows; i++) {
+                pthread_mutex_destroy(&pbi->pmutex[i]);
+            }
+            vpx_free(pbi->pmutex);
+            pbi->pmutex = NULL;
+        }
+
             vpx_free(pbi->mt_current_mb_col);
             pbi->mt_current_mb_col = NULL ;
 
@@ -781,7 +789,7 @@
     int i;
     int uv_width;
 
-    if (pbi->b_multithreaded_rd)
+    if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
     {
         vp8mt_de_alloc_temp_buffers(pbi, prev_mb_rows);
 
@@ -796,6 +804,15 @@
 
         uv_width = width >>1;
 
+        /* Allocate mutex */
+        CHECK_MEM_ERROR(pbi->pmutex, vpx_malloc(sizeof(*pbi->pmutex) *
+                                                pc->mb_rows));
+        if (pbi->pmutex) {
+            for (i = 0; i < pc->mb_rows; i++) {
+                pthread_mutex_init(&pbi->pmutex[i], NULL);
+            }
+        }
+
         /* Allocate an int for each mb row. */
         CALLOC_ARRAY(pbi->mt_current_mb_col, pc->mb_rows);
 
@@ -831,11 +848,11 @@
 void vp8_decoder_remove_threads(VP8D_COMP *pbi)
 {
     /* shutdown MB Decoding thread; */
-    if (pbi->b_multithreaded_rd)
+    if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd))
     {
         int i;
 
-        pbi->b_multithreaded_rd = 0;
+        protected_write(&pbi->mt_mutex, &pbi->b_multithreaded_rd, 0);
 
         /* allow all threads to exit */
         for (i = 0; i < pbi->allocated_decoding_thread_count; i++)
@@ -863,6 +880,7 @@
             vpx_free(pbi->de_thread_data);
             pbi->de_thread_data = NULL;
     }
+    pthread_mutex_destroy(&pbi->mt_mutex);
 }
 
 void vp8mt_decode_mb_rows( VP8D_COMP *pbi, MACROBLOCKD *xd)
--- a/vp8/encoder/encodeframe.c
+++ b/vp8/encoder/encodeframe.c
@@ -386,8 +386,8 @@
 #if CONFIG_MULTITHREAD
     const int nsync = cpi->mt_sync_range;
     const int rightmost_col = cm->mb_cols + nsync;
-    volatile const int *last_row_current_mb_col;
-    volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
+    const int *last_row_current_mb_col;
+    int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
 
     if ((cpi->b_multi_threaded != 0) && (mb_row != 0))
         last_row_current_mb_col = &cpi->mt_current_mb_col[mb_row - 1];
@@ -461,17 +461,15 @@
         vp8_copy_mem16x16(x->src.y_buffer, x->src.y_stride, x->thismb, 16);
 
 #if CONFIG_MULTITHREAD
-        if (cpi->b_multi_threaded != 0)
-        {
-            *current_mb_col = mb_col - 1; /* set previous MB done */
+        if (cpi->b_multi_threaded != 0) {
+            if (((mb_col - 1) % nsync) == 0) {
+                pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
+                protected_write(mutex, current_mb_col, mb_col - 1);
+            }
 
-            if ((mb_col & (nsync - 1)) == 0)
-            {
-                while (mb_col > (*last_row_current_mb_col - nsync))
-                {
-                    x86_pause_hint();
-                    thread_sleep(0);
-                }
+            if (mb_row && !(mb_col & (nsync - 1))) {
+                pthread_mutex_t *mutex = &cpi->pmutex[mb_row-1];
+                sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
             }
         }
 #endif
@@ -616,7 +614,7 @@
 
 #if CONFIG_MULTITHREAD
     if (cpi->b_multi_threaded != 0)
-        *current_mb_col = rightmost_col;
+        protected_write(&cpi->pmutex[mb_row], current_mb_col, rightmost_col);
 #endif
 
     /* this is to account for the border */
--- a/vp8/encoder/ethreading.c
+++ b/vp8/encoder/ethreading.c
@@ -26,12 +26,13 @@
 
     while (1)
     {
-        if (cpi->b_multi_threaded == 0)
+        if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
             break;
 
         if (sem_wait(&cpi->h_event_start_lpf) == 0)
         {
-            if (cpi->b_multi_threaded == 0) /* we're shutting down */
+            /* we're shutting down */
+            if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
                 break;
 
             vp8_loopfilter_frame(cpi, cm);
@@ -53,7 +54,7 @@
 
     while (1)
     {
-        if (cpi->b_multi_threaded == 0)
+        if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
             break;
 
         if (sem_wait(&cpi->h_event_start_encoding[ithread]) == 0)
@@ -72,9 +73,14 @@
             int *segment_counts = mbri->segment_counts;
             int *totalrate = &mbri->totalrate;
 
-            if (cpi->b_multi_threaded == 0) /* we're shutting down */
+            /* we're shutting down */
+            if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0)
                 break;
 
+            xd->mode_info_context = cm->mi + cm->mode_info_stride *
+                (ithread + 1);
+            xd->mode_info_stride = cm->mode_info_stride;
+
             for (mb_row = ithread + 1; mb_row < cm->mb_rows; mb_row += (cpi->encoding_thread_count + 1))
             {
 
@@ -85,8 +91,8 @@
                 int recon_y_stride = cm->yv12_fb[ref_fb_idx].y_stride;
                 int recon_uv_stride = cm->yv12_fb[ref_fb_idx].uv_stride;
                 int map_index = (mb_row * cm->mb_cols);
-                volatile const int *last_row_current_mb_col;
-                volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
+                const int *last_row_current_mb_col;
+                int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
 
 #if  (CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING)
                 vp8_writer *w = &cpi->bc[1 + (mb_row % num_part)];
@@ -113,15 +119,14 @@
                 /* for each macroblock col in image */
                 for (mb_col = 0; mb_col < cm->mb_cols; mb_col++)
                 {
-                    *current_mb_col = mb_col - 1;
+                    if (((mb_col - 1) % nsync) == 0) {
+                        pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
+                        protected_write(mutex, current_mb_col, mb_col - 1);
+                    }
 
-                    if ((mb_col & (nsync - 1)) == 0)
-                    {
-                        while (mb_col > (*last_row_current_mb_col - nsync))
-                        {
-                            x86_pause_hint();
-                            thread_sleep(0);
-                        }
+                    if (mb_row && !(mb_col & (nsync - 1))) {
+                      pthread_mutex_t *mutex = &cpi->pmutex[mb_row-1];
+                      sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
                     }
 
 #if CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING
@@ -296,7 +301,8 @@
                                     xd->dst.u_buffer + 8,
                                     xd->dst.v_buffer + 8);
 
-                *current_mb_col = mb_col + nsync;
+                protected_write(&cpi->pmutex[mb_row], current_mb_col,
+                                mb_col + nsync);
 
                 /* this is to account for the border */
                 xd->mode_info_context++;
@@ -473,9 +479,6 @@
 
         mb->partition_info = x->pi + x->e_mbd.mode_info_stride * (i + 1);
 
-        mbd->mode_info_context = cm->mi   + x->e_mbd.mode_info_stride * (i + 1);
-        mbd->mode_info_stride  = cm->mode_info_stride;
-
         mbd->frame_type = cm->frame_type;
 
         mb->src = * cpi->Source;
@@ -517,6 +520,8 @@
     cpi->encoding_thread_count = 0;
     cpi->b_lpf_running = 0;
 
+    pthread_mutex_init(&cpi->mt_mutex, NULL);
+
     if (cm->processor_core_count > 1 && cpi->oxcf.multi_threaded > 1)
     {
         int ithread;
@@ -580,7 +585,7 @@
         if(rc)
         {
             /* shutdown other threads */
-            cpi->b_multi_threaded = 0;
+            protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
             for(--ithread; ithread >= 0; ithread--)
             {
                 pthread_join(cpi->h_encoding_thread[ithread], 0);
@@ -594,6 +599,8 @@
             vpx_free(cpi->mb_row_ei);
             vpx_free(cpi->en_thread_data);
 
+            pthread_mutex_destroy(&cpi->mt_mutex);
+
             return -1;
         }
 
@@ -611,7 +618,7 @@
             if(rc)
             {
                 /* shutdown other threads */
-                cpi->b_multi_threaded = 0;
+                protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
                 for(--ithread; ithread >= 0; ithread--)
                 {
                     sem_post(&cpi->h_event_start_encoding[ithread]);
@@ -628,6 +635,8 @@
                 vpx_free(cpi->mb_row_ei);
                 vpx_free(cpi->en_thread_data);
 
+                pthread_mutex_destroy(&cpi->mt_mutex);
+
                 return -2;
             }
         }
@@ -637,10 +646,10 @@
 
 void vp8cx_remove_encoder_threads(VP8_COMP *cpi)
 {
-    if (cpi->b_multi_threaded)
+    if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded))
     {
         /* shutdown other threads */
-        cpi->b_multi_threaded = 0;
+        protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
         {
             int i;
 
@@ -666,5 +675,6 @@
         vpx_free(cpi->mb_row_ei);
         vpx_free(cpi->en_thread_data);
     }
+    pthread_mutex_destroy(&cpi->mt_mutex);
 }
 #endif
--- a/vp8/encoder/onyx_if.c
+++ b/vp8/encoder/onyx_if.c
@@ -477,6 +477,18 @@
     cpi->mb.pip = 0;
 
 #if CONFIG_MULTITHREAD
+    /* De-allocate mutex */
+    if (cpi->pmutex != NULL) {
+        VP8_COMMON *const pc = &cpi->common;
+        int i;
+
+        for (i = 0; i < pc->mb_rows; i++) {
+            pthread_mutex_destroy(&cpi->pmutex[i]);
+        }
+        vpx_free(cpi->pmutex);
+        cpi->pmutex = NULL;
+    }
+
     vpx_free(cpi->mt_current_mb_col);
     cpi->mt_current_mb_col = NULL;
 #endif
@@ -1180,6 +1192,9 @@
 
     int width = cm->Width;
     int height = cm->Height;
+#if CONFIG_MULTITHREAD
+    int prev_mb_rows = cm->mb_rows;
+#endif
 
     if (vp8_alloc_frame_buffers(cm, width, height))
         vpx_internal_error(&cpi->common.error, VPX_CODEC_MEM_ERROR,
@@ -1271,6 +1286,25 @@
 
     if (cpi->oxcf.multi_threaded > 1)
     {
+        int i;
+
+        /* De-allocate and re-allocate mutex */
+        if (cpi->pmutex != NULL) {
+            for (i = 0; i < prev_mb_rows; i++) {
+                pthread_mutex_destroy(&cpi->pmutex[i]);
+            }
+            vpx_free(cpi->pmutex);
+            cpi->pmutex = NULL;
+        }
+
+        CHECK_MEM_ERROR(cpi->pmutex, vpx_malloc(sizeof(*cpi->pmutex) *
+                                                cm->mb_rows));
+        if (cpi->pmutex) {
+            for (i = 0; i < cm->mb_rows; i++) {
+                pthread_mutex_init(&cpi->pmutex[i], NULL);
+            }
+        }
+
         vpx_free(cpi->mt_current_mb_col);
         CHECK_MEM_ERROR(cpi->mt_current_mb_col,
                     vpx_malloc(sizeof(*cpi->mt_current_mb_col) * cm->mb_rows));
--- a/vp8/encoder/onyx_int.h
+++ b/vp8/encoder/onyx_int.h
@@ -530,6 +530,8 @@
 
 #if CONFIG_MULTITHREAD
     /* multithread data */
+    pthread_mutex_t *pmutex;
+    pthread_mutex_t mt_mutex;           /* mutex for b_multi_threaded */
     int * mt_current_mb_col;
     int mt_sync_range;
     int b_multi_threaded;