|
3 | 3 | This uses `pygls` as a demonstration. Install with `pip install -r requirements.txt`. |
4 | 4 | """ |
5 | 5 | from pygls.server import LanguageServer |
6 | | -from pygls.lsp.types import (DidOpenTextDocumentParams, DidChangeTextDocumentParams, |
7 | | - InitializeParams, TextDocumentItem, Position, |
8 | | - Range, SemanticTokens, SemanticTokensLegend, |
9 | | - SemanticTokensParams) |
10 | | -from pygls.lsp.types import SemanticTokens, SemanticTokensParams |
| 6 | +from pygls.lsp.types import ( |
| 7 | + DidOpenTextDocumentParams, |
| 8 | + DidChangeTextDocumentParams, |
| 9 | + TextDocumentItem, |
| 10 | + Position, |
| 11 | + Range, |
| 12 | + Diagnostic, |
| 13 | + DiagnosticSeverity, |
| 14 | + CodeAction, |
| 15 | + CodeActionKind, |
| 16 | + TextEdit, |
| 17 | + WorkspaceEdit, |
| 18 | + VersionedTextDocumentIdentifier, |
| 19 | +) |
| 20 | +from pygls.lsp.methods import TEXT_DOCUMENT_DID_OPEN, TEXT_DOCUMENT_DID_CHANGE |
11 | 21 |
|
12 | 22 | import re |
13 | 23 |
|
14 | 24 | SERVER = LanguageServer('proxpl-lsp', 'v0.1') |
15 | 25 |
|
16 | 26 | TAINT_TOKEN_TYPE = 100 # custom type index (client must map) |
17 | 27 |
|
18 | | -@SERVER.feature('textDocument/didOpen') |
| 28 | +SOURCES = [r"\b(recv|read|input|http_get)\s*\("] |
| 29 | +SANITIZERS = [r"\b(sanitize|sanitize_json|json_schema)\s*\("] |
| 30 | +SINKS = [r"\b(db\.insert|exec|eval|os\.system|print_unchecked|write_unchecked)\b"] |
| 31 | + |
| 32 | + |
| 33 | +def analyze_taint(text: str): |
| 34 | + """Very small heuristic taint analysis. |
| 35 | +
|
| 36 | + - Marks variables assigned from source calls as tainted. |
| 37 | + - Propagates taint through simple assignments: `b = a`. |
| 38 | + - Recognizes sanitizer calls to mark variables as sanitized. |
| 39 | + Returns: (taint_map: dict var->'tainted'|'sanitized', diagnostics: list) |
| 40 | + """ |
| 41 | + taint = {} |
| 42 | + diagnostics = [] |
| 43 | + lines = text.splitlines() |
| 44 | + |
| 45 | + # simple pass to detect direct sources and sanitizers |
| 46 | + for lineno, line in enumerate(lines): |
| 47 | + # assignment pattern: var = expr |
| 48 | + m = re.match(r"\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$", line) |
| 49 | + if m: |
| 50 | + lhs = m.group(1) |
| 51 | + rhs = m.group(2) |
| 52 | + # source |
| 53 | + if any(re.search(p, rhs) for p in SOURCES): |
| 54 | + taint[lhs] = 'tainted' |
| 55 | + continue |
| 56 | + # sanitizer |
| 57 | + if any(re.search(p, rhs) for p in SANITIZERS): |
| 58 | + taint[lhs] = 'sanitized' |
| 59 | + continue |
| 60 | + # rhs contains other var |
| 61 | + for var in re.findall(r"\b[A-Za-z_][A-Za-z0-9_]*\b", rhs): |
| 62 | + if var in taint and taint[var] == 'tainted': |
| 63 | + taint[lhs] = 'tainted' |
| 64 | + break |
| 65 | + |
| 66 | + # second pass: find tainted usages in sinks |
| 67 | + for lineno, line in enumerate(lines): |
| 68 | + for sink_pat in SINKS: |
| 69 | + if re.search(sink_pat, line): |
| 70 | + # find variables used in the call |
| 71 | + for var in re.findall(r"\b[A-Za-z_][A-Za-z0-9_]*\b", line): |
| 72 | + if var in taint and taint[var] == 'tainted': |
| 73 | + # produce a diagnostic recommending sanitization |
| 74 | + start = line.find(var) |
| 75 | + diag = Diagnostic( |
| 76 | + range=Range(start=Position(line=lineno, character=start), |
| 77 | + end=Position(line=lineno, character=start + len(var))), |
| 78 | + message=f"Tainted variable '{var}' used in sensitive sink; consider sanitizing", |
| 79 | + severity=DiagnosticSeverity.Warning, |
| 80 | + source='proxpl-taint', |
| 81 | + ) |
| 82 | + diagnostics.append(diag) |
| 83 | + return taint, diagnostics |
| 84 | + |
| 85 | + |
| 86 | +def publish_taint(ls: LanguageServer, uri: str, text: str): |
| 87 | + taint_map, diagnostics = analyze_taint(text) |
| 88 | + # publish diagnostics |
| 89 | + ls.publish_diagnostics(uri, diagnostics) |
| 90 | + |
| 91 | + # publish semantic tokens via a custom notification for demo clients |
| 92 | + tokens = [] |
| 93 | + for lineno, line in enumerate(text.splitlines()): |
| 94 | + for var in re.findall(r"\b[A-Za-z_][A-Za-z0-9_]*\b", line): |
| 95 | + if var in taint_map and taint_map[var] == 'tainted': |
| 96 | + start = line.find(var) |
| 97 | + length = len(var) |
| 98 | + tokens.extend([lineno, start, length, TAINT_TOKEN_TYPE, 0]) |
| 99 | + ls.send_notification('proxpl/taintTokens', {'uri': uri, 'tokens': tokens}) |
| 100 | + |
| 101 | + |
| 102 | +@SERVER.feature(TEXT_DOCUMENT_DID_OPEN) |
19 | 103 | def did_open(ls: LanguageServer, params: DidOpenTextDocumentParams): |
20 | 104 | doc = params.text_document |
21 | | - publish_taint_tokens(ls, doc.text, doc.uri) |
| 105 | + publish_taint(ls, doc.uri, doc.text) |
| 106 | + |
22 | 107 |
|
23 | | -@SERVER.feature('textDocument/didChange') |
| 108 | +@SERVER.feature(TEXT_DOCUMENT_DID_CHANGE) |
24 | 109 | def did_change(ls: LanguageServer, params: DidChangeTextDocumentParams): |
25 | | - doc = ls.workspace.get_document(params.text_document.uri) |
26 | | - publish_taint_tokens(ls, doc.source, doc.uri) |
| 110 | + # Use the workspace document if available |
| 111 | + try: |
| 112 | + doc = ls.workspace.get_document(params.text_document.uri) |
| 113 | + text = doc.source |
| 114 | + except Exception: |
| 115 | + # Fallback: use the incremental content from params |
| 116 | + text = params.content_changes[0].text if params.content_changes else '' |
| 117 | + publish_taint(ls, params.text_document.uri, text) |
| 118 | + |
| 119 | + |
| 120 | +@SERVER.feature('textDocument/codeAction') |
| 121 | +def code_action(ls: LanguageServer, params): |
| 122 | + # Look for diagnostics at the requested range and provide a quick fix |
| 123 | + uri = params.text_document.uri |
| 124 | + actions = [] |
| 125 | + diagnostics = params.context.diagnostics |
| 126 | + for diag in diagnostics: |
| 127 | + if diag.source == 'proxpl-taint': |
| 128 | + # extract variable name from message |
| 129 | + m = re.search(r"'([A-Za-z_][A-Za-z0-9_]*)'", diag.message) |
| 130 | + if not m: |
| 131 | + continue |
| 132 | + var = m.group(1) |
| 133 | + # create a TextEdit to wrap the variable with sanitize_json(var) |
| 134 | + start = diag.range.start |
| 135 | + end = diag.range.end |
| 136 | + edit = TextEdit(range=Range(start=start, end=end), new_text=f"sanitize_json({var})") |
| 137 | + wedit = WorkspaceEdit(changes={uri: [edit]}) |
| 138 | + ca = CodeAction(title=f"Sanitize '{var}' with sanitize_json()", |
| 139 | + kind=CodeActionKind.QuickFix, |
| 140 | + edit=wedit) |
| 141 | + actions.append(ca) |
| 142 | + return actions |
27 | 143 |
|
28 | | -def publish_taint_tokens(ls: LanguageServer, text: str, uri: str): |
29 | | - tokens = [] # as list of integers per LSP: [line, startChar, length, tokenType, tokenModifiers] |
30 | | - for lineno, line in enumerate(text.splitlines()): |
31 | | - for m in re.finditer(r"\btaint_[A-Za-z0-9_]+\b", line): |
32 | | - start = m.start() |
33 | | - length = m.end() - m.start() |
34 | | - tokens.extend([lineno, start, length, TAINT_TOKEN_TYPE, 0]) |
35 | | - st = SemanticTokens(data=tokens) |
36 | | - # Semantic token publishing uses a custom notification for simplicity |
37 | | - ls.show_message_log(f"Publishing {len(tokens)//5} taint tokens for {uri}") |
38 | | - ls.send_notification('textDocument/semanticTokens', {'uri': uri, 'tokens': tokens}) |
39 | 144 |
|
40 | 145 | def main(): |
41 | 146 | import argparse |
42 | 147 | parser = argparse.ArgumentParser() |
43 | | - parser.add_argument('--tcp', action='store_true', help='Run LSP server over stdio (default)') |
| 148 | + parser.add_argument('--stdio', action='store_true', help='Run LSP server over stdio (default True)') |
44 | 149 | args = parser.parse_args() |
45 | 150 | SERVER.start_io() |
46 | 151 |
|
| 152 | + |
47 | 153 | if __name__ == '__main__': |
48 | 154 | main() |
0 commit comments