bot_update_coverage_test.py 7.27 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
#!/usr/bin/env python
# Copyright (c) 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import codecs
import copy
import json
import os
import sys
import unittest

13
#import test_env  # pylint: disable=relative-import,unused-import
14 15 16 17 18 19 20 21 22 23 24

sys.path.insert(0, os.path.join(
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
    'recipe_modules', 'bot_update', 'resources'))
import bot_update

DEFAULT_PARAMS = {
    'solutions': [{
        'name': 'somename',
        'url': 'https://fake.com'
    }],
25
    'revisions': {},
26 27 28 29 30 31 32 33 34 35 36 37 38
    'first_sln': 'somename',
    'target_os': None,
    'target_os_only': None,
    'patch_root': None,
    'issue': None,
    'patchset': None,
    'rietveld_server': None,
    'gerrit_repo': None,
    'gerrit_ref': None,
    'gerrit_rebase_patch_ref': None,
    'revision_mapping': {},
    'apply_issue_email_file': None,
    'apply_issue_key_file': None,
39
    'apply_issue_oauth2_file': None,
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
    'shallow': False,
    'refs': [],
    'git_cache_dir': '',
    'gerrit_reset': None,
}


class MockedPopen(object):
  """A fake instance of a called subprocess.

  This is meant to be used in conjunction with MockedCall.
  """
  def __init__(self, args=None, kwargs=None):
    self.args = args or []
    self.kwargs = kwargs or {}
    self.return_value = None
    self.fails = False

  def returns(self, rv):
    """Set the return value when this popen is called.

    rv can be a string, or a callable (eg function).
    """
    self.return_value = rv
    return self

  def check(self, args, kwargs):
    """Check to see if the given args/kwargs call match this instance.

    This does a partial match, so that a call to "git clone foo" will match
    this instance if this instance was recorded as "git clone"
    """
    if any(input_arg != expected_arg
           for (input_arg, expected_arg) in zip(args, self.args)):
      return False
    return self.return_value

  def __call__(self, args, kwargs):
    """Actually call this popen instance."""
    if hasattr(self.return_value, '__call__'):
      return self.return_value(*args, **kwargs)
    return self.return_value


class MockedCall(object):
  """A fake instance of bot_update.call().

  This object is pre-seeded with "answers" in self.expectations.  The type
  is a MockedPopen object, or any object with a __call__() and check() method.
  The check() method is used to check to see if the correct popen object is
  chosen (can be a partial match, eg a "git clone" popen module would match
  a "git clone foo" call).
  By default, if no answers have been pre-seeded, the call() returns successful
  with an empty string.
  """
  def __init__(self, fake_filesystem):
    self.expectations = []
    self.records = []

  def expect(self, args=None, kwargs=None):
    args = args or []
    kwargs = kwargs or {}
    popen = MockedPopen(args, kwargs)
    self.expectations.append(popen)
    return popen

  def __call__(self, *args, **kwargs):
    self.records.append((args, kwargs))
    for popen in self.expectations:
      if popen.check(args, kwargs):
        self.expectations.remove(popen)
        return popen(args, kwargs)
    return ''


class MockedGclientSync():
  """A class producing a callable instance of gclient sync.

  Because for bot_update, gclient sync also emits an output json file, we need
  a callable object that can understand where the output json file is going, and
  emit a (albite) fake file for bot_update to consume.
  """
  def __init__(self, fake_filesystem):
    self.output = {}
    self.fake_filesystem = fake_filesystem
125
    self.records = []
126 127 128 129 130

  def __call__(self, *args, **_):
    output_json_index = args.index('--output-json') + 1
    with self.fake_filesystem.open(args[output_json_index], 'w') as f:
      json.dump(self.output, f)
131
    self.records.append(args)
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167


class FakeFile():
  def __init__(self):
    self.contents = ''

  def write(self, buf):
    self.contents += buf

  def read(self):
    return self.contents

  def __enter__(self):
    return self

  def __exit__(self, _, __, ___):
    pass


class FakeFilesystem():
  def __init__(self):
    self.files = {}

  def open(self, target, mode='r', encoding=None):
    if 'w' in mode:
      self.files[target] = FakeFile()
      return self.files[target]
    return self.files[target]


def fake_git(*args, **kwargs):
  return bot_update.call('git', *args, **kwargs)


class BotUpdateUnittests(unittest.TestCase):
  def setUp(self):
168
    sys.platform = 'linux2'  # For consistency, ya know?
169 170 171
    self.filesystem = FakeFilesystem()
    self.call = MockedCall(self.filesystem)
    self.gclient = MockedGclientSync(self.filesystem)
172 173 174
    self.call.expect(
        (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')
    ).returns(self.gclient)
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
    self.old_call = getattr(bot_update, 'call')
    self.params = copy.deepcopy(DEFAULT_PARAMS)
    setattr(bot_update, 'call', self.call)
    setattr(bot_update, 'git', fake_git)

    self.old_os_cwd = os.getcwd
    setattr(os, 'getcwd', lambda: '/b/build/slave/foo/build')

    setattr(bot_update, 'open', self.filesystem.open)
    self.old_codecs_open = codecs.open
    setattr(codecs, 'open', self.filesystem.open)

  def tearDown(self):
    setattr(bot_update, 'call', self.old_call)
    setattr(os, 'getcwd', self.old_os_cwd)
    delattr(bot_update, 'open')
    setattr(codecs, 'open', self.old_codecs_open)

193 194
  def overrideSetupForWindows(self):
    sys.platform = 'win'
195 196 197
    self.call.expect(
        (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')
    ).returns(self.gclient)
198

199 200 201 202 203 204 205 206 207
  def testBasic(self):
    bot_update.ensure_checkout(**self.params)
    return self.call.records

  def testBasicShallow(self):
    self.params['shallow'] = True
    bot_update.ensure_checkout(**self.params)
    return self.call.records

208 209 210 211 212 213 214 215
  def testBasicRevision(self):
    self.params['revisions'] = {
        'src': 'HEAD', 'src/v8': 'deadbeef', 'somename': 'DNE'}
    bot_update.ensure_checkout(**self.params)
    args = self.gclient.records[0]
    idx_first_revision = args.index('--revision')
    idx_second_revision = args.index(
        '--revision', idx_first_revision+1)
216 217 218 219
    idx_third_revision = args.index('--revision', idx_second_revision+1)
    self.assertEquals(args[idx_first_revision+1], 'somename@unmanaged')
    self.assertEquals(args[idx_second_revision+1], 'src@origin/master')
    self.assertEquals(args[idx_third_revision+1], 'src/v8@deadbeef')
220 221
    return self.call.records

222
  def testBreakLocks(self):
223
    self.overrideSetupForWindows()
224 225 226 227
    bot_update.ensure_checkout(**self.params)
    gclient_sync_cmd = None
    for record in self.call.records:
      args = record[0]
228
      if args[:4] == (sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync'):
229 230 231
        gclient_sync_cmd = args
    self.assertTrue('--break_repo_locks' in gclient_sync_cmd)

232 233 234 235 236 237 238 239 240 241 242 243 244 245
  def testGitCheckoutBreaksLocks(self):
    self.overrideSetupForWindows()
    path = '/b/build/slave/foo/build/.git'
    lockfile = 'index.lock'
    removed = []
    old_os_walk = os.walk
    old_os_remove = os.remove
    setattr(os, 'walk', lambda _: [(path, None, [lockfile])])
    setattr(os, 'remove', removed.append)
    bot_update.ensure_checkout(**self.params)
    setattr(os, 'walk', old_os_walk)
    setattr(os, 'remove', old_os_remove)
    self.assertTrue(os.path.join(path, lockfile) in removed)

246 247 248

if __name__ == '__main__':
  unittest.main()