reactjs - How can I implement multithreading in socketio and FastAPI? - Stack Overflow

admin2025-04-27  3

I have a FastAPI application. When the user enters a room id on frontend then a dedicated worker thread is started for each room that emits the time every second to the frontend. Apart from emit event inside thread all other events are working and being emitted to frontend

Backend FastAPI code:

import asyncio
import threading
import time
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import socketio

# Create Socket.IO server
sio = socketio.AsyncServer(
    async_mode="asgi",
    cors_allowed_origins="*",  # Allow all origins
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Create FastAPI app
app = FastAPI()

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allow all origins
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Combine FastAPI and Socket.IO
socket_app = socketio.ASGIApp(sio, app)

# Dictionary to store threads
thread_pool = {}

# Worker function
def worker(room_id: str):
    """
    Worker function for threads. Emits the current time every second.
    """
    print(f"Thread started for room: {room_id}")

    # Create and set a new event loop for this thread
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    while True:
        current_time = time.strftime("%Y-%m-%d %H:%M:%S")
        try:
            # Schedule the emit coroutine in the event loop
            asyncio.run_coroutine_threadsafe(
                sio.emit(
                    "time_status", {"roomId": room_id, "current_time": current_time}, room=room_id
                ),
                loop,
            )
            print(f"Message emitted for room {room_id}: {current_time}")
        except Exception as e:
            print(f"Error emitting to room {room_id}: {e}")
        time.sleep(1)


# Start worker thread for a room
def start_worker(room_id: str):
    """
    Start a new thread worker for a given room ID.
    """
    if room_id in thread_pool:
        print(f"Thread already exists for room: {room_id}")
        return

    thread = threading.Thread(target=worker, args=(room_id,), daemon=True)
    thread_pool[room_id] = thread
    thread.start()
    print(f"New thread started for room: {room_id}")


@sio.event
async def connect(sid, environ):
    print(f"Client connected: {sid}")


@sio.event
async def disconnect(sid):
    print(f"Client disconnected: {sid}")


@sio.event
async def join_room(sid, data):
    """
    Handle join room event from client.
    """
    room_id = data.get("roomId")
    print(f"Client {sid} joining room: {room_id}")
    await sio.enter_room(sid, room_id)  # Ensure the client joins the room
    await sio.emit("join_success", {"roomId": room_id, "clientId": sid}, room=sid)
    start_worker(room_id)  # Start the worker thread for the room


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        "main:socket_app",
        host="0.0.0.0",
        port=8004,
        reload=True,
    )

Frontend React code:

import React, { useEffect, useState } from "react";
import { Modal, Input, Button, notification } from "antd";
import { io } from "socket.io-client";

const App = () => {
  const [isModalVisible, setIsModalVisible] = useState(true); // Modal visibility
  const [roomId, setRoomId] = useState(""); // Room ID entered by the user
  const [socket, setSocket] = useState(null); // Socket.IO instance
  const [joinedRoom, setJoinedRoom] = useState(null); // Joined room details
  const [messages, setMessages] = useState([]); // Real-time messages

  useEffect(() => {
    // Initialize Socket.IO connection
    const newSocket = io("http://localhost:8004"); // Replace with your server address
    setSocket(newSocket);

    // Listen for socket connection events
    newSocket.on("connect", () => {
      console.log("Connected to Socket.IO server");
      notification.success({
        message: "Connected",
        description: "Successfully connected to the server.",
      });
    });

    newSocket.on("disconnect", () => {
      console.log("Disconnected from Socket.IO server");
      notification.warning({
        message: "Disconnected",
        description: "Disconnected from the server.",
      });
    });

    // Handle join success event
    newSocket.on("join_success", (data) => {
      console.log("Successfully joined room:", data);
      notification.success({
        message: "Room Joined",
        description: `You have joined room: ${data.roomId}`,
      });
      setJoinedRoom(data); // Store room and client ID
    });

    // Handle media_status event
    newSocket.on("time_status", (data) => {
      console.log("Time Status Received:", data);
      setMessages((prev) => [
        ...prev,
        { ...data, timestamp: new Date().toISOString() },
      ]); // Add message with timestamp
    });

    newSocket.on("error", (error) => {
      console.error("Socket Error:", error);
      notification.error({
        message: "Error",
        description: error.message || "An error occurred.",
      });
    });

    // Cleanup on component unmount
    return () => {
      newSocket.disconnect();
    };
  }, []);

  const joinRoom = () => {
    if (!socket) {
      notification.error({
        message: "Socket Not Ready",
        description: "Please wait for the socket to connect.",
      });
      return;
    }

    if (!roomId) {
      notification.error({
        message: "Error",
        description: "Room ID cannot be empty.",
      });
      return;
    }

    console.log("Joining room with ID:", roomId);
    socket.emit("join_room", { roomId });

    notification.success({
      message: "Joining Room",
      description: `Attempting to join room with ID: ${roomId}`,
    });

    setIsModalVisible(false); // Close the modal
  };

  return (
    <div>
      {/* Modal for entering room ID */}
      <Modal
        title="Join a Room"
        visible={isModalVisible}
        footer={null} // No default footer
        closable={false} // Prevent closing without interaction
      >
        <Input
          placeholder="Enter Room ID"
          value={roomId}
          onChange={(e) => setRoomId(e.target.value)}
          style={{ marginBottom: 16 }}
        />
        <Button type="primary" onClick={joinRoom}>
          Join Room
        </Button>
      </Modal>

      {/* Display joined room details */}
      {joinedRoom && (
        <div>
          <h2>Joined Room: {joinedRoom.roomId}</h2>
          <h3>Client ID: {joinedRoom.clientId}</h3>
        </div>
      )}

      {/* Display real-time messages */}
      <div>
        <h3>Messages:</h3>
        <ul>
          {messages.map((msg, index) => (
            <li key={index}>
              <strong>{msg.timestamp}</strong> - Time: {msg.current_time}
            </li>
          ))}
        </ul>
      </div>
    </div>
  );
};

export default App;

When I start the application, I am prompted to enter a room Id. A room will be created for that id. Then a thread worker is started for this which emits the current time to all clients that join this room. I get no errors, dedicated threads are started and I get the printed messages in terminal.

Message emitted for room 34fdss: 2025-01-11 20:36:50
Message emitted for room 34fdss: 2025-01-11 20:36:51
Message emitted for room 34fdss: 2025-01-11 20:36:52
Message emitted for room 34fdss: 2025-01-11 20:36:53
Message emitted for room 34fdss: 2025-01-11 20:36:54
Message emitted for room 34fdss: 2025-01-11 20:36:55
Message emitted for room 34fdss: 2025-01-11 20:36:56

But the messages emitted from the thread workers are not being received on the frontend. I received the connected, room joined events on frontend and not the time_status event.

转载请注明原文地址:http://anycun.com/QandA/1745706067a91087.html