GeoTracking/class/gpxInterpreter.py
2025-01-05 20:34:15 +01:00

144 lines
5.1 KiB
Python

import datetime
import gpxpy
import gpxpy.gpx
from sqlalchemy.orm import Session
from geojson import Feature, LineString
from geopy.distance import geodesic
from errors.NotFoundException import NotFoundError
from geoObjects import Track, Waypoint
class GPXHandler:
__dbSession: Session
def __init__(self, session:Session):
self.__dbSession = session
pass
# handles converting a gpx file into usable data
def parse(self, file, driver, vehicle):
self.__gpx = gpxpy.parse(file)
if not driver:
raise ValueError("no driver found")
if not vehicle:
raise ValueError("no vehicle found")
if not file:
raise ValueError("no file found")
for track in self.__gpx.tracks:
# sets track name
# if no name is found at a track default to using date
trackName = track.name or f"Track-{datetime.now().isoformat()}" # todo using time.now might end up being misleading and to be reworked
# initializes track values
self.__startTime,
self.__endTime,
self.__trackDistance,
self.__waypoints = None, None, 0, []
# grab all waypoints from a track
for segment in track.segments:
for point in segment.points:
self.__waypoints.append(point)
if start_time is None or point.time < start_time:
start_time = point.time
if end_time is None or point.time > end_time:
end_time = point.time
# calculate distance between 2 waypoints
for i in range(1, len(self.__waypoints)):
total_distance += self.__waypoints[i - 1].distance_3d(self.__waypoints[i])
# push values to the database
track = Track(
trackName=trackName,
vehicle=vehicle.id,
driver=driver.id,
date=start_time.date() if start_time else None,
distance=total_distance,
speed=0,
start=start_time,
end=end_time
)
self.__dbSession.add(track)
self.__dbSession.commit()
for point in self.__waypoints:
waypoint = Waypoint(
lat=point.latitude,
lon=point.longitude,
ele=point.elevation,
speed=None,
time=point.time,
track=track.id
)
self.__dbSession.add(waypoint)
self.__dbSession.commit()
# handles a route from db and converting it into geoJSON
def getTrack(self, trackID):
track = self.__dbSession.query(Track).filter_by(id=trackID).first()
if not track:
raise NotFoundError(f"track with id {trackID} not found")
# fetches waypoints for a given track and converts them into geoJSON
waypoints = self.__dbSession.query(Waypoint).filter_by(track=track.id).all()
coordinates = [(wp.lon, wp.lat) for wp in waypoints]
feature = Feature(geometry=LineString(coordinates))
return feature
# grabs only the tracks from the database and returns them as json object
def getTracks(self):
tracks = self.__dbSession.query(Track).all()
track_list = [
{
"id": track.id,
"name": track.trackName,
"driver": {
"id": track.driver.id,
"name": track.driver.name
} if track.driver else None,
"vehicle": {
"id": track.vehicle.id,
"name": track.vehicle.name
} if track.vehicle else None,
"distance": track.distance,
"start_time": track.start.isoformat() if track.start else None,
"end_time": track.end.isoformat() if track.end else None,
}
for track in tracks # iterates all tracks and appends them to the list
]
return track_list
def getTracksInTime(self, start, end):
tracks = self.__dbSession.query(Track).filter(Track.start.between(start, end)).all()
track_list = [
{
"id": track.id,
"name": track.trackName,
"driver": {
"id": track.driver.id,
"name": track.driver.name
} if track.driver else None,
"vehicle": {
"id": track.vehicle.id,
"name": track.vehicle.name
} if track.vehicle else None,
"distance": track.distance,
"start_time": track.start.isoformat() if track.start else None,
"end_time": track.end.isoformat() if track.end else None,
}
for track in tracks # iterates all tracks and appends them to the list
]
return track_list