Queue

The JoobQ::Queue class is responsible for managing job queues within JoobQ. It provides the functionality to add jobs, manage workers, and track metrics for job execution. This class extends BaseQueue, which serves as the abstract base class for queues in JoobQ.

Key Components of Queue

BaseQueue

BaseQueue is an abstract class that provides the core methods that every queue must implement:

  • add(job : String): Adds a job to the queue.

  • start: Starts the queue for processing jobs.

  • stop!: Stops all workers processing jobs in the queue.

The Queue class extends BaseQueue to provide these functionalities while adding more advanced features.

Defining a Queue

The Queue class requires a specific job type (T) to manage. Below is an example of how to define and initialize a queue for a specific job type:

queue = JoobQ::Queue(ExampleJob).new("example_queue", 5)
  • name: The name of the queue.

  • total_workers: The number of worker threads assigned to process jobs from this queue.

Optionally, a throttle limit can be specified:

throttle_limit = {limit: 10, period: 1.minute}
queue = JoobQ::Queue(ExampleJob).new("example_queue", 5, throttle_limit)

This throttle limit ensures that no more than 10 jobs are processed per minute.

Queue Methods

Starting and Stopping the Queue

  • start: This method starts the queue, including initializing the worker manager and reprocessing any jobs that were left in an incomplete state.

    queue.start
  • stop!: This method stops all workers associated with the queue.

    queue.stop!

Adding Jobs

Jobs can be added to the queue in two ways:

  • Adding via JSON:

    job_json = "{\"param\": \"value\"}"
    queue.add(job_json)

    This method parses the JSON string and enqueues the corresponding job.

  • Adding via Job Object:

    job = ExampleJob.new(param: "value")
    queue.add(job)

Deleting a Job

To remove a job from the queue:

queue.delete_job(job)

This method deletes the job from the store if it has not yet been processed.

Queue Size and Status

  • size: Returns the current size of the queue, indicating how many jobs are waiting to be processed.

    current_size = queue.size
  • running?: Returns a boolean indicating whether the queue is currently running.

    is_running = queue.running?
  • status: Provides the current status of the queue, such as "Running", "Awaiting", or "Done".

    queue_status = queue.status

Metrics and Monitoring

The Queue class tracks several metrics to help monitor job processing and worker activity.

  • info: Provides a detailed overview of the current state of the queue, including metrics such as queue size, worker status, job throughput, and error rates.

    queue_info = queue.info

    Example output:

    {
      name: "example_queue",
      job_type: "ExampleJob",
      total_workers: 5,
      status: "Running",
      current_size: 10,
      completed: 50,
      retried: 2,
      dead: 1,
      running_workers: 4,
      jobs_completed_per_second: 1.5,
      queue_reduction_rate: 0.8,
      errors_per_second: 0.1,
      job_wait_time: 2.0,
      job_execution_time: 1.0,
      worker_utilization: 80.0,
      error_rate_trend: 0.05,
      failed_job_rate: 0.02,
      average_jobs_in_flight: 3.0,
      percent_completed: 90.0,
      percent_retried: 5.0,
      percent_dead: 2.0,
      percent_busy: 75.0
    }

Throttling

The Queue class provides an optional throttling feature that can be used to limit the rate of job processing. This is especially useful if jobs require interaction with external APIs that enforce rate limits.

To apply throttling, define a throttle limit when initializing the queue:

throttle_limit = {limit: 10, period: 1.minute}
queue = JoobQ::Queue(ExampleJob).new("example_queue", 5, throttle_limit)

Worker Management

The Queue class manages workers via the WorkerManager, which is responsible for handling the lifecycle of worker threads:

  • running_workers: Returns the number of currently active workers.

    active_workers = queue.running_workers
  • worker_manager.start_workers: Starts the worker threads.

  • worker_manager.stop_workers: Stops all worker threads.

Reprocessing Busy Jobs

When the queue starts, it calls reprocess_busy_jobs! to move jobs that were previously in a busy state back into the queue. This ensures that jobs that were being processed but not completed are retried when the queue is restarted.

queue.start # Will reprocess any busy jobs

Example Usage

Here's a complete example of defining, starting, and managing a queue with JoobQ:

require "joobq"

# Define a queue for ExampleJob
queue = JoobQ::Queue(ExampleJob).new("example_queue", 5)

# Start the queue
queue.start

# Add a job to the queue
job = ExampleJob.new(param: "value")
queue.add(job)

# Check the queue size
puts "Queue

Workflows in JoobQ

Workflows in JoobQ provide a way to manage complex job dependencies, allowing you to create, link, and handle multiple jobs that are interdependent. A workflow consists of a series of jobs that need to be executed in a specific order or hierarchy, ensuring each job is executed only when its dependencies are satisfied. This section covers how to add workflows, manipulate job dependencies, and effectively manage hierarchical job execution in JoobQ.

Adding Flows in Bulk

JoobQ allows you to add multiple flows in bulk to create a series of linked jobs efficiently. This is especially useful when you have multiple interrelated jobs that need to be added to the queue at once, with dependencies specified between them.

Example Usage

To add flows in bulk, use the add_flows method to specify the jobs and their relationships:

flows = [
  {job: ExampleJob1.new(x: 1), dependencies: []},
  {job: ExampleJob2.new(x: 2), dependencies: ["job_id_1"]},
  {job: ExampleJob3.new(x: 3), dependencies: ["job_id_2"]}
]

JoobQ.add_flows(flows)

In this example, multiple jobs are added, where:

  • ExampleJob1 has no dependencies.

  • ExampleJob2 depends on ExampleJob1.

  • ExampleJob3 depends on ExampleJob2.

This ensures that the jobs are executed in the correct order according to their dependencies.

Get Flow Tree

The Flow Tree represents the structure of a workflow, showing how different jobs are linked through dependencies. You can retrieve the flow tree to visualize or analyze the relationships between jobs in a workflow.

Example Usage

To get the flow tree for a particular job or workflow, use the get_flow_tree method:

flow_tree = JoobQ.get_flow_tree("root_job_id")
puts flow_tree.to_json

This returns the hierarchical representation of the workflow starting from the specified job, allowing you to understand the flow and dependencies of each job in the workflow.

Fail Parent

When managing workflows, there are scenarios where you need to fail a parent job and cascade the failure to its dependent jobs. This ensures that if a critical parent job fails, all dependent jobs are appropriately marked as failed.

Example Usage

To fail a parent job and propagate the failure to its child jobs:

JoobQ.fail_parent("parent_job_id")

This method sets the status of the parent job to Failed and then recursively fails all child jobs that depend on it.

Remove Dependency

JoobQ allows you to remove a dependency between jobs in a workflow, enabling you to modify the relationships as needed. This can be useful if the dependency between two jobs is no longer valid or necessary.

Example Usage

To remove a dependency between two jobs:

JoobQ.remove_dependency("job_id", "dependency_id")

This removes the specified dependency, allowing the job to be executed without waiting for the former dependency to complete.

Ignore Dependency

In some workflows, you may want to ignore a specific dependency if it is not critical for the overall workflow to proceed. Ignoring a dependency means that a job can continue even if its dependent job has failed or is incomplete.

Example Usage

To ignore a dependency for a job:

JoobQ.ignore_dependency("job_id", "dependency_id")

This ensures that the specified dependency is ignored, allowing the job to proceed even if the dependency does not complete successfully.

Remove Child Dependency

You can remove a child dependency from a job, effectively decoupling a child job from its parent in the workflow. This allows you to dynamically adjust the workflow and ensure that specific child jobs are not triggered by a particular parent job.

Example Usage

To remove a child dependency from a parent job:

JoobQ.remove_child_dependency("parent_job_id", "child_job_id")

This removes the child dependency, meaning that the specified child job will no longer be triggered by the completion of the parent job.

Graceful Shutdown

A graceful shutdown is an important feature to ensure that workflows are stopped cleanly without losing progress or data. During a graceful shutdown, JoobQ stops accepting new jobs, waits for all active jobs to complete, and then safely shuts down all workers and queues.

Example Usage

To initiate a graceful shutdown of a queue:

queue.graceful_shutdown

This ensures that:

  • No new jobs are added to the queue.

  • All in-progress jobs are allowed to complete before the queue stops.

Using a graceful shutdown helps maintain the consistency and integrity of workflows, ensuring that jobs are not abruptly terminated.

Best Practices for Managing Workflows

  • Plan Dependencies Carefully: Ensure that the dependencies between jobs are defined in a way that makes logical sense for your workflow to prevent jobs from being unnecessarily delayed or blocked.

  • Handle Failures Gracefully: Use the fail_parent method to ensure that failures in critical jobs are cascaded to dependent jobs, maintaining consistency in the workflow.

  • Adjust Dependencies Dynamically: Utilize methods like remove_dependency, ignore_dependency, and remove_child_dependency to dynamically adapt workflows based on changes in requirements or conditions during runtime.

  • Visualize Flow Trees: Regularly check the flow tree for complex workflows to ensure that all dependencies are in place and jobs are being executed in the correct order.

  • Use Graceful Shutdown: When stopping workflows or queues, use graceful shutdowns to ensure in-progress jobs complete without data loss.


Workflows in JoobQ provide a powerful way to manage complex job dependencies and ensure jobs are executed in the correct sequence. By using the provided methods to add, remove, and manipulate job dependencies, you can build flexible and robust workflows tailored to your application's needs.

Last updated