Commit b800fde5 authored by Edward Lemur's avatar Edward Lemur Committed by Commit Bot

git-map: Refactor and add simple tests

Change-Id: I8fd0034f6a6d7623792620f92208b25961fa174e
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/1990142Reviewed-by: 's avatarAnthony Polito <apolito@google.com>
Commit-Queue: Edward Lesmes <ehmaldonado@chromium.org>
parent 8b52ca7a
......@@ -73,6 +73,7 @@ def CommonChecks(input_api, output_api, tests_to_black_list, run_on_python3):
r'.*git_cl_test\.py$',
r'.*git_common_test\.py$',
r'.*git_hyper_blame_test\.py$',
r'.*git_map_test\.py$',
r'.*git_number_test\.py$',
r'.*git_rebase_update_test\.py$',
r'.*ninjalog_uploader_test\.py$',
......
......@@ -3,4 +3,4 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
. $(type -P python_runner.sh) | less -R
. $(type -P python_runner.sh)
......@@ -39,6 +39,11 @@ import subprocess2
from io import BytesIO
if sys.version_info.major == 2:
# On Python 3, BrokenPipeError is raised instead.
BrokenPipeError = IOError
ROOT = os.path.abspath(os.path.dirname(__file__))
IS_WIN = sys.platform == 'win32'
TEST_MODE = False
......@@ -701,7 +706,11 @@ def less(): # pragma: no cover
proc = subprocess2.Popen(cmd, stdin=subprocess2.PIPE)
yield proc.stdin
finally:
proc.stdin.close()
try:
proc.stdin.close()
except BrokenPipeError:
# BrokenPipeError is raised if proc has already completed,
pass
proc.wait()
......
......@@ -4,6 +4,8 @@
# found in the LICENSE file.
"""
usage: git map [-h] [--help] [<args>]
Enhances `git log --graph` view with information on commit branches + tags that
point to them. Items are colorized as follows:
......@@ -15,130 +17,150 @@ point to them. Items are colorized as follows:
* Blue background - The currently checked out commit
"""
from __future__ import unicode_literals
import os
import sys
import git_common
import setup_color
import subprocess2
from git_common import current_branch, branches, tags, get_config_list, GIT_EXE
from git_common import get_or_create_merge_base, root
from third_party import colorama
CYAN = colorama.Fore.CYAN
GREEN = colorama.Fore.GREEN
MAGENTA = colorama.Fore.MAGENTA
RED = colorama.Fore.RED
WHITE = colorama.Fore.WHITE
BLUEBAK = colorama.Back.BLUE
if sys.version_info.major == 2:
# On Python 3, BrokenPipeError is raised instead.
BrokenPipeError = IOError
BRIGHT = colorama.Style.BRIGHT
RESET = colorama.Fore.RESET + colorama.Back.RESET + colorama.Style.RESET_ALL
BRIGHT = colorama.Style.BRIGHT
# Git emits combined color
BRIGHT_RED = '\x1b[1;31m'
BLUE_BACK = colorama.Back.BLUE + BRIGHT
BRIGHT_RED = colorama.Fore.RED + BRIGHT
CYAN = colorama.Fore.CYAN + BRIGHT
GREEN = colorama.Fore.GREEN + BRIGHT
MAGENTA = colorama.Fore.MAGENTA + BRIGHT
RED = colorama.Fore.RED
WHITE = colorama.Fore.WHITE + BRIGHT
YELLOW = colorama.Fore.YELLOW
def print_help():
def _print_help(outbuf):
names = {
'Cyan': CYAN,
'Green': GREEN,
'Magenta': MAGENTA,
'Red': RED,
'White': WHITE,
'Blue background': BLUEBAK,
'Blue background': BLUE_BACK,
}
msg = "usage: git map [-h] [<args>]\n"
msg = ''
for line in __doc__.splitlines():
for key in names.keys():
if key in line:
msg += line.replace('* ', '* ' + names[key])+RESET+'\n'
for name, color in names.items():
if name in line:
msg += line.replace('* ' + name, color + '* ' + name + RESET) + '\n'
break
else:
msg += line + '\n'
sys.stdout.write(msg)
outbuf.write(msg.encode('utf-8', 'replace'))
def _color_branch(branch, all_branches, all_tags, current):
if branch == current or branch == 'HEAD -> ' + current:
color = CYAN
current = None
elif branch in all_branches:
color = GREEN
all_branches.remove(branch)
elif branch in all_tags:
color = MAGENTA
elif branch.startswith('tag: '):
color = MAGENTA
branch = branch[len('tag: '):]
else:
color = RED
return color + branch + RESET
def _color_branch_list(branch_list, all_branches, all_tags, current):
if not branch_list:
return ''
colored_branches = (GREEN + ', ').join(
_color_branch(branch, all_branches, all_tags, current)
for branch in branch_list if branch != 'HEAD')
return (GREEN + '(' + colored_branches + GREEN + ') ' + RESET)
def _parse_log_line(line):
graph, branch_list, commit_date, subject = (
line.decode('utf-8', 'replace').strip().split('\x00'))
branch_list = [] if not branch_list else branch_list.split(', ')
commit = graph.split()[-1]
graph = graph[:-len(commit)]
return graph, commit, branch_list, commit_date, subject
def main(argv, outbuf):
if '-h' in argv or '--help' in argv:
_print_help(outbuf)
return 0
map_extra = git_common.get_config_list('depot_tools.map_extra')
cmd = [
git_common.GIT_EXE, 'log', git_common.root(),
'--graph', '--branches', '--tags', '--color=always', '--date=short',
'--pretty=format:%H%x00%D%x00%cd%x00%s'
] + map_extra + argv
def main(argv):
if '-h' in argv:
print_help()
return 0
log_proc = subprocess2.Popen(cmd, stdout=subprocess2.PIPE, shell=False)
map_extra = get_config_list('depot_tools.map_extra')
fmt = '%C(red bold)%h%x09%Creset%C(green)%d%Creset %C(yellow)%cd%Creset ~ %s'
log_proc = subprocess2.Popen(
[GIT_EXE, 'log', '--graph', '--branches', '--tags', root(),
'--color=always', '--date=short', ('--pretty=format:' + fmt)
] + map_extra + argv,
stdout=subprocess2.PIPE,
shell=False)
current = current_branch()
all_branches = set(branches())
merge_base_map = {b: get_or_create_merge_base(b) for b in all_branches}
merge_base_map = {b: v for b, v in merge_base_map.items() if v}
current = git_common.current_branch()
all_tags = set(git_common.tags())
all_branches = set(git_common.branches())
if current in all_branches:
all_branches.remove(current)
all_tags = set(tags())
merge_base_map = {}
for branch in all_branches:
merge_base = git_common.get_or_create_merge_base(branch)
if merge_base:
merge_base_map.setdefault(merge_base, set()).add(branch)
for merge_base, branches in merge_base_map.items():
merge_base_map[merge_base] = ', '.join(branches)
try:
for line in log_proc.stdout.xreadlines():
if merge_base_map:
commit = line[line.find(BRIGHT_RED)+len(BRIGHT_RED):line.find('\t')]
base_for_branches = set()
for branch, sha in merge_base_map.items():
if sha.startswith(commit):
base_for_branches.add(branch)
if base_for_branches:
newline = '\r\n' if line.endswith('\r\n') else '\n'
line = line.rstrip(newline)
line += ''.join(
(BRIGHT, WHITE, ' <(%s)' % (', '.join(base_for_branches)),
RESET, newline))
for b in base_for_branches:
del merge_base_map[b]
start = line.find(GREEN+' (')
end = line.find(')', start)
if start != -1 and end != -1:
start += len(GREEN) + 2
branch_list = line[start:end].split(', ')
branches_str = ''
if branch_list:
colored_branches = []
head_marker = ''
for b in branch_list:
if b == "HEAD":
head_marker = BLUEBAK+BRIGHT+'*'
continue
if b == current:
colored_branches.append(CYAN+BRIGHT+b+RESET)
current = None
elif b in all_branches:
colored_branches.append(GREEN+BRIGHT+b+RESET)
all_branches.remove(b)
elif b in all_tags:
colored_branches.append(MAGENTA+BRIGHT+b+RESET)
elif b.startswith('tag: '):
colored_branches.append(MAGENTA+BRIGHT+b[5:]+RESET)
else:
colored_branches.append(RED+b)
branches_str = '(%s) ' % ((GREEN+", ").join(colored_branches)+GREEN)
line = "%s%s%s" % (line[:start-1], branches_str, line[end+5:])
if head_marker:
line = line.replace('*', head_marker, 1)
sys.stdout.write(line)
except (IOError, KeyboardInterrupt):
for line in log_proc.stdout:
if b'\x00' not in line:
outbuf.write(line)
continue
graph, commit, branch_list, commit_date, subject = _parse_log_line(line)
if 'HEAD' in branch_list:
graph = graph.replace('*', BLUE_BACK + '*')
line = '{graph}{commit}\t{branches}{date} ~ {subject}'.format(
graph=graph,
commit=BRIGHT_RED + commit[:10] + RESET,
branches=_color_branch_list(
branch_list, all_branches, all_tags, current),
date=YELLOW + commit_date + RESET,
subject=subject)
if commit in merge_base_map:
line += ' <({})'.format(WHITE + merge_base_map[commit] + RESET)
line += os.linesep
outbuf.write(line.encode('utf-8', 'replace'))
except (BrokenPipeError, KeyboardInterrupt):
pass
finally:
sys.stderr.close()
sys.stdout.close()
return 0
if __name__ == '__main__':
try:
sys.exit(main(sys.argv[1:]))
except KeyboardInterrupt:
sys.stderr.write('interrupted\n')
sys.exit(1)
setup_color.init()
with git_common.less() as less_input:
sys.exit(main(sys.argv[1:], less_input))
......@@ -485,7 +485,10 @@ class GitRepoSchemaTestBase(unittest.TestCase):
@classmethod
def getRepoContent(cls, commit):
return getattr(cls, 'COMMIT_%s' % commit, None)
commit = 'COMMIT_%s' % commit
if sys.version_info.major == 2:
commit = commit.encode('utf-8')
return getattr(cls, commit, None)
@classmethod
def setUpClass(cls):
......
#!/usr/bin/env vpython3
# coding=utf-8
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for git_map."""
from __future__ import print_function
from __future__ import unicode_literals
import io
import os
import re
import sys
import unittest
DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, DEPOT_TOOLS_ROOT)
from testing_support import git_test_utils
import git_map
import git_common
if sys.version_info.major == 2:
import mock
else:
from unittest import mock
git_common.TEST_MODE = True
GitRepo = git_test_utils.GitRepo
class GitMapTest(git_test_utils.GitRepoReadOnlyTestBase):
REPO_SCHEMA = """"
A B C D 😋 F G
B H I J K
J L
"""
def setUp(self):
# Include branch_K, branch_L to make sure that ABCDEFG all get the
# same commit hashes as self.repo. Otherwise they get committed with the
# wrong timestamps, due to commit ordering.
# TODO(iannucci): Make commit timestamps deterministic in left to right, top
# to bottom order, not in lexi-topographical order.
origin_schema = git_test_utils.GitRepoSchema("""
A B C D 😋 F G M N O
B H I J K
J L
""", self.getRepoContent)
self.origin = origin_schema.reify()
self.origin.git('checkout', 'master')
self.origin.git('branch', '-d', *['branch_'+l for l in 'KLG'])
self.repo.git('remote', 'add', 'origin', self.origin.repo_path)
self.repo.git('config', '--add', 'remote.origin.fetch',
'+refs/tags/*:refs/tags/*')
self.repo.git('update-ref', 'refs/remotes/origin/master', 'tag_E')
self.repo.git('branch', '--set-upstream-to', 'branch_G', 'branch_K')
self.repo.git('branch', '--set-upstream-to', 'branch_K', 'branch_L')
self.repo.git('fetch', 'origin')
mock.patch('git_map.RESET', '').start()
mock.patch('git_map.BLUE_BACK', '').start()
mock.patch('git_map.BRIGHT_RED', '').start()
mock.patch('git_map.CYAN', '').start()
mock.patch('git_map.GREEN', '').start()
mock.patch('git_map.MAGENTA', '').start()
mock.patch('git_map.RED', '').start()
mock.patch('git_map.WHITE', '').start()
mock.patch('git_map.YELLOW', '').start()
self.addCleanup(mock.patch.stopall)
def testHelp(self):
outbuf = io.BytesIO()
self.repo.run(git_map.main, ['-h'], outbuf)
self.assertIn(b'usage: git map [-h] [--help] [<args>]', outbuf.getvalue())
def testGitMap(self):
expected = os.linesep.join([
'* 6e85e877ea (tag_O, origin/master, origin/branch_O) 1970-01-30 ~ O',
'* 4705470871 (tag_N) 1970-01-28 ~ N',
'* 8761b1a94f (tag_M) 1970-01-26 ~ M',
'* 5e7ce08691 (tag_G) 1970-01-24 ~ G',
'* 78543ed411 (tag_F) 1970-01-18 ~ F',
'* f5c2b77013 (tag_😋) 1970-01-16 ~ 😋',
'* 5249c43079 (tag_D) 1970-01-10 ~ D',
'* 072ade676a (tag_C) 1970-01-06 ~ C',
'| * e77da937d5 (branch_G) 1970-01-26 ~ G',
'| * acda9677fd 1970-01-20 ~ F',
'| * b4bed3c8e1 1970-01-18 ~ 😋',
'| * 5da071fda9 1970-01-12 ~ D',
'| * 1ef9b2e4ca 1970-01-08 ~ C',
'| | * ddd611f619 (branch_L) 1970-01-24 ~ L',
'| | | * f07cbd8cfc (branch_K) 1970-01-22 ~ K',
'| | |/ ',
'| | * fb7da24708 1970-01-16 ~ J <(branch_L)',
'| | * bb168f6d65 1970-01-14 ~ I',
'| | * ee1032effa 1970-01-10 ~ H',
'| |/ ',
'| * db57edd2c0 1970-01-06 ~ B <(branch_K)',
'| * e4f775f844 (root_A) 1970-01-04 ~ A',
'| * 2824d6d8b6 (tag_L, origin/branch_L) 1970-01-22 ~ L',
'| | * 4e599306f0 (tag_K, origin/branch_K) 1970-01-20 ~ K',
'| |/ ',
'| * 332f1b4499 (tag_J) 1970-01-14 ~ J',
'| * 2fc0bc5ee5 (tag_I) 1970-01-12 ~ I',
'| * 6e0ab26451 (tag_H) 1970-01-08 ~ H',
'|/ ',
'* 315457dbe8 (tag_B) 1970-01-04 ~ B',
'* cd589e62d8 (tag_A, origin/root_A) 1970-01-02 ~ A',
'* 7026d3d68e (tag_", root_", master, branch_") 1970-01-02 ~ "',
])
outbuf = io.BytesIO()
self.repo.run(git_map.main, [], outbuf)
output = outbuf.getvalue()
output = re.sub(br'.\[\d\dm', b'', output)
output = re.sub(br'.\[m', b'', output)
self.assertEqual(output.splitlines(), expected.encode('utf-8').splitlines())
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment