diff --git a/src/common/file_system/virtual_file_system.cpp b/src/common/file_system/virtual_file_system.cpp index 5e84bf375..8bc0a7ea9 100644 --- a/src/common/file_system/virtual_file_system.cpp +++ b/src/common/file_system/virtual_file_system.cpp @@ -1,5 +1,7 @@ #include "common/file_system/virtual_file_system.h" +#include + #include "common/assert.h" #include "common/exception/io.h" #include "common/file_system/gzip_file_system.h" @@ -11,6 +13,30 @@ namespace lbug { namespace common { +static bool isRelativePath(const std::string& path) { + if (path.empty() || path[0] == '/' || path[0] == '~') { + return false; + } + if (path.find("://") != std::string::npos) { + return false; + } + return !( + path.size() > 1 && std::isalpha(static_cast(path[0])) && path[1] == ':'); +} + +static std::string joinRemotePath(const std::string& base, std::string path) { + while (path.starts_with("./")) { + path = path.substr(2); + } + if (path == ".") { + path.clear(); + } + if (path.empty() || base.ends_with('/')) { + return base + path; + } + return base + "/" + path; +} + VirtualFileSystem::VirtualFileSystem() : VirtualFileSystem{""} {} VirtualFileSystem::VirtualFileSystem(std::string homeDir) { @@ -151,6 +177,22 @@ std::string VirtualFileSystem::resolvePath(main::ClientContext* context, const s if (!paths.empty()) { return paths.front(); } + if (!isRelativePath(path)) { + return path; + } + const auto& fileSearchPath = context->getClientConfig()->fileSearchPath; + if (fileSearchPath.empty()) { + return path; + } + for (auto& searchPath : StringUtils::split(fileSearchPath, ",")) { + if (searchPath.find("://") == std::string::npos) { + continue; + } + paths = vfs->glob(context, joinRemotePath(searchPath, path)); + if (!paths.empty()) { + return paths.front(); + } + } return path; } diff --git a/tools/dev/README.md b/tools/dev/README.md new file mode 100644 index 000000000..f675efe3e --- /dev/null +++ b/tools/dev/README.md @@ -0,0 +1,65 @@ +# Local VFS Endpoint Servers + +`range_file_server.py` serves files with `HEAD` and byte-range `GET` support. This is useful for +testing VFS reads of Parquet files over `http://` and local S3-shaped `s3://` URLs. + +Run the commands below from the repository root. + +## HTTP + +Start a local HTTP endpoint: + +```bash +python3 tools/dev/range_file_server.py --bind 127.0.0.1 --port 8766 --root "$PWD" +``` + +Then test an HTTP init file: + +```bash +printf 'match (a)-[b]->(c) return *;\n:quit\n' \ + | ./build/release/tools/shell/lbug \ + -i http://127.0.0.1:8766/dataset/demo-db/icebug-disk/schema.cypher +``` + +The range support matters because `python3 -m http.server` can serve the Cypher file, but it is not +enough for Parquet readers that issue byte-range requests. + +## S3 + +The S3 filesystem uses HTTPS. Generate a local self-signed certificate: + +```bash +openssl req -x509 -newkey rsa:2048 -nodes \ + -keyout /private/tmp/lbug_s3_key.pem \ + -out /private/tmp/lbug_s3_cert.pem \ + -days 1 \ + -subj '/CN=127.0.0.1' +``` + +Start a local path-style S3-shaped endpoint. The `--strip-prefix icebug` flag maps requests for +`/icebug/...` back to files under the repository root. + +```bash +python3 tools/dev/range_file_server.py \ + --bind 127.0.0.1 \ + --port 9443 \ + --root "$PWD" \ + --strip-prefix icebug \ + --cert /private/tmp/lbug_s3_cert.pem \ + --key /private/tmp/lbug_s3_key.pem +``` + +Configure the S3 extension to use the local endpoint: + +```cypher +CALL s3_endpoint='127.0.0.1:9443'; +CALL s3_url_style='path'; +``` + +Then use this URL shape: + +```text +s3://icebug/dataset/demo-db/icebug-disk/schema.cypher +``` + +For shell init testing, the S3 options must be available before the `-i s3://...` file is opened. diff --git a/tools/dev/range_file_server.py b/tools/dev/range_file_server.py new file mode 100755 index 000000000..951817bd2 --- /dev/null +++ b/tools/dev/range_file_server.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +import argparse +import ssl +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from urllib.parse import unquote, urlparse + + +class RangeFileHandler(BaseHTTPRequestHandler): + root = Path(".").resolve() + strip_prefix = "" + + def translate_path(self): + path = unquote(urlparse(self.path).path).lstrip("/") + if self.strip_prefix and path == self.strip_prefix: + path = "" + elif self.strip_prefix and path.startswith(self.strip_prefix + "/"): + path = path[len(self.strip_prefix) + 1:] + return (self.root / path).resolve() + + def send_file(self, head_only=False): + file_path = self.translate_path() + try: + file_path.relative_to(self.root) + except ValueError: + self.send_error(403) + return + if not file_path.is_file(): + self.send_error(404) + return + + size = file_path.stat().st_size + start = 0 + end = size - 1 + status = 200 + range_header = self.headers.get("Range") + if range_header and range_header.startswith("bytes="): + status = 206 + range_spec = range_header[len("bytes="):].split(",", 1)[0] + first, _, last = range_spec.partition("-") + if first: + start = int(first) + if last: + end = int(last) + elif last: + suffix_length = int(last) + start = max(size - suffix_length, 0) + end = size - 1 + end = min(end, size - 1) + if start > end or start >= size: + self.send_response(416) + self.send_header("Content-Range", f"bytes */{size}") + self.end_headers() + return + + length = end - start + 1 if size else 0 + self.send_response(status) + self.send_header("Accept-Ranges", "bytes") + self.send_header("Content-Length", str(length)) + if status == 206: + self.send_header("Content-Range", f"bytes {start}-{end}/{size}") + self.end_headers() + if head_only: + return + with file_path.open("rb") as file: + file.seek(start) + remaining = length + while remaining: + chunk = file.read(min(1024 * 64, remaining)) + if not chunk: + break + self.wfile.write(chunk) + remaining -= len(chunk) + + def do_GET(self): + self.send_file(False) + + def do_HEAD(self): + self.send_file(True) + + def log_message(self, fmt, *args): + if self.server.quiet: + return + super().log_message(fmt, *args) + + +class RangeFileServer(ThreadingHTTPServer): + quiet = False + + +def main(): + parser = argparse.ArgumentParser( + description="Serve local files with HEAD and byte-range GET support.") + parser.add_argument("--bind", default="127.0.0.1") + parser.add_argument("--port", type=int, required=True) + parser.add_argument("--root", default=".") + parser.add_argument( + "--strip-prefix", + default="", + help="Drop this first URL path component before mapping to --root. " + "Useful for path-style S3 bucket names.", + ) + parser.add_argument("--cert", help="TLS certificate PEM for HTTPS.") + parser.add_argument("--key", help="TLS private key PEM for HTTPS.") + parser.add_argument("--quiet", action="store_true") + args = parser.parse_args() + + RangeFileHandler.root = Path(args.root).resolve() + RangeFileHandler.strip_prefix = args.strip_prefix.strip("/") + server = RangeFileServer((args.bind, args.port), RangeFileHandler) + server.quiet = args.quiet + if args.cert or args.key: + if not args.cert or not args.key: + parser.error("--cert and --key must be provided together") + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + context.load_cert_chain(args.cert, args.key) + server.socket = context.wrap_socket(server.socket, server_side=True) + scheme = "https" if args.cert else "http" + print(f"Serving {RangeFileHandler.root} at {scheme}://{args.bind}:{args.port}/") + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/tools/shell/shell_runner.cpp b/tools/shell/shell_runner.cpp index cff9fc379..235fd123c 100644 --- a/tools/shell/shell_runner.cpp +++ b/tools/shell/shell_runner.cpp @@ -1,7 +1,10 @@ #include +#include #include "args.hxx" +#include "common/file_system/file_info.h" #include "common/file_system/local_file_system.h" +#include "common/file_system/virtual_file_system.h" #include "common/string_utils.h" #include "common/task_system/progress_bar.h" #include "embedded_shell.h" @@ -24,11 +27,16 @@ int setConfigOutputMode(const std::string& mode, ShellConfig& shell) { void addInitFileDirToSearchPath(const std::shared_ptr& conn, const std::string& initFile) { - auto initDir = std::filesystem::path{initFile}.parent_path(); - if (initDir.empty()) { + auto slashPos = initFile.find_last_of('/'); + if (slashPos == std::string::npos) { return; } - auto initDirPath = std::filesystem::absolute(initDir).lexically_normal().string(); + auto initDirPath = initFile.substr(0, slashPos); + if (initDirPath.empty()) { + initDirPath = "/"; + } else if (initFile.find("://") == std::string::npos) { + initDirPath = std::filesystem::absolute(initDirPath).lexically_normal().string(); + } auto clientConfig = conn->getClientContext()->getClientConfigUnsafe(); auto& fileSearchPath = clientConfig->fileSearchPath; if (!fileSearchPath.empty()) { @@ -42,30 +50,37 @@ void addInitFileDirToSearchPath(const std::shared_ptr& conn, fileSearchPath = initDirPath; } -void processRunCommands(EmbeddedShell& shell, const std::string& filename) { - FILE* fp = fopen(filename.c_str(), "r"); - char buf[LINENOISE_MAX_LINE + 1]; - buf[LINENOISE_MAX_LINE] = '\0'; - - if (fp == NULL) { +void processRunCommands(EmbeddedShell& shell, const std::shared_ptr& conn, + const std::string& filename) { + std::string fileContents; + try { + auto context = conn->getClientContext(); + auto fileInfo = VirtualFileSystem::GetUnsafe(*context)->openFile(filename, + FileOpenFlags(FileFlags::READ_ONLY), context); + auto fileSize = fileInfo->getFileSize(); + fileContents.resize(fileSize); + if (fileSize > 0) { + fileInfo->readFromFile(fileContents.data(), fileSize, 0); + } + } catch (Exception& e) { if (filename != ".lbugrc") { - std::cerr << "Warning: cannot open init file: " << filename << '\n'; + std::cerr << "Warning: cannot open init file: " << filename << ": " << e.what() << '\n'; } return; } std::cout << "-- Processing: " << filename << '\n'; - while (fgets(buf, LINENOISE_MAX_LINE, fp) != NULL) { - auto queryResults = shell.processInput(buf); + std::istringstream stream{fileContents}; + std::string line; + while (std::getline(stream, line)) { + line += '\n'; + auto queryResults = shell.processInput(line); for (auto& queryResult : queryResults) { if (!queryResult->isSuccess()) { - shell.printErrorMessage(buf, *queryResult); + shell.printErrorMessage(line, *queryResult); } } } - if (fclose(fp) != 0) { - // continue regardless of error - } } int main(int argc, char* argv[]) { @@ -213,7 +228,7 @@ int main(int argc, char* argv[]) { } try { auto shell = EmbeddedShell(database, conn, shellConfig); - processRunCommands(shell, initFile); + processRunCommands(shell, conn, initFile); if (shellConfig.stats) { if (DBConfig::isDBPathInMemory(databasePath)) { std::cout << "Opening the database under in-memory mode." << '\n';