I have implemented RabbitMQ in my CodeIgniter project to handle a file upload system. Users can upload multiple files at once. After uploading the files, the IDs of the uploaded files are added to a RabbitMQ queue, and then RabbitMQ processes these files in the background. In the background process, various tasks like OCR scanning are performed.
To handle the background processing, I created a worker via the terminal with the following:
php index.php JobProcessing/process_progress_rabbitmq
This works fine when a single user is uploading files. However, when there are multiple users (e.g., 50 users at once), each user has to wait for the previous user's tasks to complete. I want to implement parallel processing so that users don't have to wait for their turn.
One solution I considered is creating multiple workers, but I'm concerned about the limit on the number of workers that can be created. If there are, say, 500 users uploading at the same time, how many workers can I safely create? Is there a recommended approach for handling this scenario, especially considering system resources and RabbitMQ limits?
Any insights or recommendations would be greatly appreciated!
below is my code:
<?php
defined('BASEPATH') or exit('No direct script access allowed');
require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Spatie\Async\Pool;
class Rabbit_mq
{
protected $ci, $connection, $channel, $pool;
public function __construct()
{
try {
$this->ci = &get_instance();
$this->ci->load->model('invoice_scan_model');
$this->ci->load->library('scan_invoice_lib');
$this->ci->load->library('quick_books');
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
} catch (Exception $e) {
echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n";
}
}
function addToQueue($data)
{
$this->channel->queue_declare('file_processing_new', false, true, false, false);
$msg = new AMQPMessage(json_encode($data));
$this->channel->basic_publish($msg, '', 'file_processing_new');
}
public function processQueue()
{
$this->channel->queue_declare('file_processing_new', false, true, false, false);
// Set prefetch count to allow multiple messages to be handled concurrently
$this->channel->basic_qos(null, 15, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
$fileId = $data['file_id'];
$filePath = $data['file_path'];
try {
echo "Received message for file ID: $fileId\n";
// Initialize the async pool
$this->pool = Pool::create();
// Add task to the pool for parallel processing
$this->pool->add(function () use ($fileId, $filePath) {
try {
echo "Processing file ID: $fileId\n";
$data = array(
'progress_status' => 'Processing',
'progress_percentage' => 50,
);
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId marked as Processing.\n";
// Simulate file processing (OCR or other logic here)
$this->process_invoice($fileId);
// Update status to 'Completed' after processing
$data = array(
'progress_status' => 'Completed',
'progress_percentage' => 100,
);
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId processing completed.\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
}
});
// Wait for all tasks to finish
$this->pool->wait(); // This will wait until all tasks are done
// Acknowledge the message after all tasks are processed
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
echo "Acknowledging message for file ID: $fileId\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
$this->channel->basic_nack($msg->delivery_info['delivery_tag']);
}
};
// Multiple consumers (workers) consuming the messages
for ($i = 0; $i < 25; $i++) {
$this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback);
}
echo "Waiting for messages. To exit press CTRL+C\n";
// Consume messages concurrently by multiple workers
while ($this->channel->callbacks) {
$this->channel->wait();
}
// Close the channel and connection when done
$this->channel->close();
$this->connection->close();
}
}
I have implemented RabbitMQ in my CodeIgniter project to handle a file upload system. Users can upload multiple files at once. After uploading the files, the IDs of the uploaded files are added to a RabbitMQ queue, and then RabbitMQ processes these files in the background. In the background process, various tasks like OCR scanning are performed.
To handle the background processing, I created a worker via the terminal with the following:
php index.php JobProcessing/process_progress_rabbitmq
This works fine when a single user is uploading files. However, when there are multiple users (e.g., 50 users at once), each user has to wait for the previous user's tasks to complete. I want to implement parallel processing so that users don't have to wait for their turn.
One solution I considered is creating multiple workers, but I'm concerned about the limit on the number of workers that can be created. If there are, say, 500 users uploading at the same time, how many workers can I safely create? Is there a recommended approach for handling this scenario, especially considering system resources and RabbitMQ limits?
Any insights or recommendations would be greatly appreciated!
below is my code:
<?php
defined('BASEPATH') or exit('No direct script access allowed');
require_once(APPPATH . 'third_party/Rabbit_mq/vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Spatie\Async\Pool;
class Rabbit_mq
{
protected $ci, $connection, $channel, $pool;
public function __construct()
{
try {
$this->ci = &get_instance();
$this->ci->load->model('invoice_scan_model');
$this->ci->load->library('scan_invoice_lib');
$this->ci->load->library('quick_books');
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
} catch (Exception $e) {
echo "RabbitMQ Connection Error: " . $e->getMessage() . "\n";
}
}
function addToQueue($data)
{
$this->channel->queue_declare('file_processing_new', false, true, false, false);
$msg = new AMQPMessage(json_encode($data));
$this->channel->basic_publish($msg, '', 'file_processing_new');
}
public function processQueue()
{
$this->channel->queue_declare('file_processing_new', false, true, false, false);
// Set prefetch count to allow multiple messages to be handled concurrently
$this->channel->basic_qos(null, 15, null);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
$fileId = $data['file_id'];
$filePath = $data['file_path'];
try {
echo "Received message for file ID: $fileId\n";
// Initialize the async pool
$this->pool = Pool::create();
// Add task to the pool for parallel processing
$this->pool->add(function () use ($fileId, $filePath) {
try {
echo "Processing file ID: $fileId\n";
$data = array(
'progress_status' => 'Processing',
'progress_percentage' => 50,
);
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId marked as Processing.\n";
// Simulate file processing (OCR or other logic here)
$this->process_invoice($fileId);
// Update status to 'Completed' after processing
$data = array(
'progress_status' => 'Completed',
'progress_percentage' => 100,
);
$this->ci->invoice_scan_model->update_data(['id' => $fileId], 'tblapi_save_invoice_file', $data);
echo "File ID: $fileId processing completed.\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
}
});
// Wait for all tasks to finish
$this->pool->wait(); // This will wait until all tasks are done
// Acknowledge the message after all tasks are processed
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
echo "Acknowledging message for file ID: $fileId\n";
} catch (Exception $e) {
echo "Error processing file ID: $fileId - " . $e->getMessage() . "\n";
$this->channel->basic_nack($msg->delivery_info['delivery_tag']);
}
};
// Multiple consumers (workers) consuming the messages
for ($i = 0; $i < 25; $i++) {
$this->channel->basic_consume('file_processing_new', '', false, false, false, false, $callback);
}
echo "Waiting for messages. To exit press CTRL+C\n";
// Consume messages concurrently by multiple workers
while ($this->channel->callbacks) {
$this->channel->wait();
}
// Close the channel and connection when done
$this->channel->close();
$this->connection->close();
}
}
First of all you have to find out, how many workers you can safely create. That is a load test. You do that on a system that is identical so that you don't kill your production box.
Let's say you find out that one system can handle 50 users at once.
Then you formulate the requirement how many users at once must always be served. Let's say 500.
Then you fire up 10 worker instances of that type and you keep them always running per your requirement (10 x 50 = 500).
While doing so, for the number of users that you actually have currently, let's say it is 250 right now, you spawn an additional five workers so that you always have the ability to treat 500 users on top at once (15 x 50 = 750).
And then, if you have 100 users then right now, you can reduce by 3 machines (12 x 50 = 600).
And so on, and so forth.
Always keep enough capacity on spare ahead, then constantly add and destroy machines so you get accustomed to it and you will always have enough working power available for your users.
This calculation is not entirely correct, but is a good training ground you can run the first weeks with and gather the necessary metrics you need to have to plan ahead.