162 lines
5.5 KiB
Python
162 lines
5.5 KiB
Python
import datetime
|
|
import gpxpy
|
|
import gpxpy.gpx
|
|
from sqlalchemy.orm import Session
|
|
from geojson import Feature, LineString, FeatureCollection
|
|
from geopy.distance import geodesic
|
|
|
|
from errors.NotFoundException import NotFoundError
|
|
from modules.geoObjects import Driver, Track, Vehicle, Waypoint
|
|
|
|
class GPXHandler:
|
|
__dbSession: Session
|
|
|
|
def __init__(self, session:Session):
|
|
self.__dbSession = session
|
|
pass
|
|
|
|
# handles converting a gpx file into usable data
|
|
def parse(self, file, driver:Driver, vehicle:Vehicle):
|
|
print(f"filename: {file}")
|
|
self.__gpx = gpxpy.parse(open(f"./uploads/{file}"))
|
|
|
|
if not driver:
|
|
raise ValueError("no driver found")
|
|
|
|
if not vehicle:
|
|
raise ValueError("no vehicle found")
|
|
|
|
if not file:
|
|
raise ValueError("no file found")
|
|
|
|
for track in self.__gpx.tracks:
|
|
# sets track name
|
|
# if no name is found at a track default to using date
|
|
trackName = track.name or f"Track-{datetime.now().isoformat()}"
|
|
|
|
# initializes track values
|
|
self.startTime = None
|
|
self.endTime = None
|
|
self.trackDistance = 0
|
|
self.waypoints = []
|
|
|
|
# grab all waypoints from a track
|
|
for segment in track.segments:
|
|
for point in segment.points:
|
|
self.waypoints.append(point)
|
|
if self.startTime is None or point.time < self.startTime:
|
|
self.startTime = point.time
|
|
if self.endTime is None or point.time > self.endTime:
|
|
self.endTime = point.time
|
|
|
|
# calculate distance between 2 waypoints
|
|
for i in range(1, len(self.waypoints)):
|
|
self.trackDistance += self.waypoints[i - 1].distance_3d(self.waypoints[i])
|
|
|
|
# push values to the database
|
|
track = Track(
|
|
trackName=trackName,
|
|
vehicle=vehicle,
|
|
driver=driver,
|
|
date=self.startTime.date() if self.startTime else None,
|
|
distance=self.trackDistance,
|
|
speed=0,
|
|
)
|
|
|
|
self.__dbSession.add(track)
|
|
self.__dbSession.commit()
|
|
|
|
for point in self.waypoints:
|
|
waypoint = Waypoint(
|
|
lat=point.latitude,
|
|
lon=point.longitude,
|
|
ele=point.elevation,
|
|
speed=None,
|
|
time=point.time,
|
|
track=track
|
|
)
|
|
self.__dbSession.add(waypoint)
|
|
|
|
self.__dbSession.commit()
|
|
|
|
# handles a route from db and converting it into geoJSON
|
|
def getTrack(self, trackID):
|
|
track = self.__dbSession.query(Track).filter_by(id=trackID).first()
|
|
if not track:
|
|
raise NotFoundError(f"track with id {trackID} not found", errors=[])
|
|
|
|
# fetches waypoints for a given track and converts them into geoJSON
|
|
waypoints = track.waypoints
|
|
|
|
coordinates = [(wp.lon, wp.lat) for wp in waypoints]
|
|
feature = Feature(geometry=LineString(coordinates))
|
|
return feature
|
|
|
|
# handles getting all infos of a track from the database
|
|
def getTrackMeta(self, trackID):
|
|
track = self.__dbSession.query(Track).filter_by(id=trackID).first()
|
|
if not track:
|
|
raise NotFoundError(f"track with id {trackID} not found", errors=[])
|
|
|
|
trackObject = {
|
|
"id": track.id,
|
|
"name": track.trackName,
|
|
"driver": {
|
|
"id": track.driver.id,
|
|
"name": track.driver.name
|
|
} if track.driver else None,
|
|
"vehicle": {
|
|
"id": track.vehicle.id,
|
|
"name": track.vehicle.name,
|
|
"licenseplate": track.vehicle.licenseplate
|
|
} if track.vehicle else None,
|
|
"distance": track.distance,
|
|
"time": track.date,
|
|
}
|
|
|
|
return trackObject
|
|
|
|
|
|
# grabs only the tracks from the database and returns them as json object
|
|
def getTracks(self):
|
|
tracks = self.__dbSession.query(Track).all()
|
|
|
|
track_list = [
|
|
{
|
|
"id": track.id,
|
|
"name": track.trackName,
|
|
"time": track.date,
|
|
}
|
|
for track in tracks # iterates all tracks and appends them to the list
|
|
]
|
|
|
|
return track_list
|
|
|
|
def getTracksInTime(self, start, end):
|
|
tracks = self.__dbSession.query(Track).filter(Track.date.between(start, end)).all()
|
|
|
|
track_list = [
|
|
{
|
|
"id": track.id,
|
|
"name": track.trackName,
|
|
"driver": {
|
|
"id": track.driver.id,
|
|
"name": track.driver.name
|
|
} if track.driver else None,
|
|
"vehicle": {
|
|
"id": track.vehicle.id,
|
|
"name": track.vehicle.name
|
|
} if track.vehicle else None,
|
|
"distance": track.distance,
|
|
"time": track.date,
|
|
}
|
|
for track in tracks # iterates all tracks and appends them to the list
|
|
]
|
|
|
|
return track_list
|
|
|
|
def deleteTrack(self, trackid):
|
|
self.__dbSession.query(Waypoint).filter(Waypoint.track_id == trackid).delete()
|
|
self.__dbSession.query(Track).filter(Track.id == trackid).delete()
|
|
self.__dbSession.commit()
|
|
|