-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
368 lines (288 loc) · 11.9 KB
/
server.py
File metadata and controls
368 lines (288 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# SPDX-License-Identifier: MIT
from mcp.server.fastmcp import FastMCP
import argparse
import logging
import os
from .recorder import Recorder
from .watcher import Watcher
from .trajectory import Trajectory
from . import path_utils
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Global state
class ServerState:
def __init__(self):
self.recorder: Recorder | None = None
self.watcher: Watcher | None = None
self.trajectory: Trajectory | None = None
self.project_path: str | None = None
state = ServerState()
# Initialize MCP Server
mcp = FastMCP("code-trajectory")
def _ensure_configured(path: str | None = None) -> str:
if path:
return _initialize_components(path)
return f"Server is configured to track: {state.project_path}"
def _check_configured() -> str | None:
"""Checks if configured, returns error message if not."""
if state.trajectory is None:
return (
"Server is NOT configured. "
"Please call 'configure_project(path=...)' with the absolute path to the project root."
)
return None
def _initialize_components(path: str) -> str:
target_path = path_utils.normalize_path(path)
if not os.path.exists(target_path):
raise ValueError(f"Target path does not exist: {target_path}")
# Check if we are already watching this path AND the shadow repo exists
shadow_repo_path = os.path.join(target_path, ".trajectory")
if (
state.watcher
and state.project_path == target_path
and os.path.exists(shadow_repo_path)
):
logger.info(f"Already watching {target_path}, skipping re-initialization.")
return f"Already configured to track: {target_path}"
# Stop existing watcher if any (different path)
if state.watcher:
state.watcher.stop()
# Check if this is a new initialization before creating the recorder (which creates the repo)
is_new_initialization = not os.path.exists(shadow_repo_path)
try:
state.recorder = Recorder(target_path)
state.watcher = Watcher(target_path, state.recorder)
state.trajectory = Trajectory(state.recorder)
state.project_path = target_path
state.watcher.start()
logger.info(f"Initialized components for {target_path}")
if is_new_initialization:
return (
"New project initialized. No history available yet. "
"Do NOT call get_session_summary."
)
return f"Successfully configured to track: {target_path}"
except Exception as e:
logger.error(f"Failed to initialize components: {e}")
raise RuntimeError(f"Failed to initialize: {e}")
@mcp.tool()
def configure_project(path: str) -> str:
"""Configures the server to track a specific project path.
This tool MUST be called before using any other tools.
It initializes the server to track the specified project directory.
Args:
path: Absolute path to the target project directory.
Returns:
A confirmation message indicating the server is configured.
"""
if path:
return _initialize_components(path)
return "Please provide a path."
@mcp.tool()
def get_file_trajectory(filepath: str, depth: int = 5) -> str:
"""Retrieves the evolutionary trajectory of a specific file.
Use this tool before modifying a complex file to understand its recent history,
or to see the "flow" of changes leading up to the present.
Args:
filepath: Relative path to the file (e.g., "src/main.py").
depth: Number of recent snapshots to retrieve (default: 5).
Returns:
A markdown-formatted narrative of the file's history, including timestamps,
intents, and diff summaries. Reverts are annotated with `[Revert Detected]`.
"""
error = _check_configured()
if error:
return error
assert state.trajectory is not None
return state.trajectory.get_file_trajectory(filepath, depth)
@mcp.tool()
def get_global_trajectory(limit: int = 20, since_consolidate: bool = False) -> str:
"""Retrieves the global trajectory (ripple effect) across the project.
Use this to understand the broader context of recent changes or to detect
ripple effects (e.g., "I changed User.py, did I also update UserTest.py?").
Args:
limit: Maximum number of commits to retrieve (default: 20).
since_consolidate: If True, retrieves all commits since the last consolidation.
This overrides the 'limit' argument.
Returns:
A summary of modified files and their relationships, grouped by time and intent.
"""
error = _check_configured()
if error:
return error
assert state.trajectory is not None
return state.trajectory.get_global_trajectory(limit, since_consolidate)
@mcp.tool()
def get_session_summary() -> str:
"""Retrieves a summary of the last session and current context.
Use this at the beginning of a chat session to "catch up" on what happened
previously or to understand the last known state of the project.
Returns:
A summary of the last recorded session, including the final intent and modified files.
"""
error = _check_configured()
if error:
return error
assert state.trajectory is not None
return state.trajectory.get_session_summary()
@mcp.tool()
def consolidate(intent: str) -> str:
"""Consolidates recent snapshots into a single commit with a descriptive intent.
Use this after completing a logical unit of work to "save" your progress semantically.
This squashes recent [AUTO-TRJ] snapshots into a single commit.
IMPORTANT: This action ONLY affects the shadow repository (.trajectory).
It does NOT create a commit in the main project's git history.
You must still commit your changes to the main project separately if desired.
Args:
intent: A clear, past-tense description of what was accomplished (e.g., "Refactored auth middleware").
Returns:
A success message indicating the consolidation was created and how many snapshots were squashed.
"""
error = _check_configured()
if error:
return error
assert state.recorder is not None
return state.recorder.consolidate(intent)
@mcp.tool()
def set_trajectory_intent(intent: str) -> str:
"""Sets the current coding intent.
The intent will be attached to all subsequent [AUTO-TRJ] snapshots until it is
changed or the server is restarted.
Args:
intent: A short description of the task (e.g., "Debugging connection timeout").
Returns:
A confirmation message indicating the intent is set.
"""
error = _check_configured()
if error:
return error
assert state.recorder is not None
state.recorder.set_intent(intent)
return f"Intent set to: '{intent}'"
class BytesStdinWrapper:
"""
Wraps stdin.buffer to ensure consistent line endings (LF) across platforms.
Specifically removes b'\r' characters that can cause issues with JSON parsing on Windows.
"""
def __init__(self, buffer):
self.buffer = buffer
def read(self, size=-1):
# Read from buffer and remove b'\r'
chunk = self.buffer.read(size)
if chunk:
return chunk.replace(b'\r', b'')
return chunk
def read1(self, size=-1):
# Read from buffer and remove b'\r'
chunk = self.buffer.read1(size)
if chunk:
return chunk.replace(b'\r', b'')
return chunk
def readline(self, size=-1):
# Read line and remove b'\r'
line = self.buffer.readline(size)
if line:
return line.replace(b'\r', b'')
return line
def __iter__(self):
for line in self.buffer:
yield line.replace(b'\r', b'')
def flush(self):
self.buffer.flush()
def close(self):
self.buffer.close()
@property
def closed(self):
return self.buffer.closed
def __getattr__(self, name):
return getattr(self.buffer, name)
class BytesStdoutWrapper:
"""
Wraps stdout.buffer to ensure consistent line endings (LF) across platforms.
Specifically replaces b'\r\n' with b'\n' to prevent "invalid trailing data" errors on clients.
"""
def __init__(self, buffer):
self.buffer = buffer
def write(self, data):
# Replace CRLF with LF
if b'\r\n' in data:
data = data.replace(b'\r\n', b'\n')
return self.buffer.write(data)
def flush(self):
self.buffer.flush()
def close(self):
self.buffer.close()
@property
def closed(self):
return self.buffer.closed
def __getattr__(self, name):
return getattr(self.buffer, name)
def main():
parser = argparse.ArgumentParser(description="Code Trajectory MCP Server")
parser.add_argument("--path", help="Path to the target project to track (optional)")
args = parser.parse_args()
# Initial configuration
try:
# Check if git is available.
import shutil
if not shutil.which("git"):
logger.error("Git is not installed or not in PATH. Code Trajectory requires git.")
raise RuntimeError("Git is not installed or not in PATH.")
if args.path:
_initialize_components(args.path)
else:
logger.info("Server started. Waiting for 'configure_project' call.")
except Exception as e:
logger.error(f"Startup configuration failed: {e}")
# We don't exit here, allowing the server to run.
# Tools will try to configure again if needed, or fail gracefully.
# Run server
try:
# Wrap stdin.buffer to sanitize input (remove \r) for Windows compatibility
import sys
import io
# We need to ensure we are wrapping the underlying buffer
if sys.stdin and hasattr(sys.stdin, 'buffer'):
original_stdin = sys.stdin
# Create a wrapper around the original buffer
wrapped_stdin = BytesStdinWrapper(original_stdin.buffer)
# Replace sys.stdin with a new TextIOWrapper using our wrapped buffer
sys.stdin = io.TextIOWrapper(
wrapped_stdin,
encoding=original_stdin.encoding,
errors=original_stdin.errors,
line_buffering=getattr(original_stdin, 'line_buffering', True)
)
# Also wrap stdout to ensure LF only output
if sys.stdout and hasattr(sys.stdout, 'buffer'):
original_stdout = sys.stdout
wrapped_stdout = BytesStdoutWrapper(original_stdout.buffer)
# We can't set sys.stdout.buffer directly (read-only).
# Instead, we replace sys.stdout with a new TextIOWrapper that wraps our buffer.
# This ensures sys.stdout.buffer points to our wrapper.
sys.stdout = io.TextIOWrapper(
wrapped_stdout,
encoding=original_stdout.encoding,
errors=original_stdout.errors,
line_buffering=getattr(original_stdout, 'line_buffering', True)
)
logger.info("Windows CRLF fix applied: wrapped sys.stdin.buffer and sys.stdout.buffer")
if sys.stderr:
sys.stderr.write("DEBUG: Code Trajectory Server with Windows CRLF fix (Input+Output) started.\n")
sys.stderr.flush()
else:
logger.warning("sys.stdin has no buffer attribute, cannot apply CRLF fix.")
if sys.stderr:
sys.stderr.write("DEBUG: sys.stdin has no buffer, fix NOT applied.\n")
sys.stderr.flush()
mcp.run()
except KeyboardInterrupt:
logger.info("Stopping server...")
finally:
if state.watcher:
state.watcher.stop()
if __name__ == "__main__":
main()