-
Notifications
You must be signed in to change notification settings - Fork 72
Expand file tree
/
Copy pathconnect.py
More file actions
223 lines (198 loc) · 8.54 KB
/
connect.py
File metadata and controls
223 lines (198 loc) · 8.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import atexit
import os
import logging
import sys
import tempfile
from cellprofiler_core.preferences import config_read_typed, get_headless, get_temporary_directory
from cellprofiler_core.preferences import get_omero_server, get_omero_port, get_omero_user
import omero
from omero.gateway import BlitzGateway
import omero_user_token
LOGGER = logging.getLogger(__name__)
OMERO_CREDENTIAL_FILE = os.path.join(get_temporary_directory(), "OMERO_CP.token")
def login(e=None, server=None, token_path=None):
# Attempt to connect to the server, first using a token, then via GUI
CREDENTIALS.get_tokens(token_path)
if CREDENTIALS.tokens:
if server is None:
# URL didn't specify which server we want. Just try whichever token is available
server = list(CREDENTIALS.tokens.keys())[0]
connected = CREDENTIALS.try_token(server)
else:
connected = CREDENTIALS.client is not None
if get_headless():
if connected:
LOGGER.info(f"Connected to {CREDENTIALS.server}")
elif CREDENTIALS.try_temp_token():
connected = True
LOGGER.info(f"Connected to {CREDENTIALS.server}")
else:
LOGGER.warning("Failed to connect, was user token invalid?")
return connected
else:
from .gui import login_gui, configure_for_safe_shutdown
if not CREDENTIALS.bound:
configure_for_safe_shutdown()
CREDENTIALS.bound = True
login_gui(connected, server=None)
def get_temporary_dir():
temporary_directory = get_temporary_directory()
if not (
os.path.exists(temporary_directory) and os.access(temporary_directory, os.W_OK)
):
temporary_directory = tempfile.gettempdir()
return temporary_directory
def clear_temporary_file():
LOGGER.debug("Checking for OMERO credential file to delete")
if os.path.exists(OMERO_CREDENTIAL_FILE):
os.unlink(OMERO_CREDENTIAL_FILE)
LOGGER.debug(f"Cleared {OMERO_CREDENTIAL_FILE}")
if not get_headless():
if os.path.exists(OMERO_CREDENTIAL_FILE):
LOGGER.warning("Existing credential file was found")
# Main GUI process should clear any temporary tokens
atexit.register(clear_temporary_file)
class LoginHelper:
"""
This class stores our working set of OMERO credentials and connection objects.
It behaves as a singleton, so multiple OMERO-using plugins will share credentials.
"""
_instance = None
def __new__(cls):
# We only allow one instance of this class within CellProfiler
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self.server = get_omero_server()
self.port = get_omero_port()
self.username = get_omero_user()
self.passwd = ""
self.session_key = None
self.session = None
self.client = None
self.gateway = None
self.container_service = None
self.tokens = {}
# Whether we've already hooked into the main GUI frame
self.bound = False
# Any OMERO browser GUI which is connected to this object
self.browser_window = None
atexit.register(self.shutdown)
def get_gateway(self):
if self.client is None:
raise Exception("Client connection not initialised")
if self.gateway is None:
LOGGER.debug("Constructing BlitzGateway")
self.gateway = BlitzGateway(client_obj=self.client)
return self.gateway
def get_tokens(self, path=None):
# Load all tokens from omero_user_token
self.tokens.clear()
# Future versions of omero_user_token may support multiple tokens, so we code with that in mind.
# Check the reader setting which disables tokens.
tokens_enabled = config_read_typed(f"Reader.OMERO.allow_token", bool)
if tokens_enabled is not None and not tokens_enabled:
return
# User tokens sadly default to the home directory. This would override that location.
home_key = "USERPROFILE" if sys.platform == "win32" else "HOME"
py_home = os.environ.get(home_key, "")
if path is not None:
os.environ[home_key] = path
try:
LOGGER.info("Requesting token info")
token = omero_user_token.get_token()
server, port = token[token.find('@') + 1:].split(':')
port = int(port)
LOGGER.info("Connection to {}:{}".format(server, port))
session_key = token[:token.find('@')]
self.tokens[server] = (server, port, session_key)
except Exception:
LOGGER.error("Failed to get user token", exc_info=True)
if path is not None:
os.environ[home_key] = py_home
def try_token(self, address):
# Attempt to use an omero token to connect to a specific server
if address not in self.tokens:
LOGGER.error(f"Token {address} not found")
return False
else:
server, port, session_key = self.tokens[address]
return self.login(server=server, port=port, session_key=session_key)
def create_temp_token(self):
# Store a temporary OMERO token based on our active session
# This allows the workers to use that session in Analysis mode.
if self.client is None:
raise ValueError("Client not initialised, cannot make token")
if os.path.exists(OMERO_CREDENTIAL_FILE):
LOGGER.warning(f"Token already exists at {OMERO_CREDENTIAL_FILE}, overwriting")
os.unlink(OMERO_CREDENTIAL_FILE)
try:
token = f"{self.session_key}@{self.server}:{self.port}"
with open(OMERO_CREDENTIAL_FILE, 'w') as token_file:
token_file.write(token)
LOGGER.debug(f"Made temp token for {self.server}")
except:
LOGGER.error("Unable to write temporary token", exc_info=True)
def try_temp_token(self):
# Look for and attempt to connect to OMERO using a temporary token.
if not os.path.exists(OMERO_CREDENTIAL_FILE):
LOGGER.error(f"No temporary OMERO token found. Cannot connect to server.")
return False
with open(OMERO_CREDENTIAL_FILE, 'r') as token_path:
token = token_path.read().strip()
server, port = token[token.find('@') + 1:].split(':')
port = int(port)
session_key = token[:token.find('@')]
LOGGER.info(f"Using connection details for {self.server}")
return self.login(server=server, port=port, session_key=session_key)
def login(self, server=None, port=None, user=None, passwd=None, session_key=None):
# Attempt to connect to the server using provided connection credentials
self.client = omero.client(host=server, port=port)
if session_key is not None:
try:
self.session = self.client.joinSession(session_key)
self.client.enableKeepAlive(60)
self.session.detachOnDestroy()
self.server = server
self.port = port
self.session_key = session_key
except Exception as e:
LOGGER.error(f"Failed to join session, token may have expired: {e}")
self.client = None
self.session = None
return False
elif user is not None:
try:
self.session = self.client.createSession(
username=user, password=passwd)
self.client.enableKeepAlive(60)
self.session.detachOnDestroy()
self.session_key = self.client.getSessionId()
self.server = server
self.port = port
self.username = user
self.passwd = passwd
except Exception as e:
LOGGER.error(f"Failed to create session: {e}")
self.client = None
self.session = None
return False
else:
self.client = None
self.session = None
raise Exception(
"Not enough details to create a server connection.")
self.container_service = self.session.getContainerService()
return True
def handle_exit(self, e):
self.shutdown()
e.Skip()
def shutdown(self):
# Disconnect from the server
if self.client is not None:
try:
self.client.closeSession()
except Exception as e:
LOGGER.error("Failed to close OMERO session - ", e)
CREDENTIALS = LoginHelper()