@@ -299,6 +299,10 @@ def decimate_into_new_db(db_in, db_out, min_period = 1, min_array_period = 3,
299299 if end is not None :
300300 tend = min ((end ,tend ,fn .now ())) #Query may finish earlier
301301
302+ if tend - tbegin < 600 :
303+ db_out .warning ('%s Tables already synchronized' % table )
304+ continue
305+
302306 if 'array' in table :
303307 period = min_array_period
304308 else :
@@ -308,16 +312,18 @@ def decimate_into_new_db(db_in, db_out, min_period = 1, min_array_period = 3,
308312 db_out .warning ('Disabling keys on %s' % table )
309313 db_out .Query ("ALTER TABLE `%s` DISABLE KEYS;" % table )
310314
311- decimate_into_new_table (db_in ,db_out ,table ,
312- tbegin ,tend ,min_period = period , max_period = max_period ,
313- method = method ,remove_nones = remove_nones ,
314- server_dec = server_dec , bunch = bunch ,
315- use_files = use_files )
315+ try :
316+ decimate_into_new_table (db_in ,db_out ,table ,
317+ tbegin ,tend ,min_period = period , max_period = max_period ,
318+ method = method ,remove_nones = remove_nones ,
319+ server_dec = server_dec , bunch = bunch ,
320+ use_files = use_files )
321+
322+ done .append (table )
323+ finally :
324+ db_out .warning ('Reenabling keys on %s' % table )
325+ db_out .Query ("ALTER TABLE `%s` ENABLE KEYS;" % table )
316326
317- db_out .warning ('Reenabling keys on %s' % table )
318- db_out .Query ("ALTER TABLE `%s` ENABLE KEYS;" % table )
319-
320- done .append (table )
321327 except :
322328 print (fn .time2str ())
323329 traceback .print_exc ()
@@ -502,16 +508,22 @@ def decimate_into_new_table(db_in, db_out, table, start, stop, ntable='',
502508 # i,j : att_id, idx
503509
504510 # Do not remove nones when using server-side decimation!
505- if ('array' in table or not server_dec ) and remove_nones :
511+ tlimit = fn .now ()+ 86400
512+ if 'array' in table or not server_dec or remove_nones :
513+ # Nones are inserted only if using server_decimation on scalars
506514 if array :
507- [data_ids [aid ][idx ].append ((t ,v ,d ,q ,x ,y )) for aid ,d ,v ,q ,t ,idx ,x ,y in data if v is not None ];
515+ [data_ids [aid ][idx ].append ((t ,v ,d ,q ,x ,y )) for aid ,d ,v ,q ,t ,idx ,x ,y in data if v is not None
516+ and 1e9 < t < tlimit ];
508517 else :
509- [data_ids [aid ][None ].append ((t ,v ,d ,q )) for aid ,d ,v ,q ,t in data if v is not None ];
518+ [data_ids [aid ][None ].append ((t ,v ,d ,q )) for aid ,d ,v ,q ,t in data if v is not None
519+ and 1e9 < t < tlimit ];
510520 else :
511521 if array :
512- [data_ids [aid ][idx ].append ((t ,v ,d ,q ,x ,y )) for aid ,d ,v ,q ,t ,idx ,x ,y in data ];
522+ [data_ids [aid ][idx ].append ((t ,v ,d ,q ,x ,y )) for aid ,d ,v ,q ,t ,idx ,x ,y in data
523+ and 1e9 < t < tlimit ];
513524 else :
514- [data_ids [aid ][None ].append ((t ,v ,d ,q )) for aid ,d ,v ,q ,t in data ];
525+ [data_ids [aid ][None ].append ((t ,v ,d ,q )) for aid ,d ,v ,q ,t in data
526+ and 1e9 < t < tlimit ];
515527
516528 db_in .info ('Decimating %d values, period = (%s,%s,[%s]), server_dec = %s' %
517529 (len (data ),min_period ,max_period ,bunch ,server_dec ))
@@ -900,8 +912,11 @@ def get_db_last_values_per_table(api, tables = None):
900912 return tables
901913
902914def get_last_value_in_table (api , table , method = 'max' ,
903- ignore_errors = False ): #, tref = -180*86400):
915+ ignore_errors = False ,
916+ trace = False ): #, tref = -180*86400):
904917 """
918+ DEPRECATED, USE API.get_table_timestamp instead
919+
905920 Returns a tuple containing:
906921 the last value stored in the given table, in epoch and date format
907922 """
@@ -922,28 +937,43 @@ def get_last_value_in_table(api, table, method='max',
922937 for i in ids :
923938 qi = q + ' where att_conf_id=%d' % i
924939 #if tref and int_time: where += ('int_time <= %d'% (tref))
925- r .extend (db .Query (qi ))
940+ rr = db .Query (qi )
941+ if trace :
942+ print ('%s[%s]:%s' % (table ,i ,rr ))
943+ r .extend (rr )
926944
927945 method = {'max' :max ,'min' :min }[method ]
928946 r = [db .mysqlsecs2time (l [0 ]) if int_time else fn .date2time (l [0 ])
929947 for l in r if l [0 ] not in (0 ,None )]
930948 r = [l for l in r if l if (ignore_errors or 1e9 < l < fn .now ())]
931949
932- last = method (r ) if len (r ) else 0
933- date = fn .time2str (last )
950+ if len (r ):
951+ last = method (r ) if len (r ) else 0
952+ date = fn .time2str (last )
953+ else :
954+ db .warning ('No values in %s' % table )
955+ last , date = None , ''
934956
935- return (last , date , size , fn .now ()- t0 )
957+ return (last , date , size , fn .now () - t0 )
936958
937959def get_first_value_in_table (api , table , ignore_errors = False ):
960+ """
961+ DEPRECATED, USE API.get_table_timestamp instead
962+ """
938963 return get_last_value_in_table (api , table , method = 'min' ,ignore_errors = ignore_errors )
939964
940965def delete_data_older_than (api , table , timestamp , doit = False , force = False ):
966+ delete_data_out_of_time (api , table , timestamp , fn .END_OF_TIME , doit , force )
967+
968+ def delete_data_out_of_time (api , table , tstart = 1e9 , tstop = None , doit = False , force = False ):
941969 if not doit :
942970 print ('doit=False, nothing to be executed' )
943971 if 'archiving04' in api .host and not force :
944972 raise Exception ('deleting on archiving04 is not allowed'
945973 ' (unless forced)' )
946-
974+
975+ timestamp = fn .str2time (tstart ) if fn .isString (tstart ) else tstart
976+ tstop = fn .str2time (tstop ) if fn .isString (tstop ) else fn .notNone (tstop ,fn .now ()+ 86400 )
947977 query = lambda q : (api .Query (q ) if doit else fn .printf (q ))
948978
949979 try :
@@ -952,21 +982,21 @@ def delete_data_older_than(api, table, timestamp, doit=False, force=False):
952982 partitions = sorted (api .getTablePartitions (table ))
953983 for p in partitions [:]:
954984 t = api .get_partition_time_by_name (p )
955- if t > timestamp :
985+ if t < timestamp :
956986 query ('alter table %s drop partition %s'
957987 % (table , p ))
958988 partitions .remove (p )
959989
960990 cols = api .getTableCols (table )
961- if 'int_time' in cols :
962- query ( 'delete from %s where int_time > %s'
963- % ( table , timestamp ))
964- elif 'data_time' in cols :
965- query ( "delete from %s where data_time > '%s'"
966- % ( table , fn .time2str ( timestamp )))
967- else :
968- query ("delete from %s where time > '%s'"
969- % ( table , fn . time2str ( timestamp )))
991+ col = ( c for c in ( 'int_time' , 'data_time' , 'time' ) if c in cols ). next ()
992+ if col != ' int_time' :
993+ timestamp , tstop = "'%s'" % timestamp , "'%s'" % tstop
994+ q = 'delete from %s where %s < %s' % ( table , col , timestamp )
995+
996+ if tstop != fn .END_OF_TIME :
997+ q += " and %s < %s " ( col , tstop )
998+ query (q )
999+
9701000 if partitions :
9711001 p = partitions [- 1 ]
9721002 query ('alter table %s repair partition %s' % (table ,p ))
0 commit comments