Queue
The JoobQ::Queue
class is responsible for managing job queues within JoobQ. It provides the functionality to add jobs, manage workers, and track metrics for job execution. This class extends BaseQueue
, which serves as the abstract base class for queues in JoobQ.
Key Components of Queue
Queue
BaseQueue
BaseQueue
is an abstract class that provides the core methods that every queue must implement:
add(job : String)
: Adds a job to the queue.start
: Starts the queue for processing jobs.stop!
: Stops all workers processing jobs in the queue.
The Queue
class extends BaseQueue
to provide these functionalities while adding more advanced features.
Defining a Queue
The Queue
class requires a specific job type (T
) to manage. Below is an example of how to define and initialize a queue for a specific job type:
name
: The name of the queue.total_workers
: The number of worker threads assigned to process jobs from this queue.
Optionally, a throttle limit can be specified:
This throttle limit ensures that no more than 10 jobs are processed per minute.
Queue Methods
Starting and Stopping the Queue
start
: This method starts the queue, including initializing the worker manager and reprocessing any jobs that were left in an incomplete state.stop!
: This method stops all workers associated with the queue.
Adding Jobs
Jobs can be added to the queue in two ways:
Adding via JSON:
This method parses the JSON string and enqueues the corresponding job.
Adding via Job Object:
Deleting a Job
To remove a job from the queue:
This method deletes the job from the store if it has not yet been processed.
Queue Size and Status
size
: Returns the current size of the queue, indicating how many jobs are waiting to be processed.running?
: Returns a boolean indicating whether the queue is currently running.status
: Provides the current status of the queue, such as "Running", "Awaiting", or "Done".
Metrics and Monitoring
The Queue
class tracks several metrics to help monitor job processing and worker activity.
info
: Provides a detailed overview of the current state of the queue, including metrics such as queue size, worker status, job throughput, and error rates.Example output:
Throttling
The Queue
class provides an optional throttling feature that can be used to limit the rate of job processing. This is especially useful if jobs require interaction with external APIs that enforce rate limits.
To apply throttling, define a throttle limit when initializing the queue:
Worker Management
The Queue
class manages workers via the WorkerManager
, which is responsible for handling the lifecycle of worker threads:
running_workers
: Returns the number of currently active workers.worker_manager.start_workers
: Starts the worker threads.worker_manager.stop_workers
: Stops all worker threads.
Reprocessing Busy Jobs
When the queue starts, it calls reprocess_busy_jobs!
to move jobs that were previously in a busy state back into the queue. This ensures that jobs that were being processed but not completed are retried when the queue is restarted.
Example Usage
Here's a complete example of defining, starting, and managing a queue with JoobQ:
Workflows in JoobQ
Workflows in JoobQ provide a way to manage complex job dependencies, allowing you to create, link, and handle multiple jobs that are interdependent. A workflow consists of a series of jobs that need to be executed in a specific order or hierarchy, ensuring each job is executed only when its dependencies are satisfied. This section covers how to add workflows, manipulate job dependencies, and effectively manage hierarchical job execution in JoobQ.
Adding Flows in Bulk
JoobQ allows you to add multiple flows in bulk to create a series of linked jobs efficiently. This is especially useful when you have multiple interrelated jobs that need to be added to the queue at once, with dependencies specified between them.
Example Usage
To add flows in bulk, use the add_flows
method to specify the jobs and their relationships:
In this example, multiple jobs are added, where:
ExampleJob1
has no dependencies.ExampleJob2
depends onExampleJob1
.ExampleJob3
depends onExampleJob2
.
This ensures that the jobs are executed in the correct order according to their dependencies.
Get Flow Tree
The Flow Tree represents the structure of a workflow, showing how different jobs are linked through dependencies. You can retrieve the flow tree to visualize or analyze the relationships between jobs in a workflow.
Example Usage
To get the flow tree for a particular job or workflow, use the get_flow_tree
method:
This returns the hierarchical representation of the workflow starting from the specified job, allowing you to understand the flow and dependencies of each job in the workflow.
Fail Parent
When managing workflows, there are scenarios where you need to fail a parent job and cascade the failure to its dependent jobs. This ensures that if a critical parent job fails, all dependent jobs are appropriately marked as failed.
Example Usage
To fail a parent job and propagate the failure to its child jobs:
This method sets the status of the parent job to Failed
and then recursively fails all child jobs that depend on it.
Remove Dependency
JoobQ allows you to remove a dependency between jobs in a workflow, enabling you to modify the relationships as needed. This can be useful if the dependency between two jobs is no longer valid or necessary.
Example Usage
To remove a dependency between two jobs:
This removes the specified dependency, allowing the job to be executed without waiting for the former dependency to complete.
Ignore Dependency
In some workflows, you may want to ignore a specific dependency if it is not critical for the overall workflow to proceed. Ignoring a dependency means that a job can continue even if its dependent job has failed or is incomplete.
Example Usage
To ignore a dependency for a job:
This ensures that the specified dependency is ignored, allowing the job to proceed even if the dependency does not complete successfully.
Remove Child Dependency
You can remove a child dependency from a job, effectively decoupling a child job from its parent in the workflow. This allows you to dynamically adjust the workflow and ensure that specific child jobs are not triggered by a particular parent job.
Example Usage
To remove a child dependency from a parent job:
This removes the child dependency, meaning that the specified child job will no longer be triggered by the completion of the parent job.
Graceful Shutdown
A graceful shutdown is an important feature to ensure that workflows are stopped cleanly without losing progress or data. During a graceful shutdown, JoobQ stops accepting new jobs, waits for all active jobs to complete, and then safely shuts down all workers and queues.
Example Usage
To initiate a graceful shutdown of a queue:
This ensures that:
No new jobs are added to the queue.
All in-progress jobs are allowed to complete before the queue stops.
Using a graceful shutdown helps maintain the consistency and integrity of workflows, ensuring that jobs are not abruptly terminated.
Best Practices for Managing Workflows
Plan Dependencies Carefully: Ensure that the dependencies between jobs are defined in a way that makes logical sense for your workflow to prevent jobs from being unnecessarily delayed or blocked.
Handle Failures Gracefully: Use the
fail_parent
method to ensure that failures in critical jobs are cascaded to dependent jobs, maintaining consistency in the workflow.Adjust Dependencies Dynamically: Utilize methods like
remove_dependency
,ignore_dependency
, andremove_child_dependency
to dynamically adapt workflows based on changes in requirements or conditions during runtime.Visualize Flow Trees: Regularly check the flow tree for complex workflows to ensure that all dependencies are in place and jobs are being executed in the correct order.
Use Graceful Shutdown: When stopping workflows or queues, use graceful shutdowns to ensure in-progress jobs complete without data loss.
Workflows in JoobQ provide a powerful way to manage complex job dependencies and ensure jobs are executed in the correct sequence. By using the provided methods to add, remove, and manipulate job dependencies, you can build flexible and robust workflows tailored to your application's needs.
Last updated