Queue Mode introduces a scalable, distributed job queue architecture within BillRun®. It enables horizontal scaling by allowing multiple worker instances to concurrently fetch and execute jobs from the shared queue. This design ensures fault tolerance, automatic recovery, and elastic scaling according to system load.
In a billing system, the most resource-intensive tasks involve processing vast amounts of data—millions or even tens of millions of subscriptions. The longest-running processes include cycle billing, cycle confirmation, and charging, which can take several hours to complete when handling massive volumes of invoices and transactions.
The worker process is responsible for fetching and executing jobs asynchronously. It runs multiple times per second, retrieving jobs from the queue. Once a job is fetched, it is locked for up to one hour to prevent duplicate processing. If the job completes successfully within this period, it is marked as done. However, if the job is not completed within the allocated time, it is automatically retried. In addition to the retry mechanism, jobs can be scheduled to run in the future.
Jobs are typically structured in a parent-child hierarchy. A parent job—such as a billing cycle—is pushed to the queue with relevant parameters. Once executed, the parent job generates child jobs, each linked to the parent, to handle specific subtasks efficiently.
Workers are autonomous processes that:
The system supports automatic retries for failed jobs and worker self-recovery to ensure high availability.
By default, each worker can handle up to 10 concurrent jobs. This value is configurable and should be tuned based on server CPU capacity and workload profile.
To start a worker inside a docker container:
docker exec -it billrun-app php public/index.php --env container --worker
Note: It is strongly recommended to supervise worker processes using tools like Supervisor, systemd, or Kubernetes Deployments to ensure automatic restarts on failure.
You can find articles how to install the worker in different environments, as follows:
http://localhost:8074/queue/push?job_type=Cycle&config={"billrun_key":"202503"}
http://localhost:8074/queue/push?job_type=Confirm&config={"billrun_key":"202503"}
http://localhost:8074/queue/push?job_type=Charging&config={"mode":"charge"}
To schedule a job for future execution, include a schedule
parameter with a Unix timestamp in your API request. For example:
This schedules the job to run at the specified Unix timestamp.
To define a new core job type:
Example:
<?php
class Billrun_Job_Foo extends Billrun_Job_Abstract {
public function run() {
// Your job logic here
Billrun_Factory::log("Running Foo Job...");
return true;
}
}
?>
Note: Additional optional methods like init() and fetch() can be implemented for custom initialization and data retrieval.
To create a new job, you need to define a class that inherits from Billrun_Job_Abstract
.
library/Billrun/Job/Foo.php
library/Billrun/Job/Foo/Bar.php
You can also develop custom job in a separated repository. Example can be find here.
run()
– This method handles the job execution asynchronously.
init()
– Initializes job parameters or settings.
fetch()
– Retrieves data from the database or other sources as needed.
finished()
- triggered when the job completed. Function should return boolean: true on success, false failed and not mark job as completed (retry will be triggered after timeout).
Here are sample records of a parent job and the child jobs it generates, stored in the jobs_messages collection.
{
_id: ObjectId('67bef2b158221d06f74ae982'),
queue_name: 'jobs',
created: ISODate('2025-02-26T10:53:37.800Z'),
body: { type: 'Charging', config: { mode: 'charge', exclude: [ 4, 5 ] } },
md5: 'fbe355fff067c83840a052e6a40d3540',
handle: '5ec420923c0aff849723e11b18c85453',
start_time: ISODate('2025-02-26T10:53:37.800Z'),
timeout: ISODate('2025-02-26T11:53:37.800Z'),
try: 1
complete_time: ISODate('2025-02-26T10:53:38.015Z'),
done: 1
}
{
_id: ObjectId('67bef2c5fe5c7e40fb229392'),
queue_name: 'jobs',
created: ISODate('2025-02-26T10:53:57.596Z'),
body: {
type: 'Charging_Account',
config: {
mode: 'charge',
aids: 1,
}
parent: 'fbe355fff067c83840a052e6a40d3540'
},
md5: '1ee3b07c100ca5ec1960e46e1b74795b',
handle: 'fbc272d4878827db88258f66b4738dc0',
start_time: ISODate('2025-02-26T10:53:57.596Z'),
timeout: ISODate('2025-02-26T11:53:57.596Z'),
try: 1
complete_time: ISODate('2025-02-26T10:53:58.004Z'),
done: 1
}
Optimization Tip:
The jobs_messages collection is indexed on queue_name, done, timeout, and schedule fields to enable efficient job selection and execution ordering.
Worker configuration have few settings:
Variable | Type | Default | Description |
enabled | boolean | false | enabled or disabled the worker service |
concurrent_limit | int | 4 | how many concurrent processes limit per worker |
job_timeout | int | 900 | what is the timeout in seconds of each job before terminate (if not complete) |
iteration | int | 2000000 | iteration in ms when no job in the queue (if there is a job the iteration will be immediate) |
lastConfig.worker:{
"enabled": true,
"concurrent_limit": 7,
"job_timeout": 1800,
"iteration": 1000000
}