item-parallel-job.h 4.7 KB
Newer Older
1 2 3 4
// Copyright 2017 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

5 6
#ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_
#define V8_HEAP_ITEM_PARALLEL_JOB_H_
7

8
#include <memory>
9 10
#include <vector>

11 12
#include "src/base/atomic-utils.h"
#include "src/base/logging.h"
13
#include "src/base/macros.h"
14
#include "src/base/optional.h"
15
#include "src/cancelable-task.h"
16
#include "src/counters.h"
17
#include "src/globals.h"
18 19

namespace v8 {
20 21 22 23 24

namespace base {
class Semaphore;
}

25 26
namespace internal {

27
class Counters;
28 29 30 31 32 33 34 35 36 37 38
class Isolate;

// This class manages background tasks that process a set of items in parallel.
// The first task added is executed on the same thread as |job.Run()| is called.
// All other tasks are scheduled in the background.
//
// - Items need to inherit from ItemParallelJob::Item.
// - Tasks need to inherit from ItemParallelJob::Task.
//
// Items need to be marked as finished after processing them. Task and Item
// ownership is transferred to the job.
39 40 41
//
// Each parallel (non-main thread) task will report the time between the job
// being created and it being scheduled to |gc_parallel_task_latency_histogram|.
42
class V8_EXPORT_PRIVATE ItemParallelJob {
43 44 45
 public:
  class Task;

46
  class V8_EXPORT_PRIVATE Item {
47
   public:
48 49
    Item() = default;
    virtual ~Item() = default;
50 51

    // Marks an item as being finished.
52
    void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); }
53 54

   private:
55
    enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished };
56 57

    bool TryMarkingAsProcessing() {
58 59
      ProcessingState available = kAvailable;
      return state_.compare_exchange_strong(available, kProcessing);
60
    }
61
    bool IsFinished() { return state_ == kFinished; }
62

63
    std::atomic<ProcessingState> state_{kAvailable};
64 65 66 67 68 69 70

    friend class ItemParallelJob;
    friend class ItemParallelJob::Task;

    DISALLOW_COPY_AND_ASSIGN(Item);
  };

71
  class V8_EXPORT_PRIVATE Task : public CancelableTask {
72
   public:
73
    explicit Task(Isolate* isolate);
74
    ~Task() override;
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97

    virtual void RunInParallel() = 0;

   protected:
    // Retrieves a new item that needs to be processed. Returns |nullptr| if
    // all items are processed. Upon returning an item, the task is required
    // to process the item and mark the item as finished after doing so.
    template <class ItemType>
    ItemType* GetItem() {
      while (items_considered_++ != items_->size()) {
        // Wrap around.
        if (cur_index_ == items_->size()) {
          cur_index_ = 0;
        }
        Item* item = (*items_)[cur_index_++];
        if (item->TryMarkingAsProcessing()) {
          return static_cast<ItemType*>(item);
        }
      }
      return nullptr;
    }

   private:
98 99 100
    friend class ItemParallelJob;
    friend class Item;

101 102 103
    // Sets up state required before invoking Run(). If
    // |start_index is >= items_.size()|, this task will not process work items
    // (some jobs have more tasks than work items in order to parallelize post-
104 105 106
    // processing, e.g. scavenging). If |gc_parallel_task_latency_histogram| is
    // provided, it will be used to report histograms on the latency between
    // posting the task and it being scheduled.
107 108 109 110
    void SetupInternal(
        base::Semaphore* on_finish, std::vector<Item*>* items,
        size_t start_index,
        base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram);
111 112

    // We don't allow overriding this method any further.
113
    void RunInternal() final;
114

115 116 117 118
    std::vector<Item*>* items_ = nullptr;
    size_t cur_index_ = 0;
    size_t items_considered_ = 0;
    base::Semaphore* on_finish_ = nullptr;
119
    base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram_;
120 121 122 123 124

    DISALLOW_COPY_AND_ASSIGN(Task);
  };

  ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
125 126 127
                  base::Semaphore* pending_tasks);

  ~ItemParallelJob();
128 129

  // Adds a task to the job. Transfers ownership to the job.
130
  void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); }
131 132 133 134

  // Adds an item to the job. Transfers ownership to the job.
  void AddItem(Item* item) { items_.push_back(item); }

135 136 137
  int NumberOfItems() const { return static_cast<int>(items_.size()); }
  int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }

138 139
  // Runs this job. Reporting metrics in a thread-safe manner to
  // |async_counters|.
140
  void Run(const std::shared_ptr<Counters>& async_counters);
141 142 143

 private:
  std::vector<Item*> items_;
144
  std::vector<std::unique_ptr<Task>> tasks_;
145 146 147 148 149 150 151 152
  CancelableTaskManager* cancelable_task_manager_;
  base::Semaphore* pending_tasks_;
  DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
};

}  // namespace internal
}  // namespace v8

153
#endif  // V8_HEAP_ITEM_PARALLEL_JOB_H_