diff --git a/MODULE.bazel b/MODULE.bazel index 697f56d3f18..2af0fca4021 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -81,6 +81,7 @@ bazel_dep(name = "googletest", version = "1.17.0.bcr.2") bazel_dep(name = "openmp", version = "21.1.5.bcr.1") bazel_dep(name = "or-tools", version = "9.15") bazel_dep(name = "spdlog", version = "1.15.1") +bazel_dep(name = "sqlite3", version = "3.51.2") bazel_dep(name = "sv-lang", version = "10.0.bcr.2") bazel_dep(name = "tcl_lang", version = "9.0.2.bcr.1") bazel_dep(name = "tcmalloc", version = "0.0.0-20250927-12f2552") diff --git a/docs/contrib/Logger.md b/docs/contrib/Logger.md index 2ee96ce1890..04ef3cf78ff 100644 --- a/docs/contrib/Logger.md +++ b/docs/contrib/Logger.md @@ -454,3 +454,36 @@ cd doc/messages && touch .md ### OpenROAD Tool List A full list of tool namespaces can be found [here](DeveloperGuide.md#tool-flow-namespace). + +# Binary/Database Logging +There is also a parallel mechanism for logging large quantities of numeric data to an SQLite database, useful for analysis of the inner workings of tools. + +## Overall Architecture +The architecture is designed around many threads doing logging, with many different log sources as well. This requires buffering everything, with a background thread draining the buffers into the database. The scheduling is a simple round-robin in the common case where there is no memory pressure(memory pressure is defined as 80% of the maximum). Memory budgets can be set as a global limit or a per-channel limit(as of right now there is no way to specialize this limit for different channels). + +All queues, and their associated schema and other metadata are packaged into channels, which has two sides: the highly templated and specialized side which is exposed to the callers, and the type-erased version which is exposed to the background thread. Therefore, all direct SQLite interactions are done opaquely through the type erased class by the background thread, which controls the raw SQLite pointer. + +## C++ API Description +Tools can call three different methods in the C++ code. + +First, the primary data logging mechanism is logToDb. This method takes a template list of column names as the template, which are checked at compile time for being valid SQLite column names. The methods do not do any writing themselves, they take the data and enqueue it as quickly as possible into a buffer. The other arguments taken are the table name, and the tool id/message id pair, analagous to how the existing logger functions do it. + +logToDbBulk is essentially the same as logToDb, except it takes iterators on the values, and streams them to the queues in bulk. + +logToDbMetadata is a "slow path" text to text key-value logging mechanism, useful for logging one-time or infrequent messages(e.g. weights passed as arguments that stay the same across the run) or events. + +## TCL API Description +The TCL API is for the user to script how the logger will handle database logging. The setters also have corresponding getters. There are analagous C++ methods that these wrap around. + +utl::start_log_db \[filename\]: Enables logging, runs logger db setup, and starts the background thread. + +utl::stop_log_db : stops background thread, cleans up and closes db logging. + +utl::set_db_log_global_max_mem \[bytes\]: Set overall max memory for pressure mechanism + +utl::set_db_log_per_channel_max_mem \[bytes\]: Set per-channel max memory for pressure mechanism + +utl::set_db_log_enabled \[tool name(e.g. GPL)\] \[true/false\]: Switch on/off per-tool logging. + +## Performance Overhead +Qualitatively acceptable. Measuring this in real-world use remains TODO. diff --git a/etc/DependencyInstaller.sh b/etc/DependencyInstaller.sh index b31299a1b97..d4f3679ee82 100755 --- a/etc/DependencyInstaller.sh +++ b/etc/DependencyInstaller.sh @@ -1004,7 +1004,7 @@ _install_ubuntu_packages() { debhelper devscripts flex g++ gcc git groff lcov libbz2-dev libffi-dev libfl-dev \ libgomp1 libomp-dev libpcre2-dev libreadline-dev pandoc \ pkg-config python3-dev qt5-image-formats-plugins tcl tcl-dev \ - tcllib unzip wget libyaml-cpp-dev zlib1g-dev tzdata + tcllib unzip wget libyaml-cpp-dev zlib1g-dev tzdata sqlite3 libsqlite3-dev local packages=() if _version_compare "$1" -ge "25.04"; then diff --git a/etc/find_messages.py b/etc/find_messages.py index cc12fd563f5..e297fc8f158 100755 --- a/etc/find_messages.py +++ b/etc/find_messages.py @@ -60,6 +60,20 @@ def parse_args(): re.VERBOSE | re.MULTILINE, ) +# Regex for logToDb / logToDbBulk calls (share the same id namespace). +logtodb_regexp_c = re.compile( + r""" + (?:->|\.) # deref + logToDb(?:Bulk)? # logToDb or logToDbBulk + <.+?> # template header + \s*\(\s* # ( + (?:utl::)?(?P[A-Z]{3}) # tool + \s*,\s* # , + (?P\d+) # id + """, + re.VERBOSE | re.MULTILINE, +) + warn_regexp_tcl = re.compile( r""" @@ -105,6 +119,24 @@ def scan_file(path, file_name, msgs): msgs[key].add(value) + if not file_name.endswith("tcl"): + for match in re.finditer(logtodb_regexp_c, lines): + tool = match.group("tool") + msg_id = int(match.group("id")) + key = "{} {:04d}".format(tool, msg_id) + + line_num = lines[0 : match.start()].count("\n") + 1 + position = "{}:{}".format(file_name, line_num) + file_link = os.path.join(path, file_name).strip("../").replace("\\", "/") + file_link = "https://github.com/The-OpenROAD-Project/OpenROAD/tree/master/{}#L{}".format( + file_link, line_num + ) + value = "{:25} {:<50} DB_LOG {}".format( + position, "(database log)", file_link + ) + + msgs[key].add(value) + def scan_dir(path, files, msgs): for file_name in files: diff --git a/src/utl/BUILD b/src/utl/BUILD index c84951b944d..8dc2860e629 100644 --- a/src/utl/BUILD +++ b/src/utl/BUILD @@ -94,6 +94,7 @@ cc_library( "@boost.iterator", "@boost.random", "@spdlog", + "@sqlite3//:sqlite3", "@tcl_lang//:tcl", ], ) diff --git a/src/utl/CMakeLists.txt b/src/utl/CMakeLists.txt index 512783e5891..30026120c5a 100644 --- a/src/utl/CMakeLists.txt +++ b/src/utl/CMakeLists.txt @@ -58,6 +58,7 @@ target_include_directories(utl_lib include PRIVATE src + ${Boost_INCLUDE_DIRS} ${TCL_INCLUDE_PATH} ) @@ -68,8 +69,7 @@ target_link_libraries(utl_lib spdlog::spdlog ${TCL_LIBRARY} ${Boost_LIBRARIES} - PRIVATE - Boost::headers + sqlite3 ) target_sources(utl @@ -83,10 +83,9 @@ target_include_directories(utl include PRIVATE src + ${Boost_INCLUDE_DIRS} ) -target_link_libraries(utl PRIVATE Boost::headers) - target_compile_definitions(utl PUBLIC FMT_DEPRECATED_OSTREAM=1 diff --git a/src/utl/include/utl/Logger.h b/src/utl/include/utl/Logger.h index 708f11ec5db..8495e7ad5e8 100644 --- a/src/utl/include/utl/Logger.h +++ b/src/utl/include/utl/Logger.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,30 @@ #include #include #include +#ifndef SWIG +// Everything behind this guard is invisible to SWIG because the +// parser cannot handle: +// - (C callbacks, opaque types) +// - mutex-protected queues (templates, C++ atomics) +// - Template-heavy helpers (std::tuple, fold expressions, +// index_sequence, TypedQueue) +// The corresponding .cpp code is still compiled normally. +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#endif #include "spdlog/common.h" #include "spdlog/details/os.h" @@ -101,6 +126,344 @@ enum ToolId SIZE // the number of tools, do not put anything after this }; +#ifndef SWIG +// --- SQLite Logging Types --- + +// Numerics Only +enum class SQLiteType { + INTEGER, + REAL +}; + +// tool/message pair +struct SchemaKey { + ToolId tool; + int id; + + bool operator==(const SchemaKey& other) const { + return tool == other.tool && id == other.id; + } +}; + +// Helper for hashmapping the tool/message pair +struct SchemaKeyHasher { + size_t operator()(const SchemaKey& k) const { + return std::hash{}(static_cast(k.tool)) ^ (std::hash{}(k.id) << 1); + } +}; + +struct ColumnDefinition { + std::string name; + SQLiteType type; +}; + +// For the binary dump tables +struct SchemaInfo { + std::vector columns; + std::string table_name; +}; + +// Type-to-SQLiteType mapping trait. Template magic to detect int vs real. +template +struct TypeToSQLite; + +template +struct TypeToSQLite>> { + static constexpr SQLiteType value = SQLiteType::INTEGER; +}; + +template +struct TypeToSQLite>> { + static constexpr SQLiteType value = SQLiteType::REAL; +}; + +// Type-erased version of the templated concrete queue. +// This type erasure magic is what allows the whole thing to work reasonably sanely, +// because the caller can use templates for queueing to avoid high serialization costs, +// while the worker thread can later make decisions independently of the exact type used. +class AbstractQueue { +public: + AbstractQueue() = default; + AbstractQueue(const AbstractQueue&) = delete; + AbstractQueue& operator=(const AbstractQueue&) = delete; + virtual ~AbstractQueue() = default; + + const SchemaInfo& schema_info() const { return info_; } + virtual size_t row_size_bytes() const = 0; + + size_t approx_size() const { + return item_count_.load(std::memory_order_acquire); + } + + uint64_t last_flush_timestamp_ms() const { + return last_flush_ms_.load(std::memory_order_acquire); + } + + virtual size_t drain_to_db(sqlite3* db, size_t max_records) = 0; + +protected: + SchemaInfo info_; + std::atomic item_count_{0}; + std::atomic last_flush_ms_{0}; +}; + +template +class TypedQueue : public AbstractQueue { +public: + using row_type = std::tuple; + + // We don't want accidental conversion to the AbstractQueue. + explicit TypedQueue(SchemaInfo info) { info_ = std::move(info); } + ~TypedQueue() override { + if (stmt_) { + sqlite3_finalize(stmt_); + } + } + + // Back of the napkin estimate at compile time but should be good enough for numerics + size_t row_size_bytes() const override { return sizeof(row_type); } + + bool push(row_type row, const std::atomic& /*abort_flag*/) + { + std::lock_guard lock(mutex_); + queue_.push_back(std::move(row)); + item_count_.fetch_add(1, std::memory_order_release); + return true; + } + + // Acquires queue mutex, and drains n records to the database. Most optimal to just drain the whole thing unless there are other reasons to not do so. + size_t drain_to_db(sqlite3* db, size_t max_records) override + { + // The latter means the statement could not be built, so fail the whole thing. Otherwise use cached stmt. + if (!stmt_ && !build_insert_stmt(db)) { + return 0; + } + + sqlite3_exec(db, "BEGIN", nullptr, nullptr, nullptr); + + size_t count = 0; + bool ok = true; + + while (count < max_records) { + row_type row; + { + std::lock_guard lock(mutex_); + if (queue_.empty()) { + break; + } + row = std::move(queue_.front()); + queue_.pop_front(); + item_count_.fetch_sub(1, std::memory_order_release); + } + + // C++17 template magic to pass the column name strings as column keys for sqlite + bind_row(stmt_, row, std::index_sequence_for{}); + int rc = sqlite3_step(stmt_); + sqlite3_reset(stmt_); + if (rc != SQLITE_DONE) { + ok = false; + break; + } + ++count; + } + + // Rollback if there was a failure. + if (count > 0) { + sqlite3_exec(db, ok ? "COMMIT" : "ROLLBACK", + nullptr, nullptr, nullptr); + } + + last_flush_ms_.store( + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(), + std::memory_order_release); + return count; + } + +private: + // More magic to dynamically bind int/reals + template + static void bind_field(sqlite3_stmt* stmt, int idx, T value) + { + if constexpr (TypeToSQLite::value == SQLiteType::INTEGER) { + sqlite3_bind_int64(stmt, idx, static_cast(value)); + } else if constexpr (TypeToSQLite::value == SQLiteType::REAL) { + sqlite3_bind_double(stmt, idx, static_cast(value)); + } + } + + template + void bind_row(sqlite3_stmt* stmt, + const row_type& row, + std::index_sequence) const + { + ((bind_field(stmt, static_cast(Is) + 1, std::get(row))), ...); + } + + // FIXME: I think we can do better than string appending with loops but this will do for now. + bool build_insert_stmt(sqlite3* db) + { + std::string sql = "INSERT INTO " + info_.table_name + " ("; + for (size_t i = 0; i < info_.columns.size(); ++i) { + if (i > 0) sql += ", "; + sql += info_.columns[i].name; + } + sql += ") VALUES ("; + for (size_t i = 0; i < info_.columns.size(); ++i) { + if (i > 0) sql += ", "; + sql += "?"; + } + sql += ");"; + return sqlite3_prepare_v2(db, sql.c_str(), -1, &stmt_, nullptr) == SQLITE_OK; + } + + std::deque queue_; + mutable std::mutex mutex_; + sqlite3_stmt* stmt_ = nullptr; +}; + +// Schema registration infrastructure below. Maybe worth it to merge it with the queue system if performance is not a problem. +// Command sent from caller thread to the backend thread +// when a new schema is discovered. +struct NewSchemaCommand { + SchemaKey key; + std::shared_ptr queue; +}; + +// Command sent from caller thread to the backend thread to create a SQLite +// table and register its schema. The caller blocks on result_promise until +// the backend has executed it. +struct CreateTableCommand { + SchemaKey key; + std::string table_name; + std::string header; + std::vector types; + std::promise result_promise; +}; + +// unordered map with some helpers +class SchemaRegistry { +public: + using RegistryMap + = std::unordered_map, SchemaKeyHasher>; + + SchemaRegistry() : registry_ptr_(std::make_shared()) {} + + std::shared_ptr get_map() const { + return std::atomic_load(®istry_ptr_); + } + + std::shared_ptr register_schema( + SchemaKey key, + std::shared_ptr queue) + { + std::lock_guard lock(registration_mutex_); + auto current_map = get_map(); + auto it = current_map->find(key); + if (it != current_map->end()) { + return it->second; + } + auto new_map = std::make_shared(*current_map); + (*new_map)[key] = queue; + std::atomic_store( + ®istry_ptr_, + std::const_pointer_cast(new_map)); + return queue; + } + + void remove_schema(SchemaKey key) { + std::lock_guard lock(registration_mutex_); + auto current_map = get_map(); + auto it = current_map->find(key); + if (it == current_map->end()) { + return; + } + auto new_map = std::make_shared(*current_map); + new_map->erase(key); + std::atomic_store( + ®istry_ptr_, + std::const_pointer_cast(new_map)); + } + +private: + std::shared_ptr registry_ptr_; + std::mutex registration_mutex_; +}; + +// Template magic to enforce valid column names for the database at compile time. +template +struct FixedString { + char data[N]{}; + constexpr FixedString() = default; + constexpr FixedString(const char (&str)[N]) { + for (size_t i = 0; i < N; ++i) data[i] = str[i]; + } + + constexpr size_t count_fields() const { + if (data[0] == '\0') return 0; + size_t count = 1; + for (size_t i = 0; i < N; ++i) { + if (data[i] == '\0') break; + if (data[i] == ',') { + size_t j = i + 1; + while (j < N && data[j] == ' ') j++; + if (j < N && data[j] != '\0' && data[j] != ',') count++; + } + } + return count; + } + + constexpr bool isValid() const { + bool empty = true; + for (size_t i = 0; i < N; ++i) { + char c = data[i]; + if (c == '\0') break; + if (c == ',') { + if (empty) return false; + empty = true; + } else if (c == '_' || c == ' ' || (c >= 'a' && c <= 'z') + || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) { + empty = false; + } else { + // Reject any character that is not alphanumeric, underscore, + // comma, or space — prevents SQL injection through column names. + return false; + } + } + return !empty; + } + + constexpr size_t field_start(size_t idx) const { + size_t cur = 0; + for (size_t i = 0; i < N; ++i) { + if (cur == idx) { + while (i < N && data[i] == ' ') i++; + return i; + } + if (data[i] == ',') cur++; + if (data[i] == '\0') break; + } + return N; + } + + constexpr size_t field_end(size_t idx) const { + size_t start = field_start(idx); + if (start >= N) return N; + size_t last = start; + for (size_t i = start; i < N && data[i] != ',' && data[i] != '\0'; ++i) { + if (data[i] != ' ') last = i + 1; + } + return last; + } +}; + +// Value type extracted from an iterator — used by logToDbBulk. +template +using iter_val_t = typename std::iterator_traits::value_type; + +#endif // End the SWIG guard + class Logger { public: @@ -236,10 +599,160 @@ class Logger void suppressMessage(ToolId tool, int id); void unsuppressMessage(ToolId tool, int id); +#ifndef SWIG + // Logs structured data to the SQLite database. + // The schema (types of Args...) is registered lazily on the first call + // for a (tool, id) pair. The template body is intentionally thin: + // compile-time checks live here, all runtime logic is in the .cpp. + template + std::optional logToDb(ToolId tool, int id, const char* tableName, RawArgs&&... raw_args) + { + // If database logging is not running, silently skip. + if (!db_ready_) { + return std::nullopt; + } + + static_assert( + Header.isValid(), + "Header must be a comma-separated list of alphanumeric names" + " and underscores."); + static_assert( + sizeof...(RawArgs) == Header.count_fields(), + "Number of arguments provided to logToDb must match the number" + " of fields in the header."); + + // Build a compile-time array of SQLiteTypes for each argument. + constexpr std::array types_arr + = {TypeToSQLite>::value...}; + + SchemaKey key{tool, id}; + + // If this (tool, id) pair has been explicitly disabled, skip silently. + if (!isDbLogEnabled(key)) { + return std::nullopt; + } + + auto queue_opt = logToDbFindQueue(key); + if (!queue_opt.has_value()) { + // send CreateTableCommand to backend, create TypedQueue, register it--- + SchemaInfo info = syncCreateTable( + key, + tableName, + std::string_view(Header.data), + std::vector(types_arr.begin(), types_arr.end())); + auto queue = std::make_shared...>>(std::move(info)); + logToDbRegisterQueue(key, std::move(queue)); + // Re-fetch after registration so the data is pushed. + queue_opt = logToDbFindQueue(key); + if (!queue_opt.has_value()) { + return std::nullopt; + } + } + + // --- FAST PATH (also reached from SLOW PATH after re-fetch): --- + auto& base = queue_opt.value(); + auto* typed = static_cast...>*>(base.get()); + typed->push( + // Construct a tuple of value types (strip references) from the + // forwarded arguments so the queue owns the data. + std::tuple...>(std::forward(raw_args)...), + log_db_running_); + return 1; + } + + // Bulk version of logToDb: takes one iterator per column and pushes + // 'count' rows at once. + template + std::optional logToDbBulk(ToolId tool, int id, const char* tableName, size_t count, InputIters... iters) + { + // If database logging is not running, silently skip. + if (!db_ready_) { + return std::nullopt; + } + + static_assert( + Header.isValid(), + "Header must be a comma-separated list of alphanumeric names" + " and underscores."); + static_assert( + sizeof...(InputIters) == Header.count_fields(), + "Number of iterators provided to logToDbBulk must match the" + " number of fields in the header."); + + // Check every iterator's value type at compile time. + constexpr bool all_numeric + = (... && std::is_arithmetic_v>); + static_assert( + all_numeric, + "All iterator value types must be arithmetic" + " (maps to INTEGER or REAL in SQLite)."); + + constexpr std::array types_arr + = {TypeToSQLite>::value...}; + + SchemaKey key{tool, id}; + if (!isDbLogEnabled(key)) { + return std::nullopt; + } + + auto queue_opt = logToDbFindQueue(key); + if (!queue_opt.has_value()) { + SchemaInfo info = syncCreateTable( + key, + tableName, + std::string_view(Header.data), + std::vector(types_arr.begin(), types_arr.end())); + auto queue = std::make_shared...>>( + std::move(info)); + logToDbRegisterQueue(key, std::move(queue)); + // Re-fetch after registration so the batch can be pushed. + queue_opt = logToDbFindQueue(key); + if (!queue_opt.has_value()) { + return std::nullopt; + } + } + + auto* typed + = static_cast...>*>(queue_opt->get()); + for (size_t i = 0; i < count; ++i) { + typed->push( + std::make_tuple(std::move(*iters++)...), log_db_running_); + } + return count; + } + + // Enqueue a metadata row (tool, key, value) for the backend to write + // to the 'metadata' table (both key and value are TEXT). + std::optional logToDbMetadata(ToolId tool, std::string key, std::string value); +#endif + void addSink(spdlog::sink_ptr sink); void removeSink(const spdlog::sink_ptr& sink); void addMetricsSink(const char* metrics_filename); void removeMetricsSink(const char* metrics_filename); + void startLogDb(const char* filename); + void stopLogDb(); + + // Maximum total memory (in bytes) the user is willing to let all buffered + // log-db queues consume before the backend applies backpressure or drops + // data. 0 means unlimited. + void setDbLogGlobalMaxMem(size_t bytes); + size_t getDbLogGlobalMaxMem() const; + + // Maximum memory (in bytes) a single per-schema queue may consume before + // the backend applies backpressure or drops data on that channel. + // 0 means unlimited. + void setDbLogPerChannelMaxMem(size_t bytes); + size_t getDbLogPerChannelMaxMem() const; + + // Enable or disable database logging for a specific (tool, id) pair. + // Disabled entries are silently skipped in logToDb. If a schema was + // already registered, it is removed so the next logToDb call triggers + // a fresh registration check. + void setDbLogEnabled(ToolId tool, int id, bool enabled); + // Returns true if logging is enabled for (tool, id). Default is true + // for all pairs not explicitly disabled. + bool getDbLogEnabled(ToolId tool, int id) const; void setMetricsStage(std::string_view format); void clearMetricsStage(); @@ -354,6 +867,11 @@ class Logger // Stop issuing messages of a given tool/id when this limit is hit. static constexpr int max_message_print = 1000; + // Backend scheduler: when a queue reaches this fraction of its memory + // limit, it is considered under pressure and gets drained aggressively. + // TODO: Consider what to hardcode this to. + static constexpr double k_queue_mem_high_threshold = 0.8; + std::vector sinks_; std::shared_ptr logger_; std::stack metrics_stages_; @@ -366,6 +884,106 @@ class Logger std::shared_ptr prometheus_registry_; std::unique_ptr prometheus_metrics_; +#ifndef SWIG + // The SQLite handle is accessed ONLY by the backend (logDbLoop) thread. + sqlite3* db_ = nullptr; + std::thread log_db_thread_; + std::atomic log_db_running_{false}; + + // Main thread checks this atomic instead of reading db_ directly + // (which is owned by the backend thread after startup). + std::atomic db_ready_{false}; + std::string db_filename_; + + // Maximum global memory footprint from buffered log-db data (bytes). + // 0 = unlimited. Used by the backend to decide when to throttle. + size_t db_log_global_max_mem_ = 0; + // Maximum per-channel (per-registration) memory from buffered data (bytes). + // 0 = unlimited. + size_t db_log_per_channel_max_mem_ = 0; + + SchemaRegistry schema_registry_; + + // Command queue: caller threads enqueue NewSchemaCommand, + // the backend thread (logDbLoop) drains and processes them. + std::deque new_schema_queue_; + std::mutex new_schema_queue_mutex_; + + // Command queue for creating SQLite tables. The caller blocks on + // the embedded promise until the backend completes registration. + std::deque create_table_queue_; + std::mutex create_table_mutex_; + + // Metadata queue: low-traffic text-to-text rows written to the + // 'metadata' table. Uses a std::queuen because + // std::string has non-trivial copy semantics. + using MetadataRow = std::tuple; + std::queue metadata_queue_; + std::mutex metadata_queue_mutex_; + + // Startup synchronization: startLogDb blocks until the backend thread + // has opened the SQLite database and created system tables. + std::mutex startup_mutex_; + std::condition_variable startup_cv_; + bool startup_done_ = false; + std::string db_start_error_; + + // Per-schema enable/disable set. Logging is enabled by default for + // all (tool, id) pairs. Insert a key here to disable it. + std::unordered_set db_log_disabled_set_; + mutable std::shared_mutex db_log_enabled_mutex_; + + // Returns false if the key is in db_log_disabled_set_. + bool isDbLogEnabled(SchemaKey key) const; + + // Non-template helpers called by the thin logToDb template. + // Implemented in Logger.cpp. + std::optional> logToDbFindQueue(SchemaKey key); + SchemaInfo logToDbBuildSchemaInfo(sqlite3* db, + SchemaKey key, + const std::string& table_name, + std::string_view header, + const std::vector& types); + void logToDbRegisterQueue(SchemaKey key, + std::shared_ptr queue); + + // Sends a CreateTableCommand to the backend thread and blocks until + // the table has been created and the SchemaInfo is returned. Called + // from logToDb / logToDbBulk slow paths. + SchemaInfo syncCreateTable(SchemaKey key, + const char* table_name, + std::string_view header, + const std::vector& types); + + // --- Database log worker thread (entry point + phases) --- + + // Entry to database log worker thread. Launched from startLogDb. + void logDbLoop(); + + // Phase 1: Open the SQLite database, create system tables, populate + // tool_names, and signal readiness. Returns true on success. + bool logDbStartup(); + + // Phase 2: Main spin loop. Drains command queues, schedules work + // across registered data channels, and applies memory-pressure + // gating. Runs until log_db_running_ becomes false. + void logDbMainLoop( + std::unordered_map, + SchemaKeyHasher>& local_registry); + + // Phase 3: Final drain of all remaining data, WAL checkpoint, and + // close of the SQLite database. local_registry is cleared before + // close so TypedQueue destructors can finalize prepared statements. + void logDbShutdown( + std::unordered_map, + SchemaKeyHasher>& local_registry); + + // Drain all pending metadata rows into the 'metadata' table. + // Returns true if any rows were drained. + // Caller must ensure db_ is valid. Called from logDbLoop and stopLogDb. + bool drainMetadataQueue(); +#endif + // This matrix is pre-allocated so it can be safely updated // from multiple threads without locks. using MessageCounter = std::array; diff --git a/src/utl/src/Logger.cpp b/src/utl/src/Logger.cpp index 6f80c792e41..a66afbaf2f7 100644 --- a/src/utl/src/Logger.cpp +++ b/src/utl/src/Logger.cpp @@ -3,8 +3,11 @@ #include "utl/Logger.h" +#include + #include #include +#include #include #include #include @@ -14,6 +17,7 @@ #include #include #include +#include #include #include "CommandLineProgress.h" @@ -68,6 +72,7 @@ Logger::Logger(const char* log_filename, const char* metrics_filename) Logger::~Logger() { + stopLogDb(); finalizeMetrics(); } @@ -87,11 +92,580 @@ void Logger::removeMetricsSink(const char* metrics_filename) metrics_sinks_.erase(metrics_file); } +// Create initial stuff and launch thread, and await startup. +void Logger::startLogDb(const char* filename) +{ + if (db_ready_) { + return; + } + + // Reset startup state + db_filename_ = filename; + db_start_error_.clear(); + { + std::lock_guard lock(startup_mutex_); + startup_done_ = false; + } + + log_db_running_ = true; + log_db_thread_ = std::thread(&Logger::logDbLoop, this); + + // Wait for the backend thread to open the database and create system + // tables. + { + std::unique_lock lock(startup_mutex_); + startup_cv_.wait(lock, [this]() { return startup_done_; }); + } + + if (!db_ready_) { + // Backend failed to open the database. + std::string err_msg + = db_start_error_.empty() ? "unknown error" : db_start_error_; + log_db_running_ = false; + if (log_db_thread_.joinable()) { + log_db_thread_.join(); + } + this->error( + UTL, 109, "Failed to open SQLite database {}: {}", filename, err_msg); + } + this->info(UTL, 117, "Logging to database: {}", filename); +} + +// Flag running as false, and block on thread join. +// FIXME: Fix the corner case where this can be called twice, +// with the second time returning imediately. Not a real problem for most situations. +void Logger::stopLogDb() +{ + if (!log_db_running_) { + return; + } + log_db_running_ = false; + if (log_db_thread_.joinable()) { + log_db_thread_.join(); + } + this->info(UTL, 118, "Stopping database logging."); + // All SQLite operations (drain, close) are handled inside the backend + // thread (logDbLoop). Nothing more to do here. +} + +// Free helper utilities. Put in anonymous namespace because I suspect they can be easily replaced +// with stdlib methods. +namespace { + +const char* sqlite_type_name(utl::SQLiteType t) +{ + switch (t) { + case utl::SQLiteType::INTEGER: + return "INTEGER"; + case utl::SQLiteType::REAL: + return "REAL"; + } + return "INTEGER"; +} + +// Split a comma-separated header string into individual column names, +// trimming leading/trailing whitespace from each. +// FIXME: There might be a way to do this with stdlib things, +// it seems too trivial to not have a nicer way. +std::vector split_header(std::string_view header) +{ + std::vector fields; + const char* cur = header.data(); + const char* end = cur + header.size(); + while (cur < end) { + // Skip leading spaces + while (cur < end && *cur == ' ') { + ++cur; + } + const char* start = cur; + // Find next comma or end + while (cur < end && *cur != ',') { + ++cur; + } + // Trim trailing spaces + const char* stop = cur; + while (stop > start && *(stop - 1) == ' ') { + --stop; + } + if (stop > start || (!fields.empty() && stop == start)) { + fields.emplace_back(start, stop - start); + } + if (cur < end) { + ++cur; // skip comma + } + } + return fields; +} + +std::vector build_columns_from_runtime( + std::string_view header, + const std::vector& types) +{ + auto names = split_header(header); + // HACK: if counts don't match, pad with "unknown" + std::vector cols; + cols.reserve(types.size()); + for (size_t i = 0; i < types.size(); ++i) { + std::string name = (i < names.size()) ? names[i] : "unknown"; + cols.push_back({std::move(name), types[i]}); + } + return cols; +} + +} // end anonymous namespace + +// logToDb helpers + +// Fish out queue from map or return optional. +// FIXME: Might be cleaner to just inline this wherever necessary. +std::optional> Logger::logToDbFindQueue( + SchemaKey key) +{ + auto map = schema_registry_.get_map(); + auto it = map->find(key); + if (it != map->end()) { + return it->second; + } + return std::nullopt; +} + + +// Helper for building a schema and creating a table from source material. +// TODO: Would require changing a bunch of signatures but it would be nice to allow +// The caller to pass some kind of long "table description" string. +SchemaInfo Logger::logToDbBuildSchemaInfo(sqlite3* db, + SchemaKey key, + const std::string& table_name, + std::string_view header, + const std::vector& types) +{ + SchemaInfo info; + info.table_name = table_name; + info.columns = build_columns_from_runtime(header, types); + + this->info(UTL, + 121, + "Creating database table {} for {}-{}.", + info.table_name, + tool_names_[key.tool], + key.id); + + // FIXME: as before not sure if there is a better way than string appending + // Build and execute CREATE TABLE IF NOT EXISTS in one shot. + std::string create_sql + = "CREATE TABLE IF NOT EXISTS " + info.table_name + " ("; + for (size_t i = 0; i < info.columns.size(); ++i) { + if (i > 0) { + create_sql += ", "; + } + create_sql + += info.columns[i].name + " " + sqlite_type_name(info.columns[i].type); + } + create_sql += ");"; + + char* err_msg = nullptr; + int rc = sqlite3_exec(db, create_sql.c_str(), nullptr, nullptr, &err_msg); + if (rc != SQLITE_OK) { + std::string err_str = err_msg ? err_msg : "unknown"; + sqlite3_free(err_msg); + this->error(UTL, + 110, + "SQLite error creating table '{}': {}", + info.table_name, + err_str); + } + + // Insert into table_list to track this registered schema. + std::string col_types, col_names; + for (size_t i = 0; i < info.columns.size(); ++i) { + if (i > 0) { + col_types += ","; + col_names += ","; + } + col_types += sqlite_type_name(info.columns[i].type); + col_names += info.columns[i].name; + } + char* tbl_sql = sqlite3_mprintf( + "INSERT OR REPLACE INTO table_list VALUES (%d, %d, %Q, %Q, %Q)", + static_cast(key.tool), + key.id, + info.table_name.c_str(), + col_types.c_str(), + col_names.c_str()); + if (tbl_sql) { + char* tbl_err = nullptr; + if (sqlite3_exec(db, tbl_sql, nullptr, nullptr, &tbl_err) != SQLITE_OK) { + this->warn(UTL, 115, "Failed to insert into table_list: {}", tbl_err); + sqlite3_free(tbl_err); + } + sqlite3_free(tbl_sql); + } + + return info; +} + +// Register the created schema internally. +void Logger::logToDbRegisterQueue(SchemaKey key, + std::shared_ptr queue) +{ + this->info(UTL, + 120, + "Registered schema for {}-{} on table {}.", + tool_names_[key.tool], + key.id, + queue->schema_info().table_name); + auto registered = schema_registry_.register_schema(key, std::move(queue)); + + if (log_db_running_.load(std::memory_order_acquire)) { + NewSchemaCommand cmd{key, std::move(registered)}; + std::lock_guard lock(new_schema_queue_mutex_); + new_schema_queue_.push_back(std::move(cmd)); + } +} + +// Promise/futures magic to synchronize caller with backend. +SchemaInfo Logger::syncCreateTable(SchemaKey key, + const char* table_name, + std::string_view header, + const std::vector& types) +{ + auto promise = std::promise(); + auto future = promise.get_future(); + + { + std::lock_guard lock(create_table_mutex_); + create_table_queue_.push_back( + {key, table_name, std::string(header), types, std::move(promise)}); + } + + // Block until the backend thread processes this command. + return future.get(); +} + + +// logDbLoop (backend thread entry point) +void Logger::logDbLoop() +{ + if (!logDbStartup()) { + return; + } + + // Local registry for the backend thread: SchemaKey -> AbstractQueue. + // Populated by processing NewSchemaCommands, drained by schedule logic. + std::unordered_map, SchemaKeyHasher> + local_registry; + + logDbMainLoop(local_registry); + // The above will exit when the run flag is lowered + logDbShutdown(local_registry); +} + + +// DB Log Startup +bool Logger::logDbStartup() +{ + // Open the SQLite database. + int rc = sqlite3_open_v2(db_filename_.c_str(), + &db_, + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, + nullptr); + if (rc != SQLITE_OK) { + db_start_error_ = sqlite3_errmsg(db_) + ? sqlite3_errmsg(db_) + : "sqlite3_open_v2 returned no message"; + sqlite3_close(db_); + db_ = nullptr; + // Signal startup completion (with failure) so startLogDb can throw. + { + std::lock_guard lock(startup_mutex_); + startup_done_ = true; + } + startup_cv_.notify_one(); + log_db_running_ = false; + return false; + } + + // We do not care about thread safety or atomicity as there is only one + // backend thread. WAL for max speed. + sqlite3_exec(db_, "PRAGMA journal_mode = WAL;", nullptr, nullptr, nullptr); + sqlite3_exec(db_, "PRAGMA synchronous = OFF;", nullptr, nullptr, nullptr); + + // Create system tables. + auto exec_sql = [&](const char* sql, const char* desc) -> bool { + char* err = nullptr; + if (sqlite3_exec(db_, sql, nullptr, nullptr, &err) != SQLITE_OK) { + db_start_error_ = err ? err : desc; + sqlite3_free(err); + return false; + } + return true; + }; + + if (!exec_sql("CREATE TABLE IF NOT EXISTS tool_names (" + "tool_id INTEGER PRIMARY KEY, name TEXT)", + "tool_names") + || !exec_sql("CREATE TABLE IF NOT EXISTS table_list (" + "tool_id INTEGER, message_id INTEGER, table_name TEXT," + " column_types TEXT, column_names TEXT," + " PRIMARY KEY(tool_id, message_id))", + "table_list") + || !exec_sql("CREATE TABLE IF NOT EXISTS metadata (" + "tool_id INTEGER, key TEXT, value TEXT)", + "metadata")) { + sqlite3_close(db_); + db_ = nullptr; + { + std::lock_guard lock(startup_mutex_); + startup_done_ = true; + } + startup_cv_.notify_one(); + log_db_running_ = false; + return false; + } + + // Populate tool_names table. + for (int i = 0; i < ToolId::SIZE; ++i) { + std::string insert + = fmt::format("INSERT OR REPLACE INTO tool_names VALUES ({}, '{}')", + i, + tool_names_[i]); + char* err = nullptr; + if (sqlite3_exec(db_, insert.c_str(), nullptr, nullptr, &err) + != SQLITE_OK) { + db_start_error_ = err ? err : "tool_names insert failed"; + sqlite3_free(err); + sqlite3_close(db_); + db_ = nullptr; + { + std::lock_guard lock(startup_mutex_); + startup_done_ = true; + } + startup_cv_.notify_one(); + log_db_running_ = false; + return false; + } + } + + // Signal to startLogDb that the backend is ready. + // IMPORTANT: Any critical faults/checks added in the future MUST be + // calculated before this. The rest of the code assumes all is well + // after this point. + db_ready_ = true; + { + std::lock_guard lock(startup_mutex_); + startup_done_ = true; + } + startup_cv_.notify_one(); + return true; +} + +// DB Log Main loop + +void Logger::logDbMainLoop( + std::unordered_map, + SchemaKeyHasher>& local_registry) +{ + // Spin until something else lowers the running flag. + // Uses a fairly simple round robin + mem pressure scheduling mechanism. + while (log_db_running_.load(std::memory_order_acquire)) { + bool did_work = false; + + // --- Step 1: Drain all pending CreateTableCommand entries --- + // These have the highest priority because callers are blocked on them + // and we cannot do anything else without a registration. + { + std::deque pending_ct; + { + std::lock_guard lock(create_table_mutex_); + pending_ct.swap(create_table_queue_); + } + for (auto& cmd : pending_ct) { + try { + SchemaInfo info = logToDbBuildSchemaInfo( + db_, cmd.key, cmd.table_name, cmd.header, cmd.types); + cmd.result_promise.set_value(std::move(info)); + } catch (...) { + // Propagate any exception (e.g. from this->error()) to the + // caller that is blocked on future.get(). + cmd.result_promise.set_exception(std::current_exception()); + } + did_work = true; + } + } + + // --- Step 2: Drain all pending NewSchemaCommand entries --- + { + std::deque pending_commands; + { + std::lock_guard lock(new_schema_queue_mutex_); + pending_commands.swap(new_schema_queue_); + } + + for (auto& cmd : pending_commands) { + if (local_registry.find(cmd.key) != local_registry.end()) { + // Schema re-registration (e.g., after disable/enable): + // drain any remaining data in the old queue, then replace. + local_registry[cmd.key]->drain_to_db(db_, SIZE_MAX); + } + local_registry[cmd.key] = std::move(cmd.queue); + } + did_work = did_work || !pending_commands.empty(); + } + + // --- Step 3: Drain metadata queue --- + if (drainMetadataQueue()) { + did_work = true; + } + + // --- Step 4: Schedule and drain data queues --- + size_t total_mem = 0; + for (auto& entry : local_registry) { + total_mem += entry.second->approx_size() * entry.second->row_size_bytes(); + } + + bool drained_some = false; + + // Global pressure: fully drain the largest queue. + bool global_pressure = false; + if (db_log_global_max_mem_ > 0) { + const size_t global_limit = static_cast( + db_log_global_max_mem_ * k_queue_mem_high_threshold); + global_pressure = (total_mem >= global_limit); + } + + if (global_pressure) { + AbstractQueue* largest_q = nullptr; + size_t largest_bytes = 0; + for (auto& entry : local_registry) { + const size_t bytes + = entry.second->approx_size() * entry.second->row_size_bytes(); + if (bytes > largest_bytes) { + largest_bytes = bytes; + largest_q = entry.second.get(); + } + } + if (largest_q) { + drained_some |= (largest_q->drain_to_db(db_, SIZE_MAX) > 0); + } + did_work |= drained_some; + } else { + // Per-channel pressure: drain enough to get below 80%. + for (auto& entry : local_registry) { + if (db_log_per_channel_max_mem_ == 0) { + continue; + } + + auto& q = entry.second; + const size_t channel_bytes = q->approx_size() * q->row_size_bytes(); + const size_t channel_limit = static_cast( + db_log_per_channel_max_mem_ * k_queue_mem_high_threshold); + + if (channel_bytes >= channel_limit) { + const size_t bytes_to_clear = channel_bytes - channel_limit + 1; + const size_t row_size = q->row_size_bytes(); + const size_t rows_to_clear + = (bytes_to_clear + row_size - 1) / row_size; + + drained_some |= (q->drain_to_db(db_, rows_to_clear) > 0); + } + } + + // Common case round robin. Skip this if under pressure to keep + // spinning this loop faster. + if (!drained_some) { + for (auto& entry : local_registry) { + drained_some |= (entry.second->drain_to_db(db_, SIZE_MAX) > 0); + } + } + did_work |= drained_some; + } + + if (!did_work) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } +} + +//DB Log Shutdown +void Logger::logDbShutdown( + std::unordered_map, + SchemaKeyHasher>& local_registry) +{ + // Drain helper: process all remaining commands / data until nothing + // remains. + auto drain_all = [&]() -> bool { + bool work = false; + + // Process any remaining CreateTable commands. + { + std::deque pending_ct; + { + std::lock_guard lock(create_table_mutex_); + pending_ct.swap(create_table_queue_); + } + for (auto& cmd : pending_ct) { + try { + SchemaInfo info = logToDbBuildSchemaInfo( + db_, cmd.key, cmd.table_name, cmd.header, cmd.types); + cmd.result_promise.set_value(std::move(info)); + } catch (...) { + cmd.result_promise.set_exception(std::current_exception()); + } + work = true; + } + } + + // Process any remaining NewSchema commands. + { + std::deque pending_commands; + { + std::lock_guard lock(new_schema_queue_mutex_); + pending_commands.swap(new_schema_queue_); + } + for (auto& cmd : pending_commands) { + if (local_registry.find(cmd.key) != local_registry.end()) { + // Schema re-registration: drain old queue then replace. + local_registry[cmd.key]->drain_to_db(db_, SIZE_MAX); + } + local_registry[cmd.key] = std::move(cmd.queue); + work = true; + } + } + // Drain all queues entirely. + work |= drainMetadataQueue(); + + for (auto& [key, q] : local_registry) { + work |= (q->drain_to_db(db_, SIZE_MAX) > 0); + } + + return work; + }; + + while (drain_all()) { + } + + // Destroy queues before closing so TypedQueue destructors can finalize + // their prepared statements. + local_registry.clear(); + + // Checkpoint the WAL into the main database before closing. + // Without this, data in the WAL may not be fully integrated into the + // main DB file. TRUNCATE mode checkpoints all pages and resets the WAL + // file to zero bytes for a clean shutdown. + sqlite3_exec( + db_, "PRAGMA wal_checkpoint(TRUNCATE);", nullptr, nullptr, nullptr); + + sqlite3_close(db_); + db_ = nullptr; + db_ready_ = false; +} + ToolId Logger::findToolId(const char* tool_name) { + const std::string_view name(tool_name); int tool_id = 0; for (const char* tool : tool_names_) { - if (strcmp(tool_name, tool) == 0) { + if (name == tool) { return static_cast(tool_id); } tool_id++; @@ -410,4 +984,103 @@ std::unique_ptr Logger::swapProgress(Progress* progress) return current_progress; } +void Logger::setDbLogGlobalMaxMem(size_t bytes) +{ + db_log_global_max_mem_ = bytes; +} + +size_t Logger::getDbLogGlobalMaxMem() const +{ + return db_log_global_max_mem_; +} + +void Logger::setDbLogPerChannelMaxMem(size_t bytes) +{ + db_log_per_channel_max_mem_ = bytes; +} + +size_t Logger::getDbLogPerChannelMaxMem() const +{ + return db_log_per_channel_max_mem_; +} + +std::optional Logger::logToDbMetadata(ToolId tool, + std::string key, + std::string value) +{ + if (!db_ready_) { + return std::nullopt; + } + std::lock_guard lock(metadata_queue_mutex_); + metadata_queue_.emplace(tool, std::move(key), std::move(value)); + return 1; +} + +bool Logger::drainMetadataQueue() +{ + std::queue local_meta; + { + std::lock_guard lock(metadata_queue_mutex_); + local_meta.swap(metadata_queue_); + } + + if (local_meta.empty()) { + return false; + } + + sqlite3_exec(db_, "BEGIN", nullptr, nullptr, nullptr); + bool meta_ok = true; + while (!local_meta.empty()) { + auto [mtool, mkey, mval] = std::move(local_meta.front()); + local_meta.pop(); + + char* sql = sqlite3_mprintf("INSERT INTO metadata VALUES (%d, %Q, %Q)", + static_cast(mtool), + mkey.c_str(), + mval.c_str()); + int rc = sqlite3_exec(db_, sql, nullptr, nullptr, nullptr); + sqlite3_free(sql); + + if (rc != SQLITE_OK) { + this->warn(UTL, + 116, + "Failed to insert metadata row for tool={} key='{}': {}", + static_cast(mtool), + mkey, + sqlite3_errmsg(db_)); + meta_ok = false; + break; + } + } + sqlite3_exec(db_, meta_ok ? "COMMIT" : "ROLLBACK", nullptr, nullptr, nullptr); + return true; +} + +bool Logger::isDbLogEnabled(SchemaKey key) const +{ + std::shared_lock lock(db_log_enabled_mutex_); + return db_log_disabled_set_.find(key) == db_log_disabled_set_.end(); +} + +void Logger::setDbLogEnabled(ToolId tool, int id, bool enabled) +{ + this->info(UTL, 119, "Database logging for {}-{} is {}.", + tool_names_[tool], id, enabled ? "enabled" : "disabled"); + SchemaKey key{tool, id}; + std::lock_guard lock(db_log_enabled_mutex_); + if (enabled) { + db_log_disabled_set_.erase(key); + } else { + db_log_disabled_set_.insert(key); + // If already registered, remove it so the next logToDb call + // hits the slow path and re-checks isDbLogEnabled. + schema_registry_.remove_schema(key); + } +} + +bool Logger::getDbLogEnabled(ToolId tool, int id) const +{ + return isDbLogEnabled({tool, id}); +} + } // namespace utl diff --git a/src/utl/src/Logger.i b/src/utl/src/Logger.i index c1f418266ee..9b1dc5277ac 100644 --- a/src/utl/src/Logger.i +++ b/src/utl/src/Logger.i @@ -105,6 +105,56 @@ void startPrometheusEndpoint(uint16_t port) logger->startPrometheusEndpoint(port); } +// --- Database logging controls --- + +void start_log_db(const std::string& filename) +{ + utl::Logger* logger = ord::getLogger(); + logger->startLogDb(filename.c_str()); +} + +void stop_log_db() +{ + utl::Logger* logger = ord::getLogger(); + logger->stopLogDb(); +} + +void set_db_log_global_max_mem(size_t bytes) +{ + utl::Logger* logger = ord::getLogger(); + logger->setDbLogGlobalMaxMem(bytes); +} + +size_t get_db_log_global_max_mem() +{ + utl::Logger* logger = ord::getLogger(); + return logger->getDbLogGlobalMaxMem(); +} + +void set_db_log_per_channel_max_mem(size_t bytes) +{ + utl::Logger* logger = ord::getLogger(); + logger->setDbLogPerChannelMaxMem(bytes); +} + +size_t get_db_log_per_channel_max_mem() +{ + utl::Logger* logger = ord::getLogger(); + return logger->getDbLogPerChannelMaxMem(); +} + +void set_db_log_enabled(const std::string& tool, int id, bool enabled) +{ + utl::Logger* logger = ord::getLogger(); + logger->setDbLogEnabled(utl::Logger::findToolId(tool.c_str()), id, enabled); +} + +bool get_db_log_enabled(const std::string& tool, int id) +{ + utl::Logger* logger = ord::getLogger(); + return logger->getDbLogEnabled(utl::Logger::findToolId(tool.c_str()), id); +} + } // namespace %} // inline