Worker

The JoobQ::Worker class is responsible for executing jobs from a queue. Each worker runs in a separate fiber, fetching jobs from the queue, executing them, and handling job completion or failure. Workers can be started, stopped, and monitored for activity.

Key Components of Worker

Properties

  • wid: The worker ID, used to uniquely identify each worker.

  • active: A boolean flag indicating whether the worker is currently active.

  • @last_job_time: The timestamp of the last job execution.

  • @terminate_channel: A channel used to signal the worker to stop.

  • @queue: The queue from which the worker fetches jobs.

Worker Methods

Initializing a Worker

To initialize a worker, you need to provide the worker ID, termination channel, and queue.

def initialize(@wid : Int32, @terminate_channel : Channel(Nil), @queue : Queue(T), @metrics : Metrics)
  • wid: The unique ID of the worker.

  • terminate_channel: The channel used to signal the worker to stop.

  • queue: The queue from which jobs are fetched for execution.

  • metrics: The metrics instance for tracking worker and job performance.

Starting the Worker

The run method starts the worker in a new fiber. The worker enters a loop where it fetches and executes jobs from the queue until it receives a termination signal.

worker.run

Stopping the Worker

The terminate method is used to signal the worker to stop by sending a message through the termination channel.

worker.terminate

Checking Worker Status

active?: Returns a boolean indicating if the worker is active.

if worker.active?
  puts "Worker is currently active"
end

Job Handling Workflow

The Worker class follows a specific workflow when handling jobs:

  1. Fetch Job: The worker fetches the next job from the queue using @queue.next_job.

  2. Job Execution:

    • If the job is expired, it is moved to the dead letter queue.

    • If the job is valid, the worker executes the job's perform method.

    • After job execution, the job is marked as completed or retried based on the outcome.

  3. Listening for Termination: The worker listens for a termination signal on the @terminate_channel. When a termination signal is received, the worker stops processing jobs and sets its active property to false.

Example Usage

Here's a complete example of how to create, start, and stop a worker:

require "joobq"

# Define a job
struct ExampleJob
  include JoobQ::Job
  property x : Int32

  def initialize(@x : Int32)
  end

  def perform
    puts "Performing job with x = #{x}"
  end
end

# Configure the queue
JoobQ.configure do
  queue "example", 10, ExampleJob
end

# Get the queue instance
queue = JoobQ["example"]

# Add jobs to the queue
10.times do |i|
  queue.add(ExampleJob.new(x: i).to_json)
end

# Create and start a worker
terminate_channel = Channel(Nil).new
worker = JoobQ::Worker(ExampleJob).new(1, terminate_channel, queue, queue.metrics)
worker.run

# Stop the worker after some time
sleep 10
worker.terminate

This example sets up a queue, adds jobs to the queue, creates a worker, starts the worker, and stops the worker after 10 seconds.

Handling Job Failures

When a job fails during execution, the worker uses the FailHandler to manage retries and move failed jobs to the dead letter queue if necessary.

FailHandler

  • Retry Mechanism: If a job fails, the worker checks if retries are allowed, and if so, it will retry the job.

  • Dead Letter Queue: If retries are exhausted or the job is deemed expired, it is moved to the dead letter queue for further inspection.

Throttling Jobs

Workers can use a throttler to control the rate of job execution.

if throttle_limit = @queue.throttle_limit
  @throttler = Throttler.new(throttle_limit)
end

This ensures that jobs are executed at a controlled rate, which is useful for scenarios where external rate limits must be respected.

Best Practices for Workers

  • Graceful Termination: Always signal workers to stop using the terminate method to ensure they stop processing jobs cleanly.

  • Monitor Activity: Track metrics such as active status, number of completed jobs, and error rates to monitor worker activity.

  • Use Throttling: If dealing with APIs or external resources with rate limits, make use of throttling to avoid getting blocked.

  • Handle Job Failures: Implement proper error handling in the perform method and use the retry mechanism effectively.


The Worker class in JoobQ plays a vital role in executing jobs from the queue, managing retries, and ensuring that the system runs efficiently. Use these features to create a robust and reliable job processing pipeline in your application.

Last updated