-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathfeeds.py
More file actions
260 lines (233 loc) · 9.36 KB
/
feeds.py
File metadata and controls
260 lines (233 loc) · 9.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
from typing import Optional
from beanie import PydanticObjectId
from beanie.operators import Or, RegEx
from fastapi import APIRouter, Depends, HTTPException
from pika.adapters.blocking_connection import BlockingChannel
import aio_pika
from aio_pika.abc import AbstractChannel
from app.deps.authorization_deps import FeedAuthorization, ListenerAuthorization
from app.keycloak_auth import get_current_user, get_current_username
from app.models.feeds import FeedDB, FeedIn, FeedOut
from app.models.files import FileOut
from app.models.groups import GroupDB
from app.models.listeners import EventListenerDB, FeedListener
from app.models.pages import Paged, _construct_page_metadata, _get_page_query
from app.models.users import UserOut
from app.rabbitmq.listeners import submit_file_job
from app.routers.authentication import get_admin, get_admin_mode
from app.search.connect import check_search_result
router = APIRouter()
# TODO: Move this to MongoDB middle layer
async def disassociate_listener_db(
feed_id: str, listener_id: str, allows: bool = Depends(FeedAuthorization())
):
"""Remove a specific Event Listener from a feed. Does not delete either resource, just removes relationship.
This actually performs the database operations, and can be used by any endpoints that need this functionality.
"""
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
new_listeners = []
for feed_listener in feed.listeners:
if feed_listener.listener_id != PydanticObjectId(listener_id):
new_listeners.append(feed_listener)
feed.listeners = new_listeners
await feed.save()
async def check_feed_listeners(
es_client,
file_out: FileOut,
user: UserOut,
rabbitmq_client: AbstractChannel,
):
"""Automatically submit new file to listeners on feeds that fit the search criteria."""
listener_ids_found = []
async for feed in FeedDB.find(FeedDB.listeners.automatic == True): # noqa: E712
# Verify whether resource_id is found when searching the specified criteria
feed_match = check_search_result(es_client, file_out, feed.search)
if feed_match:
for listener in feed.listeners:
if listener.automatic:
listener_ids_found.append(listener.listener_id)
for targ_listener in listener_ids_found:
if (
listener_info := await EventListenerDB.get(PydanticObjectId(targ_listener))
) is not None:
if (
listener_info.access is not None
and not user.admin
and not user.admin_mode
):
dataset_id = file_out.dataset_id
user_id = user.email
group_q = await GroupDB.find(
Or(GroupDB.creator == user_id, GroupDB.users.email == user_id),
).to_list()
user_groups = [g.id for g in group_q]
valid_submission = (
(listener_info.access.owner == user_id)
or (user.email in listener_info.access.users)
or (dataset_id in listener_info.access.datasets)
or (not set(user_groups).isdisjoint(listener_info.access.groups))
)
if not valid_submission:
continue
await submit_file_job(
file_out,
listener_info.name, # routing_key
{}, # parameters
user,
rabbitmq_client,
)
return listener_ids_found
@router.post("", response_model=FeedOut)
async def save_feed(
feed_in: FeedIn,
user=Depends(get_current_username),
):
"""Create a new Feed (i.e. saved search) in the database."""
feed = FeedDB(**feed_in.dict(), creator=user)
await feed.insert()
return feed.dict()
@router.put("/{feed_id}", response_model=FeedOut)
async def edit_feed(
feed_id: str,
feed_in: FeedIn,
user=Depends(get_current_username),
allow: bool = Depends(FeedAuthorization()),
):
"""Update the information about an existing Feed..
Arguments:
feed_id -- UUID of the feed to be udpated
feed_in -- JSON object including updated information
"""
feed = await FeedDB.get(PydanticObjectId(feed_id))
if feed:
# TODO: Refactor this with permissions checks etc.
feed_update = feed_in.dict()
if (
not feed_update["name"]
or not feed_update["search"]
or len(feed_update["listeners"]) == 0
):
raise HTTPException(
status_code=400,
detail="Feed name/search/listeners can't be null or empty",
)
return
feed.description = feed_update["description"]
feed.name = feed_update["name"]
feed.search = feed_update["search"]
feed.listeners = feed_update["listeners"]
try:
await feed.save()
return feed.dict()
except Exception as e:
raise HTTPException(status_code=500, detail=e.args[0])
raise HTTPException(status_code=404, detail=f"listener {feed_id} not found")
@router.get("", response_model=Paged)
async def get_feeds(
searchTerm: Optional[str] = None,
user=Depends(get_current_user),
skip: int = 0,
limit: int = 10,
admin=Depends(get_admin),
admin_mode=Depends(get_admin_mode),
):
"""Fetch all existing Feeds."""
criteria_list = []
if not admin or not admin_mode:
criteria_list.append(FeedDB.creator == user.email)
if searchTerm is not None:
criteria_list.append(
Or(
RegEx(field=FeedDB.name, pattern=searchTerm, options="i"),
RegEx(field=FeedDB.description, pattern=searchTerm, options="i"),
)
)
feeds_and_count = (
await FeedDB.find(
*criteria_list,
)
.aggregate(
[_get_page_query(skip, limit, sort_field="created", ascending=False)],
)
.to_list()
)
page_metadata = _construct_page_metadata(feeds_and_count, skip, limit)
page = Paged(
metadata=page_metadata,
data=[
FeedOut(id=item.pop("_id"), **item) for item in feeds_and_count[0]["data"]
],
)
return page.dict()
@router.get("/{feed_id}", response_model=FeedOut)
async def get_feed(
feed_id: str,
user=Depends(get_current_user),
allow: bool = Depends(FeedAuthorization()),
):
"""Fetch an existing saved search Feed."""
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
return feed.dict()
else:
raise HTTPException(status_code=404, detail=f"Feed {feed_id} not found")
@router.delete("/{feed_id}", response_model=FeedOut)
async def delete_feed(
feed_id: str,
user=Depends(get_current_user),
allow: bool = Depends(FeedAuthorization()),
):
"""Delete an existing saved search Feed."""
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
await feed.delete()
return feed.dict()
raise HTTPException(status_code=404, detail=f"Feed {feed_id} not found")
@router.post("/{feed_id}/listeners", response_model=FeedOut)
async def associate_listener(
feed_id: str,
listener: FeedListener,
username=Depends(get_current_username),
admin=Depends(get_admin),
enable_admin: bool = False,
admin_mode=Depends(get_admin_mode),
allow: bool = Depends(FeedAuthorization()),
):
"""Associate an existing Event Listener with a Feed, e.g. so it will be triggered on new Feed results.
Arguments:
feed_id: Feed that should have new Event Listener associated
listener: JSON object with "listener_id" field and "automatic" bool field (whether to auto-trigger on new data)
"""
# Because we have FeedListener rather than listener_id here, we can't use injection for this
allow = ListenerAuthorization().__call__(
listener.listener_id, username, admin_mode, admin
)
if not allow:
raise HTTPException(
status_code=403,
detail=f"User `{username} does not have permission on listener `{listener.listener_id}`",
)
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
if (
await EventListenerDB.get(PydanticObjectId(listener.listener_id))
) is not None:
feed.listeners.append(listener)
await feed.save()
return feed.dict()
raise HTTPException(
status_code=404, detail=f"Listener {listener.listener_id} not found"
)
raise HTTPException(status_code=404, detail=f"feed {feed_id} not found")
@router.delete("/{feed_id}/listeners/{listener_id}")
async def disassociate_listener(
feed_id: str, listener_id: str, allow: bool = Depends(ListenerAuthorization())
):
"""Disassociate an Event Listener from a Feed.
Arguments:
feed_id: UUID of search Feed that is being changed
listener_id: UUID of Event Listener that should be disassociated
"""
if (await FeedDB.get(PydanticObjectId(feed_id))) is not None:
if (await EventListenerDB.get(PydanticObjectId(listener_id))) is not None:
await disassociate_listener_db(feed_id, listener_id)
return {"disassociated": listener_id}
raise HTTPException(status_code=404, detail=f"Listener {listener_id} not found")
raise HTTPException(status_code=404, detail=f"Feed {feed_id} not found")