feat: implemented api

This commit is contained in:
2025-01-05 20:34:15 +01:00
parent c4de1d11ec
commit 695eaf576e
5 changed files with 189 additions and 25 deletions

View File

@@ -1,6 +1,12 @@
import datetime
import gpxpy
import gpxpy.gpx
from sqlalchemy.orm import Session
from geojson import Feature, LineString
from geopy.distance import geodesic
from errors.NotFoundException import NotFoundError
from geoObjects import Track, Waypoint
class GPXHandler:
__dbSession: Session
@@ -10,14 +16,128 @@ class GPXHandler:
pass
# handles converting a gpx file into usable data
def parse(self, file):
self.gpx = gpxpy.parse(file)
pass
def parse(self, file, driver, vehicle):
self.__gpx = gpxpy.parse(file)
if not driver:
raise ValueError("no driver found")
if not vehicle:
raise ValueError("no vehicle found")
if not file:
raise ValueError("no file found")
for track in self.__gpx.tracks:
# sets track name
# if no name is found at a track default to using date
trackName = track.name or f"Track-{datetime.now().isoformat()}" # todo using time.now might end up being misleading and to be reworked
# initializes track values
self.__startTime,
self.__endTime,
self.__trackDistance,
self.__waypoints = None, None, 0, []
# grab all waypoints from a track
for segment in track.segments:
for point in segment.points:
self.__waypoints.append(point)
if start_time is None or point.time < start_time:
start_time = point.time
if end_time is None or point.time > end_time:
end_time = point.time
# calculate distance between 2 waypoints
for i in range(1, len(self.__waypoints)):
total_distance += self.__waypoints[i - 1].distance_3d(self.__waypoints[i])
# push values to the database
track = Track(
trackName=trackName,
vehicle=vehicle.id,
driver=driver.id,
date=start_time.date() if start_time else None,
distance=total_distance,
speed=0,
start=start_time,
end=end_time
)
self.__dbSession.add(track)
self.__dbSession.commit()
for point in self.__waypoints:
waypoint = Waypoint(
lat=point.latitude,
lon=point.longitude,
ele=point.elevation,
speed=None,
time=point.time,
track=track.id
)
self.__dbSession.add(waypoint)
self.__dbSession.commit()
# handles a route from db and converting it into geoJSON
def getRoute(self, route):
pass
def getTrack(self, trackID):
track = self.__dbSession.query(Track).filter_by(id=trackID).first()
if not track:
raise NotFoundError(f"track with id {trackID} not found")
# fetches waypoints for a given track and converts them into geoJSON
waypoints = self.__dbSession.query(Waypoint).filter_by(track=track.id).all()
coordinates = [(wp.lon, wp.lat) for wp in waypoints]
feature = Feature(geometry=LineString(coordinates))
return feature
# handles calculating the distance between two points
def calcDist(self, p1, p2):
pass
# grabs only the tracks from the database and returns them as json object
def getTracks(self):
tracks = self.__dbSession.query(Track).all()
track_list = [
{
"id": track.id,
"name": track.trackName,
"driver": {
"id": track.driver.id,
"name": track.driver.name
} if track.driver else None,
"vehicle": {
"id": track.vehicle.id,
"name": track.vehicle.name
} if track.vehicle else None,
"distance": track.distance,
"start_time": track.start.isoformat() if track.start else None,
"end_time": track.end.isoformat() if track.end else None,
}
for track in tracks # iterates all tracks and appends them to the list
]
return track_list
def getTracksInTime(self, start, end):
tracks = self.__dbSession.query(Track).filter(Track.start.between(start, end)).all()
track_list = [
{
"id": track.id,
"name": track.trackName,
"driver": {
"id": track.driver.id,
"name": track.driver.name
} if track.driver else None,
"vehicle": {
"id": track.vehicle.id,
"name": track.vehicle.name
} if track.vehicle else None,
"distance": track.distance,
"start_time": track.start.isoformat() if track.start else None,
"end_time": track.end.isoformat() if track.end else None,
}
for track in tracks # iterates all tracks and appends them to the list
]
return track_list