-
Notifications
You must be signed in to change notification settings - Fork 93
Expand file tree
/
Copy pathutils.py
More file actions
4463 lines (3921 loc) · 187 KB
/
utils.py
File metadata and controls
4463 lines (3921 loc) · 187 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
#
# This file is part of WEKO3.
# Copyright (C) 2017 National Institute of Informatics.
#
# WEKO3 is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# WEKO3 is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with WEKO3; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
"""Module of weko-items-ui utils.."""
import copy
import csv
import json
import os
import re
import shutil
import sys
import tempfile
import traceback
from collections import OrderedDict
from datetime import date, datetime, timedelta
from io import StringIO
import bagit
import redis
from redis import sentinel
from elasticsearch.exceptions import NotFoundError
from elasticsearch import exceptions as es_exceptions
from flask import abort, current_app, flash, redirect, request, send_file, \
url_for,jsonify
from flask_babelex import gettext as _
from flask_login import current_user
from invenio_accounts.models import Role, userrole
from invenio_db import db
from invenio_i18n.ext import current_i18n
from invenio_indexer.api import RecordIndexer
from invenio_pidrelations.contrib.versioning import PIDVersioning
from invenio_pidrelations.models import PIDRelation
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_records.api import RecordBase
from invenio_accounts.models import User
from invenio_search import RecordsSearch
from invenio_stats.utils import QueryRankingHelper, QuerySearchReportHelper
from invenio_stats.views import QueryRecordViewCount as _QueryRecordViewCount
from invenio_stats.proxies import current_stats
from invenio_stats import config
#from invenio_stats.views import QueryRecordViewCount
from jsonschema import SchemaError, ValidationError
from simplekv.memory.redisstore import RedisStore
from sqlalchemy import MetaData, Table
from weko_deposit.api import WekoDeposit, WekoRecord
from weko_deposit.pidstore import get_record_without_version
from weko_index_tree.api import Indexes
from weko_index_tree.utils import check_index_permissions, get_index_id, \
get_user_roles
from weko_records.api import FeedbackMailList, ItemTypes, Mapping
from weko_records.serializers.utils import get_item_type_name
from weko_records.utils import replace_fqdn_of_file_metadata
from weko_records_ui.permissions import check_created_id, \
check_file_download_permission, check_publish_status
from weko_redis.redis import RedisConnection
from weko_search_ui.config import WEKO_IMPORT_DOI_TYPE
from weko_search_ui.query import item_search_factory
from weko_search_ui.utils import check_sub_item_is_system, \
get_root_item_option, get_sub_item_option
from weko_schema_ui.models import PublishStatus
from weko_user_profiles import UserProfile
from weko_workflow.api import WorkActivity
from weko_workflow.config import IDENTIFIER_GRANT_LIST, \
WEKO_SERVER_CNRI_HOST_LINK
from weko_workflow.models import ActionStatusPolicy as ASP
from weko_workflow.models import Activity, FlowAction, FlowActionRole, \
FlowDefine
from weko_workflow.utils import IdentifierHandle
def get_list_username():
"""Get list username.
Query database to get all available username
return: list of username
TODO:
"""
current_user_id = current_user.get_id()
current_app.logger.debug("current_user:{}".format(current_user))
from weko_user_profiles.models import UserProfile
users = UserProfile.query.filter(UserProfile.user_id != current_user_id).all()
result = list()
for user in users:
username = user.get_username
if username:
result.append(username)
return result
def get_list_email():
"""Get list email.
Query database to get all available email
return: list of email
"""
current_user_id = current_user.get_id()
result = list()
users = User.query.filter(User.id != current_user_id).all()
for user in users:
email = user.email
if email:
result.append(email)
# try:
# metadata = MetaData()
# metadata.reflect(bind=db.engine)
# table_name = 'accounts_user'
# user_table = Table(table_name, metadata)
# record = db.session.query(user_table)
# data = record.all()
# for item in data:
# if not int(current_user_id) == item[0]:
# result.append(item[1])
# except Exception as e:
# result = str(e)
return result
def get_user_info_by_username(username):
"""Get user information by username.
Query database to get user id by using username
Get email from database using user id
Pack response data: user id, user name, email
parameter:
username: The username
return: response pack
"""
result = dict()
try:
user = UserProfile.get_by_username(username)
user_id = user.user_id
metadata = MetaData()
metadata.reflect(bind=db.engine)
table_name = 'accounts_user'
user_table = Table(table_name, metadata)
record = db.session.query(user_table)
data = record.all()
for item in data:
if item[0] == user_id:
result['username'] = username
result['user_id'] = user_id
result['email'] = item[1]
return result
return None
except Exception as e:
result['error'] = str(e)
def validate_user(username, email):
"""Validate user information.
Get user id from database using username
Get user id from database using email
Compare 2 user id to validate user information
Pack responde data:
results: user information (username, user id, email)
validation: username is match with email or not
error: null if no error occurs
param:
username: The username
email: The email
return: response data
"""
result = {
'results': '',
'validation': False,
'error': ''
}
try:
user = UserProfile.get_by_username(username)
user_id = 0
metadata = MetaData()
metadata.reflect(bind=db.engine)
table_name = 'accounts_user'
user_table = Table(table_name, metadata)
record = db.session.query(user_table)
data = record.all()
for item in data:
if item[1] == email:
user_id = item[0]
break
if user.user_id == user_id:
user_info = dict()
user_info['username'] = username
user_info['user_id'] = user_id
user_info['email'] = email
result['results'] = user_info
result['validation'] = True
return result
except Exception as e:
result['error'] = str(e)
return result
def get_user_info_by_email(email):
"""
Get user information by email.
Query database to get user id by using email
Get username from database using user id
Pack response data: user id, user name, email
parameter:
email: The email
return: response
"""
result = dict()
try:
metadata = MetaData()
metadata.reflect(bind=db.engine)
table_name = 'accounts_user'
user_table = Table(table_name, metadata)
record = db.session.query(user_table)
data = record.all()
for item in data:
if item[1] == email:
user = UserProfile.get_by_userid(item[0])
if user is None:
result['username'] = ""
else:
result['username'] = user.get_username
result['user_id'] = item[0]
result['email'] = email
return result
return None
except Exception as e:
result['error'] = str(e)
def get_user_information(user_id):
"""
Get user information user_id.
Query database to get email by using user_id
Get username from database using user id
Pack response data: user id, user name, email
parameter:
user_id: The user_id
return: response
"""
result = {
'username': '',
'email': '',
'fullname': '',
}
user_info = UserProfile.get_by_userid(user_id)
if user_info is not None:
result['username'] = user_info.get_username
result['fullname'] = user_info.fullname
metadata = MetaData()
metadata.reflect(bind=db.engine)
table_name = 'accounts_user'
user_table = Table(table_name, metadata)
record = db.session.query(user_table)
data = record.all()
for item in data:
if item[0] == user_id:
result['email'] = item[1]
return result
return result
def get_user_permission(user_id):
"""
Get user permission user_id.
Compare current id with id of current user
parameter:
user_id: The user_id
return: true if current id is the same with id of current user.
If not return false
"""
current_id = current_user.get_id()
if current_id is None:
return False
if str(user_id) == current_id:
return True
return False
def get_current_user():
"""
Get user id of user currently login.
parameter:
return: current_id
"""
current_id = current_user.get_id()
return current_id
def find_hidden_items(item_id_list, idx_paths=None, check_creator_permission=False, has_permission_indexes=[]):
"""
Find items that should not be visible by the current user.
parameter:
item_id_list: list of uuid of items to be checked.
idx_paths: List of index paths.
check_creator_permission: List of index paths.
return: List of items ID that the user cannot access.
"""
if not item_id_list:
return []
# Check if is admin
roles = get_user_roles()
if roles[0]:
return []
has_permission_index = []
no_permission_index = []
hidden_list = []
for record in WekoRecord.get_records(item_id_list):
if check_creator_permission:
# Check if user is owner of the item
if check_created_id(record):
continue
# Check if item are public
is_public = check_publish_status(record)
else:
is_public = True
# Check if indices are public
has_index_permission = False
for idx in record.navi:
if has_permission_indexes:
if str(idx.cid) in has_permission_indexes:
has_index_permission = True
break
else:
if str(idx.cid) in has_permission_index:
has_index_permission = True
break
elif idx.cid in no_permission_index:
continue
if check_index_permissions(None, idx.cid) \
and (not idx_paths or idx.path in idx_paths):
has_permission_index.append(idx.cid)
has_index_permission = True
break
else:
no_permission_index.append(idx.cid)
if is_public and has_index_permission:
continue
hidden_list.append(str(record.id))
return hidden_list
def get_permission_record(rank_type, es_data, display_rank, has_permission_indexes):
"""
Find items that should be visible by the current user.
parameter:
rank_type: Ranking Type. e.g. 'most_reviewed_items' or 'most_downloaded_items' or 'created_most_items_user' or 'most_searched_keywords' or 'new_items'
es_data: List of ranking data.
display_rank: Number of ranking display.
has_permission_indexes: List of can be view by the current user.
return: List of ranking data that the user can access.
"""
if not es_data:
return []
result = []
roles = get_user_roles()
date_list = []
for data in es_data:
if len(result) == display_rank:
break
add_flag = False
pid_value = data['key'] \
if 'key' in data \
else data.get('_item_metadata').get('control_number')
try:
record = WekoRecord.get_record_by_pid(pid_value)
if (record.pid and record.pid.status == PIDStatus.DELETED) or \
('publish_status' in record and record['publish_status'] == PublishStatus.DELETE.value):
continue
if roles[0]:
add_flag = True
else:
is_public = roles[0] or check_created_id(record) or check_publish_status(record)
has_index_permission = False
for idx in record.navi:
if str(idx.cid) in has_permission_indexes:
has_index_permission = True
break
add_flag = is_public and has_index_permission
except PIDDoesNotExistError:
# do not add deleted items into ranking list.
add_flag = False
current_app.logger.debug("PID {} does not exist.".format(pid_value))
if add_flag:
if rank_type == 'new_items':
if data['publish_date'] not in date_list:
ranking_data = parse_ranking_results(
rank_type,
pid_value,
record=record,
date=data['publish_date']
)
date_list.append(data['publish_date'])
else:
ranking_data = parse_ranking_results(
rank_type,
pid_value,
record=record
)
else:
ranking_data = parse_ranking_results(
rank_type,
pid_value,
count=data['count'],
rank=len(result) + 1,
record=record
)
result.append(ranking_data)
return result
def parse_ranking_results(rank_type,
key,
count=-1,
rank=-1,
record=None,
date=''):
"""
Parse the raw stats results to be usable by the view.
parameter:
rank_type: Ranking Type. e.g. 'most_reviewed_items' or 'most_downloaded_items' or 'created_most_items_user' or 'most_searched_keywords' or 'new_items'
key: key value. e.g. pid_value or search_key or user_id ...
count: Count of rank data. Defaults to -1.
rank: Rank number of rank data. Defaults to -1.
record: WekoRecord object. Defaults to 'None'.
date: Date of new item. Defaults to ''. e.g. '2022-10-01'
Returns:
Rank data.
e.g. {'rank': 1, 'count': 100, 'title': 'ff', 'url': '../records/3'} or {'date': '2022-08-18', 'title': '2', 'url': '../records/1'}
"""
if rank_type in ['most_reviewed_items', 'most_downloaded_items', 'new_items']:
url = '../records/{0}'.format(key)
title = record.get_titles
elif rank_type == 'most_searched_keywords':
url = '../search?page=1&size=20&search_type=1&q={0}'.format(key)
title = key
elif rank_type == 'created_most_items_user':
url = None
user_info = None
if key:
user_info = UserProfile.get_by_userid(key)
title = '{}'.format(user_info.username) if user_info else 'None'
if rank == -1:
if date:
res = dict(
key=key,
date=date,
title=title,
url=url
)
else:
res = dict(
key=key,
title=title,
url=url
)
else:
res = dict(
key=key,
rank=rank,
count=count,
title=title,
url=url
)
return res
def parse_ranking_new_items(result_data):
"""Parse ranking new items.
:param result_data: result data
"""
data_list = list()
if not result_data or not result_data.get('hits') \
or not result_data.get('hits').get('hits'):
return data_list
for item_data in result_data.get('hits').get('hits'):
item_created = item_data.get('_source')
data = dict()
data['record_id'] = item_data.get('_id')
data['create_date'] = item_created.get('publish_date', '')
data['pid_value'] = item_created.get('control_number')
meta_data = item_created.get('_item_metadata')
item_title = ''
if isinstance(meta_data, dict):
item_title = meta_data.get('item_title')
data['record_name'] = item_title
data_list.append(data)
return data_list
def parse_ranking_record(result_data):
"""Parse ranking record.
:param result_data: result data
"""
data_list = list()
if not result_data or not result_data.get('hits') \
or not result_data.get('hits').get('hits'):
return data_list
for item_data in result_data.get('hits').get('hits'):
if item_data.get('_source', {}).get('control_number'):
data_list.append(item_data.get('_source').get('control_number'))
return data_list
def validate_form_input_data(
result: dict, item_id: str, data: dict):
"""Validate input data.
:param result: result dictionary.
:param item_id: item type identifier.
:param data: form input data
:param activity_id: activity id
"""
# current_app.logger.error("result: {}".format(result))
# current_app.logger.error("item_id: {}".format(item_id))
# current_app.logger.error("data: {}".format(data))
def _get_jpcoar_mapping_value_mapping(key, item_type_mapping):
ret = {}
if key in item_type_mapping and "jpcoar_mapping" in item_type_mapping[key]:
ret = item_type_mapping[key]["jpcoar_mapping"]
return ret
def _get_keys_that_exist_from_data(given_data: dict) -> list:
ret = []
if given_data is not None:
if type(given_data) is dict:
ret = list(given_data.keys())
elif type(given_data) is str:
ret = list(given_data)
return ret
# Get langauge key - DONE
# Iterate data for validating the value -
# Check each item and raise an error for duplicate langauge value -
item_type = ItemTypes.get_by_id(item_id)
json_schema = item_type.schema.copy()
data_keys = list(data.keys())
item_type_mapping = Mapping.get_record(item_type.id)
item_type_mapping_keys = list(item_type_mapping.keys())
is_error = False
items_to_be_checked_for_duplication: list = []
items_to_be_checked_for_ja_kana: list = []
items_to_be_checked_for_ja_latn: list = []
items_to_be_checked_for_date_format: list = []
# TITLE VARIABLES
mapping_title_item_key: str = ""
mapping_title_language_key: tuple = ("_","_")
# ALTERNATIVE TITLE VARIABLES
mapping_alternative_title_item_key: str = ""
mapping_alternative_title_language_key: tuple = ("_","_")
# CREATOR VARIABLES
mapping_creator_item_key: str = ""
mapping_creator_given_name_language_key: tuple = ("_","_")
mapping_creator_family_name_language_key: tuple = ("_","_")
mapping_creator_affiliation_name_language_key: tuple = ("_","_")
mapping_creator_creator_name_language_key: tuple = ("_","_")
mapping_creator_alternative_name_language_key: tuple = ("_","_")
# CONTRIBUTOR VARIABLES
mapping_contributor_item_key = ""
mapping_contributor_given_name_language_key: tuple = ("_","_")
mapping_contributor_family_name_language_key: tuple = ("_","_")
mapping_contributor_affiliation_name_language_key: tuple = ("_","_")
mapping_contributor_contributor_name_language_key: tuple = ("_","_")
mapping_contributor_alternative_name_language_key: tuple = ("_","_")
# RELATION VARIABLES
mapping_relation_item_key = ""
mapping_related_title_language_key: tuple = ("_","_")
# FUNDING REFERENCE VARIABLES
mapping_funding_reference_item_key = ""
mapping_funding_reference_funder_name_language_key: tuple = ("_","_")
mapping_funding_reference_award_title_language_key: tuple = ("_","_")
# SOURCE TITLE VARIABLES
mapping_source_title_item_key: str = ""
mapping_source_title_language_key: tuple = ("_","_")
# DEGREE NAME VARIABLES
mapping_degree_name_item_key: str = ""
mapping_degree_name_language_key: str = ""
# DEGREE GRANTOR NAME VARIABLES
mapping_degree_grantor_item_key: str = ""
mapping_degree_grantor_name_language_key: tuple = ("_","_")
# CONFERENCE VARIABLES
mapping_conference_item_key: str = ""
mapping_conference_date_language_key: tuple = ("_","_")
mapping_conference_name_language_key: tuple = ("_","_")
mapping_conference_venue_language_key: tuple = ("_","_")
mapping_conference_place_language_key: tuple = ("_","_")
mapping_conference_sponsor_language_key: tuple = ("_","_")
# HOLDING AGENT VARIABLES
mapping_holding_agent_item_key: str = ""
mapping_holding_agent_name_language_key: tuple = ("_","_")
# CATALOG VARIABLES
mapping_catalog_item_key: str = ""
mapping_catalog_title_language_key: tuple = ("_","_")
# DATE PATTERN VARIABLES
date_pattern_list = [
# YYYY
# 1997
r"^[0-9]{4}$",
# YYYY-MM
# 1997-07
r'^[0-9]{4}-[0-9]{2}$',
# YYYY-MM-DD
# 1997-07-16
r'^[0-9]{4}-[0-9]{2}-[0-9]{2}$',
# YYYY-MM-DDThh:mm+TZD
# 1997-07-16T19:20+01:00
r'^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}\+[0-9]{2}:[0-9]{2}$',
# YYYY-MM-DDThh:mm-TZD
# 1997-07-16T19:20-01:00
r'^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}-[0-9]{2}:[0-9]{2}$',
# YYYY-MM-DDThh:mm:ss+TZD
# 1997-07-16T19:20:30+01:00
r'^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\+[0-9]{2}:[0-9]{2}$',
# YYYY-MM-DDThh:mm:ss-TZD
# 1997-07-16T19:20:30-01:00
r'^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}-[0-9]{2}:[0-9]{2}$',
# YYYY-MM-DDThh:mm:ss.ss+TZD
# 1997-07-16T19:20:30.45+01:00
r'^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{2}\+[0-9]{2}:[0-9]{2}$',
# YYYY-MM-DDThh:mm:ss.ss-TZD
# 1997-07-16T19:20:30.45-01:00
r'^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{2}-[0-9]{2}:[0-9]{2}$',
]
"""
This code snippet is for dcterms date format validation. Once 'dcterms date' property is created and added to
デフォルトアイテムタイプ(フル)とデフォルトアイテムタイプ(シンプル)metadata via update_item_type.py 'dcterms date'
validation code will be added inside this function using the code snippet below.
### CODE SNIPPET FOR DCTERMS DATE FORMAT VALIDATION -- START
elif mapping_dcterms_date_item_key == data_key:
for data_dcterms_date_values in data_item_value_list:
items_to_be_checked_for_date_format.append(data_dcterms_date_values.get("subitem_1522650091861"))
# Validation for DATE FORMAT
validation_date_format_error_checker(
_("DATE FORMAT TEST"),
items_to_be_checked_for_date_format
)
# Reset validation lists below for the next item to be validated
items_to_be_checked_for_date_format = []
### DATE FORMAT TEST -- END
"""
"""
This loop snippet is for getting the mapping language key of each item that is stored in the database
"""
for key in item_type_mapping_keys:
jpcoar_value: dict = _get_jpcoar_mapping_value_mapping(key, item_type_mapping)
jpcoar_value_keys_lv1: list = list(jpcoar_value)
for key_lv1 in jpcoar_value_keys_lv1:
if "title" == key_lv1:
title_sub_items: dict = jpcoar_value[key_lv1]
title_sub_items_keys: list = list(title_sub_items.keys())
mapping_title_item_key: str = key
for title_sub_item in title_sub_items:
if "@attributes" == title_sub_item:
mapping_title_language_key: list = title_sub_items.get(title_sub_item, {}).get("xml:lang", "_").split(".")
elif "alternative" == key_lv1:
alternative_title_sub_items: dict = jpcoar_value[key_lv1]
alternative_title_sub_items_title_sub_items_keys: list = list(alternative_title_sub_items.keys())
mapping_alternative_title_item_key: str = key
for alternative_title_sub_item in alternative_title_sub_items:
if "@attributes" == alternative_title_sub_item:
mapping_alternative_title_language_key: list = alternative_title_sub_items.get(alternative_title_sub_item, {}).get("xml:lang", "_").split(".")
elif "creator" == key_lv1:
mapping_creator_item_key: str = key
creator_sub_items: dict = jpcoar_value[key_lv1]
creator_sub_items_keys: list = list(creator_sub_items.keys())
for creator_sub_item in creator_sub_items:
if creator_sub_item == "givenName":
mapping_creator_given_name_language_key = (
creator_sub_items.get(creator_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif creator_sub_item == "familyName":
mapping_creator_family_name_language_key = (
creator_sub_items.get(creator_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif creator_sub_item == "affiliation":
mapping_creator_affiliation_name_language_key = (
creator_sub_items.get(creator_sub_item, {}).get("affiliationName", {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif creator_sub_item == "creatorName":
mapping_creator_creator_name_language_key = (
creator_sub_items.get(creator_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif creator_sub_item == "creatorAlternative":
mapping_creator_alternative_name_language_key = (
creator_sub_items.get(creator_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif "contributor" == key_lv1:
mapping_contributor_item_key: str = key
contributor_sub_items: dict = jpcoar_value[key_lv1]
contributor_sub_items_keys: list = list(contributor_sub_items.keys())
for contributor_sub_item in contributor_sub_items:
if contributor_sub_item == "givenName":
mapping_contributor_given_name_language_key = (
contributor_sub_items.get(contributor_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif contributor_sub_item == "familyName":
mapping_contributor_family_name_language_key = (
contributor_sub_items.get(contributor_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif contributor_sub_item == "affiliation":
mapping_contributor_affiliation_name_language_key = (
contributor_sub_items.get(contributor_sub_item, {}).get("affiliationName", {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif contributor_sub_item == "contributorName":
mapping_contributor_contributor_name_language_key = (
contributor_sub_items.get(contributor_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif contributor_sub_item == "contributorAlternative":
mapping_contributor_alternative_name_language_key = (
contributor_sub_items.get(contributor_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif "relation" == key_lv1:
mapping_relation_item_key: str = key
relation_sub_items: dict = jpcoar_value[key_lv1]
relation_sub_items_keys:list = list(relation_sub_items.keys())
for relation_sub_item in relation_sub_items:
if relation_sub_item == "relatedTitle":
mapping_related_title_language_key = (
relation_sub_items.get(relation_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif "fundingReference" == key_lv1:
mapping_funding_reference_item_key: str = key
funding_reference_sub_items: dict = jpcoar_value[key_lv1]
funding_reference_sub_items_keys: list = list(funding_reference_sub_items.keys())
for funding_reference_sub_item in funding_reference_sub_items:
if funding_reference_sub_item == "funderName":
mapping_funding_reference_funder_name_language_key = (
funding_reference_sub_items.get(funding_reference_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif funding_reference_sub_item == "awardTitle":
mapping_funding_reference_award_title_language_key = (
funding_reference_sub_items.get(funding_reference_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif "sourceTitle" == key_lv1:
if len(jpcoar_value.keys()) == 1:
source_title_sub_items: dict = jpcoar_value[key_lv1]
source_title_sub_items_keys: list = list(source_title_sub_items.keys())
mapping_source_title_item_key: str = key
for source_title_sub_item in source_title_sub_items:
if "@attributes" == source_title_sub_item:
mapping_source_title_language_key: list = source_title_sub_items.get(source_title_sub_item, {}).get("xml:lang", "_").split(".")
elif "degreeName" == key_lv1:
degree_name_sub_items: dict = jpcoar_value[key_lv1]
degree_name_sub_items_keys: list = list(degree_name_sub_items.keys())
mapping_degree_name_item_key: str = key
for degree_name_sub_item in degree_name_sub_items:
if "@attributes" == degree_name_sub_item:
mapping_degree_name_language_key: list = degree_name_sub_items.get(degree_name_sub_item, {}).get("xml:lang", "_").split(".")
elif "degreeGrantor" == key_lv1:
mapping_degree_grantor_item_key: str = key
degree_grantor_sub_items: dict = jpcoar_value[key_lv1]
degree_grantor_sub_items_keys:list = list(degree_grantor_sub_items.keys())
for degree_grantor_sub_item in degree_grantor_sub_items:
if degree_grantor_sub_item == "degreeGrantorName":
mapping_degree_grantor_name_language_key = (
degree_grantor_sub_items.get(degree_grantor_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif "conference" == key_lv1:
mapping_conference_item_key: str = key
conference_sub_items: dict = jpcoar_value[key_lv1]
conference_sub_items_keys: list = list(conference_sub_items.keys())
for conference_sub_item in conference_sub_items:
if conference_sub_item == "conferenceDate":
mapping_conference_date_language_key = (
conference_sub_items.get(conference_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif conference_sub_item == "conferenceName":
mapping_conference_name_language_key = (
conference_sub_items.get(conference_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif conference_sub_item == "conferencePlace":
mapping_conference_place_language_key = (
conference_sub_items.get(conference_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif conference_sub_item == "conferenceVenue":
mapping_conference_venue_language_key = (
conference_sub_items.get(conference_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
elif conference_sub_item == "conferenceSponsor":
mapping_conference_sponsor_language_key = (
conference_sub_items.get(conference_sub_item, {}).get("@attributes", {"xml:lang": "_._"}).get("xml:lang", "_._").split(".")
)
"""
For iterating the argument 'data' and validating its language value
"""
def validation_duplication_error_checker(item_error_message: str, items_to_be_checked_for_duplication: list):
def _validate_language_value(
language_value_list: list,
language_value: str,
duplication_error: str,
item_error_message: str
):
if language_value_list.count(language_value) > 1:
raise ValidationError(f"{item_error_message} -- {duplication_error}")
duplication_error: str = """
Please ensure that the following applicable items have no duplicate language values:
Title, Creator Name ,Creator Family Name, Creator Given Name, Creator Affliation Name,
Contributor Name ,Contributor Family Name, Contributor Given Name, Contributor Affliation Name,
Related Title, Funding Reference Funder Name, Funding Reference Award Title, Source Title, Degree Name,
Degree Grantor Name, Conference Name, Conference Sponsor, Conference Date, Conference Venue, Conference Place,
Holding Agent Name, Catalog Title
"""
try:
for lang_value in items_to_be_checked_for_duplication:
_validate_language_value(
items_to_be_checked_for_duplication,
lang_value,
duplication_error,
item_error_message
)
except ValidationError as error:
current_app.logger.error(traceback.format_exc())
current_app.logger.error(error)
result["is_valid"] = False
result['error'] = f"{item_error_message} -- {duplication_error}"
def validation_ja_kana_error_checker(item_error_message: str, items_to_be_checked_for_ja_kana: list):
def _validate_language_value(
language_value_list: list,
language_value: str,
ja_kana_error: str,
item_error_message: str
):
if language_value == "ja-Kana" and "ja" not in language_value_list:
raise ValidationError(f"{item_error_message} -- {ja_kana_error}")
ja_kana_error: str = """
If ja-Kana is used, please ensure that the following applicable items have ja language values:
Creator Name ,Creator Family Name, Creator Given Name, Creator Alternative Name,
Contributor Name ,Contributor Family Name, Contributor Given Name, Contributor Alternative Name,
Holding Agent Name, Catalog Title.
"""
try:
for lang_value in items_to_be_checked_for_ja_kana:
_validate_language_value(
items_to_be_checked_for_ja_kana,
lang_value,
ja_kana_error,
item_error_message
)
except ValidationError as error:
current_app.logger.error(traceback.format_exc())
current_app.logger.error(error)
result["is_valid"] = False
result['error'] = f"{item_error_message} -- {ja_kana_error}"
def validation_ja_latn_error_checker(item_error_message: str, items_to_be_checked_for_ja_latn: list):
def _validate_language_value(
language_value_list: list,
language_value: str,
ja_latn_error: str,
item_error_message: str
):
if language_value == "ja-Latn" and "ja" not in language_value_list:
raise ValidationError(f"{item_error_message} -- {ja_latn_error}")
ja_latn_error: str = """
If ja-Latn is used, please ensure that the following applicable items have ja language values:
Creator Name ,Creator Family Name, Creator Given Name, Creator Alternative Name,
Contributor Name ,Contributor Family Name, Contributor Given Name, Contributor Alternative Name,
Holding Agent Name, Catalog Title.
"""
try:
for lang_value in items_to_be_checked_for_ja_latn:
_validate_language_value(
items_to_be_checked_for_ja_latn,
lang_value,
ja_latn_error,
item_error_message
)
except ValidationError as error:
current_app.logger.error(traceback.format_exc())
current_app.logger.error(error)
result["is_valid"] = False
result['error'] = f"{item_error_message} -- {ja_latn_error}"
def validation_date_format_error_checker(item_error_message: str, items_to_be_checked_for_date_format: list):
def _validate_date_format(
date_value: str,
date_format_error: str,
item_error_message: str
):
result = False