shithub: libvpx

Download patch

ref: 85a541a421894981cc91ea198faf74eb9146cece
parent: 121e161115d86c65101ec9f1ec1564cdd9e58598
author: Jim Bankoski <[email protected]>
date: Mon Dec 12 11:27:21 EST 2016

Reapply 'Amend and improve VP8 multithreading implementation'

Reapply this patch:
ff0107f Amend and improve VP8 multithreading implementation

Amended the patch to add a unit test, and fix an asan error.

BUG=webm:851

Change-Id: I6572c03256169c64e80248bf5a5e99f59a2fc93c

--- a/test/test_vector_test.cc
+++ b/test/test_vector_test.cc
@@ -145,7 +145,7 @@
   ASSERT_NO_FATAL_FAILURE(RunLoop(video.get(), cfg));
 }
 
-// Test VP8 decode in serial mode with single thread and with 8 threads.
+// Test VP8 decode in serial mode with single thread.
 // NOTE: VP8 only support serial mode.
 #if CONFIG_VP8_DECODER
 VP8_INSTANTIATE_TEST_CASE(
@@ -152,7 +152,7 @@
     TestVectorTest,
     ::testing::Combine(
         ::testing::Values(0),  // Serial Mode.
-        ::testing::Values(1),  // Single thread and 8 threads.
+        ::testing::Values(1),  // Single thread.
         ::testing::ValuesIn(libvpx_test::kVP8TestVectors,
                             libvpx_test::kVP8TestVectors +
                                 libvpx_test::kNumVP8TestVectors)));
@@ -159,7 +159,7 @@
 
 // Test VP8 decode in with different numbers of threads.
 INSTANTIATE_TEST_CASE_P(
-    DISABLED_VP8MultiThreaded, TestVectorTest,
+    VP8MultiThreaded, TestVectorTest,
     ::testing::Combine(
         ::testing::Values(
             static_cast<const libvpx_test::CodecFactory *>(&libvpx_test::kVP8)),
--- a/vp8/common/threading.h
+++ b/vp8/common/threading.h
@@ -193,6 +193,44 @@
 
 #include "vpx_util/vpx_thread.h"
 
+static INLINE void mutex_lock(pthread_mutex_t *const mutex) {
+  const int kMaxTryLocks = 4000;
+  int locked = 0;
+  int i;
+
+  for (i = 0; i < kMaxTryLocks; ++i) {
+    if (!pthread_mutex_trylock(mutex)) {
+      locked = 1;
+      break;
+    }
+  }
+
+  if (!locked) pthread_mutex_lock(mutex);
+}
+
+static INLINE int protected_read(pthread_mutex_t *const mutex, const int *p) {
+  int ret;
+  mutex_lock(mutex);
+  ret = *p;
+  pthread_mutex_unlock(mutex);
+  return ret;
+}
+
+static INLINE void sync_read(pthread_mutex_t *const mutex, int mb_col,
+                             const int *last_row_current_mb_col,
+                             const int nsync) {
+  while (mb_col > (protected_read(mutex, last_row_current_mb_col) - nsync)) {
+    x86_pause_hint();
+    thread_sleep(0);
+  }
+}
+
+static INLINE void protected_write(pthread_mutex_t *mutex, int *p, int v) {
+  mutex_lock(mutex);
+  *p = v;
+  pthread_mutex_unlock(mutex);
+}
+
 #endif /* CONFIG_OS_SUPPORT && CONFIG_MULTITHREAD */
 
 #ifdef __cplusplus
--- a/vp8/decoder/onyxd_int.h
+++ b/vp8/decoder/onyxd_int.h
@@ -67,7 +67,8 @@
 
 #if CONFIG_MULTITHREAD
   /* variable for threading */
-  volatile int b_multithreaded_rd;
+
+  int b_multithreaded_rd;
   int max_threads;
   int current_mb_col_main;
   unsigned int decoding_thread_count;
@@ -76,6 +77,8 @@
   int mt_baseline_filter_level[MAX_MB_SEGMENTS];
   int sync_range;
   int *mt_current_mb_col; /* Each row remembers its already decoded column. */
+  pthread_mutex_t *pmutex;
+  pthread_mutex_t mt_mutex; /* mutex for b_multithreaded_rd */
 
   unsigned char **mt_yabove_row; /* mb_rows x width */
   unsigned char **mt_uabove_row;
--- a/vp8/decoder/threading.c
+++ b/vp8/decoder/threading.c
@@ -50,9 +50,6 @@
     mbd->subpixel_predict8x8 = xd->subpixel_predict8x8;
     mbd->subpixel_predict16x16 = xd->subpixel_predict16x16;
 
-    mbd->mode_info_context = pc->mi + pc->mode_info_stride * (i + 1);
-    mbd->mode_info_stride = pc->mode_info_stride;
-
     mbd->frame_type = pc->frame_type;
     mbd->pre = xd->pre;
     mbd->dst = xd->dst;
@@ -251,8 +248,8 @@
 
 static void mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd,
                               int start_mb_row) {
-  volatile const int *last_row_current_mb_col;
-  volatile int *current_mb_col;
+  const int *last_row_current_mb_col;
+  int *current_mb_col;
   int mb_row;
   VP8_COMMON *pc = &pbi->common;
   const int nsync = pbi->sync_range;
@@ -289,6 +286,9 @@
 
   xd->up_available = (start_mb_row != 0);
 
+  xd->mode_info_context = pc->mi + pc->mode_info_stride * start_mb_row;
+  xd->mode_info_stride = pc->mode_info_stride;
+
   for (mb_row = start_mb_row; mb_row < pc->mb_rows;
        mb_row += (pbi->decoding_thread_count + 1)) {
     int recon_yoffset, recon_uvoffset;
@@ -355,14 +355,15 @@
                              xd->dst.uv_stride);
     }
 
-    for (mb_col = 0; mb_col < pc->mb_cols; mb_col++) {
-      *current_mb_col = mb_col - 1;
+    for (mb_col = 0; mb_col < pc->mb_cols; ++mb_col) {
+      if (((mb_col - 1) % nsync) == 0) {
+        pthread_mutex_t *mutex = &pbi->pmutex[mb_row];
+        protected_write(mutex, current_mb_col, mb_col - 1);
+      }
 
-      if ((mb_col & (nsync - 1)) == 0) {
-        while (mb_col > (*last_row_current_mb_col - nsync)) {
-          x86_pause_hint();
-          thread_sleep(0);
-        }
+      if (mb_row && !(mb_col & (nsync - 1))) {
+        pthread_mutex_t *mutex = &pbi->pmutex[mb_row - 1];
+        sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
       }
 
       /* Distance of MB to the various image edges.
@@ -548,7 +549,7 @@
     }
 
     /* last MB of row is ready just after extension is done */
-    *current_mb_col = mb_col + nsync;
+    protected_write(&pbi->pmutex[mb_row], current_mb_col, mb_col + nsync);
 
     ++xd->mode_info_context; /* skip prediction column */
     xd->up_available = 1;
@@ -568,10 +569,10 @@
   ENTROPY_CONTEXT_PLANES mb_row_left_context;
 
   while (1) {
-    if (pbi->b_multithreaded_rd == 0) break;
+    if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0) break;
 
     if (sem_wait(&pbi->h_event_start_decoding[ithread]) == 0) {
-      if (pbi->b_multithreaded_rd == 0) {
+      if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd) == 0) {
         break;
       } else {
         MACROBLOCKD *xd = &mbrd->mbd;
@@ -591,6 +592,7 @@
 
   pbi->b_multithreaded_rd = 0;
   pbi->allocated_decoding_thread_count = 0;
+  pthread_mutex_init(&pbi->mt_mutex, NULL);
 
   /* limit decoding threads to the max number of token partitions */
   core_count = (pbi->max_threads > 8) ? 8 : pbi->max_threads;
@@ -647,6 +649,16 @@
 void vp8mt_de_alloc_temp_buffers(VP8D_COMP *pbi, int mb_rows) {
   int i;
 
+  /* De-allocate mutex */
+  if (pbi->pmutex != NULL) {
+    for (i = 0; i < mb_rows; ++i) {
+      pthread_mutex_destroy(&pbi->pmutex[i]);
+    }
+
+    vpx_free(pbi->pmutex);
+    pbi->pmutex = NULL;
+  }
+
   vpx_free(pbi->mt_current_mb_col);
   pbi->mt_current_mb_col = NULL;
 
@@ -712,7 +724,7 @@
   int i;
   int uv_width;
 
-  if (pbi->b_multithreaded_rd) {
+  if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd)) {
     vp8mt_de_alloc_temp_buffers(pbi, prev_mb_rows);
 
     /* our internal buffers are always multiples of 16 */
@@ -730,6 +742,15 @@
 
     uv_width = width >> 1;
 
+    /* Allocate mutex */
+    CHECK_MEM_ERROR(pbi->pmutex,
+                    vpx_malloc(sizeof(*pbi->pmutex) * pc->mb_rows));
+    if (pbi->pmutex) {
+      for (i = 0; i < pc->mb_rows; ++i) {
+        pthread_mutex_init(&pbi->pmutex[i], NULL);
+      }
+    }
+
     /* Allocate an int for each mb row. */
     CALLOC_ARRAY(pbi->mt_current_mb_col, pc->mb_rows);
 
@@ -772,9 +793,9 @@
 
 void vp8_decoder_remove_threads(VP8D_COMP *pbi) {
   /* shutdown MB Decoding thread; */
-  if (pbi->b_multithreaded_rd) {
+  if (protected_read(&pbi->mt_mutex, &pbi->b_multithreaded_rd)) {
     int i;
-    pbi->b_multithreaded_rd = 0;
+    protected_write(&pbi->mt_mutex, &pbi->b_multithreaded_rd, 0);
 
     /* allow all threads to exit */
     for (i = 0; i < pbi->allocated_decoding_thread_count; ++i) {
@@ -804,6 +825,7 @@
 
     vp8mt_de_alloc_temp_buffers(pbi, pbi->common.mb_rows);
   }
+  pthread_mutex_destroy(&pbi->mt_mutex);
 }
 
 void vp8mt_decode_mb_rows(VP8D_COMP *pbi, MACROBLOCKD *xd) {
--- a/vp8/encoder/encodeframe.c
+++ b/vp8/encoder/encodeframe.c
@@ -345,8 +345,8 @@
 #if CONFIG_MULTITHREAD
   const int nsync = cpi->mt_sync_range;
   const int rightmost_col = cm->mb_cols + nsync;
-  volatile const int *last_row_current_mb_col;
-  volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
+  const int *last_row_current_mb_col;
+  int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
 
   if ((cpi->b_multi_threaded != 0) && (mb_row != 0)) {
     last_row_current_mb_col = &cpi->mt_current_mb_col[mb_row - 1];
@@ -419,13 +419,14 @@
 
 #if CONFIG_MULTITHREAD
     if (cpi->b_multi_threaded != 0) {
-      *current_mb_col = mb_col - 1; /* set previous MB done */
+      if (((mb_col - 1) % nsync) == 0) {
+        pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
+        protected_write(mutex, current_mb_col, mb_col - 1);
+      }
 
-      if ((mb_col & (nsync - 1)) == 0) {
-        while (mb_col > (*last_row_current_mb_col - nsync)) {
-          x86_pause_hint();
-          thread_sleep(0);
-        }
+      if (mb_row && !(mb_col & (nsync - 1))) {
+        pthread_mutex_t *mutex = &cpi->pmutex[mb_row - 1];
+        sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
       }
     }
 #endif
@@ -565,7 +566,9 @@
                     xd->dst.u_buffer + 8, xd->dst.v_buffer + 8);
 
 #if CONFIG_MULTITHREAD
-  if (cpi->b_multi_threaded != 0) *current_mb_col = rightmost_col;
+  if (cpi->b_multi_threaded != 0) {
+    protected_write(&cpi->pmutex[mb_row], current_mb_col, rightmost_col);
+  }
 #endif
 
   /* this is to account for the border */
--- a/vp8/encoder/ethreading.c
+++ b/vp8/encoder/ethreading.c
@@ -25,11 +25,11 @@
   VP8_COMMON *cm = &cpi->common;
 
   while (1) {
-    if (cpi->b_multi_threaded == 0) break;
+    if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break;
 
     if (sem_wait(&cpi->h_event_start_lpf) == 0) {
       /* we're shutting down */
-      if (cpi->b_multi_threaded == 0) break;
+      if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break;
 
       vp8_loopfilter_frame(cpi, cm);
 
@@ -47,7 +47,7 @@
   ENTROPY_CONTEXT_PLANES mb_row_left_context;
 
   while (1) {
-    if (cpi->b_multi_threaded == 0) break;
+    if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break;
 
     if (sem_wait(&cpi->h_event_start_encoding[ithread]) == 0) {
       const int nsync = cpi->mt_sync_range;
@@ -65,8 +65,11 @@
       int *totalrate = &mbri->totalrate;
 
       /* we're shutting down */
-      if (cpi->b_multi_threaded == 0) break;
+      if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded) == 0) break;
 
+      xd->mode_info_context = cm->mi + cm->mode_info_stride * (ithread + 1);
+      xd->mode_info_stride = cm->mode_info_stride;
+
       for (mb_row = ithread + 1; mb_row < cm->mb_rows;
            mb_row += (cpi->encoding_thread_count + 1)) {
         int recon_yoffset, recon_uvoffset;
@@ -76,8 +79,8 @@
         int recon_y_stride = cm->yv12_fb[ref_fb_idx].y_stride;
         int recon_uv_stride = cm->yv12_fb[ref_fb_idx].uv_stride;
         int map_index = (mb_row * cm->mb_cols);
-        volatile const int *last_row_current_mb_col;
-        volatile int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
+        const int *last_row_current_mb_col;
+        int *current_mb_col = &cpi->mt_current_mb_col[mb_row];
 
 #if (CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING)
         vp8_writer *w = &cpi->bc[1 + (mb_row % num_part)];
@@ -103,13 +106,14 @@
 
         /* for each macroblock col in image */
         for (mb_col = 0; mb_col < cm->mb_cols; ++mb_col) {
-          *current_mb_col = mb_col - 1;
+          if (((mb_col - 1) % nsync) == 0) {
+            pthread_mutex_t *mutex = &cpi->pmutex[mb_row];
+            protected_write(mutex, current_mb_col, mb_col - 1);
+          }
 
-          if ((mb_col & (nsync - 1)) == 0) {
-            while (mb_col > (*last_row_current_mb_col - nsync)) {
-              x86_pause_hint();
-              thread_sleep(0);
-            }
+          if (mb_row && !(mb_col & (nsync - 1))) {
+            pthread_mutex_t *mutex = &cpi->pmutex[mb_row - 1];
+            sync_read(mutex, mb_col, last_row_current_mb_col, nsync);
           }
 
 #if CONFIG_REALTIME_ONLY & CONFIG_ONTHEFLY_BITPACKING
@@ -281,7 +285,7 @@
         vp8_extend_mb_row(&cm->yv12_fb[dst_fb_idx], xd->dst.y_buffer + 16,
                           xd->dst.u_buffer + 8, xd->dst.v_buffer + 8);
 
-        *current_mb_col = mb_col + nsync;
+        protected_write(&cpi->pmutex[mb_row], current_mb_col, mb_col + nsync);
 
         /* this is to account for the border */
         xd->mode_info_context++;
@@ -450,9 +454,6 @@
 
     mb->partition_info = x->pi + x->e_mbd.mode_info_stride * (i + 1);
 
-    mbd->mode_info_context = cm->mi + x->e_mbd.mode_info_stride * (i + 1);
-    mbd->mode_info_stride = cm->mode_info_stride;
-
     mbd->frame_type = cm->frame_type;
 
     mb->src = *cpi->Source;
@@ -492,6 +493,8 @@
   cpi->encoding_thread_count = 0;
   cpi->b_lpf_running = 0;
 
+  pthread_mutex_init(&cpi->mt_mutex, NULL);
+
   if (cm->processor_core_count > 1 && cpi->oxcf.multi_threaded > 1) {
     int ithread;
     int th_count = cpi->oxcf.multi_threaded - 1;
@@ -551,7 +554,7 @@
 
     if (rc) {
       /* shutdown other threads */
-      cpi->b_multi_threaded = 0;
+      protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
       for (--ithread; ithread >= 0; ithread--) {
         pthread_join(cpi->h_encoding_thread[ithread], 0);
         sem_destroy(&cpi->h_event_start_encoding[ithread]);
@@ -565,6 +568,8 @@
       vpx_free(cpi->mb_row_ei);
       vpx_free(cpi->en_thread_data);
 
+      pthread_mutex_destroy(&cpi->mt_mutex);
+
       return -1;
     }
 
@@ -579,7 +584,7 @@
 
       if (rc) {
         /* shutdown other threads */
-        cpi->b_multi_threaded = 0;
+        protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
         for (--ithread; ithread >= 0; ithread--) {
           sem_post(&cpi->h_event_start_encoding[ithread]);
           sem_post(&cpi->h_event_end_encoding[ithread]);
@@ -597,6 +602,8 @@
         vpx_free(cpi->mb_row_ei);
         vpx_free(cpi->en_thread_data);
 
+        pthread_mutex_destroy(&cpi->mt_mutex);
+
         return -2;
       }
     }
@@ -605,9 +612,9 @@
 }
 
 void vp8cx_remove_encoder_threads(VP8_COMP *cpi) {
-  if (cpi->b_multi_threaded) {
+  if (protected_read(&cpi->mt_mutex, &cpi->b_multi_threaded)) {
     /* shutdown other threads */
-    cpi->b_multi_threaded = 0;
+    protected_write(&cpi->mt_mutex, &cpi->b_multi_threaded, 0);
     {
       int i;
 
@@ -635,5 +642,6 @@
     vpx_free(cpi->mb_row_ei);
     vpx_free(cpi->en_thread_data);
   }
+  pthread_mutex_destroy(&cpi->mt_mutex);
 }
 #endif
--- a/vp8/encoder/onyx_if.c
+++ b/vp8/encoder/onyx_if.c
@@ -446,6 +446,18 @@
   cpi->mb.pip = 0;
 
 #if CONFIG_MULTITHREAD
+  /* De-allocate mutex */
+  if (cpi->pmutex != NULL) {
+    VP8_COMMON *const pc = &cpi->common;
+    int i;
+
+    for (i = 0; i < pc->mb_rows; ++i) {
+      pthread_mutex_destroy(&cpi->pmutex[i]);
+    }
+    vpx_free(cpi->pmutex);
+    cpi->pmutex = NULL;
+  }
+
   vpx_free(cpi->mt_current_mb_col);
   cpi->mt_current_mb_col = NULL;
 #endif
@@ -1075,6 +1087,9 @@
 
   int width = cm->Width;
   int height = cm->Height;
+#if CONFIG_MULTITHREAD
+  int prev_mb_rows = cm->mb_rows;
+#endif
 
   if (vp8_alloc_frame_buffers(cm, width, height)) {
     vpx_internal_error(&cpi->common.error, VPX_CODEC_MEM_ERROR,
@@ -1164,6 +1179,25 @@
   }
 
   if (cpi->oxcf.multi_threaded > 1) {
+    int i;
+
+    /* De-allocate and re-allocate mutex */
+    if (cpi->pmutex != NULL) {
+      for (i = 0; i < prev_mb_rows; ++i) {
+        pthread_mutex_destroy(&cpi->pmutex[i]);
+      }
+      vpx_free(cpi->pmutex);
+      cpi->pmutex = NULL;
+    }
+
+    CHECK_MEM_ERROR(cpi->pmutex,
+                    vpx_malloc(sizeof(*cpi->pmutex) * cm->mb_rows));
+    if (cpi->pmutex) {
+      for (i = 0; i < cm->mb_rows; ++i) {
+        pthread_mutex_init(&cpi->pmutex[i], NULL);
+      }
+    }
+
     vpx_free(cpi->mt_current_mb_col);
     CHECK_MEM_ERROR(cpi->mt_current_mb_col,
                     vpx_malloc(sizeof(*cpi->mt_current_mb_col) * cm->mb_rows));
--- a/vp8/encoder/onyx_int.h
+++ b/vp8/encoder/onyx_int.h
@@ -511,6 +511,8 @@
 
 #if CONFIG_MULTITHREAD
   /* multithread data */
+  pthread_mutex_t *pmutex;
+  pthread_mutex_t mt_mutex; /* mutex for b_multi_threaded */
   int *mt_current_mb_col;
   int mt_sync_range;
   int b_multi_threaded;