php - Parallel Processing with RabbitMQ in CodeIgniter for Multiple File Uploads - Stack Overflow

admin2025-04-17  2

I have implemented RabbitMQ in my CodeIgniter project to handle a file upload system. Users can upload multiple files at once. After uploading the files, the IDs of the uploaded files are added to a RabbitMQ queue, and then RabbitMQ processes these files in the background. In the background process, various tasks like OCR scanning are performed.

To handle the background processing, I created a worker via the terminal with the following:

php index.php JobProcessing/process_progress_rabbitmq

This works fine when a single user is uploading files. However, when there are multiple users (e.g., 50 users at once), each user has to wait for the previous user's tasks to complete. I want to implement parallel processing so that users don't have to wait for their turn.

One solution I considered is creating multiple workers, but I'm concerned about the limit on the number of workers that can be created. If there are, say, 500 users uploading at the same time, how many workers can I safely create? Is there a recommended approach for handling this scenario, especially considering system resources and RabbitMQ limits?

Any insights or recommendations would be greatly appreciated!

below is my code:

<?php

defined('BASEPATH') or exit('No direct script access allowed');
require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Spatie\Async\Pool;

class Rabbit_mq
{
    protected $ci, $connection, $channel, $pool;

    public function __construct()
    {
        try {
            $this->ci = &get_instance();
            $this->ci->load->model('invoice_scan_model');
            $this->ci->load->library('scan_invoice_lib');
            $this->ci->load->library('quick_books');
            $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
            $this->channel = $this->connection->channel();
        } catch (Exception $e) {
            echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n";
        }
    }

    function addToQueue($data)
    {
        $this->channel->queue_declare('file_processing_new', false, true, false, false);

        $msg = new AMQPMessage(json_encode($data));
        $this->channel->basic_publish($msg, '', 'file_processing_new');
    }

    public function processQueue()
    {
        $this->channel->queue_declare('file_processing_new', false, true, false, false);

        // Set prefetch count to allow multiple messages to be handled concurrently
        $this->channel->basic_qos(null, 15, null);

        $callback = function ($msg) {
            $data = json_decode($msg->body, true);
            $fileId = $data['file_id'];
            $filePath = $data['file_path'];

            try {
                echo "Received message for file ID: $fileId\n";

                // Initialize the async pool
                $this->pool = Pool::create();

                // Add task to the pool for parallel processing
                $this->pool->add(function () use ($fileId, $filePath) {
                    try {
                        echo "Processing file ID: $fileId\n";
                        $data = array(
                            'progress_status' => 'Processing',
                            'progress_percentage' => 50,
                        );
                        $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
                        echo "File ID: $fileId marked as Processing.\n";

                        // Simulate file processing (OCR or other logic here)
                        $this->process_invoice($fileId);

                        // Update status to 'Completed' after processing
                        $data = array(
                            'progress_status' => 'Completed',
                            'progress_percentage' => 100,
                        );
                        $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
                        echo "File ID: $fileId processing completed.\n";
                    } catch (Exception $e) {
                        echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
                    }
                });

                // Wait for all tasks to finish
                $this->pool->wait(); // This will wait until all tasks are done

                // Acknowledge the message after all tasks are processed
                $this->channel->basic_ack($msg->delivery_info['delivery_tag']);
                echo "Acknowledging message for file ID: $fileId\n";
            } catch (Exception $e) {
                echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
                $this->channel->basic_nack($msg->delivery_info['delivery_tag']);
            }
        };

        // Multiple consumers (workers) consuming the messages
        for ($i = 0; $i < 25; $i++) {
            $this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback);
        }

        echo "Waiting for messages. To exit press CTRL+C\n";

        // Consume messages concurrently by multiple workers
        while ($this->channel->callbacks) {
            $this->channel->wait();
        }

        // Close the channel and connection when done
        $this->channel->close();
        $this->connection->close();
    }
}

I have implemented RabbitMQ in my CodeIgniter project to handle a file upload system. Users can upload multiple files at once. After uploading the files, the IDs of the uploaded files are added to a RabbitMQ queue, and then RabbitMQ processes these files in the background. In the background process, various tasks like OCR scanning are performed.

To handle the background processing, I created a worker via the terminal with the following:

php index.php JobProcessing/process_progress_rabbitmq

This works fine when a single user is uploading files. However, when there are multiple users (e.g., 50 users at once), each user has to wait for the previous user's tasks to complete. I want to implement parallel processing so that users don't have to wait for their turn.

One solution I considered is creating multiple workers, but I'm concerned about the limit on the number of workers that can be created. If there are, say, 500 users uploading at the same time, how many workers can I safely create? Is there a recommended approach for handling this scenario, especially considering system resources and RabbitMQ limits?

Any insights or recommendations would be greatly appreciated!

below is my code:

<?php

defined('BASEPATH') or exit('No direct script access allowed');
require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Spatie\Async\Pool;

class Rabbit_mq
{
    protected $ci, $connection, $channel, $pool;

    public function __construct()
    {
        try {
            $this->ci = &get_instance();
            $this->ci->load->model('invoice_scan_model');
            $this->ci->load->library('scan_invoice_lib');
            $this->ci->load->library('quick_books');
            $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
            $this->channel = $this->connection->channel();
        } catch (Exception $e) {
            echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n";
        }
    }

    function addToQueue($data)
    {
        $this->channel->queue_declare('file_processing_new', false, true, false, false);

        $msg = new AMQPMessage(json_encode($data));
        $this->channel->basic_publish($msg, '', 'file_processing_new');
    }

    public function processQueue()
    {
        $this->channel->queue_declare('file_processing_new', false, true, false, false);

        // Set prefetch count to allow multiple messages to be handled concurrently
        $this->channel->basic_qos(null, 15, null);

        $callback = function ($msg) {
            $data = json_decode($msg->body, true);
            $fileId = $data['file_id'];
            $filePath = $data['file_path'];

            try {
                echo "Received message for file ID: $fileId\n";

                // Initialize the async pool
                $this->pool = Pool::create();

                // Add task to the pool for parallel processing
                $this->pool->add(function () use ($fileId, $filePath) {
                    try {
                        echo "Processing file ID: $fileId\n";
                        $data = array(
                            'progress_status' => 'Processing',
                            'progress_percentage' => 50,
                        );
                        $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
                        echo "File ID: $fileId marked as Processing.\n";

                        // Simulate file processing (OCR or other logic here)
                        $this->process_invoice($fileId);

                        // Update status to 'Completed' after processing
                        $data = array(
                            'progress_status' => 'Completed',
                            'progress_percentage' => 100,
                        );
                        $this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
                        echo "File ID: $fileId processing completed.\n";
                    } catch (Exception $e) {
                        echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
                    }
                });

                // Wait for all tasks to finish
                $this->pool->wait(); // This will wait until all tasks are done

                // Acknowledge the message after all tasks are processed
                $this->channel->basic_ack($msg->delivery_info['delivery_tag']);
                echo "Acknowledging message for file ID: $fileId\n";
            } catch (Exception $e) {
                echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
                $this->channel->basic_nack($msg->delivery_info['delivery_tag']);
            }
        };

        // Multiple consumers (workers) consuming the messages
        for ($i = 0; $i < 25; $i++) {
            $this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback);
        }

        echo "Waiting for messages. To exit press CTRL+C\n";

        // Consume messages concurrently by multiple workers
        while ($this->channel->callbacks) {
            $this->channel->wait();
        }

        // Close the channel and connection when done
        $this->channel->close();
        $this->connection->close();
    }
}
Share Improve this question asked Jan 31 at 11:49 vinodvinod 216 bronze badges 3
  • 1 There's no strict limit on how many workers you can run, but you should align concurrency with the CPU, memory, and I/O capacity of your environment. You can start with a modest number of workers, monitor system performance, and gradually increase if you have enough headroom. Prefetch settings in RabbitMQ and container orchestration strategies can help you manage and scale parallel processing more effectively. – Kamyar Safari Commented Jan 31 at 13:50
  • ok @KamyarSafari, but my problem is same. If I create 20 workers and there are 500 users at the same time. Then the last 500th user have to wait. This many take so much time. – vinod Commented Jan 31 at 15:09
  • 1 Even with more workers, there's a hardware limit on how many tasks can run at once. If 20 workers can’t handle 500 users, you might need to spread the load across multiple servers or containers and scale up resources. Monitor performance closely to keep queues moving and reduce wait times. I don't think there is any other way. – Kamyar Safari Commented Jan 31 at 15:45
Add a comment  | 

1 Answer 1

Reset to default 1

First of all you have to find out, how many workers you can safely create. That is a load test. You do that on a system that is identical so that you don't kill your production box.

Let's say you find out that one system can handle 50 users at once.

Then you formulate the requirement how many users at once must always be served. Let's say 500.

Then you fire up 10 worker instances of that type and you keep them always running per your requirement (10 x 50 = 500).

While doing so, for the number of users that you actually have currently, let's say it is 250 right now, you spawn an additional five workers so that you always have the ability to treat 500 users on top at once (15 x 50 = 750).

And then, if you have 100 users then right now, you can reduce by 3 machines (12 x 50 = 600).

And so on, and so forth.

Always keep enough capacity on spare ahead, then constantly add and destroy machines so you get accustomed to it and you will always have enough working power available for your users.

This calculation is not entirely correct, but is a good training ground you can run the first weeks with and gather the necessary metrics you need to have to plan ahead.

转载请注明原文地址:http://anycun.com/QandA/1744864049a88693.html