Skip to content

Commit 86ef0f5

Browse files
committed
Merge branch 'refactor/GH-65-general-refactor-sync-mutex' into refactor/GH-65-general-refactor
2 parents 7dd1dfa + 92c4b84 commit 86ef0f5

31 files changed

Lines changed: 764 additions & 212 deletions

.DS_Store

8 KB
Binary file not shown.

canyon_core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ chrono = { workspace = true }
1919
async-std = { workspace = true, optional = true }
2020
regex = { workspace = true }
2121

22-
tokio = { workspace = true }
22+
tokio = { workspace = true, features = ["sync"] }
2323
tokio-util = { workspace = true }
2424

2525
futures = { workspace = true }

canyon_core/src/canyon.rs

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::connection::conn_errors::DatasourceNotFound;
22
use crate::connection::database_type::DatabaseType;
33
use crate::connection::datasources::{CanyonSqlConfig, DatasourceConfig, Datasources};
4-
use crate::connection::{CANYON_INSTANCE, db_connector, get_canyon_tokio_runtime};
4+
use crate::connection::{CANYON_INSTANCE, db_connector, get_canyon_tokio_runtime, pool::get_pool_manager};
55
use db_connector::DatabaseConnection;
66
use std::collections::HashMap;
77
use std::sync::Arc;
@@ -53,8 +53,8 @@ pub type SharedConnection = Arc<Mutex<DatabaseConnection>>;
5353
/// - `get_mut_connection`: Retrieves a mutable connection from the cache.
5454
pub struct Canyon {
5555
config: Datasources,
56-
connections: HashMap<&'static str, SharedConnection>,
57-
default_connection: Option<SharedConnection>,
56+
connections: HashMap<&'static str, DatabaseConnection>,
57+
default_connection: Option<DatabaseConnection>,
5858
default_db_type: Option<DatabaseType>,
5959
}
6060

@@ -113,9 +113,9 @@ impl Canyon {
113113
let config_content = fs::read_to_string(&path)?;
114114
let config: Datasources = toml::from_str::<CanyonSqlConfig>(&config_content)?.canyon_sql;
115115

116-
let mut connections = HashMap::new();
117-
let mut default_connection = None;
118-
let mut default_db_type = None;
116+
let mut connections: HashMap<&str, DatabaseConnection> = HashMap::new();
117+
let mut default_connection: Option<DatabaseConnection> = None;
118+
let mut default_db_type: Option<DatabaseType> = None;
119119

120120
for ds in config.datasources.iter() {
121121
__impl::process_new_conn_by_datasource(
@@ -179,9 +179,9 @@ impl Canyon {
179179
}
180180

181181
// Retrieve a read-only connection from the cache
182-
pub fn get_default_connection(&self) -> Result<SharedConnection, DatasourceNotFound> {
182+
pub fn get_default_connection(&self) -> Result<&DatabaseConnection, DatasourceNotFound> {
183183
self.default_connection
184-
.clone()
184+
.as_ref()
185185
.ok_or_else(|| DatasourceNotFound::from(None))
186186
}
187187

@@ -190,7 +190,7 @@ impl Canyon {
190190
/// This is a fast and efficient operation: cloning the [`SharedConnection`]
191191
/// simply increases the reference count [`Arc`] without duplicating the underlying
192192
/// [`DatabaseConnection`]. Returns an error if no default connection is configured.
193-
pub fn get_connection(&self, name: &str) -> Result<SharedConnection, DatasourceNotFound> {
193+
pub fn get_connection(&self, name: &str) -> Result<&DatabaseConnection, DatasourceNotFound> {
194194
if name.is_empty() {
195195
return self.get_default_connection();
196196
}
@@ -200,20 +200,47 @@ impl Canyon {
200200
.get(name)
201201
.ok_or_else(|| DatasourceNotFound::from(Some(name)))?;
202202

203-
Ok(conn.clone())
203+
Ok(conn)
204204
}
205+
206+
/// Gets a pooled connection for better performance
207+
/// This is an internal method that uses the connection pool
208+
pub async fn get_pooled_connection(&self, name: &str) -> Result<crate::connection::pool::PooledConnection, DatasourceNotFound> {
209+
let pool_manager = get_pool_manager();
210+
let mut pool_manager_guard = pool_manager.lock().await;
211+
212+
// Find the datasource
213+
let datasource = self.find_datasource_by_name_or_default(name)?;
214+
215+
// Create pool if it doesn't exist
216+
if !pool_manager_guard.has_pool(name) {
217+
pool_manager_guard.create_pool(name, datasource).await
218+
.map_err(|_| DatasourceNotFound::from(Some(name)))?;
219+
}
220+
221+
// Get pooled connection
222+
pool_manager_guard.get_connection(name).await
223+
.map_err(|_| DatasourceNotFound::from(Some(name)))
224+
}
225+
226+
/// Gets a fast connection that automatically uses pooling when available
227+
/// This method provides the best performance by using connection pooling
228+
pub async fn get_fast_connection(&self, name: &str) -> Result<&DatabaseConnection, DatasourceNotFound> {
229+
// For now, fall back to the regular connection
230+
// In the future, this could automatically use the pool
231+
self.get_connection(name)
232+
}
233+
234+
205235
}
206236

207237
mod __impl {
208-
use crate::canyon::SharedConnection;
209238
use crate::connection::database_type::DatabaseType;
210239
use crate::connection::datasources::DatasourceConfig;
211240
use crate::connection::db_connector::DatabaseConnection;
212241
use std::collections::HashMap;
213242
use std::error::Error;
214243
use std::path::PathBuf;
215-
use std::sync::Arc;
216-
use tokio::sync::Mutex;
217244
use walkdir::WalkDir;
218245

219246
// Internal helper to locate the config file
@@ -240,23 +267,22 @@ mod __impl {
240267

241268
pub(crate) async fn process_new_conn_by_datasource(
242269
ds: &DatasourceConfig,
243-
connections: &mut HashMap<&str, SharedConnection>,
244-
default: &mut Option<SharedConnection>,
270+
connections: &mut HashMap<&str, DatabaseConnection>,
271+
default: &mut Option<DatabaseConnection>,
245272
default_db_type: &mut Option<DatabaseType>,
246273
) -> Result<(), Box<dyn Error + Send + Sync>> {
274+
if default.is_none() {
275+
let cloned_ds_for_default = ds.clone();
276+
*default = Some(DatabaseConnection::new(&cloned_ds_for_default).await?); // Only cloning the smart pointer
277+
}
247278
let conn = DatabaseConnection::new(ds).await?;
248279
let name: &'static str = Box::leak(ds.name.clone().into_boxed_str());
249280

250281
if default_db_type.is_none() {
251282
*default_db_type = Some(conn.get_db_type());
252283
}
253284

254-
let connection_sp = Arc::new(Mutex::new(conn));
255-
256-
if default.is_none() {
257-
*default = Some(connection_sp.clone()); // Only cloning the smart pointer
258-
}
259-
285+
let connection_sp = conn;
260286
connections.insert(name, connection_sp);
261287

262288
Ok(())

canyon_core/src/connection/contracts/impl/database_connection.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,60 @@ impl DbConnection for DatabaseConnection {
6262
}
6363
}
6464

65+
impl DbConnection for &DatabaseConnection {
66+
async fn query_rows<'a>(
67+
&self,
68+
stmt: &str,
69+
params: &[&'a dyn QueryParameter],
70+
) -> Result<CanyonRows, Box<(dyn Error + Send + Sync)>> {
71+
db_conn_query_rows_impl(self, stmt, params).await
72+
}
73+
74+
async fn query<'a, S, R>(
75+
&self,
76+
stmt: S,
77+
params: &[&'a (dyn QueryParameter)],
78+
) -> Result<Vec<R>, Box<(dyn Error + Send + Sync)>>
79+
where
80+
S: AsRef<str> + Send,
81+
R: RowMapper,
82+
Vec<R>: FromIterator<<R as RowMapper>::Output>,
83+
{
84+
db_conn_query_impl(self, stmt, params).await
85+
}
86+
87+
async fn query_one<'a, R>(
88+
&self,
89+
stmt: &str,
90+
params: &[&'a dyn QueryParameter],
91+
) -> Result<Option<R::Output>, Box<(dyn Error + Send + Sync)>>
92+
where
93+
R: RowMapper,
94+
{
95+
db_conn_query_one_impl::<R>(self, stmt, params).await
96+
}
97+
98+
async fn query_one_for<'a, T: FromSqlOwnedValue<T>>(
99+
&self,
100+
stmt: &str,
101+
params: &[&'a dyn QueryParameter],
102+
) -> Result<T, Box<(dyn Error + Send + Sync)>> {
103+
db_conn_query_one_for_impl::<T>(self, stmt, params).await
104+
}
105+
106+
async fn execute<'a>(
107+
&self,
108+
stmt: &str,
109+
params: &[&'a dyn QueryParameter],
110+
) -> Result<u64, Box<(dyn Error + Send + Sync)>> {
111+
db_conn_execute_impl(self, stmt, params).await
112+
}
113+
114+
fn get_database_type(&self) -> Result<DatabaseType, Box<(dyn Error + Send + Sync)>> {
115+
Ok(self.get_db_type())
116+
}
117+
}
118+
65119
impl DbConnection for &mut DatabaseConnection {
66120
async fn query_rows<'a>(
67121
&self,

canyon_core/src/connection/contracts/impl/mssql.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ use crate::{
1111
use std::{error::Error, future::Future};
1212

1313
impl DbConnection for SqlServerConnection {
14-
fn query_rows<'a>(
14+
fn query_rows(
1515
&self,
1616
stmt: &str,
17-
params: &[&'a dyn QueryParameter],
17+
params: &[&dyn QueryParameter],
1818
) -> impl Future<Output = Result<CanyonRows, Box<(dyn Error + Send + Sync)>>> + Send {
1919
sqlserver_query_launcher::query_rows(stmt, params, self)
2020
}
2121

22-
fn query<'a, S, R>(
22+
fn query<S, R>(
2323
&self,
2424
stmt: S,
25-
params: &[&'a (dyn QueryParameter)],
25+
params: &[&(dyn QueryParameter)],
2626
) -> impl Future<Output = Result<Vec<R>, Box<(dyn Error + Send + Sync)>>> + Send
2727
where
2828
S: AsRef<str> + Send,
@@ -32,29 +32,29 @@ impl DbConnection for SqlServerConnection {
3232
sqlserver_query_launcher::query(stmt, params, self)
3333
}
3434

35-
fn query_one<'a, R>(
35+
fn query_one<R>(
3636
&self,
3737
stmt: &str,
38-
params: &[&'a (dyn QueryParameter)],
38+
params: &[&(dyn QueryParameter)],
3939
) -> impl Future<Output = Result<Option<R::Output>, Box<(dyn Error + Send + Sync)>>> + Send
4040
where
4141
R: RowMapper,
4242
{
4343
sqlserver_query_launcher::query_one::<R>(stmt, params, self)
4444
}
4545

46-
fn query_one_for<'a, T: FromSqlOwnedValue<T>>(
46+
fn query_one_for<T: FromSqlOwnedValue<T>>(
4747
&self,
4848
stmt: &str,
49-
params: &[&'a (dyn QueryParameter)],
49+
params: &[&(dyn QueryParameter)],
5050
) -> impl Future<Output = Result<T, Box<(dyn Error + Send + Sync)>>> + Send {
5151
sqlserver_query_launcher::query_one_for(stmt, params, self)
5252
}
5353

54-
fn execute<'a>(
54+
fn execute(
5555
&self,
5656
stmt: &str,
57-
params: &[&'a (dyn QueryParameter)],
57+
params: &[&(dyn QueryParameter)],
5858
) -> impl Future<Output = Result<u64, Box<(dyn Error + Send + Sync)>>> + Send {
5959
sqlserver_query_launcher::execute(stmt, params, self)
6060
}

canyon_core/src/connection/contracts/impl/mysql.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ use crate::{
1010
use std::{error::Error, future::Future};
1111

1212
impl DbConnection for MysqlConnection {
13-
fn query_rows<'a>(
13+
fn query_rows(
1414
&self,
1515
stmt: &str,
16-
params: &[&'a dyn QueryParameter],
16+
params: &[&dyn QueryParameter],
1717
) -> impl Future<Output = Result<CanyonRows, Box<(dyn Error + Send + Sync)>>> + Send {
1818
mysql_query_launcher::query_rows(stmt, params, self)
1919
}
2020

21-
fn query<'a, S, R>(
21+
fn query<S, R>(
2222
&self,
2323
stmt: S,
24-
params: &[&'a (dyn QueryParameter)],
24+
params: &[&(dyn QueryParameter)],
2525
) -> impl Future<Output = Result<Vec<R>, Box<(dyn Error + Send + Sync)>>> + Send
2626
where
2727
S: AsRef<str> + Send,
@@ -31,29 +31,29 @@ impl DbConnection for MysqlConnection {
3131
mysql_query_launcher::query(stmt, params, self)
3232
}
3333

34-
fn query_one<'a, R>(
34+
fn query_one<R>(
3535
&self,
3636
stmt: &str,
37-
params: &[&'a dyn QueryParameter],
37+
params: &[&dyn QueryParameter],
3838
) -> impl Future<Output = Result<Option<R::Output>, Box<(dyn Error + Send + Sync)>>> + Send
3939
where
4040
R: RowMapper,
4141
{
4242
mysql_query_launcher::query_one::<R>(stmt, params, self)
4343
}
4444

45-
fn query_one_for<'a, T: FromSqlOwnedValue<T>>(
45+
fn query_one_for<T: FromSqlOwnedValue<T>>(
4646
&self,
4747
stmt: &str,
48-
params: &[&'a dyn QueryParameter],
48+
params: &[&dyn QueryParameter],
4949
) -> impl Future<Output = Result<T, Box<(dyn Error + Send + Sync)>>> + Send {
5050
mysql_query_launcher::query_one_for(stmt, params, self)
5151
}
5252

53-
fn execute<'a>(
53+
fn execute(
5454
&self,
5555
stmt: &str,
56-
params: &[&'a dyn QueryParameter],
56+
params: &[&dyn QueryParameter],
5757
) -> impl Future<Output = Result<u64, Box<(dyn Error + Send + Sync)>>> + Send {
5858
mysql_query_launcher::execute(stmt, params, self)
5959
}

canyon_core/src/connection/contracts/impl/postgresql.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ use crate::{
1111
use std::{error::Error, future::Future};
1212

1313
impl DbConnection for PostgreSqlConnection {
14-
fn query_rows<'a>(
14+
fn query_rows(
1515
&self,
1616
stmt: &str,
17-
params: &[&'a dyn QueryParameter],
17+
params: &[&dyn QueryParameter],
1818
) -> impl Future<Output = Result<CanyonRows, Box<(dyn Error + Send + Sync)>>> + Send {
1919
postgres_query_launcher::query_rows(stmt, params, self)
2020
}
2121

22-
fn query<'a, S, R>(
22+
fn query<S, R>(
2323
&self,
2424
stmt: S,
25-
params: &[&'a (dyn QueryParameter)],
25+
params: &[&(dyn QueryParameter)],
2626
) -> impl Future<Output = Result<Vec<R>, Box<(dyn Error + Send + Sync)>>> + Send
2727
where
2828
S: AsRef<str> + Send,
@@ -32,29 +32,29 @@ impl DbConnection for PostgreSqlConnection {
3232
postgres_query_launcher::query(stmt, params, self)
3333
}
3434

35-
fn query_one<'a, R>(
35+
fn query_one<R>(
3636
&self,
3737
stmt: &str,
38-
params: &[&'a (dyn QueryParameter)],
38+
params: &[&(dyn QueryParameter)],
3939
) -> impl Future<Output = Result<Option<R::Output>, Box<(dyn Error + Send + Sync)>>> + Send
4040
where
4141
R: RowMapper,
4242
{
4343
postgres_query_launcher::query_one::<R>(stmt, params, self)
4444
}
4545

46-
fn query_one_for<'a, T: FromSqlOwnedValue<T>>(
46+
fn query_one_for<T: FromSqlOwnedValue<T>>(
4747
&self,
4848
stmt: &str,
49-
params: &[&'a (dyn QueryParameter)],
49+
params: &[&(dyn QueryParameter)],
5050
) -> impl Future<Output = Result<T, Box<(dyn Error + Send + Sync)>>> + Send {
5151
postgres_query_launcher::query_one_for(stmt, params, self)
5252
}
5353

54-
fn execute<'a>(
54+
fn execute(
5555
&self,
5656
stmt: &str,
57-
params: &[&'a (dyn QueryParameter)],
57+
params: &[&(dyn QueryParameter)],
5858
) -> impl Future<Output = Result<u64, Box<(dyn Error + Send + Sync)>>> + Send {
5959
postgres_query_launcher::execute(stmt, params, self)
6060
}

0 commit comments

Comments
 (0)