class MiGA::Daemon

MiGA Daemons handling job submissions.

Constants

MIGA_DAEMON_LAIR

Attributes

jobs_running[R]

Array of jobs currently running.

jobs_to_run[R]

Array of jobs next to be executed.

loop_i[R]

Integer indicating the current iteration.

options[R]

Options used to setup the daemon.

project[R]

MiGA::Project in which the daemon is running.

Public Class Methods

last_alive(project) click to toggle source

When was the last time a daemon for the MiGA::Project project was seen active? Returns Time.

# File lib/miga/daemon.rb, line 16
def self.last_alive(project)
  f = File.expand_path('daemon/alive', project.path)
  return nil unless File.exist? f
  Time.parse(File.read(f))
end
new(project) click to toggle source

Initialize an unactive daemon for the MiGA::Project project. See daemon to wake the daemon.

# File lib/miga/daemon.rb, line 39
def initialize(project)
  $_MIGA_DAEMON_LAIR << self
  @project = project
  @runopts = MiGA::Json.parse(
    File.expand_path('daemon/daemon.json', project.path),
    default: File.expand_path('.miga_daemon.json', ENV['MIGA_HOME']))
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Public Instance Methods

check_datasets() click to toggle source

Traverse datasets

# File lib/miga/daemon.rb, line 119
def check_datasets
  project.each_dataset do |n, ds|
    if ds.nil?
      say "Warning: Dataset #{n} listed but not loaded, reloading project"
      project.load
    else
      to_run = ds.next_preprocessing(false)
      queue_job(:d, ds) unless to_run.nil?
    end
  end
end
check_project() click to toggle source

Check if all reference datasets are pre-processed. If yes, check the project-level tasks

# File lib/miga/daemon.rb, line 134
def check_project
  return if project.dataset_names.empty?
  return unless project.done_preprocessing?(false)
  to_run = project.next_distances(true)
  to_run = project.next_inclade(true) if to_run.nil?
  queue_job(:p) unless to_run.nil?
end
daemon(task, opts=[]) click to toggle source

Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.

# File lib/miga/daemon.rb, line 67
def daemon(task, opts=[])
  options = default_options
  opts.unshift(task)
  options[:ARGV] = opts
  Daemons.run_proc("MiGA:#{project.name}", options) do
    loop { break unless in_loop }
  end
end
declare_alive() click to toggle source

Tell the world that you're alive.

# File lib/miga/daemon.rb, line 78
def declare_alive
  f = File.open(File.expand_path('daemon/alive', project.path), 'w')
  f.print Time.now.to_s
  f.close
end
default_options() click to toggle source

Returns Hash containing the default options for the daemon.

# File lib/miga/daemon.rb, line 59
def default_options
  { dir_mode: :normal, dir: File.expand_path('daemon', project.path),
    multiple: false, log_output: true }
end
flush!() click to toggle source

Remove finished jobs from the internal queue and launch as many as possible respecting maxjobs.

# File lib/miga/daemon.rb, line 192
def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    ongoing = case job[:job].to_s
    when 'd'
      not job[:ds].next_preprocessing(false).nil?
    when 'p'
      not project.next_task(nil, false).nil?
    else
      (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil?
    end
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless ongoing
    ongoing
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while jobs_running.size < maxjobs
    break if jobs_to_run.empty?
    launch_job @jobs_to_run.shift
  end
end
get_job(job, ds = nil) click to toggle source

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.

# File lib/miga/daemon.rb, line 179
def get_job(job, ds = nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds.nil?
      j[:ds].nil? and j[:job] == job
    else
      (! j[:ds].nil?) and j[:ds].name == ds.name and j[:job] == job
    end
  end
end
in_loop() click to toggle source

Run one loop step. Returns a Boolean indicating if the loop should continue.

# File lib/miga/daemon.rb, line 225
def in_loop
  declare_alive
  project.load
  if loop_i == -1
    say '-----------------------------------'
    say 'MiGA:%s launched.' % project.name
    say '-----------------------------------'
    load_status
    @loop_i = 0
  end
  @loop_i += 1
  check_datasets
  check_project
  if shutdown_when_done? and jobs_running.size + jobs_to_run.size == 0
    say 'Nothing else to do, shutting down.'
    return false
  end
  flush!
  if loop_i==4
    say 'Housekeeping for sanity'
    @loop_i = 0
    purge!
  end
  report_status
  sleep(latency)
  true
end
last_alive() click to toggle source

When was the last time a daemon for the current project was seen active? Returns Time.

# File lib/miga/daemon.rb, line 53
def last_alive
  MiGA::Daemon.last_alive project
end
load_status() click to toggle source

Load the status of a previous instance.

# File lib/miga/daemon.rb, line 94
def load_status
  f_path = File.expand_path('daemon/status.json', project.path)
  return unless File.size? f_path
  say 'Loading previous status in daemon/status.json:'
  status = MiGA::Json.parse(f_path)
  status.keys.each do |i|
    status[i].map! do |j|
      j.tap do |k|
        unless k[:ds].nil? or k[:ds_name] == 'miga-project'
          k[:ds] = project.dataset(k[:ds_name])
        end
        k[:job] = k[:job].to_sym unless k[:job].nil?
      end
    end
  end
  @jobs_running = status[:jobs_running]
  @jobs_to_run  = status[:jobs_to_run]
  say "- jobs left running: #{@jobs_running.size}"
  purge!
  say "- jobs running: #{@jobs_running.size}"
  say "- jobs to run: #{@jobs_to_run.size}"
end
purge!() click to toggle source

Remove dead jobs.

# File lib/miga/daemon.rb, line 217
def purge!
  @jobs_running.select! do |job|
    %x#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
  end
end
queue_job(job, ds=nil) click to toggle source

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see flush!.

# File lib/miga/daemon.rb, line 146
def queue_job(job, ds=nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  say 'Queueing %s:%s' % [ds_name, job]
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts(:type),
    'CORES'   => ppn,
    'MIGA'    => MiGA::MiGA.root_path
  }
  vars['DATASET'] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
  to_run = {ds: ds, ds_name: ds_name, job: job, task_name: task_name,
    cmd: sprintf(runopts(:cmd),
      # 1: script
      MiGA::MiGA.script_path(job, miga:vars['MIGA'], project:project),
      # 2: vars
      vars.keys.map { |k| sprintf(runopts(:var), k, vars[k]) }.
        join(runopts(:varsep)),
      # 3: CPUs
      ppn,
      # 4: log file
      File.expand_path("#{ds_name}.log", log_dir),
      # 5: task name
      task_name)}
  @jobs_to_run << to_run
end
report_status() click to toggle source

Report status in a JSON file.

# File lib/miga/daemon.rb, line 86
def report_status
  MiGA::Json.generate(
    {jobs_running: @jobs_running, jobs_to_run: @jobs_to_run},
    File.expand_path('daemon/status.json', project.path))
end
say(*opts) click to toggle source

Send a datestamped message to the log.

# File lib/miga/daemon.rb, line 255
def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end
terminate() click to toggle source

Terminates a daemon.

# File lib/miga/daemon.rb, line 261
def terminate
  say 'Terminating daemon...'
  report_status
  f = File.expand_path('daemon/alive', project.path)
  File.unlink(f) if File.exist? f
end