Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/common/file_system/virtual_file_system.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "common/file_system/virtual_file_system.h"

#include <cctype>

#include "common/assert.h"
#include "common/exception/io.h"
#include "common/file_system/gzip_file_system.h"
Expand All @@ -11,6 +13,30 @@
namespace lbug {
namespace common {

static bool isRelativePath(const std::string& path) {
if (path.empty() || path[0] == '/' || path[0] == '~') {
return false;
}
if (path.find("://") != std::string::npos) {
return false;
}
return !(
path.size() > 1 && std::isalpha(static_cast<unsigned char>(path[0])) && path[1] == ':');
}

static std::string joinRemotePath(const std::string& base, std::string path) {
while (path.starts_with("./")) {
path = path.substr(2);
}
if (path == ".") {
path.clear();
}
if (path.empty() || base.ends_with('/')) {
return base + path;
}
return base + "/" + path;
}

VirtualFileSystem::VirtualFileSystem() : VirtualFileSystem{""} {}

VirtualFileSystem::VirtualFileSystem(std::string homeDir) {
Expand Down Expand Up @@ -151,6 +177,22 @@ std::string VirtualFileSystem::resolvePath(main::ClientContext* context, const s
if (!paths.empty()) {
return paths.front();
}
if (!isRelativePath(path)) {
return path;
}
const auto& fileSearchPath = context->getClientConfig()->fileSearchPath;
if (fileSearchPath.empty()) {
return path;
}
for (auto& searchPath : StringUtils::split(fileSearchPath, ",")) {
if (searchPath.find("://") == std::string::npos) {
continue;
}
paths = vfs->glob(context, joinRemotePath(searchPath, path));
if (!paths.empty()) {
return paths.front();
}
}
return path;
}

Expand Down
65 changes: 65 additions & 0 deletions tools/dev/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Local VFS Endpoint Servers

`range_file_server.py` serves files with `HEAD` and byte-range `GET` support. This is useful for
testing VFS reads of Parquet files over `http://` and local S3-shaped `s3://` URLs.

Run the commands below from the repository root.

## HTTP

Start a local HTTP endpoint:

```bash
python3 tools/dev/range_file_server.py --bind 127.0.0.1 --port 8766 --root "$PWD"
```

Then test an HTTP init file:

```bash
printf 'match (a)-[b]->(c) return *;\n:quit\n' \
| ./build/release/tools/shell/lbug \
-i http://127.0.0.1:8766/dataset/demo-db/icebug-disk/schema.cypher
```

The range support matters because `python3 -m http.server` can serve the Cypher file, but it is not
enough for Parquet readers that issue byte-range requests.

## S3

The S3 filesystem uses HTTPS. Generate a local self-signed certificate:

```bash
openssl req -x509 -newkey rsa:2048 -nodes \
-keyout /private/tmp/lbug_s3_key.pem \
-out /private/tmp/lbug_s3_cert.pem \
-days 1 \
-subj '/CN=127.0.0.1'
```

Start a local path-style S3-shaped endpoint. The `--strip-prefix icebug` flag maps requests for
`/icebug/...` back to files under the repository root.

```bash
python3 tools/dev/range_file_server.py \
--bind 127.0.0.1 \
--port 9443 \
--root "$PWD" \
--strip-prefix icebug \
--cert /private/tmp/lbug_s3_cert.pem \
--key /private/tmp/lbug_s3_key.pem
```

Configure the S3 extension to use the local endpoint:

```cypher
CALL s3_endpoint='127.0.0.1:9443';
CALL s3_url_style='path';
```

Then use this URL shape:

```text
s3://icebug/dataset/demo-db/icebug-disk/schema.cypher
```

For shell init testing, the S3 options must be available before the `-i s3://...` file is opened.
124 changes: 124 additions & 0 deletions tools/dev/range_file_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3
import argparse
import ssl
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import unquote, urlparse


class RangeFileHandler(BaseHTTPRequestHandler):
root = Path(".").resolve()
strip_prefix = ""

def translate_path(self):
path = unquote(urlparse(self.path).path).lstrip("/")
if self.strip_prefix and path == self.strip_prefix:
path = ""
elif self.strip_prefix and path.startswith(self.strip_prefix + "/"):
path = path[len(self.strip_prefix) + 1:]
return (self.root / path).resolve()

def send_file(self, head_only=False):
file_path = self.translate_path()
try:
file_path.relative_to(self.root)
except ValueError:
self.send_error(403)
return
if not file_path.is_file():
self.send_error(404)
return

size = file_path.stat().st_size
start = 0
end = size - 1
status = 200
range_header = self.headers.get("Range")
if range_header and range_header.startswith("bytes="):
status = 206
range_spec = range_header[len("bytes="):].split(",", 1)[0]
first, _, last = range_spec.partition("-")
if first:
start = int(first)
if last:
end = int(last)
elif last:
suffix_length = int(last)
start = max(size - suffix_length, 0)
end = size - 1
end = min(end, size - 1)
if start > end or start >= size:
self.send_response(416)
self.send_header("Content-Range", f"bytes */{size}")
self.end_headers()
return

length = end - start + 1 if size else 0
self.send_response(status)
self.send_header("Accept-Ranges", "bytes")
self.send_header("Content-Length", str(length))
if status == 206:
self.send_header("Content-Range", f"bytes {start}-{end}/{size}")
self.end_headers()
if head_only:
return
with file_path.open("rb") as file:
file.seek(start)
remaining = length
while remaining:
chunk = file.read(min(1024 * 64, remaining))
if not chunk:
break
self.wfile.write(chunk)
remaining -= len(chunk)

def do_GET(self):
self.send_file(False)

def do_HEAD(self):
self.send_file(True)

def log_message(self, fmt, *args):
if self.server.quiet:
return
super().log_message(fmt, *args)


class RangeFileServer(ThreadingHTTPServer):
quiet = False


def main():
parser = argparse.ArgumentParser(
description="Serve local files with HEAD and byte-range GET support.")
parser.add_argument("--bind", default="127.0.0.1")
parser.add_argument("--port", type=int, required=True)
parser.add_argument("--root", default=".")
parser.add_argument(
"--strip-prefix",
default="",
help="Drop this first URL path component before mapping to --root. "
"Useful for path-style S3 bucket names.",
)
parser.add_argument("--cert", help="TLS certificate PEM for HTTPS.")
parser.add_argument("--key", help="TLS private key PEM for HTTPS.")
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args()

RangeFileHandler.root = Path(args.root).resolve()
RangeFileHandler.strip_prefix = args.strip_prefix.strip("/")
server = RangeFileServer((args.bind, args.port), RangeFileHandler)
server.quiet = args.quiet
if args.cert or args.key:
if not args.cert or not args.key:
parser.error("--cert and --key must be provided together")
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(args.cert, args.key)
server.socket = context.wrap_socket(server.socket, server_side=True)
scheme = "https" if args.cert else "http"
print(f"Serving {RangeFileHandler.root} at {scheme}://{args.bind}:{args.port}/")
server.serve_forever()


if __name__ == "__main__":
main()
49 changes: 32 additions & 17 deletions tools/shell/shell_runner.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include <iostream>
#include <sstream>

#include "args.hxx"
#include "common/file_system/file_info.h"
#include "common/file_system/local_file_system.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_utils.h"
#include "common/task_system/progress_bar.h"
#include "embedded_shell.h"
Expand All @@ -24,11 +27,16 @@ int setConfigOutputMode(const std::string& mode, ShellConfig& shell) {

void addInitFileDirToSearchPath(const std::shared_ptr<Connection>& conn,
const std::string& initFile) {
auto initDir = std::filesystem::path{initFile}.parent_path();
if (initDir.empty()) {
auto slashPos = initFile.find_last_of('/');
if (slashPos == std::string::npos) {
return;
}
auto initDirPath = std::filesystem::absolute(initDir).lexically_normal().string();
auto initDirPath = initFile.substr(0, slashPos);
if (initDirPath.empty()) {
initDirPath = "/";
} else if (initFile.find("://") == std::string::npos) {
initDirPath = std::filesystem::absolute(initDirPath).lexically_normal().string();
}
auto clientConfig = conn->getClientContext()->getClientConfigUnsafe();
auto& fileSearchPath = clientConfig->fileSearchPath;
if (!fileSearchPath.empty()) {
Expand All @@ -42,30 +50,37 @@ void addInitFileDirToSearchPath(const std::shared_ptr<Connection>& conn,
fileSearchPath = initDirPath;
}

void processRunCommands(EmbeddedShell& shell, const std::string& filename) {
FILE* fp = fopen(filename.c_str(), "r");
char buf[LINENOISE_MAX_LINE + 1];
buf[LINENOISE_MAX_LINE] = '\0';

if (fp == NULL) {
void processRunCommands(EmbeddedShell& shell, const std::shared_ptr<Connection>& conn,
const std::string& filename) {
std::string fileContents;
try {
auto context = conn->getClientContext();
auto fileInfo = VirtualFileSystem::GetUnsafe(*context)->openFile(filename,
FileOpenFlags(FileFlags::READ_ONLY), context);
auto fileSize = fileInfo->getFileSize();
fileContents.resize(fileSize);
if (fileSize > 0) {
fileInfo->readFromFile(fileContents.data(), fileSize, 0);
}
} catch (Exception& e) {
if (filename != ".lbugrc") {
std::cerr << "Warning: cannot open init file: " << filename << '\n';
std::cerr << "Warning: cannot open init file: " << filename << ": " << e.what() << '\n';
}
return;
}

std::cout << "-- Processing: " << filename << '\n';
while (fgets(buf, LINENOISE_MAX_LINE, fp) != NULL) {
auto queryResults = shell.processInput(buf);
std::istringstream stream{fileContents};
std::string line;
while (std::getline(stream, line)) {
line += '\n';
auto queryResults = shell.processInput(line);
for (auto& queryResult : queryResults) {
if (!queryResult->isSuccess()) {
shell.printErrorMessage(buf, *queryResult);
shell.printErrorMessage(line, *queryResult);
}
}
}
if (fclose(fp) != 0) {
// continue regardless of error
}
}

int main(int argc, char* argv[]) {
Expand Down Expand Up @@ -213,7 +228,7 @@ int main(int argc, char* argv[]) {
}
try {
auto shell = EmbeddedShell(database, conn, shellConfig);
processRunCommands(shell, initFile);
processRunCommands(shell, conn, initFile);
if (shellConfig.stats) {
if (DBConfig::isDBPathInMemory(databasePath)) {
std::cout << "Opening the database under in-memory mode." << '\n';
Expand Down
Loading