Skip to content

Commit f2d5af9

Browse files
committed
feat: add debug info about number of stored metrics
1 parent 099efe2 commit f2d5af9

1 file changed

Lines changed: 38 additions & 28 deletions

File tree

datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ import java.text.SimpleDateFormat
3535
import java.util.Date
3636
import java.util.Locale
3737
import java.util.concurrent.Executors
38+
import java.util.concurrent.atomic.AtomicLong
3839

3940
val tripManager: TripManager = DefaultTripManager()
4041

41-
private const val LOGGER_TAG = "TripManager"
42+
private const val LOG_TAG = "TripManager"
4243
private const val MIN_TRIP_LENGTH = 5
4344
private const val TRIP_DIRECTORY = "trips"
4445
private const val TRIP_FILE_PREFIX = "trip"
@@ -60,6 +61,9 @@ internal class DefaultTripManager :
6061
private var activeFileOutputStream: FileOutputStream? = null
6162
private var activeTripFileName: String? = null
6263

64+
// Thread-safe counter for total items written to disk
65+
private val totalMetricsSaved = AtomicLong(0)
66+
6367
// Single thread dispatcher for sequential, non-blocking file operations
6468
private val fileIoDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
6569
private val fileIoScope = CoroutineScope(fileIoDispatcher)
@@ -79,22 +83,22 @@ internal class DefaultTripManager :
7983
rawAnswer = obdMetric.raw
8084
)
8185

82-
// STREAM TO FILE (Sequential, Non-Blocking via single-thread dispatcher)
8386
fileIoScope.launch {
8487
try {
8588
val jsonLine = tripModelSerializer.serializer.writeValueAsString(metric) + "\n"
86-
activeFileOutputStream?.write(jsonLine.toByteArray())
89+
activeFileOutputStream?.let {
90+
it.write(jsonLine.toByteArray())
91+
totalMetricsSaved.incrementAndGet()
92+
}
8793
} catch (e: Exception) {
88-
Log.e(LOGGER_TAG, "Failed to stream line to JSONL file", e)
94+
Log.e(LOG_TAG, "Failed to stream line to JSONL file", e)
8995
}
9096
}
9197

92-
// UPDATE RAM CACHE (With 30-min Cap)
9398
if (trip.entries.containsKey(key)) {
9499
val tripEntry = trip.entries[key]!!
95100
tripEntry.metrics.add(metric)
96101

97-
// Memory Protection: Cap the list size
98102
if (tripEntry.metrics.size > MAX_CACHED_METRICS_PER_SENSOR) {
99103
tripEntry.metrics.removeAt(0)
100104
}
@@ -107,7 +111,7 @@ internal class DefaultTripManager :
107111
}
108112
}
109113
} catch (e: Throwable) {
110-
Log.e(LOGGER_TAG, "Failed to add cache entry for ${obdMetric.command.pid.pid}", e)
114+
Log.e(LOG_TAG, "Failed to add cache entry for ${obdMetric.command.pid.pid}", e)
111115
}
112116
}
113117

@@ -117,13 +121,14 @@ internal class DefaultTripManager :
117121
}
118122

119123
val trip = tripCache.getTrip()!!
120-
Log.i(LOGGER_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'")
124+
Log.i(LOG_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'")
121125
return trip
122126
}
123127

124128
override fun startNewTrip(newTs: Long) {
125-
Log.i(LOGGER_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'")
129+
Log.i(LOG_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'")
126130
updateCache(newTs)
131+
totalMetricsSaved.set(0)
127132

128133
// Generate the file name
129134
val fileName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$newTs.jsonl"
@@ -134,9 +139,9 @@ internal class DefaultTripManager :
134139
try {
135140
val file = getTripFile(getContext()!!, fileName)
136141
activeFileOutputStream = FileOutputStream(file, true)
137-
Log.i(LOGGER_TAG, "Opened stream for file: $fileName")
142+
Log.i(LOG_TAG, "Opened stream for file: $fileName")
138143
} catch (e: Exception) {
139-
Log.e(LOGGER_TAG, "Failed to open file stream for streaming", e)
144+
Log.e(LOG_TAG, "Failed to open file stream for streaming", e)
140145
}
141146
}
142147
}
@@ -145,11 +150,13 @@ internal class DefaultTripManager :
145150
tripCache.getTrip { trip ->
146151
val recordShortTrip = Prefs.isEnabled("pref.trips.recordings.save.short.trip")
147152
val tripLength = getTripLength(trip)
148-
Log.i(LOGGER_TAG, "Stopping trip, length: ${tripLength}s")
153+
Log.i(LOG_TAG, "Stopping trip, length: ${tripLength}s")
149154

155+
// Capture the current file name in case another trip starts immediately
150156
val fileNameToProcess = activeTripFileName
151157

152158
if (recordShortTrip || tripLength > MIN_TRIP_LENGTH) {
159+
// Close and rename on the sequential thread to guarantee all pending writes finish first
153160
fileIoScope.launch {
154161
try {
155162
activeFileOutputStream?.flush()
@@ -162,17 +169,20 @@ internal class DefaultTripManager :
162169
val finalName = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-${trip.startTs}-$tripLength.jsonl"
163170
val finalFile = getTripFile(getContext()!!, finalName)
164171
currentFile.renameTo(finalFile)
165-
Log.i(LOGGER_TAG, "Trip stream closed and renamed to: '$finalName'")
172+
173+
val totalItems = totalMetricsSaved.get()
174+
val fileSizeMb = finalFile.length() / (1024.0 * 1024.0)
175+
Log.i(LOG_TAG, "Trip stream closed. File: '$finalName' | Saved: $totalItems items | Size: ${String.format("%.2f", fileSizeMb)} MB")
166176
}
167177
}
168178
} catch (e: java.lang.Exception) {
169-
Log.e(LOGGER_TAG, "Failed to finalize streaming trip file", e)
179+
Log.e(LOG_TAG, "Failed to finalize streaming trip file", e)
170180
} finally {
171181
activeTripFileName = null
172182
}
173183
}
174184
} else {
175-
Log.w(LOGGER_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Deleting short file.")
185+
Log.w(LOG_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Deleting short file.")
176186
fileIoScope.launch {
177187
try {
178188
activeFileOutputStream?.close()
@@ -182,7 +192,7 @@ internal class DefaultTripManager :
182192
getTripFile(getContext()!!, it).delete()
183193
}
184194
} catch (e: Exception) {
185-
Log.e(LOGGER_TAG, "Failed to delete short trip file", e)
195+
Log.e(LOG_TAG, "Failed to delete short trip file", e)
186196
} finally {
187197
activeTripFileName = null
188198
}
@@ -192,11 +202,11 @@ internal class DefaultTripManager :
192202
}
193203

194204
override fun findAllTripsBy(filter: String): MutableCollection<TripFileDesc> {
195-
Log.i(LOGGER_TAG, "Finds all trips by filter: '$filter' and profile=${profile.getCurrentProfile()}")
205+
Log.i(LOG_TAG, "Finds all trips by filter: '$filter' and profile=${profile.getCurrentProfile()}")
196206

197207
val files = File(getTripsDirectory(getContext()!!)).list()
198208
if (files == null) {
199-
Log.i(LOGGER_TAG, "No files were found in the trips directory.")
209+
Log.i(LOG_TAG, "No files were found in the trips directory.")
200210
return mutableListOf()
201211
} else {
202212
val result =
@@ -211,24 +221,24 @@ internal class DefaultTripManager :
211221
false
212222
}
213223
}.mapNotNull { fileName ->
214-
Log.d(LOGGER_TAG, "Found trip which fits the conditions: $fileName")
224+
Log.d(LOG_TAG, "Found trip which fits the conditions: $fileName")
215225
tripDescParser.getTripDesc(fileName)
216226
}.sortedByDescending { it.startTime.toLongOrNull() }
217227
.toMutableList()
218-
Log.i(LOGGER_TAG, "Found trips by filter: '$filter' for profile=${profile.getCurrentProfile()}. Result size: ${result.size}")
228+
Log.i(LOG_TAG, "Found trips by filter: '$filter' for profile=${profile.getCurrentProfile()}. Result size: ${result.size}")
219229
return result
220230
}
221231
}
222232

223233
override fun deleteTrip(trip: TripFileDesc) {
224-
Log.i(LOGGER_TAG, "Deleting '${trip.fileName}' from the storage.")
234+
Log.i(LOG_TAG, "Deleting '${trip.fileName}' from the storage.")
225235
val file = File(getTripsDirectory(getContext()!!), trip.fileName)
226236
file.delete()
227-
Log.i(LOGGER_TAG, "Trip '${trip.fileName}' has been deleted from the storage.")
237+
Log.i(LOG_TAG, "Trip '${trip.fileName}' has been deleted from the storage.")
228238
}
229239

230240
override fun loadTrip(tripName: String) {
231-
Log.i(LOGGER_TAG, "Loading '$tripName' from disk.")
241+
Log.i(LOG_TAG, "Loading '$tripName' from disk.")
232242

233243
if (tripName.isEmpty()) {
234244
updateCache(System.currentTimeMillis())
@@ -264,16 +274,16 @@ internal class DefaultTripManager :
264274
}
265275
}
266276

267-
Log.i(LOGGER_TAG, "Trip '${file.absolutePath}' was loaded from the storage.")
268-
Log.i(LOGGER_TAG, "Trip selected PIDs ${trip.entries.keys}")
269-
Log.i(LOGGER_TAG, "Number of entries ${trip.entries.values.size} collected within the trip")
277+
Log.i(LOG_TAG, "Trip '${file.absolutePath}' was loaded from the storage.")
278+
Log.i(LOG_TAG, "Trip selected PIDs ${trip.entries.keys}")
279+
Log.i(LOG_TAG, "Number of entries ${trip.entries.values.size} collected within the trip")
270280

271281
tripCache.updateTrip(trip)
272282
tripVirtualScreenManager.updateReservedVirtualScreen(
273283
trip.entries.keys.map { it.toString() }.toList()
274284
)
275285
} catch (e: Throwable) {
276-
Log.e(LOGGER_TAG, "Did not find or failed to parse trip '$tripName'.", e)
286+
Log.e(LOG_TAG, "Did not find or failed to parse trip '$tripName'.", e)
277287
updateCache(System.currentTimeMillis())
278288
}
279289
}
@@ -287,7 +297,7 @@ internal class DefaultTripManager :
287297
private fun updateCache(newTs: Long) {
288298
val trip = Trip(startTs = newTs)
289299
tripCache.updateTrip(trip)
290-
Log.i(LOGGER_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'")
300+
Log.i(LOG_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'")
291301
}
292302

293303
private fun getTripLength(trip: Trip): Long =

0 commit comments

Comments
 (0)