Skip to content

Commit 002647a

Browse files
committed
Connection pools support keepalive through a proxyserver
1 parent d8392a2 commit 002647a

3 files changed

Lines changed: 161 additions & 64 deletions

File tree

src/Connections.jl

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ using Sockets, LoggingExtras, NetworkOptions
2525
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!
2626
using MbedTLS, OpenSSL, ConcurrentUtilities
2727
using ..IOExtras, ..Conditions, ..Exceptions
28+
using URIs: URI
2829

2930
const nolimit = typemax(Int)
3031

@@ -73,6 +74,7 @@ Fields:
7374
mutable struct Connection{IO_t <: IO} <: IO
7475
host::String
7576
port::String
77+
proxy::Union{Nothing,String}
7678
idle_timeout::Int
7779
require_ssl_verification::Bool
7880
keepalive::Bool
@@ -98,20 +100,21 @@ request parameters of host and port, and if ssl verification is required, if kee
98100
and if an existing Connection was already created with the exact.
99101
same parameters, we can re-use it (as long as it's not already being used, obviously).
100102
"""
101-
connectionkey(x::Connection) = (x.host, x.port, x.require_ssl_verification, x.keepalive, x.clientconnection)
103+
connectionkey(x::Connection) = (x.host, x.port, x.proxy, x.require_ssl_verification, x.keepalive, x.clientconnection)
102104

103-
const ConnectionKeyType = Tuple{AbstractString, AbstractString, Bool, Bool, Bool}
105+
const ConnectionKeyType = Tuple{AbstractString, AbstractString, Union{Nothing,AbstractString}, Bool, Bool, Bool}
104106

105-
Connection(host::AbstractString, port::AbstractString,
107+
Connection(host::AbstractString, port::AbstractString, proxy::Union{Nothing,AbstractString},
106108
idle_timeout::Int,
107109
require_ssl_verification::Bool, keepalive::Bool, io::T, client=true) where {T}=
108-
Connection{T}(host, port, idle_timeout,
110+
Connection{T}(host, port, proxy, idle_timeout,
109111
require_ssl_verification, keepalive,
110112
safe_getpeername(io)..., localport(io),
111-
io, client, PipeBuffer(), time(), false, false, IOBuffer(), nothing)
113+
io, client,
114+
PipeBuffer(), time(), false, false, IOBuffer(), nothing)
112115

113116
Connection(io; require_ssl_verification::Bool=true, keepalive::Bool=true) =
114-
Connection("", "", 0, require_ssl_verification, keepalive, io, false)
117+
Connection("", "", nothing, 0, require_ssl_verification, keepalive, io, false)
115118

116119
getrawstream(c::Connection) = c.io
117120

@@ -432,30 +435,59 @@ end
432435
Find a reusable `Connection` in the `pool`,
433436
or create a new `Connection` if required.
434437
"""
435-
function newconnection(::Type{T},
436-
host::AbstractString,
437-
port::AbstractString;
438+
function newconnection(wrapconnection::Function,
439+
url::URI;
440+
proxy::Union{Nothing, AbstractString}=nothing,
441+
socket_type::Type,
442+
socket_type_tls::Type,
438443
pool::Union{Nothing, Pool}=nothing,
439444
connection_limit=nothing,
440445
forcenew::Bool=false,
441446
idle_timeout=typemax(Int),
442-
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
447+
require_ssl_verification::Bool=NetworkOptions.verify_host(url.host, "SSL"),
443448
keepalive::Bool=true,
444-
kw...) where {T <: IO}
449+
kw...)
450+
IOType = sockettype(url, socket_type, socket_type_tls)
451+
445452
connection_limit_warning(connection_limit)
446-
return acquire(
447-
getpool(pool, T),
448-
(host, port, require_ssl_verification, keepalive, true);
453+
454+
key = (url.host, url.port, proxy, require_ssl_verification, keepalive, true)
455+
456+
acquire(
457+
getpool(pool, IOType),
458+
key;
449459
forcenew=forcenew,
450460
isvalid=c->connection_isvalid(c, Int(idle_timeout))) do
451-
Connection(host, port,
461+
462+
if proxy !== nothing
463+
url = URI(proxy)
464+
end
465+
466+
innerIOType = sockettype(url, socket_type, socket_type_tls)
467+
468+
io = Connection(url.host, url.port, proxy,
452469
idle_timeout, require_ssl_verification, keepalive,
453-
getconnection(T, host, port;
454-
require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
470+
getconnection(innerIOType, url.host, url.port;
471+
require_ssl_verification, keepalive, kw...)
455472
)
473+
474+
try
475+
io = wrapconnection(io)
476+
catch ex
477+
@try Base.IOError close(io)
478+
rethrow(ex)
479+
end
480+
481+
if connectionkey(io) != key
482+
throw(ErrorException(string("Connection error ", (;expected = connectionkey(io), key))))
483+
end
484+
485+
io
456486
end
457487
end
458488

489+
sockettype(url::URI, tcp, tls) = url.scheme in ("wss", "https") ? tls : tcp
490+
459491
function releaseconnection(c::Connection{T}, reuse; pool::Union{Nothing, Pool}=nothing, kw...) where {T}
460492
c.timestamp = time()
461493
release(getpool(pool, T), connectionkey(c), reuse ? c : nothing)
@@ -615,7 +647,6 @@ end
615647

616648
function sslupgrade(::Type{IOType}, c::Connection{T},
617649
host::AbstractString;
618-
pool::Union{Nothing, Pool}=nothing,
619650
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
620651
keepalive::Bool=true,
621652
readtimeout::Int=0,
@@ -630,12 +661,9 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
630661
else
631662
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
632663
end
664+
633665
# success, now we turn it into a new Connection
634-
conn = Connection(host, "", 0, require_ssl_verification, keepalive, tls)
635-
# release the "old" one, but don't return the connection since we're hijacking the socket
636-
release(getpool(pool, T), connectionkey(c))
637-
# and return the new one
638-
return acquire(() -> conn, getpool(pool, IOType), connectionkey(conn); forcenew=true)
666+
Connection(host,"", "", 0, require_ssl_verification, keepalive, tls)
639667
end
640668

641669
function Base.show(io::IO, c::Connection)

src/clientlayers/ConnectionRequest.jl

Lines changed: 58 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -57,27 +57,37 @@ Otherwise leave it open so that it can be reused.
5757
function connectionlayer(handler)
5858
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...)
5959
local io, stream
60+
url = req.url
6061
if proxy !== nothing
61-
target_url = req.url
62-
url = URI(proxy)
63-
if target_url.scheme == "http"
64-
req.target = string(target_url)
65-
end
66-
6762
userinfo = unescapeuri(url.userinfo)
6863
if !isempty(userinfo) && !hasheader(req.headers, "Proxy-Authorization")
6964
@debugv 1 "Adding Proxy-Authorization: Basic header."
7065
setheader(req.headers, "Proxy-Authorization" => "Basic $(base64encode(userinfo))")
7166
end
72-
else
73-
url = target_url = req.url
67+
68+
proxyauth = header(req, "Proxy-Authorization")
69+
if url.scheme in ("https", "wss", "ws")
70+
req.headers = filter(x->x.first != "Proxy-Authorization", req.headers)
71+
end
72+
73+
if url.scheme == "http"
74+
req.target = string(url)
75+
end
7476
end
7577

76-
IOType = sockettype(url, socket_type, socket_type_tls)
7778
start_time = time()
7879
try
79-
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
80+
io = newconnection(url; proxy, socket_type, socket_type_tls, readtimeout, kw...) do io
81+
if isnothing(proxy)
82+
io
83+
else
84+
proxy_tunnel(io, url; proxyauth, proxy, socket_type_tls, readtimeout, kw...)
85+
end
86+
end
8087
catch e
88+
if e isa StatusError && !get(kw, :status_exception, true)
89+
return e.response
90+
end
8191
if logerrors
8292
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
8393
@error err type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag
@@ -88,33 +98,8 @@ function connectionlayer(handler)
8898
req.context[:connect_duration_ms] = get(req.context, :connect_duration_ms, 0.0) + (time() - start_time) * 1000
8999
end
90100

91-
shouldreuse = !(target_url.scheme in ("ws", "wss"))
101+
shouldreuse = !(url.scheme in ("ws", "wss"))
92102
try
93-
if proxy !== nothing && target_url.scheme in ("https", "wss", "ws")
94-
shouldreuse = false
95-
# tunnel request
96-
if target_url.scheme in ("https", "wss")
97-
target_url = URI(target_url, port=443)
98-
elseif target_url.scheme in ("ws", ) && target_url.port == ""
99-
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
100-
end
101-
r = if readtimeout > 0
102-
try_with_timeout(readtimeout) do _
103-
connect_tunnel(io, target_url, req)
104-
end
105-
else
106-
connect_tunnel(io, target_url, req)
107-
end
108-
if r.status != 200
109-
close(io)
110-
return r
111-
end
112-
if target_url.scheme in ("https", "wss")
113-
io = Connections.sslupgrade(socket_type_tls, io, target_url.host; readtimeout=readtimeout, kw...)
114-
end
115-
req.headers = filter(x->x.first != "Proxy-Authorization", req.headers)
116-
end
117-
118103
stream = Stream(req.response, io)
119104
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, logtag=logtag, kw...)
120105
catch e
@@ -151,13 +136,42 @@ function connectionlayer(handler)
151136
end
152137
end
153138

154-
sockettype(url::URI, tcp, tls) = url.scheme in ("wss", "https") ? tls : tcp
139+
function proxy_tunnel(io, url; proxyauth, socket_type_tls, readtimeout, proxy, kw...)
140+
if url.scheme in ("https", "wss", "ws")
141+
# tunnel request
142+
143+
if (port = url.port; isempty(port))
144+
# if there is no port info, connect_tunnel will fail
145+
# The pool entry still has port = ""
146+
if url.scheme in ("https", "wss")
147+
port = 443
148+
elseif scheme in ("ws", )
149+
port = 80
150+
end
151+
end
155152

156-
function connect_tunnel(io, target_url, req)
157-
target = "$(URIs.hoststring(target_url.host)):$(target_url.port)"
153+
if readtimeout > 0
154+
try_with_timeout(readtimeout) do _
155+
connect_tunnel(io, url.host, port, proxyauth)
156+
end
157+
else
158+
connect_tunnel(io, url.host, port, proxyauth)
159+
end
160+
161+
if url.scheme in ("https", "wss")
162+
io = Connections.sslupgrade(socket_type_tls, io, url.host; readtimeout, kw...)
163+
io.port = url.port
164+
io.proxy = proxy
165+
end
166+
end
167+
io
168+
end
169+
170+
function connect_tunnel(io, host, port, auth)
171+
target = "$(URIs.hoststring(host)):$(port)"
158172
@debugv 1 "📡 CONNECT HTTPS tunnel to $target"
159173
headers = Dict("Host" => target)
160-
if (auth = header(req, "Proxy-Authorization"); !isempty(auth))
174+
if !isempty(auth)
161175
headers["Proxy-Authorization"] = auth
162176
end
163177
request = Request("CONNECT", target, headers)
@@ -166,7 +180,10 @@ function connect_tunnel(io, target_url, req)
166180
# @debugv 2 "connect_tunnel: reading headers"
167181
readheaders(io, request.response)
168182
# @debugv 2 "connect_tunnel: done reading headers"
169-
return request.response
183+
if request.response.status != 200
184+
throw(StatusError(request.response.status,
185+
request.method, request.target, request.response))
186+
end
170187
end
171188

172189
end # module ConnectionRequest

test/client.jl

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,58 @@ end
553553
end
554554
end
555555

556+
@testset "HTTP CONNECT Proxy pool" begin
557+
# Stores the http request passed by the client
558+
messages = []
559+
streams = Set()
560+
561+
function forwardstream(src, dst)
562+
while isopen(dst) && isopen(src) && !eof(src)
563+
buff = readavailable(src)
564+
!isempty(buff) && isopen(dst) && write(dst, buff)
565+
end
566+
567+
close(src)
568+
close(dst)
569+
end
570+
571+
# Simple implementation of a proxy server
572+
proxy = HTTP.listen!(IPv4(0), 8082; stream = true) do http::HTTP.Stream
573+
push!(messages, http.message)
574+
host, port = split(http.message.target, ":")
575+
targetstream = connect(host, parse(Int, port))
576+
HTTP.setstatus(http, 200)
577+
HTTP.startwrite(http)
578+
up = @async forwardstream(http.stream.io, targetstream)
579+
down = @async forwardstream(targetstream, http.stream.io)
580+
push!(streams, targetstream)
581+
wait(up)
582+
wait(down)
583+
delete!(streams, targetstream)
584+
end
585+
586+
try
587+
# Make the HTTP request
588+
r1 = HTTP.get("https://example.com:443"; proxy="http://localhost:8082", retry=false, status_exception=true)
589+
@test length(messages) == 1
590+
@test first(messages).method == "CONNECT"
591+
@test length(streams) == 1 && isopen(first(streams)) # still alive
592+
593+
# Make another request
594+
# This should reuse the connection pool and not make another request to the proxy
595+
empty!(messages)
596+
r2 = HTTP.get("https://example.com:443"; proxy="http://localhost:8082", retry=false, status_exception=true)
597+
@test isempty(messages)
598+
@test r1.body == r2.body # no new message to the proxy yet successfully get the same response from the target server
599+
@test length(streams) == 1 && isopen(first(streams)) # still only one stream alive
600+
finally
601+
close.(streams)
602+
close(proxy)
603+
HTTP.Connections.closeall()
604+
wait(proxy)
605+
end
606+
end
607+
556608
@testset "Retry with request/response body streams" begin
557609
shouldfail = Ref(true)
558610
status = Ref(200)

0 commit comments

Comments
 (0)