Skip to content

Commit 873c491

Browse files
committed
pool: emit firefly onStart marker from RemoteHttpDataTransferProtocol
Move the firefly flow-start marker emission from AbstractMoverProtocolTransferService.MoverTask into RemoteHttpDataTransferProtocol, where the actual HTTP connection's local socket address (correct IP and port) is available. Previously, the start marker was emitted in MoverTask.run() before the HTTP connection was established, using NetworkUtils.getLocalAddress() to derive the local endpoint. This produced the wrong port (0) and could select the wrong interface on multi-homed hosts. Now, RemoteHttpTransferService passes the TransferLifeCycle to RemoteHttpDataTransferProtocol at construction time and sets the Subject via the overridden createMover(). The protocol calls onStart() in doGet() and sendFile() immediately after capturing the local endpoint from HttpInetConnection, which provides the real bound address and port. Signed-off-by: Shawn McKee <smckee@umich.edu>
1 parent 5fe6ec9 commit 873c491

3 files changed

Lines changed: 62 additions & 2 deletions

File tree

modules/dcache/src/main/java/org/dcache/pool/classic/AbstractMoverProtocolTransferService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) {
7373
_transferLifeCycle = transferLifeCycle;
7474
}
7575

76+
protected TransferLifeCycle getTransferLifeCycle() {
77+
return _transferLifeCycle;
78+
}
79+
7680
@Override
7781
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
7882
CellPath pathToDoor)

modules/dcache/src/main/java/org/dcache/pool/classic/RemoteHttpTransferService.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.base.Splitter;
2121
import diskCacheV111.util.CacheException;
22+
import diskCacheV111.vehicles.PoolIoFileMessage;
2223
import diskCacheV111.vehicles.ProtocolInfo;
2324
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
2425
import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo;
@@ -58,8 +59,11 @@
5859
import org.apache.http.impl.client.HttpClients;
5960
import org.apache.http.protocol.HttpContext;
6061
import org.apache.http.protocol.HttpRequestExecutor;
62+
import org.dcache.pool.movers.Mover;
6163
import org.dcache.pool.movers.MoverProtocol;
64+
import org.dcache.pool.movers.MoverProtocolMover;
6265
import org.dcache.pool.movers.RemoteHttpDataTransferProtocol;
66+
import org.dcache.pool.repository.ReplicaDescriptor;
6367
import org.dcache.security.trust.AggregateX509TrustManager;
6468
import org.dcache.util.Version;
6569
import org.slf4j.Logger;
@@ -69,6 +73,8 @@
6973
import static com.google.common.base.Preconditions.checkArgument;
7074
import static dmg.util.Exceptions.meaningfulMessage;
7175

76+
import dmg.cells.nucleus.CellPath;
77+
7278
public class RemoteHttpTransferService extends SecureRemoteTransferService {
7379

7480
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteHttpTransferService.class);
@@ -128,6 +134,17 @@ public HttpUriRequest getRedirect(final HttpRequest request,
128134
private X509TrustManager trustManager;
129135
private CloseableHttpClient sharedClient;
130136

137+
@Override
138+
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
139+
CellPath pathToDoor) throws CacheException {
140+
Mover<?> mover = super.createMover(handle, message, pathToDoor);
141+
if (mover instanceof MoverProtocolMover mpm
142+
&& mpm.getMover() instanceof RemoteHttpDataTransferProtocol rhtp) {
143+
rhtp.setSubject(message.getSubject());
144+
}
145+
return mover;
146+
}
147+
131148
@Override
132149
protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception {
133150
if (!(info instanceof RemoteHttpDataTransferProtocolInfo)) {
@@ -145,7 +162,7 @@ protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception
145162
SSLContext context = buildSSLContext(credential.getKeyManager());
146163
CloseableHttpClient client = createClient(context);
147164

148-
return new RemoteHttpDataTransferProtocol(client) {
165+
return new RemoteHttpDataTransferProtocol(client, getTransferLifeCycle()) {
149166
@Override
150167
protected void afterTransfer() {
151168
super.afterTransfer();
@@ -159,7 +176,7 @@ protected void afterTransfer() {
159176
}
160177
}
161178

162-
return new RemoteHttpDataTransferProtocol(sharedClient);
179+
return new RemoteHttpDataTransferProtocol(sharedClient, getTransferLifeCycle());
163180
}
164181

165182
@PostConstruct

modules/dcache/src/main/java/org/dcache/pool/movers/RemoteHttpDataTransferProtocol.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import diskCacheV111.util.CacheException;
1818
import diskCacheV111.util.ThirdPartyTransferFailedCacheException;
19+
import diskCacheV111.vehicles.IpProtocolInfo;
1920
import diskCacheV111.vehicles.ProtocolInfo;
2021
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
2122
import java.io.IOException;
@@ -38,7 +39,9 @@
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.function.Consumer;
4041
import java.util.stream.Collectors;
42+
import javax.annotation.Nullable;
4143
import javax.annotation.concurrent.GuardedBy;
44+
import javax.security.auth.Subject;
4245
import org.apache.http.Header;
4346
import org.apache.http.HttpEntity;
4447
import org.apache.http.HttpInetConnection;
@@ -213,9 +216,22 @@ private enum HeaderFlags {
213216
private Long _expectedTransferSize;
214217

215218
private InetSocketAddress _localEndpoint;
219+
private final TransferLifeCycle _transferLifeCycle;
220+
private Subject _subject;
221+
private boolean _startMarkerSent;
216222

217223
public RemoteHttpDataTransferProtocol(CloseableHttpClient client) {
224+
this(client, null);
225+
}
226+
227+
public RemoteHttpDataTransferProtocol(CloseableHttpClient client,
228+
@Nullable TransferLifeCycle transferLifeCycle) {
218229
_client = requireNonNull(client);
230+
_transferLifeCycle = transferLifeCycle;
231+
}
232+
233+
public void setSubject(Subject subject) {
234+
_subject = subject;
219235
}
220236

221237
private static void checkThat(boolean isOk, String message) throws CacheException {
@@ -540,6 +556,7 @@ private CloseableHttpResponse doGet(final RemoteHttpDataTransferProtocolInfo inf
540556
CloseableHttpResponse response = _client.execute(get, context);
541557

542558
_localEndpoint = localAddress().orElse(null);
559+
startFlowMarker();
543560

544561
boolean isSuccessful = false;
545562
try {
@@ -605,6 +622,7 @@ private void sendFile(RemoteHttpDataTransferProtocolInfo info)
605622

606623
try (CloseableHttpResponse response = _client.execute(put, context)) {
607624
_localEndpoint = localAddress().orElse(null);
625+
startFlowMarker();
608626
StatusLine status = response.getStatusLine();
609627
switch (status.getStatusCode()) {
610628
case 200: /* OK (not actually a valid response from PUT) */
@@ -981,6 +999,27 @@ public Long getBytesExpected() {
981999
return _expectedTransferSize;
9821000
}
9831001

1002+
private void startFlowMarker() {
1003+
if (_startMarkerSent || _transferLifeCycle == null
1004+
|| _localEndpoint == null || _subject == null) {
1005+
return;
1006+
}
1007+
1008+
ProtocolInfo protocolInfo = _channel.getProtocolInfo();
1009+
if (!(protocolInfo instanceof IpProtocolInfo ipInfo)) {
1010+
return;
1011+
}
1012+
1013+
InetSocketAddress remoteEndpoint = ipInfo.getSocketAddress();
1014+
if (remoteEndpoint == null) {
1015+
return;
1016+
}
1017+
1018+
_transferLifeCycle.onStart(remoteEndpoint, _localEndpoint,
1019+
protocolInfo, _subject);
1020+
_startMarkerSent = true;
1021+
}
1022+
9841023
@Override
9851024
public Optional<InetSocketAddress> getLocalEndpoint() {
9861025
return Optional.ofNullable(_localEndpoint);

0 commit comments

Comments
 (0)