1+ /*
2+ * ====================================================================
3+ * Licensed to the Apache Software Foundation (ASF) under one
4+ * or more contributor license agreements. See the NOTICE file
5+ * distributed with this work for additional information
6+ * regarding copyright ownership. The ASF licenses this file
7+ * to you under the Apache License, Version 2.0 (the
8+ * "License"); you may not use this file except in compliance
9+ * with the License. You may obtain a copy of the License at
10+ *
11+ * http://www.apache.org/licenses/LICENSE-2.0
12+ *
13+ * Unless required by applicable law or agreed to in writing,
14+ * software distributed under the License is distributed on an
15+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+ * KIND, either express or implied. See the License for the
17+ * specific language governing permissions and limitations
18+ * under the License.
19+ * ====================================================================
20+ *
21+ * This software consists of voluntary contributions made by many
22+ * individuals on behalf of the Apache Software Foundation. For more
23+ * information on the Apache Software Foundation, please see
24+ * <http://www.apache.org/>.
25+ *
26+ */
27+ package org .apache .hc .core5 .http2 .nio .support ;
28+
29+ import java .io .IOException ;
30+ import java .nio .ByteBuffer ;
31+ import java .util .List ;
32+ import java .util .concurrent .atomic .AtomicBoolean ;
33+
34+ import org .apache .hc .core5 .concurrent .FutureCallback ;
35+ import org .apache .hc .core5 .http .ConnectionClosedException ;
36+ import org .apache .hc .core5 .http .EntityDetails ;
37+ import org .apache .hc .core5 .http .Header ;
38+ import org .apache .hc .core5 .http .HttpException ;
39+ import org .apache .hc .core5 .http .HttpRequest ;
40+ import org .apache .hc .core5 .http .HttpResponse ;
41+ import org .apache .hc .core5 .http .Method ;
42+ import org .apache .hc .core5 .http .StreamControl ;
43+ import org .apache .hc .core5 .http .impl .BasicEntityDetails ;
44+ import org .apache .hc .core5 .http .message .BasicHttpRequest ;
45+ import org .apache .hc .core5 .http .message .StatusLine ;
46+ import org .apache .hc .core5 .http .nio .AsyncClientExchangeHandler ;
47+ import org .apache .hc .core5 .http .nio .CapacityChannel ;
48+ import org .apache .hc .core5 .http .nio .DataStreamChannel ;
49+ import org .apache .hc .core5 .http .nio .RequestChannel ;
50+ import org .apache .hc .core5 .http .nio .ssl .TlsStrategy ;
51+ import org .apache .hc .core5 .http .protocol .HttpContext ;
52+ import org .apache .hc .core5 .io .CloseMode ;
53+ import org .apache .hc .core5 .net .NamedEndpoint ;
54+ import org .apache .hc .core5 .net .URIAuthority ;
55+ import org .apache .hc .core5 .reactor .IOEventHandler ;
56+ import org .apache .hc .core5 .reactor .IOEventHandlerFactory ;
57+ import org .apache .hc .core5 .reactor .IOSession ;
58+ import org .apache .hc .core5 .reactor .ProtocolIOSession ;
59+ import org .apache .hc .core5 .reactor .ssl .TransportSecurityLayer ;
60+ import org .apache .hc .core5 .util .Timeout ;
61+
62+ /**
63+ * Exchange handler that establishes CONNECT and then exposes the tunnel stream as a ProtocolIOSession.
64+ */
65+ final class H2OverH2TunnelExchangeHandler implements AsyncClientExchangeHandler {
66+
67+ private final IOSession physicalSession ;
68+ private final NamedEndpoint targetEndpoint ;
69+ private final Timeout connectTimeout ;
70+ private final boolean secure ;
71+ private final TlsStrategy tlsStrategy ;
72+ private final IOEventHandlerFactory protocolStarter ;
73+ private final FutureCallback <ProtocolIOSession > callback ;
74+
75+ private final AtomicBoolean done ;
76+
77+ private volatile DataStreamChannel dataChannel ;
78+ private volatile CapacityChannel capacityChannel ;
79+ private volatile StreamControl streamControl ;
80+ private volatile H2TunnelProtocolIOSession tunnelSession ;
81+
82+ H2OverH2TunnelExchangeHandler (
83+ final IOSession physicalSession ,
84+ final NamedEndpoint targetEndpoint ,
85+ final Timeout connectTimeout ,
86+ final boolean secure ,
87+ final TlsStrategy tlsStrategy ,
88+ final IOEventHandlerFactory protocolStarter ,
89+ final FutureCallback <ProtocolIOSession > callback ) {
90+ this .physicalSession = physicalSession ;
91+ this .targetEndpoint = targetEndpoint ;
92+ this .connectTimeout = connectTimeout ;
93+ this .secure = secure ;
94+ this .tlsStrategy = tlsStrategy ;
95+ this .protocolStarter = protocolStarter ;
96+ this .callback = callback ;
97+ this .done = new AtomicBoolean (false );
98+ }
99+
100+ void initiated (final StreamControl streamControl ) {
101+ this .streamControl = streamControl ;
102+ final H2TunnelProtocolIOSession tunnel = this .tunnelSession ;
103+ if (tunnel != null ) {
104+ tunnel .bindStreamControl (streamControl );
105+ }
106+ }
107+
108+ @ Override
109+ public void releaseResources () {
110+ }
111+
112+ @ Override
113+ public void failed (final Exception cause ) {
114+ fail (cause );
115+ }
116+
117+ @ Override
118+ public void cancel () {
119+ fail (new ConnectionClosedException ("Tunnel setup cancelled" ));
120+ }
121+
122+ @ Override
123+ public void produceRequest (final RequestChannel requestChannel , final HttpContext context ) throws HttpException , IOException {
124+ final HttpRequest connectRequest = new BasicHttpRequest (Method .CONNECT .name (), (String ) null );
125+ connectRequest .setAuthority (new URIAuthority (targetEndpoint ));
126+ requestChannel .sendRequest (connectRequest , new BasicEntityDetails (-1 , null ), context );
127+ }
128+
129+ @ Override
130+ public int available () {
131+ final H2TunnelProtocolIOSession tunnel = this .tunnelSession ;
132+ return tunnel != null ? tunnel .available () : 0 ;
133+ }
134+
135+ @ Override
136+ public void produce (final DataStreamChannel channel ) throws IOException {
137+ this .dataChannel = channel ;
138+ final H2TunnelProtocolIOSession tunnel = this .tunnelSession ;
139+ if (tunnel != null ) {
140+ tunnel .attachChannel (channel );
141+ tunnel .onOutputReady ();
142+ }
143+ }
144+
145+ @ Override
146+ public void consumeInformation (final HttpResponse response , final HttpContext context ) {
147+ }
148+
149+ @ Override
150+ public void consumeResponse (
151+ final HttpResponse response ,
152+ final EntityDetails entityDetails ,
153+ final HttpContext context ) throws HttpException , IOException {
154+
155+ final int status = response .getCode ();
156+ if (status < 200 || status >= 300 ) {
157+ throw new HttpException ("Tunnel refused: " + new StatusLine (response ));
158+ }
159+
160+ if (entityDetails == null ) {
161+ throw new HttpException ("CONNECT response does not provide a tunneled data stream" );
162+ }
163+
164+ if (this .tunnelSession != null ) {
165+ return ;
166+ }
167+
168+ final H2TunnelProtocolIOSession tunnel =
169+ new H2TunnelProtocolIOSession (physicalSession , targetEndpoint , connectTimeout , streamControl );
170+
171+ final DataStreamChannel currentChannel = this .dataChannel ;
172+ if (currentChannel != null ) {
173+ tunnel .attachChannel (currentChannel );
174+ }
175+ final CapacityChannel currentCapacity = this .capacityChannel ;
176+ if (currentCapacity != null ) {
177+ tunnel .updateCapacityChannel (currentCapacity );
178+ }
179+ this .tunnelSession = tunnel ;
180+
181+ if (secure ) {
182+ tlsStrategy .upgrade (
183+ tunnel ,
184+ targetEndpoint ,
185+ null ,
186+ connectTimeout ,
187+ new FutureCallback <TransportSecurityLayer >() {
188+
189+ @ Override
190+ public void completed (final TransportSecurityLayer transportSecurityLayer ) {
191+ try {
192+ startProtocol (tunnel );
193+ complete (tunnel );
194+ } catch (final Exception ex ) {
195+ fail (ex );
196+ }
197+ }
198+
199+ @ Override
200+ public void failed (final Exception ex ) {
201+ fail (ex );
202+ }
203+
204+ @ Override
205+ public void cancelled () {
206+ fail (new ConnectionClosedException ("Tunnel TLS upgrade cancelled" ));
207+ }
208+ });
209+ } else {
210+ startProtocol (tunnel );
211+ complete (tunnel );
212+ }
213+ }
214+
215+ private void startProtocol (final H2TunnelProtocolIOSession tunnel ) throws IOException {
216+ if (protocolStarter == null ) {
217+ return ;
218+ }
219+ final IOEventHandler protocolHandler = protocolStarter .createHandler (tunnel , null );
220+ tunnel .upgrade (protocolHandler );
221+ protocolHandler .connected (tunnel );
222+ }
223+
224+ @ Override
225+ public void updateCapacity (final CapacityChannel capacityChannel ) throws IOException {
226+ this .capacityChannel = capacityChannel ;
227+ final H2TunnelProtocolIOSession tunnel = this .tunnelSession ;
228+ if (tunnel != null ) {
229+ tunnel .updateCapacityChannel (capacityChannel );
230+ }
231+ }
232+
233+ @ Override
234+ public void consume (final ByteBuffer src ) throws IOException {
235+ final H2TunnelProtocolIOSession tunnel = this .tunnelSession ;
236+ if (tunnel != null && src != null && src .hasRemaining ()) {
237+ tunnel .onInput (src );
238+ }
239+ }
240+
241+ @ Override
242+ public void streamEnd (final List <? extends Header > trailers ) {
243+ final H2TunnelProtocolIOSession tunnel = this .tunnelSession ;
244+ if (tunnel != null ) {
245+ tunnel .onRemoteStreamEnd ();
246+ } else {
247+ closeTransport (CloseMode .GRACEFUL );
248+ }
249+ if (done .compareAndSet (false , true ) && callback != null ) {
250+ callback .failed (new ConnectionClosedException ("Tunnel stream closed" ));
251+ }
252+ }
253+
254+ private void closeTransport (final CloseMode closeMode ) {
255+ final H2TunnelProtocolIOSession tunnel = this .tunnelSession ;
256+ if (tunnel != null ) {
257+ tunnel .close (closeMode );
258+ return ;
259+ }
260+ final StreamControl currentStreamControl = this .streamControl ;
261+ if (currentStreamControl != null ) {
262+ currentStreamControl .cancel ();
263+ }
264+ }
265+
266+ private void fail (final Exception cause ) {
267+ closeTransport (CloseMode .IMMEDIATE );
268+ if (done .compareAndSet (false , true ) && callback != null ) {
269+ callback .failed (cause );
270+ }
271+ }
272+
273+ private void complete (final H2TunnelProtocolIOSession tunnel ) {
274+ if (done .compareAndSet (false , true ) && callback != null ) {
275+ callback .completed (tunnel );
276+ }
277+ }
278+ }
0 commit comments