symphony-metronome/lib/symphony/metronome/scheduledevent.rb

218 lines
5 KiB
Ruby
Raw Normal View History

#!/usr/bin/env ruby
# vim: set nosta noet ts=4 sw=4:
require 'set'
require 'sequel'
require 'sqlite3'
require 'yajl'
require 'symphony/metronome'
Sequel.extension :migration
### A class the represents the relationship between an interval and
### an event.
###
class Symphony::Metronome::ScheduledEvent
extend Loggability, Configurability
include Comparable
log_to :symphony
config_key :metronome
# Configure defaults.
#
CONFIG_DEFAULTS = {
:db => 'sqlite:///tmp/metronome.db',
:splay => 0
}
class << self
# A Sequel-style DB connection URI.
attr_reader :db
# Adjust recurring intervals by a random window.
attr_reader :splay
end
######################################################################
# C L A S S M E T H O D S
######################################################################
### Configurability API.
###
def self::configure( config=nil )
config = self.defaults.merge( config || {} )
@db = Sequel.connect( config.delete(:db) )
@splay = config.delete( :splay )
# Ensure the database is current.
#
migrations_dir = Symphony::Metronome::DATADIR + 'migrations'
unless Sequel::Migrator.is_current?( self.db, migrations_dir.to_s )
Loggability[ Symphony ].info "Installing database schema..."
Sequel::Migrator.apply( self.db, migrations_dir.to_s )
end
end
### Return a set of all known events, sorted by date of execution.
### Delete any rows that are invalid expressions.
###
def self::load
now = Time.now
events = SortedSet.new
# Force reset the DB handle.
self.db.disconnect
self.log.debug "Parsing/loading all actions."
self.db[ :metronome ].each do |event|
begin
event = new( event )
events << event
rescue ArgumentError, Symphony::Metronome::TimeParseError => err
self.log.error "%p while parsing \"%s\": %s" % [
err.class,
event[:expression],
err.message
]
self.log.debug " " + err.backtrace.join( "\n " )
self.db[ :metronome ].filter( :id => event[:id] ).delete
end
end
return events
end
######################################################################
# I N S T A N C E M E T H O D S
######################################################################
### Create a new ScheduledEvent object.
###
def initialize( row )
@event = Symphony::Metronome::IntervalExpression.parse( row[:expression], row[:created] )
@options = row.delete( :options )
@id = row.delete( :id )
@ds = self.class.db[ :metronome ].filter( :id => self.id )
self.reset_runtime
unless self.class.splay.zero?
splay = Range.new( - self.class.splay, self.class.splay )
@runtime = self.runtime + rand( splay )
end
end
# The sequel dataset representing this event.
attr_reader :ds
# The parsed interval expression.
attr_reader :event
# The unique ID number of the scheduled event.
attr_reader :id
# The options hash attached to this event.
attr_reader :options
# The exact time that this event will run.
attr_reader :runtime
### Set the datetime that this event should fire next.
###
def reset_runtime
now = Time.now
# Start time is in the future, so it's sufficent to be considered the run time.
#
if self.event.starting >= now
@runtime = self.event.starting
return
end
# Otherwise, the event should already be running (start time has already
# elapsed), so schedule it forward on it's next interval iteration.
#
# If it's a recurring event that has run before, consider the elapsed time
# as part of the next calculation.
#
row = self.ds.first
if self.event.recurring && row
last = row[ :lastrun ]
if last && now > last
@runtime = now + self.event.interval - ( now - last )
else
@runtime = now + self.event.interval
end
else
@runtime = now + self.event.interval
end
end
### Perform the action attached to the event. Yields the
### deserialized options, the action ID to the supplied block if
### this event is okay to execute.
###
### If the event is recurring, perform additional checks against the
### last run time.
###
### Automatically remove the event if it has expired.
###
def fire
rv = self.event.fire?
# Just based on the expression parser, is this event ready to fire?
#
if rv
opts = Yajl.load( self.options )
# Don't fire recurring events unless their interval has elapsed.
# This prevents events from triggering when the daemon receives
# a HUP.
#
if self.event.recurring
now = Time.now
row = self.ds.first
if row
last = row[ :lastrun ]
return false if last && now - last < self.event.interval
end
# Mark the time this recurring event was fired.
self.ds.update( :lastrun => Time.now )
end
yield opts, self.id
end
self.delete if rv.nil?
return rv
end
### Permanently remove this event from the database.
###
def delete
self.log.debug "Removing action %p" % [ self.id ]
self.ds.delete
end
### Comparable interface, order by next run time, soonest first.
###
def <=>( other )
return self.runtime <=> other.runtime
end
end # Symphony::Metronome::ScheduledEvent