2024-01-04 01:28:28 +01:00

73 lines
2.5 KiB
Python

import json
import socket
import threading
from Classes.System.Network.Handler.EventHandler import TCPEventHandler, UDPEventHandler
class NetworkManager:
class UDP:
def __init__(self, addr: str, port: str):
self.addr = addr
self.port = int(port)
self.udpSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.eventHandler = UDPEventHandler(self.udpSocket)
# start listener thread
self.listen()
def send(self, message: dict):
self.udpSocket.sendto(json.dumps(message).encode(), (self.addr, self.port))
# starts a listener thread for udp data
def listen(self):
udpThread = threading.Thread(target=self.receive)
udpThread.daemon = True
udpThread.start()
def receive(self):
while True:
try:
data, addr = self.udpSocket.recvfrom(1024)
if data:
decoded_data = json.loads(data.decode())
self.eventHandler.handleEvents(decoded_data)
except Exception as e:
print(f"Error receiving UDP data: {e}")
break
class TCP:
def __init__(self, addr: str, port: str):
self.addr = addr
self.port = int(port)
self.tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.eventHandler = TCPEventHandler(self.tcpSocket)
# start listener thread
self.listen()
def connect(self):
self.tcpSocket.connect((self.addr, self.port))
def send(self, message: dict):
self.tcpSocket.sendall(json.dumps(message).encode())
# starts a listener thread for tcp data
def listen(self):
tcp_thread = threading.Thread(target=self.receive)
tcp_thread.daemon = True
tcp_thread.start()
def receive(self):
while True:
try:
data = self.tcpSocket.recv(1024)
if data:
decoded_data = json.loads(data.decode())
self.eventHandler.handleEvents(decoded_data)
except Exception as e:
print(f"Error receiving TCP data: {e}")
break
def __init__(self, addr: str, port: str):
self.tcp = self.TCP(addr, port)
self.udp = self.UDP(addr, port)