class MiGA::Daemon

MiGA Daemons handling job submissions.

Attributes

jobs_running[R]

Array of jobs currently running

jobs_to_run[R]

Array of jobs next to be executed

options[R]

Options used to setup the daemon

project[R]

MiGA::Project in which the daemon is running

Public Class Methods

daemon_home(project) click to toggle source

Daemon's home inside the MiGA::Project project or a String with the full path to the project's 'daemon' folder

# File lib/miga/daemon.rb, line 19
def daemon_home(project)
  return project if project.is_a? String

  File.join(project.path, 'daemon')
end
new(project, json = nil) click to toggle source

Initialize an unactive daemon for the MiGA::Project project. See daemon to wake the daemon. If passed, json must be the path to a daemon definition in json format. Otherwise, the project-stored daemon definition is used. In either case, missing variables are used as defined in ~/.miga_daemon.json.

# File lib/miga/daemon.rb, line 44
def initialize(project, json = nil)
  @project = project
  @runopts = {}
  json ||= File.join(project.path, 'daemon/daemon.json')
  default_json = File.expand_path('.miga_daemon.json', ENV['MIGA_HOME'])
  MiGA::Json.parse(
    json, default: File.exist?(default_json) ? default_json : nil
  ).each { |k, v| runopts(k, v) }
  update_format_0
  @jobs_to_run = []
  @jobs_running = []
end

Public Instance Methods

check_datasets() click to toggle source

Traverse datasets, and returns boolean indicating if at any reference datasets are incomplete

# File lib/miga/daemon.rb, line 188
def check_datasets
  l_say(2, 'Checking datasets')
  o = false
  project.each_dataset do |ds|
    next unless ds.status == :incomplete
    next if ds.next_preprocessing(false).nil?

    o = true if ds.ref?
    queue_job(:d, ds)
  end
  unless show_log?
    n = project.dataset_names.count
    k = jobs_to_run.size + jobs_running.size
    k -= 1 unless get_job(:maintenance).nil?
    advance('Datasets:', n - k, n, false)
    miga_say if k == 0
  end
  o
end
check_project() click to toggle source

Check if all reference datasets are pre-processed. If yes, check the project-level tasks

# File lib/miga/daemon.rb, line 211
def check_project
  l_say(2, 'Checking project')

  # Ignore task if the project has no datasets
  return if project.dataset_names.empty?

  # Double-check if all datasets are ready
  return unless project.done_preprocessing?

  # Queue project-level job
  to_run = project.next_task(nil, false)
  queue_job(:p) unless to_run.nil?
end
daemon_first_loop() click to toggle source

Run only in the first loop

# File lib/miga/daemon.rb, line 77
def daemon_first_loop
  say '-----------------------------------'
  say 'MiGA:%s launched' % project.name
  say '-----------------------------------'
  miga_say "Saving log to: #{output_file}" unless show_log?
  say 'Configuration options:'
  say @runopts.to_s
  load_status
  queue_maintenance(true)
end
daemon_home() click to toggle source

Path to the daemon home

# File lib/miga/daemon.rb, line 59
def daemon_home
  self.class.daemon_home(project)
end
daemon_loop() click to toggle source

Run one loop step. Returns a Boolean indicating if the loop should continue

# File lib/miga/daemon.rb, line 90
def daemon_loop
  l_say(3, 'Daemon loop start')
  reload_project
  check_datasets or check_project
  if shutdown_when_done? && (jobs_running.size + jobs_to_run.size).zero?
    say 'Nothing else to do, shutting down'
    exit_cleanup
    return false
  end
  flush!
  if (loop_i % 12).zero?
    purge!
    queue_maintenance if (loop_i % (12 * (skip_maintenance + 1))).zero?
  end
  save_status
  sleep(latency)
  l_say(3, 'Daemon loop end')
  true
end
daemon_name() click to toggle source

Name of the daemon

# File lib/miga/daemon.rb, line 65
def daemon_name
  "MiGA:#{project.name}"
end
exit_cleanup() click to toggle source

Remove temporary files on completion

# File lib/miga/daemon.rb, line 121
def exit_cleanup
  FileUtils.rm_f(File.join(daemon_home, 'status.json'))
end
flush!() click to toggle source

Remove finished jobs from the internal queue and launch as many as possible respecting maxjobs or nodelist (if set).

# File lib/miga/daemon.rb, line 285
def flush!
  # Check for finished jobs
  l_say(2, 'Checking for finished jobs')
  @jobs_running.select! do |job|
    ongoing =
      case job[:job].to_s
      when 'd'
        !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil?
      when 'p'
        !project.next_task(nil, false).nil?
      else
        (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil?
      end
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing
    ongoing
  end

  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)

  # Prioritize: Project-wide > MiGA Online queries > Other datasets
  @jobs_to_run.sort_by! do |job|
    job[:ds].nil? ? 1 : job[:ds_name] =~ /^qG_/ ? 2 : 3
  end

  # Launch as many +jobs_to_run+ as possible
  while (hostk = next_host)
    break if jobs_to_run.empty?

    launch_job(@jobs_to_run.shift, hostk)
  end
end
get_job(job, ds = nil) click to toggle source

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.

# File lib/miga/daemon.rb, line 272
def get_job(job, ds = nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds.nil?
      j[:ds].nil? && j[:job] == job
    else
      !j[:ds].nil? && j[:ds].name == ds.name && j[:job] == job
    end
  end
end
job_cmd(to_run) click to toggle source

Construct the command for the given job definition with current daemon settings

# File lib/miga/daemon.rb, line 242
def job_cmd(to_run)
  what = to_run[:ds].nil? ? :project : :dataset
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts_for(:type, what),
    'CORES' => ppn(what),
    'MIGA' => MiGA::MiGA.root_path
  }
  vars['DATASET'] = to_run[:ds].name unless to_run[:ds].nil?
  log_dir = File.expand_path("daemon/#{to_run[:job]}", project.path)
  FileUtils.mkdir_p(log_dir)
  var_hsh = {
    script: MiGA::MiGA.script_path(
              to_run[:job], miga: vars['MIGA'], project: project
            ),
    vars: vars.map do |k, v|
            runopts(:var).miga_variables(key: k, value: v)
          end.join(runopts_for(:varsep, what)),
    cpus: ppn(what),
    log: File.join(log_dir, "#{to_run[:ds_name]}.log"),
    task_name: to_run[:task_name],
    task_name_simple: to_run[:task_name].gsub(/[^A-Za-z0-9_]/, '-'),
    miga: File.join(MiGA::MiGA.root_path, 'bin/miga').shellescape
  }
  runopts_for(:cmd, what).miga_variables(var_hsh)
end
l_say(level, *msg) click to toggle source

Send msg to say as long as level is at most verbosity

# File lib/miga/daemon.rb, line 127
def l_say(level, *msg)
  say(*msg) if verbosity >= level
end
launch_job(job, hostk = nil) click to toggle source

Launch the job described by Hash job to hostk-th host

# File lib/miga/daemon.rb, line 342
def launch_job(job, hostk = nil)
  # Execute job
  job[:cmd] = job_cmd(job)
  MiGA::MiGA.DEBUG "CMD: #{job[:cmd]}"
  case runopts(:type)
  when 'ssh'
    # Remote job
    job[:hostk] = hostk
    job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk])
    job[:pid] = spawn job[:cmd]
    MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
    Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
  when 'bash'
    # Local job
    job[:pid] = spawn job[:cmd]
    MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
    Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
  else
    # Schedule cluster job (qsub, msub, slurm)
    job[:pid] = MiGA::MiGA.run_cmd(job[:cmd], return: :output).chomp
  end

  # Check if registered
  if [nil, '', 0].include? job[:pid]
    job[:pid] = nil
    @jobs_to_run << job
    say "Unsuccessful #{job[:task_name]}, rescheduling"
  else
    @jobs_running << job
    job_host = " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk]
    say "Spawned pid:#{job[:pid]}#{job_host} for #{job[:task_name]}"
  end
end
load_status() click to toggle source

Load the status of a previous instance.

# File lib/miga/daemon.rb, line 161
def load_status
  f_path = File.join(daemon_home, 'status.json')
  return unless File.size? f_path

  say 'Loading previous status in daemon/status.json:'
  status = MiGA::Json.parse(f_path)
  status.each_key do |i|
    status[i].map! do |j|
      j.tap do |k|
        unless k[:ds].nil? || k[:ds_name] == 'miga-project'
          k[:ds] = project.dataset(k[:ds_name])
        end
        k[:job] = k[:job].to_sym unless k[:job].nil?
      end
    end
  end
  @jobs_running = status[:jobs_running]
  @jobs_to_run  = status[:jobs_to_run]
  say "- jobs left running: #{@jobs_running.size}"
  purge!
  say "- jobs running: #{@jobs_running.size}"
  say "- jobs to run: #{@jobs_to_run.size}"
end
miga_say(*msg)

Rename the orginal MiGA::MiGA#say as miga_say, allowing external reporting since MiGA::Daemon overwrites say

Alias for: say
next_host() click to toggle source

In SSH daemons, retrieve the host index of an available node, nil if none. In any other daemons, returns true as long as maxjobs is not reached

# File lib/miga/daemon.rb, line 321
def next_host
  return jobs_running.size < maxjobs if runopts(:type) != 'ssh'

  allk = (0..nodelist.size - 1).to_a
  busyk = jobs_running.map { |k| k[:hostk] }
  (allk - busyk).first
end
path() click to toggle source

Alias to project.path for compatibility with lairs

# File lib/miga/daemon.rb, line 71
def path
  project.path
end
purge!() click to toggle source

Remove dead jobs.

# File lib/miga/daemon.rb, line 331
def purge!
  say 'Probing running jobs'
  @jobs_running.select! do |job|
    MiGA::MiGA.run_cmd(
      runopts(:alive).miga_variables(pid: job[:pid]), return: :output
    ).chomp.to_i == 1
  end
end
queue_job(job, ds = nil) click to toggle source

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash or ssh) see flush!

# File lib/miga/daemon.rb, line 229
def queue_job(job, ds = nil)
  return nil unless get_job(job, ds).nil?

  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
  to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name }
  say 'Queueing %s:%s' % [to_run[:ds_name], to_run[:job]]
  @jobs_to_run << to_run
end
queue_maintenance(force = false) click to toggle source

Queue maintenance tasks as an analysis job

# File lib/miga/daemon.rb, line 112
def queue_maintenance(force = false)
  return if bypass_maintenance? || (!force && shutdown_when_done?)

  say 'Queueing maintenance tasks'
  queue_job(:maintenance)
end
reload_project() click to toggle source

Reload the project's metadata

# File lib/miga/daemon.rb, line 144
def reload_project
  l_say(2, 'Reloading project')
  project.load
end
save_status() click to toggle source

Report status in a JSON file.

# File lib/miga/daemon.rb, line 151
def save_status
  l_say(2, 'Saving current status')
  MiGA::Json.generate(
    { jobs_running: @jobs_running, jobs_to_run: @jobs_to_run },
    File.join(daemon_home, 'status.json')
  )
end
say(*msg) click to toggle source

Same as l_say with +level = 1+

Calls superclass method MiGA::MiGA#say
# File lib/miga/daemon.rb, line 138
def say(*msg)
  super(logfh, *msg) if verbosity >= 1
end
Also aliased as: miga_say
update_format_0() click to toggle source

Update from daemon JSON format 0 to the latest version

# File lib/miga/daemon.rb, line 378
def update_format_0
  {
    cmd: %w[script vars cpus log task_name],
    var: %w[key value],
    alive: %w[pid],
    kill: %w[pid]
  }.each do |k, v|
    if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/
      runopts(
        k, runopts(k).gsub(/%(\d+\$)?d/, '%\1s') % v.map { |i| "{{#{i}}}" }
      )
    end
  end
  runopts(:format_version, 1)
end