// Copyright 2020 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/libplatform/default-job.h"

#include "src/base/platform/condition-variable.h"
#include "src/base/platform/platform.h"
#include "src/libplatform/default-platform.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace v8 {
namespace platform {
namespace default_job_unittest {

// Verify that Cancel() on a job stops running the worker task and causes
// current workers to yield.
TEST(DefaultJobTest, CancelJob) {
  static constexpr size_t kTooManyTasks = 1000;
  static constexpr size_t kMaxTask = 4;
  DefaultPlatform platform(kMaxTask);

  // This Job notifies |threads_running| once started and loops until
  // ShouldYield() returns true, and then returns.
  class JobTest : public JobTask {
   public:
    ~JobTest() override = default;

    void Run(JobDelegate* delegate) override {
      {
        base::MutexGuard guard(&mutex);
        worker_count++;
      }
      threads_running.NotifyOne();
      while (!delegate->ShouldYield()) {
      }
    }

    size_t GetMaxConcurrency(size_t /* worker_count */) const override {
      return max_concurrency.load(std::memory_order_relaxed);
    }

    base::Mutex mutex;
    base::ConditionVariable threads_running;
    size_t worker_count = 0;
    std::atomic_size_t max_concurrency{kTooManyTasks};
  };

  auto job = std::make_unique<JobTest>();
  JobTest* job_raw = job.get();
  auto state = std::make_shared<DefaultJobState>(
      &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask);
  state->NotifyConcurrencyIncrease();

  {
    base::MutexGuard guard(&job_raw->mutex);
    while (job_raw->worker_count < kMaxTask) {
      job_raw->threads_running.Wait(&job_raw->mutex);
    }
    EXPECT_EQ(kMaxTask, job_raw->worker_count);
  }
  state->CancelAndWait();
  // Workers should return and this test should not hang.
}

// Verify that Join() on a job contributes to max concurrency and waits for all
// workers to return.
TEST(DefaultJobTest, JoinJobContributes) {
  static constexpr size_t kMaxTask = 4;
  DefaultPlatform platform(kMaxTask);

  // This Job notifies |threads_running| once started and blocks on a barrier
  // until kMaxTask + 1 threads reach that point, and then returns.
  class JobTest : public JobTask {
   public:
    ~JobTest() override = default;

    void Run(JobDelegate* delegate) override {
      base::MutexGuard guard(&mutex);
      worker_count++;
      threads_running.NotifyAll();
      while (worker_count < kMaxTask + 1) threads_running.Wait(&mutex);
      --max_concurrency;
    }

    size_t GetMaxConcurrency(size_t /* worker_count */) const override {
      return max_concurrency.load(std::memory_order_relaxed);
    }

    base::Mutex mutex;
    base::ConditionVariable threads_running;
    size_t worker_count = 0;
    std::atomic_size_t max_concurrency{kMaxTask + 1};
  };

  auto job = std::make_unique<JobTest>();
  JobTest* job_raw = job.get();
  auto state = std::make_shared<DefaultJobState>(
      &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask);
  state->NotifyConcurrencyIncrease();

  // The main thread contributing is necessary for |worker_count| to reach
  // kMaxTask + 1 thus, Join() should not hang.
  state->Join();
  EXPECT_EQ(0U, job_raw->max_concurrency);
}

// Verify that Join() on a job that uses |worker_count| eventually converges
// and doesn't hang.
TEST(DefaultJobTest, WorkerCount) {
  static constexpr size_t kMaxTask = 4;
  DefaultPlatform platform(kMaxTask);

  // This Job spawns a workers until the first worker task completes.
  class JobTest : public JobTask {
   public:
    ~JobTest() override = default;

    void Run(JobDelegate* delegate) override {
      base::MutexGuard guard(&mutex);
      if (max_concurrency > 0) --max_concurrency;
    }

    size_t GetMaxConcurrency(size_t worker_count) const override {
      return worker_count + max_concurrency.load(std::memory_order_relaxed);
    }

    base::Mutex mutex;
    std::atomic_size_t max_concurrency{kMaxTask};
  };

  auto job = std::make_unique<JobTest>();
  JobTest* job_raw = job.get();
  auto state = std::make_shared<DefaultJobState>(
      &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask);
  state->NotifyConcurrencyIncrease();

  // GetMaxConcurrency() eventually returns 0 thus, Join() should not hang.
  state->Join();
  EXPECT_EQ(0U, job_raw->max_concurrency);
}

// Verify that calling NotifyConcurrencyIncrease() (re-)schedules tasks with the
// intended concurrency.
TEST(DefaultJobTest, JobNotifyConcurrencyIncrease) {
  static constexpr size_t kMaxTask = 4;
  DefaultPlatform platform(kMaxTask);

  // This Job notifies |threads_running| once started and blocks on a barrier
  // until kMaxTask threads reach that point, and then returns.
  class JobTest : public JobTask {
   public:
    ~JobTest() override = default;

    void Run(JobDelegate* delegate) override {
      base::MutexGuard guard(&mutex);
      worker_count++;
      threads_running.NotifyAll();
      // Wait synchronously until |kMaxTask| workers reach this point.
      while (worker_count < kMaxTask) threads_running.Wait(&mutex);
      --max_concurrency;
    }

    size_t GetMaxConcurrency(size_t /* worker_count */) const override {
      return max_concurrency.load(std::memory_order_relaxed);
    }

    base::Mutex mutex;
    base::ConditionVariable threads_running;
    bool continue_flag = false;
    size_t worker_count = 0;
    std::atomic_size_t max_concurrency{kMaxTask / 2};
  };

  auto job = std::make_unique<JobTest>();
  JobTest* job_raw = job.get();
  auto state = std::make_shared<DefaultJobState>(
      &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask);
  state->NotifyConcurrencyIncrease();

  {
    base::MutexGuard guard(&job_raw->mutex);
    while (job_raw->worker_count < kMaxTask / 2)
      job_raw->threads_running.Wait(&job_raw->mutex);
    EXPECT_EQ(kMaxTask / 2, job_raw->worker_count);

    job_raw->max_concurrency = kMaxTask;
  }
  state->NotifyConcurrencyIncrease();
  // Workers should reach |continue_flag| and eventually return thus, Join()
  // should not hang.
  state->Join();
  EXPECT_EQ(0U, job_raw->max_concurrency);
}

// Verify that Join() doesn't contribute if the Job is already finished.
TEST(DefaultJobTest, FinishBeforeJoin) {
  static constexpr size_t kMaxTask = 4;
  DefaultPlatform platform(kMaxTask);

  // This Job notifies |threads_running| once started and returns.
  class JobTest : public JobTask {
   public:
    ~JobTest() override = default;

    void Run(JobDelegate* delegate) override {
      base::MutexGuard guard(&mutex);
      worker_count++;
      // Assert that main thread doesn't contribute in this test.
      EXPECT_NE(main_thread_id, base::OS::GetCurrentThreadId());
      worker_ran.NotifyAll();
      --max_concurrency;
    }

    size_t GetMaxConcurrency(size_t /* worker_count */) const override {
      return max_concurrency.load(std::memory_order_relaxed);
    }

    const int main_thread_id = base::OS::GetCurrentThreadId();
    base::Mutex mutex;
    base::ConditionVariable worker_ran;
    size_t worker_count = 0;
    std::atomic_size_t max_concurrency{kMaxTask * 5};
  };

  auto job = std::make_unique<JobTest>();
  JobTest* job_raw = job.get();
  auto state = std::make_shared<DefaultJobState>(
      &platform, std::move(job), TaskPriority::kUserVisible, kMaxTask);
  state->NotifyConcurrencyIncrease();

  {
    base::MutexGuard guard(&job_raw->mutex);
    while (job_raw->worker_count < kMaxTask * 5)
      job_raw->worker_ran.Wait(&job_raw->mutex);
    EXPECT_EQ(kMaxTask * 5, job_raw->worker_count);
  }

  state->Join();
  EXPECT_EQ(0U, job_raw->max_concurrency);
}

// Verify that destroying a DefaultJobHandle triggers a DCHECK if neither Join()
// or Cancel() was called.
TEST(DefaultJobTest, LeakHandle) {
  class JobTest : public JobTask {
   public:
    ~JobTest() override = default;

    void Run(JobDelegate* delegate) override {}

    size_t GetMaxConcurrency(size_t /* worker_count */) const override {
      return 0;
    }
  };

  DefaultPlatform platform(0);
  auto job = std::make_unique<JobTest>();
  auto state = std::make_shared<DefaultJobState>(&platform, std::move(job),
                                                 TaskPriority::kUserVisible, 1);
  auto handle = std::make_unique<DefaultJobHandle>(std::move(state));
#ifdef DEBUG
  EXPECT_DEATH_IF_SUPPORTED({ handle.reset(); }, "");
#endif  // DEBUG
  handle->Join();
}

TEST(DefaultJobTest, AcquireTaskId) {
  class JobTest : public JobTask {
   public:
    ~JobTest() override = default;

    void Run(JobDelegate* delegate) override {}

    size_t GetMaxConcurrency(size_t /* worker_count */) const override {
      return 0;
    }
  };

  DefaultPlatform platform(0);
  auto job = std::make_unique<JobTest>();
  auto state = std::make_shared<DefaultJobState>(&platform, std::move(job),
                                                 TaskPriority::kUserVisible, 1);

  EXPECT_EQ(0U, state->AcquireTaskId());
  EXPECT_EQ(1U, state->AcquireTaskId());
  EXPECT_EQ(2U, state->AcquireTaskId());
  EXPECT_EQ(3U, state->AcquireTaskId());
  EXPECT_EQ(4U, state->AcquireTaskId());
  state->ReleaseTaskId(1);
  state->ReleaseTaskId(3);
  EXPECT_EQ(1U, state->AcquireTaskId());
  EXPECT_EQ(3U, state->AcquireTaskId());
  EXPECT_EQ(5U, state->AcquireTaskId());
}

}  // namespace default_job_unittest
}  // namespace platform
}  // namespace v8