Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ var Schedule.Timestamp.originalDepartureDelay: Duration

val Schedule.Timestamp.originalDeparture get() = departure - originalDepartureDelay

val Schedule.Timestamp.maxDate get() = maxOf(originalDeparture, departure, originalArrival, arrival)
val Schedule.Timestamp.minDate get() = minOf(originalDeparture, departure, originalArrival, arrival)

/**
* It's better to be early at the stop, than late and miss the vehicle departure -> truncate (floor by) to early w/ precision
*/
Expand Down Expand Up @@ -226,4 +229,4 @@ fun Schedule.Timestamp.toStringShort() = buildString {
append("[OLD]")
}
append("}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import org.mtransit.android.commons.MTLog
import org.mtransit.android.commons.SecurityUtils
import org.mtransit.android.commons.TimeUtils
import org.mtransit.android.commons.TimeUtilsK
import org.mtransit.android.commons.data.Direction
import org.mtransit.android.commons.data.POIStatus
import org.mtransit.android.commons.data.RouteDirectionStop
import org.mtransit.android.commons.data.Schedule
import org.mtransit.android.commons.data.arrival
import org.mtransit.android.commons.data.departure
import org.mtransit.android.commons.data.makeSchedule
import org.mtransit.android.commons.data.toNoData
import org.mtransit.android.commons.provider.GTFSRealTimeProvider
import org.mtransit.android.commons.provider.gtfs.GtfsRealtimeExt.optDirectionIdValid
import org.mtransit.android.commons.provider.gtfs.GtfsRealtimeExt.optTimestampMs
import org.mtransit.android.commons.provider.GTFSRealTimeProvider.isIGNORE_DIRECTION
import org.mtransit.android.commons.provider.gtfs.GtfsRealTimeStorage
import org.mtransit.android.commons.provider.gtfs.GtfsRealtimeExt.optDirectionId
import org.mtransit.android.commons.provider.gtfs.GtfsRealtimeExt.optTrip
import org.mtransit.android.commons.provider.gtfs.GtfsRealtimeExt.optTripId
import org.mtransit.android.commons.provider.gtfs.GtfsRealtimeExt.sortTripUpdates
Expand All @@ -29,8 +31,8 @@ import org.mtransit.android.commons.provider.gtfs.getTripIds
import org.mtransit.android.commons.provider.gtfs.ignoreDirection
import org.mtransit.android.commons.provider.gtfs.makeRequest
import org.mtransit.android.commons.provider.gtfs.parseRouteId
import org.mtransit.android.commons.provider.gtfs.parseStopId
import org.mtransit.android.commons.provider.gtfs.parseTripId
import org.mtransit.android.commons.provider.gtfs.storage
import org.mtransit.commons.SourceUtils
import java.io.File
import java.io.IOException
Expand All @@ -43,7 +45,11 @@ import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import com.google.transit.realtime.GtfsRealtime.FeedMessage as GFeedMessage
import com.google.transit.realtime.GtfsRealtime.TripDescriptor.ScheduleRelationship as GTDScheduleRelationship
import com.google.transit.realtime.GtfsRealtime.TripUpdate as GTripUpdate
import com.google.transit.realtime.GtfsRealtime.TripUpdate.StopTimeUpdate as GTUStopTimeUpdate
import com.google.transit.realtime.GtfsRealtime.TripUpdate.StopTimeUpdate.ScheduleRelationship as GTUSTUScheduleRelationship
import com.google.transit.realtime.GtfsRealtime.TripUpdate.StopTimeUpdate.StopTimeProperties.DropOffPickupType as GTUSTUSTPDropOffPickupType

object GTFSRealTimeTripUpdatesProvider : MTLog.Loggable {

Expand Down Expand Up @@ -111,15 +117,15 @@ object GTFSRealTimeTripUpdatesProvider : MTLog.Loggable {
}
}

private const val DEBUG_STATIC_RT_MATCH = false
// private const val DEBUG_STATIC_RT_MATCH = true // DEBUG
val GTFSRealTimeProvider.ignoreDirection get() = isIGNORE_DIRECTION(this.requireContextCompat())

private fun GTFSRealTimeProvider.makeCachedStatusFromAgencyData(
context: Context,
filter: Schedule.ScheduleStatusFilter,
staticTripIds: List<String>,
): POIStatus? {
val lastUpdateInMs = storage.getTripUpdateLastUpdateMs(0L)
val context = context ?: return null
val readFromSourceMs = GtfsRealTimeStorage.getTripUpdateLastUpdateMs(context, 0L)
.takeIf { it > 0L } ?: return null // never loaded
val readFromSourceMs = storage.getTripUpdateReadFromSourceMs(0L)
.takeIf { it > 0L } ?: lastUpdateInMs
Expand All @@ -128,76 +134,32 @@ object GTFSRealTimeTripUpdatesProvider : MTLog.Loggable {
GTFSRealTimeProvider.getAgencyTripUpdatesUrlString(context, "T")
)
try {
val (targetRoute, targetDirection) = filter.routeDirectionStop.let { it.route to it.direction }
val targetAuthority = filter.targetAuthority
val targetRouteIdHash = targetRoute.originalIdHash.toString()
val targetDirectionOriginalId = targetDirection.originalDirectionIdOrNull
var tripIdsOutOfSync = false
if (DEBUG_STATIC_RT_MATCH) {
MTLog.d(LOG_TAG, "makeCachedStatusFromAgencyData() > target trip IDs [${staticTripIds.size}]:")
staticTripIds.chunked(10).forEach {
MTLog.d(LOG_TAG, "makeCachedStatusFromAgencyData() > - ${it.joinToString(",")}")
}
}
val rds = filter.routeDirectionStop
val targetAuthority = rds.authority
val routeId = rds.route.id
val directionId = rds.direction.id
val rdTripUpdates = gTripUpdates
.mapNotNull { gTripUpdate ->
gTripUpdate.optTrip?.let { it to gTripUpdate }
}.filter { (td, _) ->
parseRouteId(td)?.let { routeIdHash ->
if (routeIdHash != targetRouteIdHash) {
// if (DEBUG_STATIC_RT_MATCH) { // too much log
// MTLog.d(LOG_TAG, "makeCachedStatusFromAgencyData() > IGNORE: wrong route ID '$routeIdHash' (t:$targetRouteIdHash)")
// }
parseTripId(td)?.let { tripId ->
if (tripId !in tripIds) {
return@filter false
}
}
td.optDirectionIdValid?.takeIf { !ignoreDirection }?.let { directionId ->
if (directionId != targetDirectionOriginalId) {
if (DEBUG_STATIC_RT_MATCH) {
MTLog.d(LOG_TAG, "makeCachedStatusFromAgencyData() > IGNORE: wrong direction ID '$directionId' (t:$targetDirectionOriginalId)")
}
parseRouteId(td)?.let { routeIdHash ->
if (routeIdHash != rds.route.originalIdHash.toString()) {
return@filter false
}
}
parseTripId(td)?.let { tripId ->
if (tripId !in staticTripIds) {
if (DEBUG_STATIC_RT_MATCH) {
MTLog.d(LOG_TAG, "makeCachedStatusFromAgencyData() > IGNORE: wrong trip ID ($tripId)")
}
tripIdsOutOfSync = true
td.optDirectionId?.takeIf { !ignoreDirection }?.let { directionId ->
if (directionId != rds.direction.originalDirectionIdOrNull) {
return@filter false
}
}
return@filter true
}.takeIf { it.isNotEmpty() }
if (tripIdsOutOfSync) {
MTLog.w(LOG_TAG, "Trip IDs (might be) out of sync for route '${targetRoute.shortestName}' direction '${targetDirection.headsignValue}'!")
}
rdTripUpdates ?: run {
context.getRDS(targetAuthority, targetRoute.id, targetDirection.id)
?.map { rds ->
rds.makeSchedule(
lastUpdateInMs = lastUpdateInMs,
validityInMs = TRIP_UPDATE_VALIDITY_IN_MS,
readFromSourceAtInMs = readFromSourceMs,
providerPrecisionInMs = PROVIDER_PRECISION_IN_MS,
sourceLabel = sourceLabel,
noData = true
)
}?.forEach { noDataStatus ->
cacheStatus(noDataStatus)
}
MTLog.i(
LOG_TAG,
"No trip updates found for route '${targetRoute.shortestName}' direction '${targetDirection.headsignValue}'."
)
return null
}
val distinctTripId = rdTripUpdates.mapNotNull { it.first.optTripId }.distinct()
MTLog.i(
LOG_TAG,
"Using ${rdTripUpdates.size} trip updates for route '${targetRoute.shortestName}' direction '${targetDirection.headsignValue}': $distinctTripId."
)
rdTripUpdates ?: return null
if (Constants.DEBUG) {
MTLog.d(
LOG_TAG,
Expand All @@ -214,63 +176,78 @@ object GTFSRealTimeTripUpdatesProvider : MTLog.Loggable {
?.associateBy { it.targetUUID }
uuidSchedule ?: return null
processRDTripUpdates(rdTripUpdates, uuidSchedule, sortedRDS, filter.isIncludeCancelledTimestampsOrDefault)
cacheRealTimeSchedules(uuidSchedule.values, sourceLabel, readFromSourceMs, readFromSourceMs)
return getCachedStatusS(filter.targetUUID, staticTripIds)
val tripsWithRealTime = uuidSchedule.values
.asSequence()
.mapNotNull { schedule -> schedule.timestamps.takeIf { it.isNotEmpty() } }.flatten()
.filter { it.isRealTime }
.map { it.tripId }
.toSet() // distinct
uuidSchedule.forEach { (_, schedule) ->
val now = TimeUtilsK.currentInstant()
if (!schedule.timestamps.any { it.isRealTime || (it.tripId in tripsWithRealTime && it.departure < now) }) {
cacheStatus(schedule.toNoData()) // avoid re-run
return@forEach
}
var oldestDateForRealTime = now - 1.minutes
var maxFutureDateForRealTime = now + 12.hours
val (past, future) = schedule.timestamps.partition { it.departure < now }
oldestDateForRealTime = past.filter { it.isRealTime }.minOfOrNull { it.arrival } // all real-time
?: oldestDateForRealTime
maxFutureDateForRealTime = future.take(10).maxOfOrNull { it.departure } // keep firsts 10
?.takeIf { it > maxFutureDateForRealTime }
?: maxFutureDateForRealTime
maxFutureDateForRealTime = future.filter { it.isRealTime }.maxOfOrNull { it.departure } // all real-time
?.takeIf { it > maxFutureDateForRealTime }
?: maxFutureDateForRealTime
schedule.timestamps
.filterNot {
it.isRealTime || oldestDateForRealTime < it.arrival && it.departure < maxFutureDateForRealTime
}
.forEach { timestamp ->
schedule.removeTimestamp(timestamp)
}
schedule.sourceLabel = sourceLabel
schedule.lastUpdateInMs = readFromSourceMs
schedule.readFromSourceAtInMs = readFromSourceMs
schedule.providerPrecisionInMs = PROVIDER_PRECISION_IN_MS
schedule.validityInMs = TRIP_UPDATE_VALIDITY_IN_MS
cacheStatus(schedule)
}
return getCachedStatusS(filter.targetUUID, tripIds)
} catch (e: Exception) {
MTLog.w(LOG_TAG, e, "makeCachedStatusFromAgencyData() > error!")
MTLog.w(this, e, "makeCachedStatusFromAgencyData() > error!")
return null
}
}

private val OLDEST_FOR_REAL_TIME = 1.minutes
private val MAX_FUTURE_FOR_REAL_TIME = 12.hours

private fun GTFSRealTimeProvider.cacheRealTimeSchedules(
scheduleList: Collection<Schedule>,
sourceLabel: String,
lastUpdateInMs: Long,
readFromSourceMs: Long,
ignorePastRealTime: Boolean = false,
tripsWithRealTime: Set<String> = scheduleList
.asSequence()
.mapNotNull { schedule -> schedule.timestamps.takeIf { it.isNotEmpty() } }.flatten()
.filter { it.isRealTime }
.mapNotNull { it.tripId }
.toSet() // distinct
) {
scheduleList.forEach { schedule ->
schedule.sourceLabel = sourceLabel
schedule.lastUpdateInMs = lastUpdateInMs
schedule.readFromSourceAtInMs = readFromSourceMs
schedule.providerPrecisionInMs = PROVIDER_PRECISION_IN_MS
schedule.validityInMs = TRIP_UPDATE_VALIDITY_IN_MS
val now = TimeUtilsK.currentInstant()
if (!schedule.timestamps.any { it.isRealTime || (it.tripId in tripsWithRealTime && it.departure < now) }) {
cacheStatus(schedule.toNoData()) // avoid re-run
return@forEach
return Schedule.Timestamp(
departureMs,
provider.timeZone
).apply {
if (Constants.DEBUG) {
tripId = gTripUpdate.optTrip?.tripId // this trip ID does NOT match static data!
}
var oldestDateForRealTime = now - OLDEST_FOR_REAL_TIME
var maxFutureDateForRealTime = now + MAX_FUTURE_FOR_REAL_TIME
val (past, future) = schedule.timestamps.partition { it.departure < now }
if (!ignorePastRealTime) {
oldestDateForRealTime = past.filter { it.isRealTime }.minOfOrNull { it.arrival } // all real-time
?: oldestDateForRealTime
optDeparture?.optDelayMs?.let {
this.originalDepartureDelayMs = it
}
maxFutureDateForRealTime = future.take(10).maxOfOrNull { it.departure } // keep firsts 10
?.takeIf { it > maxFutureDateForRealTime }
?: maxFutureDateForRealTime
maxFutureDateForRealTime = future.filter { it.isRealTime }.maxOfOrNull { it.departure } // all real-time
?.takeIf { it > maxFutureDateForRealTime }
?: maxFutureDateForRealTime
// remove timestamps that are not real-time & outside of min/max date for real-time
schedule.timestamps
.filterNot {
it.isRealTime || oldestDateForRealTime < it.arrival && it.departure < maxFutureDateForRealTime
optArrival?.optDelayMs?.let {
this.originalArrivalDelayMs = it
}
realTime = true
arrival?.optTimeMs?.let { this.arrivalT = it }
if (scheduleRelationship == GTUSTUScheduleRelationship.SKIPPED) {
cancelled = true
} else if (gTripUpdate.optTrip?.optScheduleRelationship == GTDScheduleRelationship.CANCELED) {
cancelled = true
}
stopTimeProperties?.let { stp ->
stp.optStopHeadsign?.takeIf { it.isNotBlank() }?.let {
this.setHeadsign(Direction.HEADSIGN_TYPE_STRING, it)
}
.forEach { timestamp ->
schedule.removeTimestamp(timestamp)
if (stp.optPickupType == GTUSTUSTPDropOffPickupType.NONE) {
this.setHeadsign(Direction.HEADSIGN_TYPE_NO_PICKUP, null)
} else if (rds.isNoPickup) {
this.setHeadsign(Direction.HEADSIGN_TYPE_NO_PICKUP, null)
}
cacheStatus(schedule)
}
}
}

Expand Down Expand Up @@ -419,7 +396,7 @@ object GTFSRealTimeTripUpdatesProvider : MTLog.Loggable {

else -> {
MTLog.w(
LOG_TAG,
this@GTFSRealTimeTripUpdatesProvider,
"ERROR: HTTP URL-Connection Response Code ${response.code} (Message: ${response.message})"
)
return false
Expand Down
Loading