-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgit.py
More file actions
149 lines (118 loc) · 5.19 KB
/
Copy pathgit.py
File metadata and controls
149 lines (118 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Git subprocess wrappers and repo resolution."""
import hashlib
import os
import re
import subprocess
from datetime import datetime
from pathlib import Path
BLAME_RE = re.compile(r"\((.+?)\s+(\d{10})\s+[+-]\d{4}\s+\d+\)")
def run_git(cmd: list[str], cwd: str) -> str:
r = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True)
if r.returncode != 0:
raise RuntimeError(f"git failed: {' '.join(cmd)}\n{r.stderr}")
return r.stdout
CACHE_DIR = Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache")) / "gitplot" / "repos"
def resolve_repo(repo: str) -> tuple[Path, str]:
"""Return (local_path, display_name).
Local paths are used directly. URLs are cloned into ~/.cache/gitplot/repos/.
"""
p = Path(repo)
if p.is_dir() and (p / ".git").exists():
return p, p.name
name = repo.rstrip("/").split("/")[-1].replace(".git", "")
url_hash = hashlib.md5(repo.encode()).hexdigest()[:8]
dest = CACHE_DIR / f"{name}-{url_hash}"
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists():
subprocess.run(["git", "fetch", "--all"], cwd=dest, capture_output=True)
else:
subprocess.run(["git", "clone", repo, str(dest)], capture_output=True, check=True)
return dest, name
def get_all_commits(repo: str) -> list[tuple[str, datetime]]:
out = run_git(["git", "log", "--format=%H %at", "--reverse"], repo)
commits = []
for line in out.strip().split("\n"):
if line:
h, ts = line.split()
commits.append((h, datetime.fromtimestamp(int(ts))))
return commits
def sample_evenly(commits: list[tuple[str, datetime]], n: int) -> list[tuple[str, datetime]]:
if len(commits) <= n:
return commits
step = len(commits) / n
indices = [int(i * step) for i in range(n)]
indices[-1] = len(commits) - 1
return [commits[i] for i in indices]
def tracked_files(
repo: str, commit: str, include: re.Pattern[str] | None = None, exclude: re.Pattern[str] | None = None
) -> list[str]:
out = run_git(["git", "ls-tree", "-r", "--name-only", commit], repo)
files = [f for f in out.strip().split("\n") if f]
if include is not None:
files = [f for f in files if include.search(f)]
if exclude is not None:
files = [f for f in files if not exclude.search(f)]
return files
def get_log_numstat(repo: str) -> list[tuple[str, float, str, str, int, int]]:
"""Parse git log --numstat. Returns (hash, timestamp, author, file, insertions, deletions)."""
out = run_git(["git", "log", "--numstat", "--no-merges", "--format=COMMIT %H %at %aN"], repo)
rows: list[tuple[str, float, str, str, int, int]] = []
current_hash = current_author = None
current_ts = 0.0
for line in out.split("\n"):
if line.startswith("COMMIT "):
parts = line.split(" ", 3)
current_hash = parts[1]
current_ts = float(parts[2])
current_author = parts[3]
elif line and "\t" in line and current_hash:
parts = line.split("\t")
if len(parts) == 3 and parts[0] != "-":
rows.append((current_hash, current_ts, current_author, parts[2], int(parts[0]), int(parts[1])))
return rows
def get_coauthor_map(repo: str) -> dict[str, list[str]]:
"""Parse all commit messages for Co-Authored-By trailers.
Returns short_hash (8 chars) -> [co-author names].
Uses short hashes to match git blame output.
"""
out = run_git(["git", "log", "--format=%H%n%b%nEND_COMMIT"], repo)
result: dict[str, list[str]] = {}
current_hash = None
coauthors: list[str] = []
for line in out.split("\n"):
if line == "END_COMMIT":
if current_hash and coauthors:
result[current_hash[:8]] = coauthors
current_hash = None
coauthors = []
elif len(line) == 40 and all(c in "0123456789abcdef" for c in line):
current_hash = line
coauthors = []
elif "Co-Authored-By:" in line or "Co-authored-by:" in line:
part = line.split(":", 1)[1].strip()
name = part.split("<")[0].strip() if "<" in part else part
if name:
coauthors.append(name)
return result
def blame_lines_with_hash(repo: str, commit: str, path: str) -> list[tuple[str, int, str]]:
"""Return (commit_hash, unix_timestamp, author) for each line in a file."""
try:
out = run_git(["git", "blame", "-t", commit, "--", path], repo)
except RuntimeError, UnicodeDecodeError:
return []
results = []
for line in out.split("\n"):
if not line:
continue
m = BLAME_RE.search(line)
if m:
line_hash = line.split()[0].lstrip("^")
results.append((line_hash, int(m.group(2)), m.group(1).strip()))
return results
def blame_lines(repo: str, commit: str, path: str) -> list[tuple[int, str]]:
"""Return (unix_timestamp, author) for each line in a file at a given commit."""
try:
out = run_git(["git", "blame", "-t", commit, "--", path], repo)
except RuntimeError, UnicodeDecodeError:
return []
return [(int(m.group(2)), m.group(1).strip()) for line in out.split("\n") if line and (m := BLAME_RE.search(line))]