Skip to content

Commit e5c77f8

Browse files
author
Grok Compression
committed
network: support ADSL, configurable timout and retry
1 parent 448a871 commit e5c77f8

5 files changed

Lines changed: 47 additions & 3 deletions

File tree

src/lib/core/grok.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,17 @@ typedef struct _grk_stream_params
612612
/* 9 Requester pays (S3) */
613613
char request_payer[64]; /* e.g. "requester" for x-amz-request-payer header */
614614

615+
/* 10 User agent */
616+
char user_agent[256]; /* custom User-Agent header string */
617+
618+
/* 11 Timeouts (seconds, 0 = use default) */
619+
long timeout; /* overall request timeout (CURLOPT_TIMEOUT) */
620+
long connect_timeout; /* connection timeout (CURLOPT_CONNECTTIMEOUT) */
621+
622+
/* 12 Retry configuration (0 = use default) */
623+
uint32_t max_retry; /* maximum number of retries */
624+
uint32_t retry_delay; /* delay between retries in seconds */
625+
615626
} grk_stream_params;
616627

617628
/**

src/lib/core/stream/StreamGenerator.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,26 @@ IStream* StreamGenerator::createCurlFetchStream(void)
151151
auth.proxy_userpwd_ = streamParams_.proxy_userpwd;
152152
if(streamParams_.request_payer[0])
153153
auth.request_payer_ = streamParams_.request_payer;
154+
if(streamParams_.user_agent[0])
155+
auth.user_agent_ = streamParams_.user_agent;
156+
auth.timeout_ = streamParams_.timeout;
157+
auth.connect_timeout_ = streamParams_.connect_timeout;
158+
auth.max_retry_ = streamParams_.max_retry;
159+
auth.retry_delay_ = streamParams_.retry_delay;
154160
grklog.debug("StreamGenerator: s3_allow_insecure: streamParams=%d, auth=%d",
155161
(int)streamParams_.s3_allow_insecure, (int)auth.s3_allow_insecure_);
156162
std::string_view file{streamParams_.file};
157163
bool isS3 = file.starts_with("/vsis3/") || file.starts_with("/vsis3_streaming/");
158164
bool isAZ = file.starts_with("/vsiaz/");
165+
bool isADLS = file.starts_with("/vsiadls/");
159166
bool isGS = file.starts_with("/vsigs/");
160167
CurlFetcher* fetcher;
161168
if(isS3)
162169
fetcher = new S3Fetcher();
163170
else if(isAZ)
164171
fetcher = new AZFetcher();
172+
else if(isADLS)
173+
fetcher = new ADLSFetcher();
165174
else if(isGS)
166175
fetcher = new GSFetcher();
167176
else

src/lib/core/stream/StreamGenerator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ class StreamGenerator
100100
std::string_view file{streamParams_.file};
101101
bool isNetwork = file.starts_with("http://") || file.starts_with("https://") ||
102102
file.starts_with("/vsis3/") || file.starts_with("/vsicurl/") ||
103-
file.starts_with("/vsiaz/") || file.starts_with("/vsigs/");
103+
file.starts_with("/vsiaz/") || file.starts_with("/vsiadls/") ||
104+
file.starts_with("/vsigs/");
104105
if(isNetwork && !streamParams_.read_fn)
105106
return createCurlFetchStream();
106107
}

src/lib/core/stream/fetchers/CurlFetcher.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ class CurlFetcher : public IFetcher
203203
void init(const std::string& path, const FetchAuth& auth) override
204204
{
205205
auth_ = auth;
206+
if(auth_.max_retry_ > 0)
207+
maxRetries_ = auth_.max_retry_;
208+
if(auth_.retry_delay_ > 0)
209+
retryDelayMs_ = auth_.retry_delay_ * 1000;
206210
parse(path);
207211
fetch_total_size();
208212
}
@@ -598,6 +602,14 @@ class CurlFetcher : public IFetcher
598602
if(EnvVarManager::get("GRK_CURL_PROXYAUTH"))
599603
curl_easy_setopt(curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
600604
}
605+
606+
// User agent
607+
if(!auth_.user_agent_.empty())
608+
curl_easy_setopt(curl, CURLOPT_USERAGENT, auth_.user_agent_.c_str());
609+
610+
// Timeouts
611+
long connect_timeout = auth_.connect_timeout_ > 0 ? auth_.connect_timeout_ : 10L;
612+
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, connect_timeout);
601613
}
602614

603615
curl_slist* configureHeaders(const std::string& range)
@@ -685,7 +697,6 @@ class CurlFetcher : public IFetcher
685697
throw std::runtime_error("Failed to initialize CURL easy handle");
686698

687699
curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
688-
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10L);
689700
curl_initiate_retry(curl);
690701
curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
691702
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
@@ -860,7 +871,8 @@ class CurlFetcher : public IFetcher
860871

861872
void curl_initiate_retry(CURL* curl)
862873
{
863-
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30L);
874+
long timeout = auth_.timeout_ > 0 ? auth_.timeout_ : 30L;
875+
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
864876
}
865877

866878
/**

src/lib/core/stream/fetchers/FetchCommon.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,17 @@ struct FetchAuth
6868
// S3 requester pays
6969
std::string request_payer_; // e.g. "requester"
7070

71+
// User agent
72+
std::string user_agent_;
73+
74+
// Timeouts (seconds, 0 = use default)
75+
long timeout_ = 0;
76+
long connect_timeout_ = 0;
77+
78+
// Retry configuration (0 = use default)
79+
uint32_t max_retry_ = 0;
80+
uint32_t retry_delay_ = 0;
81+
7182
FetchAuth() = default;
7283
FetchAuth(const std::string& u, const std::string& p, const std::string& t, const std::string& h,
7384
const std::string& r = "", const std::string& st = "")

0 commit comments

Comments
 (0)