gsutil_test.py 5.36 KB
Newer Older
1 2 3 4 5 6 7 8 9
#!/usr/bin/env python
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Test gsutil.py."""


import __builtin__
10
import base64
11
import hashlib
12 13
import json
import os
14
import shutil
15
import subprocess
16 17
import sys
import tempfile
18
import unittest
19
import urllib2
20
import zipfile
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72


# Add depot_tools to path
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
DEPOT_TOOLS_DIR = os.path.dirname(THIS_DIR)
sys.path.append(DEPOT_TOOLS_DIR)

import gsutil


class TestError(Exception):
  pass


class Buffer(object):
  def __init__(self, data=None):
    self.data = data or ''

  def write(self, buf):
    self.data += buf

  def read(self, amount=None):
    if not amount:
      amount = len(self.data)
    result = self.data[:amount]
    self.data = self.data[amount:]
    return result


class FakeCall(object):
  def __init__(self):
    self.expectations = []

  def add_expectation(self, *args, **kwargs):
    returns = kwargs.pop('_returns', None)
    self.expectations.append((args, kwargs, returns))

  def __call__(self, *args, **kwargs):
    if not self.expectations:
      raise TestError('Got unexpected\n%s\n%s' % (args, kwargs))
    exp_args, exp_kwargs, exp_returns = self.expectations.pop(0)
    if args != exp_args or kwargs != exp_kwargs:
      message = 'Expected:\n  args: %s\n  kwargs: %s\n' % (exp_args, exp_kwargs)
      message += 'Got:\n  args: %s\n  kwargs: %s\n' % (args, kwargs)
      raise TestError(message)
    return exp_returns


class GsutilUnitTests(unittest.TestCase):
  def setUp(self):
    self.fake = FakeCall()
    self.tempdir = tempfile.mkdtemp()
73
    self.old_urlopen = getattr(urllib2, 'urlopen')
74
    self.old_call = getattr(subprocess, 'call')
75
    setattr(urllib2, 'urlopen', self.fake)
76
    setattr(subprocess, 'call', self.fake)
77 78 79 80

  def tearDown(self):
    self.assertEqual(self.fake.expectations, [])
    shutil.rmtree(self.tempdir)
81
    setattr(urllib2, 'urlopen', self.old_urlopen)
82
    setattr(subprocess, 'call', self.old_call)
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127

  def test_download_gsutil(self):
    version = '4.2'
    filename = 'gsutil_%s.zip' % version
    full_filename = os.path.join(self.tempdir, filename)
    fake_file = 'This is gsutil.zip'
    fake_file2 = 'This is other gsutil.zip'
    url = '%s%s' % (gsutil.GSUTIL_URL, filename)
    self.fake.add_expectation(url, _returns=Buffer(fake_file))

    self.assertEquals(
        gsutil.download_gsutil(version, self.tempdir), full_filename)
    with open(full_filename, 'r') as f:
      self.assertEquals(fake_file, f.read())

    metadata_url = gsutil.API_URL + filename
    md5_calc = hashlib.md5()
    md5_calc.update(fake_file)
    b64_md5 = base64.b64encode(md5_calc.hexdigest())
    self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({
        'md5Hash': b64_md5
    })))
    self.assertEquals(
        gsutil.download_gsutil(version, self.tempdir), full_filename)
    with open(full_filename, 'r') as f:
      self.assertEquals(fake_file, f.read())
    self.assertEquals(self.fake.expectations, [])

    self.fake.add_expectation(metadata_url, _returns=Buffer(json.dumps({
        'md5Hash': base64.b64encode('aaaaaaa')  # Bad MD5
    })))
    self.fake.add_expectation(url, _returns=Buffer(fake_file2))
    self.assertEquals(
        gsutil.download_gsutil(version, self.tempdir), full_filename)
    with open(full_filename, 'r') as f:
      self.assertEquals(fake_file2, f.read())
    self.assertEquals(self.fake.expectations, [])

  def test_ensure_gsutil_full(self):
    version = '4.2'
    gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
    gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
    os.makedirs(gsutil_dir)

    self.fake.add_expectation(
128 129
        [sys.executable, gsutil_bin, 'version'], stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT, _returns=1)
130 131 132 133 134 135 136 137 138 139 140 141

    with open(gsutil_bin, 'w') as f:
      f.write('Foobar')
    zip_filename = 'gsutil_%s.zip' % version
    url = '%s%s' % (gsutil.GSUTIL_URL, zip_filename)
    _, tempzip = tempfile.mkstemp()
    fake_gsutil = 'Fake gsutil'
    with zipfile.ZipFile(tempzip, 'w') as zf:
      zf.writestr('gsutil/gsutil', fake_gsutil)
    with open(tempzip, 'rb') as f:
      self.fake.add_expectation(url, _returns=Buffer(f.read()))
    self.fake.add_expectation(
142 143
        [sys.executable, gsutil_bin, 'version'], stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT, _returns=1)
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161

    # This should delete the old bin and rewrite it with 'Fake gsutil'
    self.assertRaises(
        gsutil.InvalidGsutilError, gsutil.ensure_gsutil, version, self.tempdir)
    self.assertTrue(os.path.isdir(os.path.join(self.tempdir, '.cache_dir')))
    self.assertTrue(os.path.exists(gsutil_bin))
    with open(gsutil_bin, 'r') as f:
      self.assertEquals(f.read(), fake_gsutil)
    self.assertEquals(self.fake.expectations, [])

  def test_ensure_gsutil_short(self):
    version = '4.2'
    gsutil_dir = os.path.join(self.tempdir, 'gsutil_%s' % version, 'gsutil')
    gsutil_bin = os.path.join(gsutil_dir, 'gsutil')
    os.makedirs(gsutil_dir)

    # Mock out call().
    self.fake.add_expectation(
162 163
        [sys.executable, gsutil_bin, 'version'],
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT, _returns=0)
164 165 166 167 168 169 170 171

    with open(gsutil_bin, 'w') as f:
      f.write('Foobar')
    self.assertEquals(
        gsutil.ensure_gsutil(version, self.tempdir), gsutil_bin)

if __name__ == '__main__':
  unittest.main()