default-job.cc 8.36 KB
Newer Older
1 2 3 4 5 6
// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/libplatform/default-job.h"

7 8 9
#include "src/base/bits.h"
#include "src/base/macros.h"

10 11
namespace v8 {
namespace platform {
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
namespace {

// Capped to allow assigning task_ids from a bitfield.
constexpr size_t kMaxWorkersPerJob = 32;

}  // namespace

DefaultJobState::JobDelegate::~JobDelegate() {
  static_assert(kInvalidTaskId >= kMaxWorkersPerJob,
                "kInvalidTaskId must be outside of the range of valid task_ids "
                "[0, kMaxWorkersPerJob)");
  if (task_id_ != kInvalidTaskId) outer_->ReleaseTaskId(task_id_);
}

uint8_t DefaultJobState::JobDelegate::GetTaskId() {
  if (task_id_ == kInvalidTaskId) task_id_ = outer_->AcquireTaskId();
  return task_id_;
}

DefaultJobState::DefaultJobState(Platform* platform,
                                 std::unique_ptr<JobTask> job_task,
                                 TaskPriority priority,
                                 size_t num_worker_threads)
    : platform_(platform),
      job_task_(std::move(job_task)),
      priority_(priority),
      num_worker_threads_(std::min(num_worker_threads, kMaxWorkersPerJob)) {}
39 40 41 42 43 44 45

DefaultJobState::~DefaultJobState() { DCHECK_EQ(0U, active_workers_); }

void DefaultJobState::NotifyConcurrencyIncrease() {
  if (is_canceled_.load(std::memory_order_relaxed)) return;

  size_t num_tasks_to_post = 0;
46
  TaskPriority priority;
47 48
  {
    base::MutexGuard guard(&mutex_);
49
    const size_t max_concurrency = CappedMaxConcurrency(active_workers_);
50 51 52 53 54
    // Consider |pending_tasks_| to avoid posting too many tasks.
    if (max_concurrency > (active_workers_ + pending_tasks_)) {
      num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
      pending_tasks_ += num_tasks_to_post;
    }
55
    priority = priority_;
56 57 58
  }
  // Post additional worker tasks to reach |max_concurrency|.
  for (size_t i = 0; i < num_tasks_to_post; ++i) {
59 60
    CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
                                     shared_from_this(), job_task_.get()));
61 62 63
  }
}

64 65 66 67 68 69 70 71 72
uint8_t DefaultJobState::AcquireTaskId() {
  static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
                "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
  uint32_t assigned_task_ids =
      assigned_task_ids_.load(std::memory_order_relaxed);
  DCHECK_LE(v8::base::bits::CountPopulation(assigned_task_ids) + 1,
            kMaxWorkersPerJob);
  uint32_t new_assigned_task_ids = 0;
  uint8_t task_id = 0;
73 74 75
  // memory_order_acquire on success, matched with memory_order_release in
  // ReleaseTaskId() so that operations done by previous threads that had
  // the same task_id become visible to the current thread.
76 77 78 79 80 81
  do {
    // Count trailing one bits. This is the id of the right-most 0-bit in
    // |assigned_task_ids|.
    task_id = v8::base::bits::CountTrailingZeros32(~assigned_task_ids);
    new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
  } while (!assigned_task_ids_.compare_exchange_weak(
82 83
      assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
      std::memory_order_relaxed));
84 85 86 87
  return task_id;
}

void DefaultJobState::ReleaseTaskId(uint8_t task_id) {
88 89 90
  // memory_order_release to match AcquireTaskId().
  uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
      ~(uint32_t(1) << task_id), std::memory_order_release);
91 92 93 94
  DCHECK(previous_task_ids & (uint32_t(1) << task_id));
  USE(previous_task_ids);
}

95 96 97 98 99 100 101 102 103 104 105 106
void DefaultJobState::Join() {
  bool can_run = false;
  {
    base::MutexGuard guard(&mutex_);
    priority_ = TaskPriority::kUserBlocking;
    // Reserve a worker for the joining thread. GetMaxConcurrency() is ignored
    // here, but WaitForParticipationOpportunityLockRequired() waits for
    // workers to return if necessary so we don't exceed GetMaxConcurrency().
    num_worker_threads_ = platform_->NumberOfWorkerThreads() + 1;
    ++active_workers_;
    can_run = WaitForParticipationOpportunityLockRequired();
  }
107
  DefaultJobState::JobDelegate delegate(this, true);
108
  while (can_run) {
109
    job_task_->Run(&delegate);
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
    base::MutexGuard guard(&mutex_);
    can_run = WaitForParticipationOpportunityLockRequired();
  }
}

void DefaultJobState::CancelAndWait() {
  {
    base::MutexGuard guard(&mutex_);
    is_canceled_.store(true, std::memory_order_relaxed);
    while (active_workers_ > 0) {
      worker_released_condition_.Wait(&mutex_);
    }
  }
}

125 126 127 128
void DefaultJobState::CancelAndDetach() {
  is_canceled_.store(true, std::memory_order_relaxed);
}

129
bool DefaultJobState::IsActive() {
130
  base::MutexGuard guard(&mutex_);
131 132
  return job_task_->GetMaxConcurrency(active_workers_) != 0 ||
         active_workers_ != 0;
133 134
}

135 136 137 138
bool DefaultJobState::CanRunFirstTask() {
  base::MutexGuard guard(&mutex_);
  --pending_tasks_;
  if (is_canceled_.load(std::memory_order_relaxed)) return false;
139 140
  if (active_workers_ >= std::min(job_task_->GetMaxConcurrency(active_workers_),
                                  num_worker_threads_)) {
141 142 143 144 145 146 147 148 149
    return false;
  }
  // Acquire current worker.
  ++active_workers_;
  return true;
}

bool DefaultJobState::DidRunTask() {
  size_t num_tasks_to_post = 0;
150
  TaskPriority priority;
151 152
  {
    base::MutexGuard guard(&mutex_);
153
    const size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
154 155 156 157 158 159 160 161 162 163 164 165
    if (is_canceled_.load(std::memory_order_relaxed) ||
        active_workers_ > max_concurrency) {
      // Release current worker and notify.
      --active_workers_;
      worker_released_condition_.NotifyOne();
      return false;
    }
    // Consider |pending_tasks_| to avoid posting too many tasks.
    if (max_concurrency > active_workers_ + pending_tasks_) {
      num_tasks_to_post = max_concurrency - active_workers_ - pending_tasks_;
      pending_tasks_ += num_tasks_to_post;
    }
166
    priority = priority_;
167 168 169 170 171 172 173
  }
  // Post additional worker tasks to reach |max_concurrency| in the case that
  // max concurrency increased. This is not strictly necessary, since
  // NotifyConcurrencyIncrease() should eventually be invoked. However, some
  // users of PostJob() batch work and tend to call NotifyConcurrencyIncrease()
  // late. Posting here allows us to spawn new workers sooner.
  for (size_t i = 0; i < num_tasks_to_post; ++i) {
174 175
    CallOnWorkerThread(priority, std::make_unique<DefaultJobWorker>(
                                     shared_from_this(), job_task_.get()));
176 177 178 179 180
  }
  return true;
}

bool DefaultJobState::WaitForParticipationOpportunityLockRequired() {
181
  size_t max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
182 183
  while (active_workers_ > max_concurrency && active_workers_ > 1) {
    worker_released_condition_.Wait(&mutex_);
184
    max_concurrency = CappedMaxConcurrency(active_workers_ - 1);
185 186 187 188 189 190 191 192 193
  }
  if (active_workers_ <= max_concurrency) return true;
  DCHECK_EQ(1U, active_workers_);
  DCHECK_EQ(0U, max_concurrency);
  active_workers_ = 0;
  is_canceled_.store(true, std::memory_order_relaxed);
  return false;
}

194 195 196
size_t DefaultJobState::CappedMaxConcurrency(size_t worker_count) const {
  return std::min(job_task_->GetMaxConcurrency(worker_count),
                  num_worker_threads_);
197 198
}

199 200 201
void DefaultJobState::CallOnWorkerThread(TaskPriority priority,
                                         std::unique_ptr<Task> task) {
  switch (priority) {
202 203 204 205 206 207 208 209 210
    case TaskPriority::kBestEffort:
      return platform_->CallLowPriorityTaskOnWorkerThread(std::move(task));
    case TaskPriority::kUserVisible:
      return platform_->CallOnWorkerThread(std::move(task));
    case TaskPriority::kUserBlocking:
      return platform_->CallBlockingTaskOnWorkerThread(std::move(task));
  }
}

211 212 213 214 215
void DefaultJobState::UpdatePriority(TaskPriority priority) {
  base::MutexGuard guard(&mutex_);
  priority_ = priority;
}

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
DefaultJobHandle::DefaultJobHandle(std::shared_ptr<DefaultJobState> state)
    : state_(std::move(state)) {
  state_->NotifyConcurrencyIncrease();
}

DefaultJobHandle::~DefaultJobHandle() { DCHECK_EQ(nullptr, state_); }

void DefaultJobHandle::Join() {
  state_->Join();
  state_ = nullptr;
}
void DefaultJobHandle::Cancel() {
  state_->CancelAndWait();
  state_ = nullptr;
}

232 233 234 235 236
void DefaultJobHandle::CancelAndDetach() {
  state_->CancelAndDetach();
  state_ = nullptr;
}

237
bool DefaultJobHandle::IsActive() { return state_->IsActive(); }
238

239 240 241 242
void DefaultJobHandle::UpdatePriority(TaskPriority priority) {
  state_->UpdatePriority(priority);
}

243 244
}  // namespace platform
}  // namespace v8