import datetime import gpxpy import gpxpy.gpx from sqlalchemy.orm import Session from geojson import Feature, LineString from geopy.distance import geodesic from errors.NotFoundException import NotFoundError from modules.geoObjects import Driver, Track, Vehicle, Waypoint class GPXHandler: __dbSession: Session def __init__(self, session:Session): self.__dbSession = session pass # handles converting a gpx file into usable data def parse(self, file, driver:Driver, vehicle:Vehicle): print(f"filename: {file}") self.__gpx = gpxpy.parse(open(f"./uploads/{file}")) if not driver: raise ValueError("no driver found") if not vehicle: raise ValueError("no vehicle found") if not file: raise ValueError("no file found") for track in self.__gpx.tracks: # sets track name # if no name is found at a track default to using date trackName = track.name or f"Track-{datetime.now().isoformat()}" # todo using time.now might end up being misleading and to be reworked # initializes track values self.startTime = None self.endTime = None self.trackDistance = 0 self.waypoints = [] # grab all waypoints from a track for segment in track.segments: for point in segment.points: self.waypoints.append(point) if self.startTime is None or point.time < self.startTime: self.startTime = point.time if self.endTime is None or point.time > self.endTime: self.endTime = point.time # calculate distance between 2 waypoints for i in range(1, len(self.waypoints)): self.trackDistance += self.waypoints[i - 1].distance_3d(self.waypoints[i]) # push values to the database track = Track( trackName=trackName, vehicle=vehicle, driver=driver, date=self.startTime.date() if self.startTime else None, distance=self.trackDistance, speed=0, ) self.__dbSession.add(track) self.__dbSession.commit() for point in self.waypoints: waypoint = Waypoint( lat=point.latitude, lon=point.longitude, ele=point.elevation, speed=None, time=point.time, track=track ) self.__dbSession.add(waypoint) self.__dbSession.commit() # handles a route from db and converting it into geoJSON def getTrack(self, trackID): track = self.__dbSession.query(Track).filter_by(id=trackID).first() if not track: raise NotFoundError(f"track with id {trackID} not found", errors=[]) # fetches waypoints for a given track and converts them into geoJSON waypoints = track.waypoints coordinates = [(wp.lon, wp.lat) for wp in waypoints] feature = Feature(geometry=LineString(coordinates)) return feature # grabs only the tracks from the database and returns them as json object def getTracks(self): tracks = self.__dbSession.query(Track).all() track_list = [ { "id": track.id, "name": track.trackName, # "driver": { # "id": track.driver.id, # "name": track.driver.name # } if track.driver else None, # "vehicle": { # "id": track.vehicle.id, # "name": track.vehicle.name # } if track.vehicle else None, # "distance": track.distance, # "startTime": track.start.isoformat() if track.start else None, # "endTime": track.end.isoformat() if track.end else None, } for track in tracks # iterates all tracks and appends them to the list ] return track_list def getTracksInTime(self, start, end): tracks = self.__dbSession.query(Track).filter(Track.start.between(start, end)).all() track_list = [ { "id": track.id, "name": track.trackName, "driver": { "id": track.driver.id, "name": track.driver.name } if track.driver else None, "vehicle": { "id": track.vehicle.id, "name": track.vehicle.name } if track.vehicle else None, "distance": track.distance, "startTime": track.start.isoformat() if track.start else None, "end_time": track.end.isoformat() if track.end else None, } for track in tracks # iterates all tracks and appends them to the list ] return track_list