upload_to_google_storage_unittests.py 6.91 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for upload_to_google_storage.py."""

import optparse
import os
import Queue
import shutil
import StringIO
import sys
14
import tarfile
15 16 17 18 19 20 21 22
import tempfile
import threading
import unittest

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import upload_to_google_storage
from download_from_google_storage_unittests import GsutilMock
23
from download_from_google_storage_unittests import ChangedWorkingDirectory
24 25 26 27 28 29 30 31 32 33

# ../third_party/gsutil/gsutil
GSUTIL_DEFAULT_PATH = os.path.join(
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
    'third_party', 'gsutil', 'gsutil')
TEST_DIR = os.path.dirname(os.path.abspath(__file__))


class UploadTests(unittest.TestCase):
  def setUp(self):
34
    self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH, None)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
    self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
    self.base_path = os.path.join(self.temp_dir, 'gstools')
    shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path)
    self.base_url = 'gs://sometesturl'
    self.parser = optparse.OptionParser()
    self.ret_codes = Queue.Queue()
    self.stdout_queue = Queue.Queue()
    self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
    self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'

  def cleanUp(self):
    shutil.rmtree(self.temp_dir)
    sys.stdin = sys.__stdin__

  def test_upload_single_file(self):
    filenames = [self.lorem_ipsum]
    output_filename = '%s.sha1'  % self.lorem_ipsum
    code = upload_to_google_storage.upload_to_google_storage(
53
        filenames, self.base_url, self.gsutil, True, False, 1, False, 'txt')
54 55 56 57 58
    self.assertEqual(
        self.gsutil.history,
        [('check_call',
          ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
         ('check_call',
59 60
          ('cp', '-z', 'txt', filenames[0],
           '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))])
61 62 63 64 65 66 67
    self.assertTrue(os.path.exists(output_filename))
    self.assertEqual(
        open(output_filename, 'rb').read(),
        '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
    os.remove(output_filename)
    self.assertEqual(code, 0)

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  def test_create_archive(self):
    work_dir = os.path.join(self.base_path, 'download_test_data')
    with ChangedWorkingDirectory(work_dir):
      dirname = 'subfolder'
      dirs = [dirname]
      tar_gz_file = '%s.tar.gz' % dirname
      self.assertTrue(upload_to_google_storage.validate_archive_dirs(dirs))
      upload_to_google_storage.create_archives(dirs)
      self.assertTrue(os.path.exists(tar_gz_file))
      with tarfile.open(tar_gz_file, 'r:gz') as tar:
        content = map(lambda x: x.name, tar.getmembers())
        self.assertTrue(dirname in content)
        self.assertTrue(os.path.join(dirname, 'subfolder_text.txt') in content)
        self.assertTrue(
            os.path.join(dirname, 'subfolder_text.txt.sha1') in content)

  def test_validate_archive_dirs_fails(self):
    work_dir = os.path.join(self.base_path, 'download_test_data')
    with ChangedWorkingDirectory(work_dir):
      symlink = 'link'
      os.symlink(os.path.join(self.base_path, 'subfolder'), symlink)
    self.assertFalse(upload_to_google_storage.validate_archive_dirs([symlink]))
    self.assertFalse(upload_to_google_storage.validate_archive_dirs(['foobar']))

92 93 94 95 96 97 98
  def test_upload_single_file_remote_exists(self):
    filenames = [self.lorem_ipsum]
    output_filename = '%s.sha1'  % self.lorem_ipsum
    etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e'
    self.gsutil.add_expected(0, '', '')
    self.gsutil.add_expected(0, etag_string, '')
    code = upload_to_google_storage.upload_to_google_storage(
99
        filenames, self.base_url, self.gsutil, False, False, 1, False, None)
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
    self.assertEqual(
        self.gsutil.history,
        [('check_call',
          ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
         ('check_call',
          ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))])
    self.assertTrue(os.path.exists(output_filename))
    self.assertEqual(
        open(output_filename, 'rb').read(),
        '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
    os.remove(output_filename)
    self.assertEqual(code, 0)

  def test_upload_worker_errors(self):
    work_queue = Queue.Queue()
    work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1))
    work_queue.put((None, None))
    self.gsutil.add_expected(1, '', '')  # For the first ls call.
    self.gsutil.add_expected(20, '', 'Expected error message')
    # pylint: disable=W0212
    upload_to_google_storage._upload_worker(
        0,
        work_queue,
        self.base_url,
        self.gsutil,
        threading.Lock(),
        False,
        False,
        self.stdout_queue,
129 130
        self.ret_codes,
        None)
131 132 133 134 135 136 137 138 139 140 141 142 143
    expected_ret_codes = [
      (20,
       'Encountered error on uploading %s to %s/%s\nExpected error message' %
          (self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))]
    self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)

  def test_skip_hashing(self):
    filenames = [self.lorem_ipsum]
    output_filename = '%s.sha1' % self.lorem_ipsum
    fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f'
    with open(output_filename, 'wb') as f:
      f.write(fake_hash)  # Fake hash.
    code = upload_to_google_storage.upload_to_google_storage(
144
        filenames, self.base_url, self.gsutil, False, False, 1, True, None)
145 146 147 148 149 150 151
    self.assertEqual(
        self.gsutil.history,
        [('check_call',
          ('ls', '%s/%s' % (self.base_url, fake_hash))),
         ('check_call',
          ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))),
         ('check_call',
152
          ('cp', filenames[0], '%s/%s' % (self.base_url, fake_hash)))])
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    self.assertEqual(
        open(output_filename, 'rb').read(), fake_hash)
    os.remove(output_filename)
    self.assertEqual(code, 0)

  def test_get_targets_no_args(self):
    try:
      upload_to_google_storage.get_targets([], self.parser, False)
      self.fail()
    except SystemExit, e:
      self.assertEqual(e.code, 2)

  def test_get_targets_passthrough(self):
    result = upload_to_google_storage.get_targets(
        ['a', 'b', 'c', 'd', 'e'],
        self.parser,
        False)
    self.assertEqual(result, ['a', 'b', 'c', 'd', 'e'])

  def test_get_targets_multiple_stdin(self):
    inputs = ['a', 'b', 'c', 'd', 'e']
    sys.stdin = StringIO.StringIO(os.linesep.join(inputs))
    result = upload_to_google_storage.get_targets(
        ['-'],
        self.parser,
        False)
    self.assertEqual(result, inputs)

  def test_get_targets_multiple_stdin_null(self):
    inputs = ['a', 'b', 'c', 'd', 'e']
    sys.stdin = StringIO.StringIO('\0'.join(inputs))
    result = upload_to_google_storage.get_targets(
        ['-'],
        self.parser,
        True)
    self.assertEqual(result, inputs)


if __name__ == '__main__':
192
  unittest.main()