Queue mode refers to BillRun's implementation of a job queue worker architecture, designed to handle large-scale processing with built-in fault tolerance and an automatic retry mechanism.
In a billing system, the most resource-intensive tasks involve processing vast amounts of data—millions or even tens of millions of subscriptions. The longest-running processes include cycle billing, cycle confirmation, and charging, which can take several hours to complete when handling massive volumes of invoices and transactions.
The worker process is responsible for fetching and executing jobs asynchronously. It runs multiple times per second, retrieving jobs from the queue. Once a job is fetched, it is locked for up to one hour to prevent duplicate processing. If the job completes successfully within this period, it is marked as done. However, if the job is not completed within the allocated time, it is automatically retried. In addition to the retry mechanism, jobs can be scheduled to run in the future.
Jobs are typically structured in a parent-child hierarchy. A parent job—such as a billing cycle—is pushed to the queue with relevant parameters. Once executed, the parent job generates child jobs, each linked to the parent, to handle specific subtasks efficiently.
The worker process fetches jobs and executes them in parallel. By default, the worker processes up to 10 jobs concurrently. This value can be adjusted by setting the 'concurrency' parameter in the worker configuration. If this limit is exceeded, the worker waits for a running job to complete before fetching a new one.
To run a worker process in a Docker container, use the following command:
docker exec -it billrun-app php public/index.php --env container --worker
For high availability and scalability, multiple worker docker containers or kubernetes pods can be deployed to distribute the workload efficiently.
You can find articles how to install the worker in different environments, as follows:
http://localhost:8074/queue/push?job_type=Cycle&config={"billrun_key":"202503"}
http://localhost:8074/queue/push?job_type=Confirm&config={"billrun_key":"202503"}
http://localhost:8074/queue/push?job_type=Charging&config={"mode":"charge"}
To schedule a job for future execution, include a schedule
parameter with a Unix timestamp in your API request. For example:
This schedules the job to run at the specified Unix timestamp.
To create a new job, you need to define a class that inherits from Billrun_Job_Abstract
.
library/Billrun/Job/Foo.php
library/Billrun/Job/Foo/Bar.php
You can find Hello custom job example here, that log entry and create another Hello job in random time.
run()
– This method handles the job execution asynchronously.
init()
– Initializes job parameters or settings.
fetch()
– Retrieves data from the database or other sources as needed.
finished()
- triggered when the job completed. Function should return boolean: true on success, false failed and not mark job as completed (retry will be triggered after timeout).
Here are sample records of a parent job and the child jobs it generates, stored in the jobs_messages collection:
{
_id: ObjectId('67bef2b158221d06f74ae982'),
queue_name: 'jobs',
created: ISODate('2025-02-26T10:53:37.800Z'),
body: { type: 'Charging', config: { mode: 'charge', exclude: [ 4, 5 ] } },
md5: 'fbe355fff067c83840a052e6a40d3540',
handle: '5ec420923c0aff849723e11b18c85453',
start_time: ISODate('2025-02-26T10:53:37.800Z'),
timeout: ISODate('2025-02-26T11:53:37.800Z'),
complete_time: ISODate('2025-02-26T10:53:38.015Z'),
done: 1
},
{
_id: ObjectId('67bef2c5fe5c7e40fb229392'),
queue_name: 'jobs',
created: ISODate('2025-02-26T10:53:57.596Z'),
body: {
type: 'Charging_Account',
config: {
mode: 'charge',
aids: 1,
}
parent: 'fbe355fff067c83840a052e6a40d3540'
},
md5: '1ee3b07c100ca5ec1960e46e1b74795b',
handle: 'fbc272d4878827db88258f66b4738dc0',
start_time: ISODate('2025-02-26T10:53:57.596Z'),
timeout: ISODate('2025-02-26T11:53:57.596Z'),
complete_time: ISODate('2025-02-26T10:53:58.004Z'),
done: 1
}
Worker configuration have few settings:
Variable | Type | Default | Description |
enabled | boolean | false | enabled or disabled the worker service |
concurrent_limit | int | 10 | how many concurrent processes limit per worker |
job_timeout | int | 3600 | what is the timeout in seconds of each job before terminate (if not complete) |
iteration | int | 1000000 | iteration in ms when no job in the queue (if there is a job the iteration will be immediate) |
lastConfig.worker:{
"enabled": true,
"concurrent_limit": 7,
"job_timeout": 1800,
"iteration": 1000000
}