Middlewares
Middlewares in JoobQ are powerful tools to customize the behavior of job processing globally. They enable features like logging, retrying failed jobs, throttling, enforcing execution time limits, and implementing custom logic.
JoobQ employs a middleware pipeline to process jobs in a flexible and modular manner. Each middleware component can intercept jobs as they pass through the pipeline, allowing for tasks like logging, timeout handling, throttling, and retry mechanisms.
Middleware Pipeline Overview
At the core is the MiddlewarePipeline, which manages the execution of registered middleware components sequentially.
class MiddlewarePipeline
  @middlewares : Array(Middleware)
  def initialize(@middlewares : Array(Middleware) = [] of Middleware)
  end
  def call(job : Job, queue : BaseQueue, &block : ->)
    call_next(0, job, queue, &block)
  end
  private def call_next(index : Int32, job : Job, queue : BaseQueue, &block : ->)
    # Executes middleware in order
  end
endKey Concepts:
- Middleware Registration: Middleware components are stored in an array and executed in order. 
- Control Flow: Each middleware decides whether to process a job or pass it to the next middleware. 
- Global Configuration: Middlewares are added to the global configuration ( - JoobQ.config), not individual queues.
- Matching Logic: The - matches?method determines whether a middleware applies to a specific job and queue.
- Pipeline Execution: Middlewares are executed in the order they are defined. If a middleware does not match, the pipeline skips it. 
Built-in Middlewares
- AsyncLoggingMiddleware: Logs details about job execution asynchronously. 
- RetryMiddleware: Retries failed jobs based on a configured maximum number of retries and delay. 
- ThrottleMiddleware: Limits concurrent job executions. 
- TimeoutMiddleware: Sets a maximum execution time for jobs. 
Middleware Interface
All middleware components include the Middleware module and implement two essential methods:
- matches?(job : Job, queue : BaseQueue) : Bool: Determines if the middleware should act on the given job.
- call(job : Job, queue : BaseQueue, next_middleware : ->) : Nil: Contains the middleware's logic and controls the flow to the next middleware.
How to Use Middlewares in JoobQ
JoobQ allows you to customize the behavior of your job queue system by registering middlewares through the use method. This method provides a flexible way to add or modify the middleware stack globally.
Registering Middlewares with use
useThe use method is part of the JoobQ::Configure class and serves as a DSL to modify the middleware stack. It accepts a block where you can manipulate the array of middlewares (middlewares property).
Basic Syntax
JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << MyCustomMiddleware.new
    middlewares.delete(JoobQ::Middlewares::RetryMiddleware) # Optionally remove default middlewares
  end
endThis allows you to:
- Add new middlewares. 
- Remove or replace default middlewares. 
- Customize the order of middleware execution. 
Examples
Adding a Custom Middleware
Here’s an example of adding a custom logging middleware:
class CustomLoggerMiddleware 
  include JoobQ::Middleware
  
  def matches?(job : JoobQ::Job, queue : JoobQ::BaseQueue) : Bool
    true # Apply to all jobs
  end
  def call(job : JoobQ::Job, queue : JoobQ::BaseQueue, next_middleware : ->)
    Log.info { "Job #{job.id} started on queue #{queue.name}" }
    next_middleware.call
    Log.info { "Job #{job.id} finished on queue #{queue.name}" }
  end
end
JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << CustomLoggerMiddleware.new
  end
endRemoving a Default Middleware
To remove a default middleware like ThrottleMiddleware:
JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares.delete(JoobQ::Middlewares::ThrottleMiddleware)
  end
endReplacing a Middleware
To replace the RetryMiddleware with a custom implementation:
class CustomRetryMiddleware < JoobQ::Middlewares::RetryMiddleware
  def initialize(max_retries : Int32)
    super
  end
  def matches?(job : JoobQ::Job, queue : JoobQ::BaseQueue) : Bool
    job.priority == "high" # Apply only to high-priority jobs
  end
end
JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares.delete(JoobQ::Middlewares::RetryMiddleware)
    middlewares << CustomRetryMiddleware.new(max_retries: 5)
  end
endUnderstanding Middleware Execution
When a job is processed, middlewares are executed in the order they are defined in the middlewares array:
- JoobQ evaluates each middleware's - matches?method.
- If - matches?returns- true, the middleware’s- callmethod is invoked.
- Each middleware must call - next_middleware.callto pass control to the next middleware in the pipeline.
For example, if your middleware stack looks like this:
JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << JoobQ::Middlewares::AsyncLogging.new
    middlewares << JoobQ::Middlewares::Retry.new(max_retries: 3)
    middlewares << JoobQ::Middlewares::Timeout.new(timeout: 10.seconds)
  end
end- Logging happens first. 
- If a job fails, RetryMiddleware retries it up to 3 times. 
- If a job takes longer than 10 seconds, TimeoutMiddleware stops execution. 
Advanced: Conditional Middleware Execution
Middlewares can be conditionally applied to specific jobs or queues using the matches? method.
Example: Queue-Specific Middleware
class QueueSpecificMiddleware 
  include JoobQ::Middleware
  
  def matches?(job : JoobQ::Job, queue : JoobQ::BaseQueue) : Bool
    queue.name == "critical_queue" # Only apply to the "critical_queue"
  end
  def call(job : JoobQ::Job, queue : JoobQ::BaseQueue, next_middleware : ->)
    Log.info { "Critical job #{job.id} started!" }
    next_middleware.call
  end
end
JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << QueueSpecificMiddleware.new
  end
endBest Practices for Using use
use- Plan Middleware Order: - Place logging or monitoring middlewares early in the stack. 
- Place retry, throttling, or timeout mechanisms after them. 
 
- Leverage - matches?:- Use - matches?for fine-grained control over middleware application.
 
- Test Middleware Behavior: - Test each middleware independently to ensure it integrates well with others. 
 
By using the use method effectively, you can fully customize the behavior of your job processing pipeline in JoobQ.
Last updated
Was this helpful?
