-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrajectory.py
More file actions
237 lines (192 loc) · 9.24 KB
/
trajectory.py
File metadata and controls
237 lines (192 loc) · 9.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# SPDX-License-Identifier: MIT
from datetime import datetime
import logging
import os
from .recorder import Recorder
from . import path_utils
logger = logging.getLogger(__name__)
class Trajectory:
def __init__(self, recorder: Recorder):
self.recorder = recorder
def get_file_trajectory(self, filepath: str, depth: int = 5) -> str:
"""Generates a narrative trajectory for a specific file.
Args:
filepath: Path to the file.
depth: Number of recent snapshots to include.
Returns:
A markdown-formatted string containing the file's history.
"""
commits = self.recorder.get_history(filepath, max_count=depth)
if not commits:
return f"No trajectory found for {filepath}."
trajectory = [f"# Trajectory for {filepath}"]
# Normalize filepath for tree access (must be relative to project root).
# We need a POSIX path for git tree traversal.
if os.path.isabs(filepath):
try:
rel_filepath = path_utils.get_relative_path(filepath, self.recorder.project_root)
except ValueError:
rel_filepath = filepath # Fallback
else:
rel_filepath = filepath
# Ensure it is posix style for git
rel_filepath = path_utils.to_posix_path(rel_filepath)
# Track content hashes to detect reverts.
# Map: content_hash -> (timestamp, message)
seen_states = {}
# Process from oldest to newest.
for commit in reversed(commits):
timestamp = datetime.fromtimestamp(commit.committed_date).strftime(
"%Y-%m-%d %H:%M:%S"
)
message = commit.message.strip()
# Calculate content hash.
try:
# gitpython: get blob for file at this commit.
blob = commit.tree / rel_filepath
content = blob.data_stream.read()
import hashlib
content_hash = hashlib.md5(content).hexdigest()
except KeyError:
# File might not exist in this commit (e.g. deleted).
content_hash = None
except Exception as e:
logger.warning(
f"Failed to hash content for {filepath} at {commit.hexsha}: {e}"
)
content_hash = None
revert_annotation = ""
if content_hash and content_hash in seen_states:
prev_ts, prev_msg = seen_states[content_hash]
revert_annotation = (
f" **[Revert Detected]** (Matches state from {prev_ts})"
)
if content_hash:
seen_states[content_hash] = (timestamp, message)
# Get diff.
if commit.parents:
parent = commit.parents[0]
diffs = parent.diff(commit, paths=filepath, create_patch=True)
diff_text = ""
for diff in diffs:
if diff.diff:
# Ensure diff is bytes before decoding.
if isinstance(diff.diff, bytes):
diff_text = diff.diff.decode("utf-8")
else:
diff_text = str(diff.diff)
break
else:
# First commit.
diff_text = "[Initial Commit]"
trajectory.append(f"## {timestamp} - {message}{revert_annotation}")
trajectory.append(f"```diff\n{diff_text}\n```")
return "\n\n".join(trajectory)
def get_global_trajectory(self, limit: int = 20, since_consolidate: bool = False) -> str:
"""Generates a global trajectory summary.
Args:
limit: Maximum number of commits to retrieve (default: 20).
since_consolidate: If True, retrieves all commits since the last consolidation.
This overrides the 'limit' argument.
Returns:
A markdown-formatted summary of global activity.
"""
try:
# Fetch commits. We fetch a bit more than limit if checking for consolidation,
# but for simplicity in this iteration, we'll iterate from HEAD.
# If since_consolidate is True, we need to iterate until we find a consolidation.
commits = []
if since_consolidate:
# Iterate commits until we find a consolidation or hit a reasonable safety limit (e.g. 1000)
for commit in self.recorder.repo.iter_commits(max_count=1000):
message = str(commit.message)
if message.startswith("[CONSOLIDATE]") or message.startswith("[CHECKPOINT]"):
# Backward compatibility: also check for [CHECKPOINT]
break
commits.append(commit)
else:
commits = list(self.recorder.repo.iter_commits(max_count=limit))
except Exception as e:
# Check for empty repo error (gitpython usually raises ValueError or GitCommandError)
error_msg = str(e)
# More specific checks for empty repository states
if "Reference at 'refs/heads/master' does not exist" in error_msg:
return "No history available"
# Check for GitCommandError that indicates no commits (git log fails)
if hasattr(e, 'stderr') and "does not have any commits yet" in str(e.stderr):
return "No history available"
# Check for BadObject (happens when HEAD is invalid)
if "BadObject" in error_msg and "HEAD" in error_msg:
return "No history available"
logger.error(f"Failed to fetch global trajectory: {e}")
return f"Error fetching global trajectory: {e}"
if not commits:
return "No global activity found."
trajectory = []
if since_consolidate:
trajectory.append("# Global Trajectory (Since Last Consolidation)")
else:
trajectory.append(f"# Global Trajectory (Last {len(commits)} snapshots)")
for commit in reversed(commits):
timestamp = datetime.fromtimestamp(commit.committed_date).strftime(
"%H:%M:%S"
)
message = commit.message.strip()
files_changed = [str(f) for f in commit.stats.files.keys()]
files_str = ", ".join(files_changed)
trajectory.append(f"- **{timestamp}**: {message} (Files: `{files_str}`)")
return "\n".join(trajectory)
def get_session_summary(self) -> str:
"""Identifies session gaps and summarizes the last session.
Returns:
A markdown-formatted summary of the last session's activity.
"""
# Optimization: Fetch only timestamps first to find the gap efficiently.
# Look back up to 1000 commits to find a session boundary.
try:
# %ct = committer timestamp, unix epoch.
timestamps_output = self.recorder.repo.git.log("-n", "1000", "--format=%ct")
if not timestamps_output:
return "No session history found."
timestamps = [int(ts) for ts in timestamps_output.splitlines()]
except Exception as e:
error_msg = str(e)
if "does not have any commits yet" in error_msg:
return "No history available"
logger.error(f"Failed to fetch commit timestamps: {e}")
return f"Error analyzing session history: {e}"
session_gap_threshold = 3600 # 1 hour
commit_count = len(timestamps)
last_session_count = commit_count # Default to all if no gap found
for i in range(commit_count - 1):
current_time = timestamps[i]
prev_time = timestamps[i + 1]
if (current_time - prev_time) > session_gap_threshold:
# Gap found at index i.
# The session includes commits 0 to i (inclusive), so count is i + 1.
last_session_count = i + 1
break
# Now fetch the actual commit objects for the identified session.
try:
last_session_commits = list(self.recorder.repo.iter_commits(max_count=last_session_count))
except Exception as e:
return f"Error fetching session commits: {e}"
if not last_session_commits:
return "No session history found."
# Summarize last session
start_time = datetime.fromtimestamp(
last_session_commits[-1].committed_date
).strftime("%Y-%m-%d %H:%M:%S")
end_time = datetime.fromtimestamp(
last_session_commits[0].committed_date
).strftime("%H:%M:%S")
files_touched: set[str] = set()
for c in last_session_commits:
# Manually iterate to avoid type checker confusion with GitPython's dict_keys.
for file_path in c.stats.files.keys():
files_touched.add(str(file_path))
summary = ["# Last Session Summary"]
summary.append(f"**Time:** {start_time} to {end_time}")
summary.append(f"**Files Modified:** {', '.join(files_touched)}")
summary.append(f"**Commit Count:** {len(last_session_commits)}")
return "\n".join(summary)