I have a FastAPI application. When the user enters a room id on frontend then a dedicated worker thread is started for each room that emits the time every second to the frontend. Apart from emit event inside thread all other events are working and being emitted to frontend
Backend FastAPI code:
import asyncio
import threading
import time
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import socketio
# Create Socket.IO server
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins="*", # Allow all origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create FastAPI app
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Combine FastAPI and Socket.IO
socket_app = socketio.ASGIApp(sio, app)
# Dictionary to store threads
thread_pool = {}
# Worker function
def worker(room_id: str):
"""
Worker function for threads. Emits the current time every second.
"""
print(f"Thread started for room: {room_id}")
# Create and set a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
try:
# Schedule the emit coroutine in the event loop
asyncio.run_coroutine_threadsafe(
sio.emit(
"time_status", {"roomId": room_id, "current_time": current_time}, room=room_id
),
loop,
)
print(f"Message emitted for room {room_id}: {current_time}")
except Exception as e:
print(f"Error emitting to room {room_id}: {e}")
time.sleep(1)
# Start worker thread for a room
def start_worker(room_id: str):
"""
Start a new thread worker for a given room ID.
"""
if room_id in thread_pool:
print(f"Thread already exists for room: {room_id}")
return
thread = threading.Thread(target=worker, args=(room_id,), daemon=True)
thread_pool[room_id] = thread
thread.start()
print(f"New thread started for room: {room_id}")
@sio.event
async def connect(sid, environ):
print(f"Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"Client disconnected: {sid}")
@sio.event
async def join_room(sid, data):
"""
Handle join room event from client.
"""
room_id = data.get("roomId")
print(f"Client {sid} joining room: {room_id}")
await sio.enter_room(sid, room_id) # Ensure the client joins the room
await sio.emit("join_success", {"roomId": room_id, "clientId": sid}, room=sid)
start_worker(room_id) # Start the worker thread for the room
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:socket_app",
host="0.0.0.0",
port=8004,
reload=True,
)
Frontend React code:
import React, { useEffect, useState } from "react";
import { Modal, Input, Button, notification } from "antd";
import { io } from "socket.io-client";
const App = () => {
const [isModalVisible, setIsModalVisible] = useState(true); // Modal visibility
const [roomId, setRoomId] = useState(""); // Room ID entered by the user
const [socket, setSocket] = useState(null); // Socket.IO instance
const [joinedRoom, setJoinedRoom] = useState(null); // Joined room details
const [messages, setMessages] = useState([]); // Real-time messages
useEffect(() => {
// Initialize Socket.IO connection
const newSocket = io("http://localhost:8004"); // Replace with your server address
setSocket(newSocket);
// Listen for socket connection events
newSocket.on("connect", () => {
console.log("Connected to Socket.IO server");
notification.success({
message: "Connected",
description: "Successfully connected to the server.",
});
});
newSocket.on("disconnect", () => {
console.log("Disconnected from Socket.IO server");
notification.warning({
message: "Disconnected",
description: "Disconnected from the server.",
});
});
// Handle join success event
newSocket.on("join_success", (data) => {
console.log("Successfully joined room:", data);
notification.success({
message: "Room Joined",
description: `You have joined room: ${data.roomId}`,
});
setJoinedRoom(data); // Store room and client ID
});
// Handle media_status event
newSocket.on("time_status", (data) => {
console.log("Time Status Received:", data);
setMessages((prev) => [
...prev,
{ ...data, timestamp: new Date().toISOString() },
]); // Add message with timestamp
});
newSocket.on("error", (error) => {
console.error("Socket Error:", error);
notification.error({
message: "Error",
description: error.message || "An error occurred.",
});
});
// Cleanup on component unmount
return () => {
newSocket.disconnect();
};
}, []);
const joinRoom = () => {
if (!socket) {
notification.error({
message: "Socket Not Ready",
description: "Please wait for the socket to connect.",
});
return;
}
if (!roomId) {
notification.error({
message: "Error",
description: "Room ID cannot be empty.",
});
return;
}
console.log("Joining room with ID:", roomId);
socket.emit("join_room", { roomId });
notification.success({
message: "Joining Room",
description: `Attempting to join room with ID: ${roomId}`,
});
setIsModalVisible(false); // Close the modal
};
return (
<div>
{/* Modal for entering room ID */}
<Modal
title="Join a Room"
visible={isModalVisible}
footer={null} // No default footer
closable={false} // Prevent closing without interaction
>
<Input
placeholder="Enter Room ID"
value={roomId}
onChange={(e) => setRoomId(e.target.value)}
style={{ marginBottom: 16 }}
/>
<Button type="primary" onClick={joinRoom}>
Join Room
</Button>
</Modal>
{/* Display joined room details */}
{joinedRoom && (
<div>
<h2>Joined Room: {joinedRoom.roomId}</h2>
<h3>Client ID: {joinedRoom.clientId}</h3>
</div>
)}
{/* Display real-time messages */}
<div>
<h3>Messages:</h3>
<ul>
{messages.map((msg, index) => (
<li key={index}>
<strong>{msg.timestamp}</strong> - Time: {msg.current_time}
</li>
))}
</ul>
</div>
</div>
);
};
export default App;
When I start the application, I am prompted to enter a room Id. A room will be created for that id. Then a thread worker is started for this which emits the current time to all clients that join this room. I get no errors, dedicated threads are started and I get the printed messages in terminal.
Message emitted for room 34fdss: 2025-01-11 20:36:50
Message emitted for room 34fdss: 2025-01-11 20:36:51
Message emitted for room 34fdss: 2025-01-11 20:36:52
Message emitted for room 34fdss: 2025-01-11 20:36:53
Message emitted for room 34fdss: 2025-01-11 20:36:54
Message emitted for room 34fdss: 2025-01-11 20:36:55
Message emitted for room 34fdss: 2025-01-11 20:36:56
But the messages emitted from the thread workers are not being received on the frontend. I received the connected, room joined events on frontend and not the time_status event.