Skip to content

Commit b79aa24

Browse files
committed
major backend improvements and better error handling!
1 parent 7566c66 commit b79aa24

File tree

8 files changed

+215
-287
lines changed

8 files changed

+215
-287
lines changed

backend/src/sim.py

Lines changed: 27 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -5,118 +5,27 @@
55
import random
66
import datetime
77

8-
# Creating Fake Simulation Data To Then Pretend It's Live Telemetry Data
9-
10-
# Note: Each characteristic does not act in relation to each other as it would
11-
# in real life, but the indvidual characteristics themselves are reflective of reality(if that makes any sense)
12-
13-
# Cols 0-27: Acc Temp(Cel)
14-
# Cols 28-55: Acc Voltage(V)
15-
# Cols 56-57: Break Pressure Front & Rear(PSI)
16-
# Col 58: Current to Acc(A)
17-
# Cols 59-62: Hall Effect Sensors - FL,FR,RL,RR(Hz)
18-
# Col 63: Altitude(ft)
19-
# Col 64: Latitude(ft)
20-
# Col 65: Longitude(ft)
21-
# Col 66: Speed(mph)
22-
# Col 67-69: x,y, and z acceleration(m/s^2)
23-
# Col 70-72: x, y, and z gyro(deg)
24-
# Cols 73-76: Suspension Travel - FL,FR,RL,RR(V)
25-
# Cols 77-80: Suspension Force - FL,FR,RL,RR(Oh)
26-
# Col 81: Acc Air Intake Temp(C)
27-
# Col 82: Acc Air Exhaust Temp(C)
28-
# Col 83: Steering(Deg)
29-
# Col 84: Acc Air Intake Pressure(PSI)
30-
# Col 85: Acc Intake Air Flow Rate(m^3/sec)
31-
32-
# data_columns = {
33-
# "Acc Temp 1(Cel)": pa.float32(),
34-
# "Acc Temp 2(Cel)": pa.float32(),
35-
# "Acc Temp 3(Cel)": pa.float32(),
36-
# "Acc Temp 4(Cel)": pa.float32(),
37-
# "Acc Temp 5(Cel)": pa.float32(),
38-
# "Acc Temp 6(Cel)": pa.float32(),
39-
# "Acc Temp 7(Cel)": pa.float32(),
40-
# "Acc Temp 8(Cel)": pa.float32(),
41-
# "Acc Temp 9(Cel)": pa.float32(),
42-
# "Acc Temp 10(Cel)": pa.float32(),
43-
# "Acc Temp 11(Cel)": pa.float32(),
44-
# "Acc Temp 12(Cel)": pa.float32(),
45-
# "Acc Temp 13(Cel)": pa.float32(),
46-
# "Acc Temp 14(Cel)": pa.float32(),
47-
# "Acc Temp 15(Cel)": pa.float32(),
48-
# "Acc Temp 16(Cel)": pa.float32(),
49-
# "Acc Temp 17(Cel)": pa.float32(),
50-
# "Acc Temp 18(Cel)": pa.float32(),
51-
# "Acc Temp 19(Cel)": pa.float32(),
52-
# "Acc Temp 20(Cel)": pa.float32(),
53-
# "Acc Temp 21(Cel)": pa.float32(),
54-
# "Acc Temp 22(Cel)": pa.float32(),
55-
# "Acc Temp 23(Cel)": pa.float32(),
56-
# "Acc Temp 24(Cel)": pa.float32(),
57-
# "Acc Temp 25(Cel)": pa.float32(),
58-
# "Acc Temp 26(Cel)": pa.float32(),
59-
# "Acc Temp 27(Cel)": pa.float32(),
60-
# "Acc Temp 28(Cel)": pa.float32(),
61-
# "Acc Voltage 1(V)": pa.float32(),
62-
# "Acc Voltage 2(V)": pa.float32(),
63-
# "Acc Voltage 3(V)": pa.float32(),
64-
# "Acc Voltage 4(V)": pa.float32(),
65-
# "Acc Voltage 5(V)": pa.float32(),
66-
# "Acc Voltage 6(V)": pa.float32(),
67-
# "Acc Voltage 7(V)": pa.float32(),
68-
# "Acc Voltage 8(V)": pa.float32(),
69-
# "Acc Voltage 9(V)": pa.float32(),
70-
# "Acc Voltage 10(V)": pa.float32(),
71-
# "Acc Voltage 11(V)": pa.float32(),
72-
# "Acc Voltage 12(V)": pa.float32(),
73-
# "Acc Voltage 13(V)": pa.float32(),
74-
# "Acc Voltage 14(V)": pa.float32(),
75-
# "Acc Voltage 15(V)": pa.float32(),
76-
# "Acc Voltage 16(V)": pa.float32(),
77-
# "Acc Voltage 17(V)": pa.float32(),
78-
# "Acc Voltage 18(V)": pa.float32(),
79-
# "Acc Voltage 19(V)": pa.float32(),
80-
# "Acc Voltage 20(V)": pa.float32(),
81-
# "Acc Voltage 21(V)": pa.float32(),
82-
# "Acc Voltage 22(V)": pa.float32(),
83-
# "Acc Voltage 23(V)": pa.float32(),
84-
# "Acc Voltage 24(V)": pa.float32(),
85-
# "Acc Voltage 25(V)": pa.float32(),
86-
# "Acc Voltage 26(V)": pa.float32(),
87-
# "Acc Voltage 27(V)": pa.float32(),
88-
# "Acc Voltage 28(V)": pa.float32(),
89-
# "Brake Pressure Front(PSI)": pa.float32(),
90-
# "Brake Pressure Rear(PSI)": pa.float32(),
91-
# "Current to Acc(A)": pa.float32(),
92-
# "Hall Effect Sensor - FL(Hz)": pa.float32(),
93-
# "Hall Effect Sensor - FR(Hz)": pa.float32(),
94-
# "Hall Effect Sensor - RL(Hz)": pa.float32(),
95-
# "Hall Effect Sensor - RR(Hz)": pa.float32(),
96-
# "Altitude(ft)": pa.float32(),
97-
# "Latitude(ft)": pa.float32(),
98-
# "Longitude(ft)": pa.float32(),
99-
# "Speed(mph)": pa.float32(),
100-
# "x acceleration(m/s^2)": pa.float32(),
101-
# "y acceleration(m/s^2)": pa.float32(),
102-
# "z acceleration(m/s^2)": pa.float32(),
103-
# "x gyro(deg)": pa.float32(),
104-
# "y gyro(deg)": pa.float32(),
105-
# "z gyro(deg)": pa.float32(),
106-
# "Suspension Travel - FL(V)": pa.float32(),
107-
# "Suspension Travel - FR(V)": pa.float32(),
108-
# "Suspension Travel - RL(V)": pa.float32(),
109-
# "Suspension Travel - RR(V)": pa.float32(),
110-
# "Suspension Force - FL(Oh)": pa.float32(),
111-
# "Suspension Force - FR(Oh)": pa.float32(),
112-
# "Suspension Force - RL(Oh)": pa.float32(),
113-
# "Suspension Force - RR(Oh)": pa.float32(),
114-
# "Acc Air Intake Temp(C)": pa.float32(),
115-
# "Acc Air Exhaust Temp(C)": pa.float32(),
116-
# "Steering(Deg)": pa.float32(),
117-
# "Acc Air Intake Pressure(PSI)": pa.float32(),
118-
# "Acc Intake Air Flow Rate(m^3/sec)": pa.float32(),
119-
# }
8+
# List of some data to consider:
9+
# Acc Temp(Cel)
10+
# Acc Voltage(V)
11+
# Break Pressure Front & Rear(PSI)
12+
# Current to Acc(A)
13+
# Hall Effect Sensors - FL,FR,RL,RR(Hz)
14+
# Altitude(ft)
15+
# Latitude(ft)
16+
# Longitude(ft)
17+
# Speed(mph)
18+
# x,y, and z acceleration(m/s^2)
19+
# x, y, and z gyro(deg)
20+
# Suspension Travel - FL,FR,RL,RR(V)
21+
# Suspension Force - FL,FR,RL,RR(Oh)
22+
# Acc Air Intake Temp(C)
23+
# Acc Air Exhaust Temp(C)
24+
# Steering(Deg)
25+
# Acc Air Intake Pressure(PSI)
26+
# Acc Intake Air Flow Rate(m^3/sec)
27+
28+
# Real data schema:
12029
data_columns = {
12130
":Lap": pa.float32(), # 91
12231
":LapTime": pa.float32(), # 91
@@ -305,79 +214,13 @@
305214

306215

307216
def createdf():
308-
simLength: float = 5.0 # how many seconds to run the simulation for
309-
simStepsPerSec: int = 100 # how many simulation steps per second
310-
# rowcount = int(simLength * simStepsPerSec)
311-
rowcount = 1
312-
313-
column_names = list(data_columns.keys())
314-
total_cols = len(column_names)
315-
# table = [rowcount][total_cols]
316-
317-
# Initialize the data array
318-
data = np.zeros((rowcount, total_cols))
319-
320-
# Generate Acc Temp Data
321-
for col in range(0, 28):
322-
# horiz_shift = random.uniform(-0.5, 0.5)
323-
# time = np.arange(rowcount) / simStepsPerSec
324-
# data[:, col] = 1 + np.sin(time - horiz_shift) * 0.5
325-
t = (datetime.datetime.now() - t0).total_seconds()
326-
data[:, col] = math.sin(t)
327-
328-
# Generate Acc Voltage Data
329-
for col in range(28, 56):
330-
horiz_shift = random.uniform(-0.3, 0.3)
331-
time = np.arange(rowcount) / simStepsPerSec
332-
data[:, col] = 1 + np.sin(time - horiz_shift) * 0.5
333-
334-
# Brake Pressure
335-
data[: int(300), 56:58] = 0 # First 3 seconds are zero
336-
data[300:, 56] = (
337-
np.arange(300, rowcount) / simStepsPerSec
338-
) * 0.5 - 3 # Arbitrary equation for brake pressure
339-
340-
# Current to Acc
341-
total_voltage = 6000 # Arbitrary voltage
342-
resistance = 50 # Arbitrary resistance
343-
capacitance = 300 # Arbitrary capacitance
344-
data[:, 58] = (
345-
total_voltage
346-
/ resistance
347-
* np.exp(-np.arange(rowcount) / (resistance * capacitance))
348-
)
349-
350-
# RPM Wheel Data
351-
for col in range(59, 63):
352-
time = np.arange(rowcount) / simStepsPerSec
353-
data[:, col] = np.where(
354-
time < 4, 1000 * time, 4000
355-
) # Linear eq up to 4 seconds, then peak
356-
357-
# Altitude and Latitude: Assuming constant at 0
358-
data[:, 63] = 0 # Altitude
359-
data[:, 64] = 0 # Latitude
360-
361-
# Longitude: Arbitrary quadratic function
362-
data[:, 65] = 10.73 * (np.arange(rowcount) / simStepsPerSec) ** 2
363-
364-
# Speed: Same arbitrary acceleration
365-
data[:, 66] = 10.73 * (np.arange(rowcount) / simStepsPerSec)
366-
367-
# Assuming gyro is 0
368-
data[:, 70:73] = 0
369-
370-
# I do not have enough physics knowledge to do these
371-
data[:, 73:] = 0
372-
373-
data[:, 2] = (datetime.datetime.now() - t0).total_seconds()
217+
# Default item
218+
series_list = [pl.Series(k, [math.sin((datetime.datetime.now() - t0).total_seconds() + i/10)], pl.Float32) for i, k in enumerate(data_columns)]
219+
# :Time
220+
series_list[2] = pl.Series(":Time", [(datetime.datetime.now() - t0).total_seconds()], pl.Float32)
221+
df = pl.DataFrame(series_list)
374222

375-
# df = pl.DataFrame(data, schema=column_names).cast(pl.Float32)
376-
# timestamps = pl.Series(
377-
# "Timestamp(s)", [(datetime.datetime.now() - t0).total_seconds()]
378-
# ).cast(pl.Float32)
379-
# return df.with_columns(timestamps)
380-
return pl.DataFrame(data, schema=column_names).cast(pl.Float32)
223+
return df
381224

382225
# def get_schema():
383226
# # for now, we use nullable: True. In the future, the backend should

backend/src/websocket_server.py

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,88 @@
11
import asyncio
22
import io
3-
import signal
3+
from websockets import ConnectionClosed
44
import websockets.asyncio.server as ws
55
import polars as pl
66
import pyarrow as pa
77
import random
8+
89
import sim
910

11+
# List of WebSocket connections with their corresponding Arrow Stream (IPC)
12+
# Writers and BytesIO buffers
13+
connections: list[
14+
tuple[ws.ServerConnection, pa.RecordBatchStreamWriter, io.BytesIO]
15+
] = []
1016

11-
async def websocket_serve(websocket: ws.ServerConnection):
12-
buffer = io.BytesIO()
13-
with pa.ipc.new_stream(buffer, sim.get_schema()) as writer:
14-
while True:
15-
df = sim.createdf()
16-
# print(str(df))
17-
arrow_data = df.to_arrow()
1817

19-
assert arrow_data.schema.equals(sim.get_schema()), "get_schema() != df's schema!"
18+
# Todo: this will come from the radio module eventually!
19+
def get_arrow_data():
20+
current_df = sim.createdf()
21+
arrow_data = current_df.to_arrow()
22+
assert arrow_data.schema.equals(sim.get_schema()), "get_schema() != df's schema!"
23+
return arrow_data
24+
25+
26+
# TODO: use IDs and not indexes (race condition here I think)
27+
def clean_connection(idx: int):
28+
rb_writer: pa.RecordBatchStreamWriter
29+
buffer: io.BytesIO
30+
if connections[idx]:
31+
_, rb_writer, buffer = connections[idx]
32+
rb_writer.close()
33+
buffer.close()
34+
del connections[idx]
35+
2036

21-
writer.write_table(arrow_data)
37+
# The main loop that sends Arrow record batches to all connected clients
38+
async def ws_main_loop(freq_hertz: int):
39+
while True:
40+
arrow_data = get_arrow_data()
2241

23-
ipc_data = buffer.getvalue()
24-
buffer.seek(0)
25-
buffer.truncate()
42+
disconnected_idxs = []
43+
for idx, (ws_connection, rb_writer, buffer) in enumerate(connections):
44+
try:
45+
rb_writer.write_table(arrow_data)
2646

27-
await websocket.send(ipc_data)
28-
# await websocket.send(df.write_ndjson())
29-
await asyncio.sleep((1 + random.uniform(-0.5, 0.5)) / 100)
47+
ipc_data = buffer.getvalue()
48+
buffer.seek(0)
49+
buffer.truncate()
3050

51+
await ws_connection.send(ipc_data)
52+
except ConnectionClosed as e:
53+
print("ConnectionClosed:", e)
54+
disconnected_idxs.append(idx)
55+
56+
# Clean any disconnected websockets
57+
for idx in reversed(disconnected_idxs):
58+
clean_connection(idx)
59+
60+
await asyncio.sleep(
61+
1 / freq_hertz + random.uniform(-0.4 / freq_hertz, 0.4 / freq_hertz)
62+
)
63+
64+
65+
# Handler function for each incoming connection
66+
async def websocket_handler(websocket: ws.ServerConnection):
67+
buffer = io.BytesIO()
68+
rb_writer = pa.ipc.new_stream(buffer, sim.get_schema())
3169

32-
# Old code, which created a new Arrow IPC stream for each record batch
33-
# (which breaks decoding multiple batches with a single reader on the client)
34-
# while True:
35-
# df = sim.createdf()
36-
# # print(str(df))
37-
# arrow_data = df.to_arrow()
38-
# byte_stream = io.BytesIO()
39-
# with pa.ipc.new_stream(byte_stream, arrow_data.schema) as writer:
40-
# writer.write_table(arrow_data)
41-
#
42-
# ipc_data = byte_stream.getvalue()
43-
#
44-
# await websocket.send(ipc_data)
45-
# # await websocket.send(df.write_ndjson())
46-
# await asyncio.sleep((1 + random.uniform(-0.5, 0.5)) / 100)
70+
connections.append((websocket, rb_writer, buffer))
71+
print(f"Client connected! Current connections: {len(connections)}")
4772

73+
try:
74+
await websocket.wait_closed()
75+
finally:
76+
if (websocket, rb_writer, buffer) in connections:
77+
idx = connections.index((websocket, rb_writer, buffer))
78+
clean_connection(idx)
79+
print(f"Client disconnected. Remaining connections: {len(connections)}")
4880

4981

5082
async def ws_main(port: int):
51-
# Add signal handler for SIGTERM
52-
stop = asyncio.Event()
53-
loop = asyncio.get_running_loop()
54-
loop.add_signal_handler(signal.SIGTERM, lambda: stop.set())
83+
ws_server = await ws.serve(websocket_handler, "0.0.0.0", port)
5584

56-
async with ws.serve(websocket_serve, "0.0.0.0", port):
57-
await stop.wait() # Run until stop "event" has been triggered
85+
await asyncio.gather(ws_main_loop(100), ws_server.serve_forever())
5886

5987

6088
# asyncio.run(ws_main()) # handled by main.py now

frontend/app/components/ItemContainer.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ interface ItemContainerProps {
66

77
export default function ItemContainer({ children }: ItemContainerProps) {
88
return (
9-
<div className="bg-background-2 rounded-xl p-4">
9+
<div className="bg-background-2 rounded-xl p-3">
1010
{children}
1111
</div>
1212
)

0 commit comments

Comments
 (0)