Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) {
_transferLifeCycle = transferLifeCycle;
}

protected TransferLifeCycle getTransferLifeCycle() {
return _transferLifeCycle;
}

@Override
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
CellPath pathToDoor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Splitter;
import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.PoolIoFileMessage;
import diskCacheV111.vehicles.ProtocolInfo;
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo;
Expand Down Expand Up @@ -58,8 +59,11 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpRequestExecutor;
import org.dcache.pool.movers.Mover;
import org.dcache.pool.movers.MoverProtocol;
import org.dcache.pool.movers.MoverProtocolMover;
import org.dcache.pool.movers.RemoteHttpDataTransferProtocol;
import org.dcache.pool.repository.ReplicaDescriptor;
import org.dcache.security.trust.AggregateX509TrustManager;
import org.dcache.util.Version;
import org.slf4j.Logger;
Expand All @@ -69,6 +73,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static dmg.util.Exceptions.meaningfulMessage;

import dmg.cells.nucleus.CellPath;

public class RemoteHttpTransferService extends SecureRemoteTransferService {

private static final Logger LOGGER = LoggerFactory.getLogger(RemoteHttpTransferService.class);
Expand Down Expand Up @@ -128,6 +134,15 @@ public HttpUriRequest getRedirect(final HttpRequest request,
private X509TrustManager trustManager;
private CloseableHttpClient sharedClient;

@Override
public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
CellPath pathToDoor) throws CacheException {
Mover<?> mover = super.createMover(handle, message, pathToDoor);
MoverProtocolMover mpm = (MoverProtocolMover) mover;
((RemoteHttpDataTransferProtocol) mpm.getMover()).setSubject(message.getSubject());
return mover;
}

@Override
protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception {
if (!(info instanceof RemoteHttpDataTransferProtocolInfo)) {
Expand All @@ -145,7 +160,7 @@ protected MoverProtocol createMoverProtocol(ProtocolInfo info) throws Exception
SSLContext context = buildSSLContext(credential.getKeyManager());
CloseableHttpClient client = createClient(context);

return new RemoteHttpDataTransferProtocol(client) {
return new RemoteHttpDataTransferProtocol(client, getTransferLifeCycle()) {
@Override
protected void afterTransfer() {
super.afterTransfer();
Expand All @@ -159,7 +174,7 @@ protected void afterTransfer() {
}
}

return new RemoteHttpDataTransferProtocol(sharedClient);
return new RemoteHttpDataTransferProtocol(sharedClient, getTransferLifeCycle());
}

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import diskCacheV111.util.CacheException;
import diskCacheV111.util.ThirdPartyTransferFailedCacheException;
import diskCacheV111.vehicles.IpProtocolInfo;
import diskCacheV111.vehicles.ProtocolInfo;
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
import java.io.IOException;
Expand All @@ -39,6 +40,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.security.auth.Subject;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpInetConnection;
Expand Down Expand Up @@ -213,9 +215,18 @@ private enum HeaderFlags {
private Long _expectedTransferSize;

private InetSocketAddress _localEndpoint;
private final TransferLifeCycle _transferLifeCycle;
private Subject _subject;
private boolean _startMarkerSent;

public RemoteHttpDataTransferProtocol(CloseableHttpClient client) {
public RemoteHttpDataTransferProtocol(CloseableHttpClient client,
TransferLifeCycle transferLifeCycle) {
_client = requireNonNull(client);
_transferLifeCycle = requireNonNull(transferLifeCycle);
}

public void setSubject(Subject subject) {
_subject = subject;
}

private static void checkThat(boolean isOk, String message) throws CacheException {
Expand Down Expand Up @@ -540,6 +551,7 @@ private CloseableHttpResponse doGet(final RemoteHttpDataTransferProtocolInfo inf
CloseableHttpResponse response = _client.execute(get, context);

_localEndpoint = localAddress().orElse(null);
startFlowMarker();

boolean isSuccessful = false;
try {
Expand Down Expand Up @@ -605,6 +617,7 @@ private void sendFile(RemoteHttpDataTransferProtocolInfo info)

try (CloseableHttpResponse response = _client.execute(put, context)) {
_localEndpoint = localAddress().orElse(null);
startFlowMarker();
StatusLine status = response.getStatusLine();
switch (status.getStatusCode()) {
case 200: /* OK (not actually a valid response from PUT) */
Expand Down Expand Up @@ -981,6 +994,26 @@ public Long getBytesExpected() {
return _expectedTransferSize;
}

private void startFlowMarker() {
if (_startMarkerSent || _localEndpoint == null || _subject == null) {
return;
}

ProtocolInfo protocolInfo = _channel.getProtocolInfo();
if (!(protocolInfo instanceof IpProtocolInfo ipInfo)) {
return;
}

InetSocketAddress remoteEndpoint = ipInfo.getSocketAddress();
if (remoteEndpoint == null) {
return;
}

_transferLifeCycle.onStart(remoteEndpoint, _localEndpoint,
protocolInfo, _subject);
_startMarkerSent = true;
}

@Override
public Optional<InetSocketAddress> getLocalEndpoint() {
return Optional.ofNullable(_localEndpoint);
Expand Down
Loading