Skip to content

Commit 4dfb3a9

Browse files
authored
refactor: Streaming Trip Storage & Memory Optimization (#165)
1 parent e1116ca commit 4dfb3a9

10 files changed

Lines changed: 386 additions & 194 deletions

File tree

app/src/main/java/org/obd/graphs/ui/graph/GraphFragment.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ import org.obd.metrics.pid.PidDefinitionRegistry
7373
import java.text.SimpleDateFormat
7474
import java.util.Date
7575
import java.util.Locale
76+
import java.util.concurrent.ConcurrentLinkedDeque
7677

7778
private const val LOG_TAG = "Graph"
7879

@@ -260,7 +261,7 @@ class GraphFragment : Fragment() {
260261
val sensorData =
261262
SensorData(
262263
id = it.command.pid.id,
263-
metrics = mutableListOf(),
264+
metrics = ConcurrentLinkedDeque(),
264265
min = hist.min,
265266
max = hist.max,
266267
mean = hist.mean

common/src/main/java/org/obd/graphs/Cache.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ class Cache {
2424
fun updateEntry(name: String, value: Any) {
2525
cache[name] = value
2626
}
27-
28-
fun initCache(m: MutableMap<String, Any>) {
29-
cache = m
30-
}
3127
}
3228

3329
val cacheManager = Cache()

datalogger/src/main/java/org/obd/graphs/bl/trip/DefaultTripManager.kt

Lines changed: 77 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package org.obd.graphs.bl.trip
1818

1919
import android.content.Context
2020
import android.util.Log
21-
import org.obd.graphs.bl.datalogger.DataLoggerRepository
2221
import org.obd.graphs.bl.datalogger.MetricsProcessor
2322
import org.obd.graphs.bl.datalogger.scaleToRange
2423
import org.obd.graphs.getContext
@@ -27,32 +26,31 @@ import org.obd.graphs.preferences.Prefs
2726
import org.obd.graphs.preferences.isEnabled
2827
import org.obd.graphs.profile.profile
2928
import org.obd.metrics.api.model.ObdMetric
30-
import java.io.File
31-
import java.io.FileOutputStream
3229
import java.text.SimpleDateFormat
3330
import java.util.Date
3431
import java.util.Locale
3532

36-
val tripManager: TripManager = DefaultTripManager()
33+
val tripManager: TripManager by lazy { DefaultTripManager() }
3734

3835
private const val LOGGER_TAG = "TripManager"
3936
private const val MIN_TRIP_LENGTH = 5
40-
private const val TRIP_DIRECTORY = "trips"
41-
4237
private const val TRIP_FILE_PREFIX = "trip"
4338

39+
private const val MAX_CACHED_METRICS_PER_SENSOR = 18000
40+
4441
internal class DefaultTripManager :
4542
TripManager,
4643
MetricsProcessor {
47-
private val dateFormat: SimpleDateFormat =
48-
SimpleDateFormat("MM.dd HH:mm:ss", Locale.getDefault())
4944

50-
private val tripModelSerializer = TripModelSerializer()
45+
private val dateFormat: SimpleDateFormat = SimpleDateFormat("MM.dd HH:mm:ss", Locale.getDefault())
5146
private val tripCache = TripCache()
52-
5347
private val tripDescParser = TripDescParser()
5448

55-
override fun getTripsDirectory(context: Context) = "${context.getExternalFilesDir(TRIP_DIRECTORY)?.absolutePath}"
49+
private val repository: TripRepository by lazy { FileTripRepository(getContext()!!) }
50+
51+
private var activeTripId: String? = null
52+
53+
override fun getTripsDirectory(context: Context) = "${context.getExternalFilesDir("trips")?.absolutePath}"
5654

5755
override fun postValue(obdMetric: ObdMetric) {
5856
try {
@@ -61,207 +59,123 @@ internal class DefaultTripManager :
6159
val key = obdMetric.command.pid.id
6260
val newRecord = if (obdMetric.isNumber()) Entry(ts, obdMetric.scaleToRange(), key) else Entry(ts, obdMetric.value, key)
6361

64-
if (trip.entries.containsKey(key)) {
65-
val tripEntry = trip.entries[key]!!
66-
tripEntry.metrics.add(
67-
Metric(
68-
entry = newRecord,
69-
ts = obdMetric.timestamp,
70-
rawAnswer = obdMetric.raw
71-
)
72-
)
73-
} else {
74-
trip.entries[key] =
75-
SensorData(
76-
id = key,
77-
metrics =
78-
mutableListOf(
79-
Metric(
80-
entry = newRecord,
81-
ts = obdMetric.timestamp,
82-
rawAnswer = obdMetric.raw
83-
)
84-
)
85-
)
62+
val metric = Metric(
63+
entry = newRecord,
64+
ts = obdMetric.timestamp,
65+
rawAnswer = obdMetric.raw
66+
)
67+
68+
repository.saveMetric(metric)
69+
70+
val tripEntry = trip.entries.getOrPut(key) {
71+
SensorData(id = key)
72+
}
73+
74+
tripEntry.metrics.add(metric)
75+
76+
while (tripEntry.metrics.size > MAX_CACHED_METRICS_PER_SENSOR) {
77+
tripEntry.metrics.removeFirst()
8678
}
8779
}
8880
} catch (e: Throwable) {
89-
Log.e(LOGGER_TAG, "Failed to add cache entry for ${obdMetric.command.pid.pid}", e)
81+
Log.e(LOGGER_TAG, "Failed to process metric for ${obdMetric.command.pid.pid}", e)
9082
}
9183
}
9284

9385
override fun getCurrentTrip(): Trip {
94-
if (null == tripCache.getTrip()) {
95-
startNewTrip(System.currentTimeMillis())
86+
val trip = tripCache.getTrip() ?: run {
87+
val newTs = System.currentTimeMillis()
88+
startNewTrip(newTs)
89+
tripCache.getTrip() ?: Trip(startTs = newTs)
9690
}
9791

98-
val trip = tripCache.getTrip()!!
9992
Log.i(LOGGER_TAG, "Get current trip ts: '${formatTimestamp(trip.startTs)}'")
10093
return trip
10194
}
10295

10396
override fun startNewTrip(newTs: Long) {
10497
Log.i(LOGGER_TAG, "Starting new trip, timestamp: '${formatTimestamp(newTs)}'")
10598
updateCache(newTs)
99+
100+
activeTripId = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$newTs.jsonl"
101+
repository.initStorage(activeTripId!!)
106102
}
107103

108-
override fun saveCurrentTrip(f: () -> Unit) {
104+
override fun saveCurrentTrip() {
109105
tripCache.getTrip { trip ->
110106
val recordShortTrip = Prefs.isEnabled("pref.trips.recordings.save.short.trip")
111107
val tripLength = getTripLength(trip)
112-
Log.i(LOGGER_TAG, "Recorded trip, length: ${tripLength}s")
108+
val currentTripId = activeTripId ?: return@getTrip
109+
110+
Log.i(LOGGER_TAG, "Stopping trip, length: ${tripLength}s")
111+
112+
repository.releaseStorage(currentTripId)
113113

114114
if (recordShortTrip || tripLength > MIN_TRIP_LENGTH) {
115-
val tripStartTs = trip.startTs
116-
117-
val filter = "$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$tripStartTs"
118-
val alreadySaved = findAllTripsBy(filter)
119-
120-
if (alreadySaved.isNotEmpty()) {
121-
Log.e(
122-
LOGGER_TAG,
123-
"It seems that Trip which start same date='$filter' is already saved."
124-
)
125-
} else {
126-
try {
127-
f()
128-
val histogram = DataLoggerRepository.getDiagnostics().histogram()
129-
val pidDefinitionRegistry = DataLoggerRepository.getPidDefinitionRegistry()
130-
131-
trip.entries.forEach { (t, u) ->
132-
val p = pidDefinitionRegistry.findBy(t)
133-
p?.let {
134-
val histogramSupplier = histogram.findBy(it)
135-
u.max = histogramSupplier.max
136-
u.min = histogramSupplier.min
137-
u.mean = histogramSupplier.mean
138-
}
139-
}
140-
141-
val content: String =
142-
tripModelSerializer.serializer.writeValueAsString(trip)
143-
144-
val fileName =
145-
"$TRIP_FILE_PREFIX-${profile.getCurrentProfile()}-$tripStartTs-$tripLength.json"
146-
Log.i(
147-
LOGGER_TAG,
148-
"Saving the trip to the file: '$fileName'. Length: ${tripLength}s"
149-
)
150-
writeFile(getContext()!!, fileName, content)
151-
Log.i(
152-
LOGGER_TAG,
153-
"Trip was written to the file: '$fileName'. Length: ${tripLength}s"
154-
)
155-
} catch (e: java.lang.Exception) {
156-
Log.e(LOGGER_TAG, "Failed to save trip", e)
157-
}
158-
}
115+
repository.updateTripMetadata(currentTripId, trip.startTs, tripLength, profile.getCurrentProfile())
159116
} else {
160-
Log.w(LOGGER_TAG, "Trip was not saved. Trip time is less than ${MIN_TRIP_LENGTH}s")
117+
Log.w(LOGGER_TAG, "Trip time is less than ${MIN_TRIP_LENGTH}s. Discarding.")
118+
repository.deleteTrip(currentTripId)
161119
}
120+
121+
activeTripId = null
162122
}
163123
}
164124

165125
override fun findAllTripsBy(filter: String): MutableCollection<TripFileDesc> {
166-
Log.i(LOGGER_TAG, "Finds all trips by filter: '$filter' and profile=${profile.getCurrentProfile()}")
167-
168-
val files = File(getTripsDirectory(getContext()!!)).list()
169-
if (files == null) {
170-
Log.i(LOGGER_TAG, "No files were found in the trips directory.")
171-
return mutableListOf()
172-
} else {
173-
val result =
174-
files
175-
.filter { if (filter.isNotEmpty()) it.startsWith(filter) else true }
176-
.filter { it.startsWith("${TRIP_FILE_PREFIX}_") || it.startsWith("$TRIP_FILE_PREFIX-") }
177-
.filter {
178-
it.contains("${profile.getCurrentProfile()}-")
179-
}.filter {
180-
try {
181-
tripDescParser.decodeTripName(it).size > 3
182-
} catch (e: Throwable) {
183-
false
184-
}
185-
}.mapNotNull { fileName ->
186-
Log.d(LOGGER_TAG, "Found trip which fits the conditions: $fileName")
187-
tripDescParser.getTripDesc(fileName)
188-
}.sortedByDescending { it.startTime.toLongOrNull() }
189-
.toMutableList()
190-
Log.i(LOGGER_TAG, "Found trips by filter: '$filter' for profile=${profile.getCurrentProfile()}. Result size: ${result.size}")
191-
return result
192-
}
126+
return repository.findAllTripsBy(filter, profile.getCurrentProfile())
193127
}
194128

195129
override fun deleteTrip(trip: TripFileDesc) {
196-
Log.i(LOGGER_TAG, "Deleting '${trip.fileName}' from the storage.")
197-
val file = File(getTripsDirectory(getContext()!!), trip.fileName)
198-
file.delete()
199-
Log.i(LOGGER_TAG, "Trip '${trip.fileName}' has been deleted from the storage.")
130+
repository.deleteTrip(trip.fileName)
200131
}
201132

202-
override fun loadTrip(tripName: String) {
203-
Log.i(LOGGER_TAG, "Loading '$tripName' from disk.")
133+
override fun loadTrip(tripId: String) {
134+
Log.i(LOGGER_TAG, "Loading trip ID: '$tripId'")
204135

205-
if (tripName.isEmpty()) {
136+
if (tripId.isEmpty()) {
206137
updateCache(System.currentTimeMillis())
207-
} else {
208-
val file = File(getTripsDirectory(getContext()!!), tripName)
209-
try {
210-
val trip: Trip = tripModelSerializer.deserializer.readValue(file, Trip::class.java)
211-
Log.i(LOGGER_TAG, "Trip '${file.absolutePath}' was loaded from the storage.")
212-
Log.i(LOGGER_TAG, "Trip selected PIDs ${trip.entries.keys}")
213-
Log.i(LOGGER_TAG, "Number of entries ${trip.entries.values.size} collected within the trip")
214-
215-
tripCache.updateTrip(trip)
216-
tripVirtualScreenManager.updateReservedVirtualScreen(
217-
trip.entries.keys
218-
.map { it.toString() }
219-
.toList()
220-
)
221-
} catch (e: Throwable) {
222-
Log.e(LOGGER_TAG, "Did not find trip '$tripName'.", e)
223-
updateCache(System.currentTimeMillis())
224-
}
138+
return
225139
}
226-
}
227140

228-
private fun writeFile(
229-
context: Context,
230-
fileName: String,
231-
content: String
232-
) {
233-
var fd: FileOutputStream? = null
234141
try {
235-
val file = getTripFile(context, fileName)
236-
fd =
237-
FileOutputStream(file).apply {
238-
write(content.toByteArray())
142+
val parts = tripDescParser.decodeTripName(tripId)
143+
val startTs = parts.getOrNull(2)?.toLongOrNull() ?: System.currentTimeMillis()
144+
val trip = Trip(startTs = startTs)
145+
146+
repository.loadTrip(tripId) { metric ->
147+
val key = metric.entry.data
148+
if (!trip.entries.containsKey(key)) {
149+
trip.entries[key] = SensorData(id = key)
150+
}
151+
trip.entries[key]!!.metrics.add(metric)
152+
}
153+
154+
trip.entries.values.forEach { sensorData ->
155+
val values = sensorData.metrics.mapNotNull { it.entry.y.toString().toFloatOrNull() }
156+
if (values.isNotEmpty()) {
157+
sensorData.min = values.minOrNull() ?: 0f
158+
sensorData.max = values.maxOrNull() ?: 0f
159+
sensorData.mean = values.average()
239160
}
240-
} finally {
241-
fd?.run {
242-
flush()
243-
close()
244161
}
162+
163+
Log.i(LOGGER_TAG, "Trip loaded successfully. PIDs: ${trip.entries.keys}")
164+
tripCache.updateTrip(trip)
165+
tripVirtualScreenManager.updateReservedVirtualScreen(trip.entries.keys.map { it.toString() })
166+
} catch (e: Throwable) {
167+
Log.e(LOGGER_TAG, "Failed to load trip '$tripId'.", e)
168+
updateCache(System.currentTimeMillis())
245169
}
246170
}
247171

248-
private fun getTripFile(
249-
context: Context,
250-
fileName: String
251-
): File = File(getTripsDirectory(context), fileName)
252-
253172
private fun updateCache(newTs: Long) {
254-
val trip = Trip(startTs = newTs, entries = mutableMapOf())
173+
val trip = Trip(startTs = newTs)
255174
tripCache.updateTrip(trip)
256-
Log.i(LOGGER_TAG, "Init new Trip with timestamp: '${formatTimestamp(newTs)}'")
257175
}
258176

259177
private fun getTripLength(trip: Trip): Long =
260-
if (trip.startTs == 0L) {
261-
0
262-
} else {
263-
(Date().time - trip.startTs) / 1000
264-
}
178+
if (trip.startTs == 0L) 0 else (Date().time - trip.startTs) / 1000
265179

266180
private fun formatTimestamp(ts: Long) = dateFormat.format(Date(ts))
267181
}

datalogger/src/main/java/org/obd/graphs/bl/trip/TripCache.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ private const val CACHE_TRIP_PROPERTY_NAME = "cache.trip.current"
2424
internal class TripCache {
2525

2626
init {
27-
val trip = Trip(startTs = System.currentTimeMillis(), entries = mutableMapOf())
27+
val trip = Trip(startTs = System.currentTimeMillis())
2828
updateTrip(trip)
2929
Log.i("tripCache", "Init Trip with stamp: ${trip.startTs}")
3030
}

datalogger/src/main/java/org/obd/graphs/bl/trip/TripDescParser.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,22 @@ class TripDescParser {
2323
val p = decodeTripName(fileName)
2424
val profileId = p[1]
2525
val profiles = profile.getAvailableProfiles()
26-
val profileLabel = profiles[profileId]!!
26+
val profileLabel = profiles[profileId] ?: "Unknown"
27+
28+
val startTime = if (p.size > 2) p[2] else ""
29+
val tripTimeSec = if (p.size > 3) p[3] else "0"
2730

2831
return TripFileDesc(
2932
fileName = fileName,
3033
profileId = profileId,
3134
profileLabel = profileLabel,
32-
startTime = p[2],
33-
tripTimeSec = p[3]
35+
startTime = startTime,
36+
tripTimeSec = tripTimeSec
3437
)
3538
}
3639

37-
fun decodeTripName(fileName: String) = fileName.substring(0, fileName.length - 5).split("-")
40+
fun decodeTripName(fileName: String): List<String> {
41+
val nameWithoutExtension = fileName.substringBeforeLast(".")
42+
return nameWithoutExtension.split("-")
43+
}
3844
}

0 commit comments

Comments
 (0)