1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
|
require 'timeout'
module Delayed
class DeserializationError < StandardError
end
class Job < ActiveRecord::Base
@@max_attempts = 25
@@max_run_time = 4.hours
cattr_accessor :max_attempts, :max_run_time
set_table_name :delayed_jobs
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true
cattr_accessor :worker_name
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil
def self.clear_locks!
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
def failed?
failed_at
end
alias_method :failed, :failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end
def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
payload.display_name
else
payload.class.name
end
end
end
def payload_object=(object)
self['handler'] = object.to_yaml
end
def reschedule(message, backtrace = [], time = nil)
if (self.attempts += 1) < max_attempts
time ||= Job.db_time_now + (attempts ** 4) + 5
self.run_at = time
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
save!
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
end
end
def run_with_lock(max_run_time, worker_name)
logger.info "* [JOB] acquiring lock on #{name}"
unless lock_exclusively!(max_run_time, worker_name)
logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
return nil
end
begin
runtime = Benchmark.realtime do
Timeout.timeout(max_run_time.to_i) { invoke_job }
destroy
end
logger.info "* [JOB] #{name} completed after %.4f" % runtime
return true
rescue Exception => e
reschedule e.message, e.backtrace
log_exception(e)
return false
end
end
def self.enqueue(*args, &block)
object = block_given? ? EvaledJob.new(&block) : args.shift
unless object.respond_to?(:perform) || block_given?
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
priority = args.first || 0
run_at = args[1]
Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end
def self.find_available(limit = 5, max_run_time = max_run_time)
time_now = db_time_now
sql = NextTaskSQL.dup
conditions = [time_now, time_now - max_run_time, worker_name]
if self.min_priority
sql << ' AND (priority >= ?)'
conditions << min_priority
end
if self.max_priority
sql << ' AND (priority <= ?)'
conditions << max_priority
end
conditions.unshift(sql)
ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
end
def self.reserve_and_run_one_job(max_run_time = max_run_time)
find_available(5, max_run_time).each do |job|
t = job.run_with_lock(max_run_time, worker_name)
return t unless t == nil
end
nil
end
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
else
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
if affected_rows == 1
self.locked_at = now
self.locked_by = worker
return true
else
return false
end
end
def unlock
self.locked_at = nil
self.locked_by = nil
end
def log_exception(error)
logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
logger.error(error)
end
def self.work_off(num = 100)
success, failure = 0, 0
num.times do
case self.reserve_and_run_one_job
when true
success += 1
when false
failure += 1
else
break
end
break if $exit
end
return [success, failure]
end
def invoke_job
payload_object.perform
end
private
def deserialize(source)
handler = YAML.load(source) rescue nil
unless handler.respond_to?(:perform)
if handler.nil? && source =~ ParseObjectFromYaml
handler_class = $1
end
attempt_to_load(handler_class || handler.class)
handler = YAML.load(source)
end
return handler if handler.respond_to?(:perform)
raise DeserializationError,
'Job failed to load: Unknown handler. Try to manually require the appropriate file.'
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file."
end
def attempt_to_load(klass)
klass.constantize
end
def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.zone.now
end
protected
def before_save
self.run_at ||= self.class.db_time_now
end
end
class EvaledJob
def initialize
@job = yield
end
def perform
eval(@job)
end
end
end
|