Delayed_job в отдельной базе данных
Как я уже когда-то писал, иногда проект дорастает до того, что ему не хватает одной базы данных. И тогда можно сделать вторую. Вопрос, какие таблицы переносить во вторую базу данных, всегда остается открытым. В нашем случае, поскольку на сервере БД очень много памяти и слабая дисковая подсистема, мне показалось разумным перенести туда таблицы, в которые идет интенсивная запись. И эти таблицы не должны быть «связаны», то есть перенос их должен быть простым.
Вот такой запрос показывает потенциальных кандидатов на переезд.
SELECT
relname, n_tup_ins + n_tup_upd + n_tup_del
FROM
pg_stat_user_tables
ORDER BY
2
В нашем случае это оказалась таблица, где сохранялись ip-адреса, с которых пользователи заходят на сайт, и
таблица delayed_jobs
. В rails 2.3 делалось это очень просто.
# config/initializers/delayed_job_config.rb
class Delayed::Backend::ActiveRecord::Job
establish_connection "second_#{Rails.env}"
end
В случае же с rails 3, на которые вот уже совсем скоро переедет Групон, все оказалось не так просто. Gem delayed_jobs
версии 2.1.4 достаточно сильно отличается от версии 2.0. Поэтому старый трюк работать перестал. Гугление не помогло,
поэтому пришлось залесть в его исходники, понять, чего там внутри происходит, и сделать вот такой monkey-patch.
# config/initializers/delayed_job_config.rb
Delayed::Worker.backend = SecondDbDj
# app/models/second_db_dj.rb
class SecondDbDj < SecondDb # это абстрактный класс, который подключает ко второй БД
include Delayed::Backend::Base
set_table_name :delayed_jobs
before_save :set_default_run_at
scope :ready_to_run, lambda {|worker_name, max_run_time|
where(['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now,
db_time_now - max_run_time, worker_name])
}
scope :by_priority, order('priority ASC, run_at ASC')
def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end
def self.after_fork
::ActiveRecord::Base.establish_connection
end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Delayed::Worker.max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Delayed::Worker.min_priority]) if Delayed::Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Delayed::Worker.max_priority]) if Delayed::Worker.max_priority
::ActiveRecord::Base.silence do
scope.by_priority.all(:limit => limit)
end
end
# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker)
now = self.class.db_time_now
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at
< ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
if affected_rows == 1
self.locked_at = now
self.locked_by = worker
self.locked_at_will_change!
self.locked_by_will_change!
return true
else
return false
end
end
# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have syncronized clocks.
def self.db_time_now
if Time.zone
Time.zone.now
elsif ::ActiveRecord::Base.default_timezone == :utc
Time.now.utc
else
Time.now
end
end
end
Эта огромная простыня отличается от того, что находится в delayed_job
всего тремя строчками. Если кто-то знает более
простой способ сделать то же самое, я буду рад об этом узнать.