import datetime import gpxpy import gpxpy.gpx from sqlalchemy.orm import Session from geojson import Feature, LineString from geopy.distance import geodesic from errors.NotFoundException import NotFoundError from geoObjects import Track, Waypoint class GPXHandler: __dbSession: Session def __init__(self, session:Session): self.__dbSession = session pass # handles converting a gpx file into usable data def parse(self, file, driver, vehicle): self.__gpx = gpxpy.parse(file) if not driver: raise ValueError("no driver found") if not vehicle: raise ValueError("no vehicle found") if not file: raise ValueError("no file found") for track in self.__gpx.tracks: # sets track name # if no name is found at a track default to using date trackName = track.name or f"Track-{datetime.now().isoformat()}" # todo using time.now might end up being misleading and to be reworked # initializes track values self.__startTime, self.__endTime, self.__trackDistance, self.__waypoints = None, None, 0, [] # grab all waypoints from a track for segment in track.segments: for point in segment.points: self.__waypoints.append(point) if start_time is None or point.time < start_time: start_time = point.time if end_time is None or point.time > end_time: end_time = point.time # calculate distance between 2 waypoints for i in range(1, len(self.__waypoints)): total_distance += self.__waypoints[i - 1].distance_3d(self.__waypoints[i]) # push values to the database track = Track( trackName=trackName, vehicle=vehicle.id, driver=driver.id, date=start_time.date() if start_time else None, distance=total_distance, speed=0, start=start_time, end=end_time ) self.__dbSession.add(track) self.__dbSession.commit() for point in self.__waypoints: waypoint = Waypoint( lat=point.latitude, lon=point.longitude, ele=point.elevation, speed=None, time=point.time, track=track.id ) self.__dbSession.add(waypoint) self.__dbSession.commit() # handles a route from db and converting it into geoJSON def getTrack(self, trackID): track = self.__dbSession.query(Track).filter_by(id=trackID).first() if not track: raise NotFoundError(f"track with id {trackID} not found") # fetches waypoints for a given track and converts them into geoJSON waypoints = self.__dbSession.query(Waypoint).filter_by(track=track.id).all() coordinates = [(wp.lon, wp.lat) for wp in waypoints] feature = Feature(geometry=LineString(coordinates)) return feature # grabs only the tracks from the database and returns them as json object def getTracks(self): tracks = self.__dbSession.query(Track).all() track_list = [ { "id": track.id, "name": track.trackName, # "driver": { # "id": track.driver.id, # "name": track.driver.name # } if track.driver else None, # "vehicle": { # "id": track.vehicle.id, # "name": track.vehicle.name # } if track.vehicle else None, # "distance": track.distance, # "start_time": track.start.isoformat() if track.start else None, # "end_time": track.end.isoformat() if track.end else None, } for track in tracks # iterates all tracks and appends them to the list ] return track_list def getTracksInTime(self, start, end): tracks = self.__dbSession.query(Track).filter(Track.start.between(start, end)).all() track_list = [ { "id": track.id, "name": track.trackName, "driver": { "id": track.driver.id, "name": track.driver.name } if track.driver else None, "vehicle": { "id": track.vehicle.id, "name": track.vehicle.name } if track.vehicle else None, "distance": track.distance, "start_time": track.start.isoformat() if track.start else None, "end_time": track.end.isoformat() if track.end else None, } for track in tracks # iterates all tracks and appends them to the list ] return track_list