GeoTracking/modules/gpxInterpreter.py

189 lines
6.7 KiB
Python

import datetime
import gpxpy
import gpxpy.gpx
from sqlalchemy.orm import Session
from geojson import Feature, LineString, FeatureCollection
from geopy.distance import geodesic
from errors.NotFoundException import NotFoundError
from modules.geoObjects import Driver, Track, Vehicle, Waypoint
class GPXHandler:
__dbSession: Session
def __init__(self, session:Session):
self.__dbSession = session
pass
# handles converting a gpx file into usable data
def parse(self, file, driver:Driver, vehicle:Vehicle):
print(f"filename: {file}")
self.__gpx = gpxpy.parse(open(f"./uploads/{file}"))
if not driver:
raise ValueError("no driver found")
if not vehicle:
raise ValueError("no vehicle found")
if not file:
raise ValueError("no file found")
for track in self.__gpx.tracks:
# sets track name
# if no name is found at a track default to using date
trackName = track.name or f"Track-{datetime.now().isoformat()}" # todo using time.now might end up being misleading and to be reworked
# initializes track values
self.startTime = None
self.endTime = None
self.trackDistance = 0
self.waypoints = []
# grab all waypoints from a track
for segment in track.segments:
for point in segment.points:
self.waypoints.append(point)
if self.startTime is None or point.time < self.startTime:
self.startTime = point.time
if self.endTime is None or point.time > self.endTime:
self.endTime = point.time
# calculate distance between 2 waypoints
for i in range(1, len(self.waypoints)):
self.trackDistance += self.waypoints[i - 1].distance_3d(self.waypoints[i])
# push values to the database
track = Track(
trackName=trackName,
vehicle=vehicle,
driver=driver,
date=self.startTime.date() if self.startTime else None,
distance=self.trackDistance,
speed=0,
)
self.__dbSession.add(track)
self.__dbSession.commit()
for point in self.waypoints:
waypoint = Waypoint(
lat=point.latitude,
lon=point.longitude,
ele=point.elevation,
speed=None,
time=point.time,
track=track
)
self.__dbSession.add(waypoint)
self.__dbSession.commit()
# handles a route from db and converting it into geoJSON
def getTrack(self, trackID):
track = self.__dbSession.query(Track).filter_by(id=trackID).first()
if not track:
raise NotFoundError(f"track with id {trackID} not found", errors=[])
# fetches waypoints for a given track and converts them into geoJSON
waypoints = track.waypoints
coordinates = [(wp.lon, wp.lat) for wp in waypoints]
feature = Feature(geometry=LineString(coordinates))
return feature
# handles getting all infos of a track from the database
def getTrackMeta(self, trackID):
track = self.__dbSession.query(Track).filter_by(id=trackID).first()
if not track:
raise NotFoundError(f"track with id {trackID} not found", errors=[])
trackObject = {
"id": track.id,
"name": track.trackName,
"driver": {
"id": track.driver.id,
"name": track.driver.name
} if track.driver else None,
"vehicle": {
"id": track.vehicle.id,
"name": track.vehicle.name,
"licenseplate": track.vehicle.licenseplate
} if track.vehicle else None,
"distance": track.distance,
"time": track.date,
}
return trackObject
# grabs only the tracks from the database and returns them as json object
def getTracks(self):
tracks = self.__dbSession.query(Track).all()
track_list = [
{
"id": track.id,
"name": track.trackName,
"time": track.date,
}
for track in tracks # iterates all tracks and appends them to the list
]
return track_list
def getTracksInTimeWithGeoData(self, start, end):
# Alle Tracks in der Zeitspanne abfragen
tracks = self.__dbSession.query(Track).filter(Track.date.between(start, end)).all()
# Eine Liste von GeoJSON-Features für alle Tracks
features = []
# Für jedes Track-Objekt die Waypoints abfragen und die GeoJSON-Daten generieren
for track in tracks:
# Waypoints für das Track laden
waypoints = track.waypoints # track.waypoints ist bereits korrekt verknüpft
# Waypoints in GeoJSON-kompatible Koordinaten umwandeln
coordinates = [(wp.lon, wp.lat) for wp in waypoints]
# LineString Feature für das Track erstellen
feature = Feature(geometry=LineString(coordinates))
features.append(feature)
# Ein FeatureCollection erstellen, das alle Track-Features enthält
feature_collection = FeatureCollection(features)
# GeoJSON zurückgeben, das von Leaflet verarbeitet werden kann
return feature_collection
def getTracksInTime(self, start, end):
tracks = self.__dbSession.query(Track).filter(Track.date.between(start, end)).all()
track_list = [
{
"id": track.id,
"name": track.trackName,
"driver": {
"id": track.driver.id,
"name": track.driver.name
} if track.driver else None,
"vehicle": {
"id": track.vehicle.id,
"name": track.vehicle.name
} if track.vehicle else None,
"distance": track.distance,
"time": track.date,
}
for track in tracks # iterates all tracks and appends them to the list
]
return track_list
def deleteTrack(self, trackid):
self.__dbSession.query(Waypoint).filter(Waypoint.track_id == trackid).delete()
self.__dbSession.query(Track).filter(Track.id == trackid).delete()
self.__dbSession.commit()