#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for upload_to_google_storage.py."""

import optparse
import os
import Queue
import shutil
import StringIO
import sys
import tempfile
import threading
import unittest

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import upload_to_google_storage
from download_from_google_storage_unittests import GsutilMock

# ../third_party/gsutil/gsutil
GSUTIL_DEFAULT_PATH = os.path.join(
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
    'third_party', 'gsutil', 'gsutil')
TEST_DIR = os.path.dirname(os.path.abspath(__file__))


class UploadTests(unittest.TestCase):
  def setUp(self):
    self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH, None)
    self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
    self.base_path = os.path.join(self.temp_dir, 'gstools')
    shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path)
    self.base_url = 'gs://sometesturl'
    self.parser = optparse.OptionParser()
    self.ret_codes = Queue.Queue()
    self.stdout_queue = Queue.Queue()
    self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
    self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'

  def cleanUp(self):
    shutil.rmtree(self.temp_dir)
    sys.stdin = sys.__stdin__

  def test_upload_single_file(self):
    filenames = [self.lorem_ipsum]
    output_filename = '%s.sha1'  % self.lorem_ipsum
    code = upload_to_google_storage.upload_to_google_storage(
        filenames, self.base_url, self.gsutil, True, False, 1, False, 'txt')
    self.assertEqual(
        self.gsutil.history,
        [('check_call',
          ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
         ('check_call',
          ('cp', '-z', 'txt', filenames[0],
           '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))])
    self.assertTrue(os.path.exists(output_filename))
    self.assertEqual(
        open(output_filename, 'rb').read(),
        '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
    os.remove(output_filename)
    self.assertEqual(code, 0)

  def test_upload_single_file_remote_exists(self):
    filenames = [self.lorem_ipsum]
    output_filename = '%s.sha1'  % self.lorem_ipsum
    etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e'
    self.gsutil.add_expected(0, '', '')
    self.gsutil.add_expected(0, etag_string, '')
    code = upload_to_google_storage.upload_to_google_storage(
        filenames, self.base_url, self.gsutil, False, False, 1, False, None)
    self.assertEqual(
        self.gsutil.history,
        [('check_call',
          ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
         ('check_call',
          ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))])
    self.assertTrue(os.path.exists(output_filename))
    self.assertEqual(
        open(output_filename, 'rb').read(),
        '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
    os.remove(output_filename)
    self.assertEqual(code, 0)

  def test_upload_worker_errors(self):
    work_queue = Queue.Queue()
    work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1))
    work_queue.put((None, None))
    self.gsutil.add_expected(1, '', '')  # For the first ls call.
    self.gsutil.add_expected(20, '', 'Expected error message')
    # pylint: disable=W0212
    upload_to_google_storage._upload_worker(
        0,
        work_queue,
        self.base_url,
        self.gsutil,
        threading.Lock(),
        False,
        False,
        self.stdout_queue,
        self.ret_codes,
        None)
    expected_ret_codes = [
      (20,
       'Encountered error on uploading %s to %s/%s\nExpected error message' %
          (self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))]
    self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)

  def test_skip_hashing(self):
    filenames = [self.lorem_ipsum]
    output_filename = '%s.sha1' % self.lorem_ipsum
    fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f'
    with open(output_filename, 'wb') as f:
      f.write(fake_hash)  # Fake hash.
    code = upload_to_google_storage.upload_to_google_storage(
        filenames, self.base_url, self.gsutil, False, False, 1, True, None)
    self.assertEqual(
        self.gsutil.history,
        [('check_call',
          ('ls', '%s/%s' % (self.base_url, fake_hash))),
         ('check_call',
          ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))),
         ('check_call',
          ('cp', filenames[0], '%s/%s' % (self.base_url, fake_hash)))])
    self.assertEqual(
        open(output_filename, 'rb').read(), fake_hash)
    os.remove(output_filename)
    self.assertEqual(code, 0)

  def test_get_targets_no_args(self):
    try:
      upload_to_google_storage.get_targets([], self.parser, False)
      self.fail()
    except SystemExit, e:
      self.assertEqual(e.code, 2)

  def test_get_targets_passthrough(self):
    result = upload_to_google_storage.get_targets(
        ['a', 'b', 'c', 'd', 'e'],
        self.parser,
        False)
    self.assertEqual(result, ['a', 'b', 'c', 'd', 'e'])

  def test_get_targets_multiple_stdin(self):
    inputs = ['a', 'b', 'c', 'd', 'e']
    sys.stdin = StringIO.StringIO(os.linesep.join(inputs))
    result = upload_to_google_storage.get_targets(
        ['-'],
        self.parser,
        False)
    self.assertEqual(result, inputs)

  def test_get_targets_multiple_stdin_null(self):
    inputs = ['a', 'b', 'c', 'd', 'e']
    sys.stdin = StringIO.StringIO('\0'.join(inputs))
    result = upload_to_google_storage.get_targets(
        ['-'],
        self.parser,
        True)
    self.assertEqual(result, inputs)


if __name__ == '__main__':
  unittest.main()