MiGA Daemons handling job submissions.
Array of jobs currently running
Array of jobs next to be executed
Options used to setup the daemon
MiGA::Project in which the daemon is running
Daemon's home inside the MiGA::Project
project
or a String with the full
path to the project's 'daemon' folder
# File lib/miga/daemon.rb, line 19 def daemon_home(project) return project if project.is_a? String File.join(project.path, 'daemon') end
Initialize an unactive daemon for the MiGA::Project project
. See daemon to wake the
daemon. If passed, json
must be the path to a daemon
definition in json format. Otherwise, the project-stored daemon definition
is used. In either case, missing variables are used as defined in
~/.miga_daemon.json.
# File lib/miga/daemon.rb, line 44 def initialize(project, json = nil) @project = project @runopts = {} json ||= File.join(project.path, 'daemon/daemon.json') default_json = File.expand_path('.miga_daemon.json', ENV['MIGA_HOME']) MiGA::Json.parse( json, default: File.exist?(default_json) ? default_json : nil ).each { |k, v| runopts(k, v) } update_format_0 @jobs_to_run = [] @jobs_running = [] end
Traverse datasets, and returns boolean indicating if at any reference datasets are incomplete
# File lib/miga/daemon.rb, line 188 def check_datasets l_say(2, 'Checking datasets') o = false project.each_dataset do |ds| next unless ds.status == :incomplete next if ds.next_preprocessing(false).nil? o = true if ds.ref? queue_job(:d, ds) end unless show_log? n = project.dataset_names.count k = jobs_to_run.size + jobs_running.size k -= 1 unless get_job(:maintenance).nil? advance('Datasets:', n - k, n, false) miga_say if k == 0 end o end
Check if all reference datasets are pre-processed. If yes, check the project-level tasks
# File lib/miga/daemon.rb, line 211 def check_project l_say(2, 'Checking project') # Ignore task if the project has no datasets return if project.dataset_names.empty? # Double-check if all datasets are ready return unless project.done_preprocessing? # Queue project-level job to_run = project.next_task(nil, false) queue_job(:p) unless to_run.nil? end
Run only in the first loop
# File lib/miga/daemon.rb, line 77 def daemon_first_loop say '-----------------------------------' say 'MiGA:%s launched' % project.name say '-----------------------------------' miga_say "Saving log to: #{output_file}" unless show_log? say 'Configuration options:' say @runopts.to_s load_status queue_maintenance(true) end
Path to the daemon home
# File lib/miga/daemon.rb, line 59 def daemon_home self.class.daemon_home(project) end
Run one loop step. Returns a Boolean indicating if the loop should continue
# File lib/miga/daemon.rb, line 90 def daemon_loop l_say(3, 'Daemon loop start') reload_project check_datasets or check_project if shutdown_when_done? && (jobs_running.size + jobs_to_run.size).zero? say 'Nothing else to do, shutting down' exit_cleanup return false end flush! if (loop_i % 12).zero? purge! queue_maintenance if (loop_i % (12 * (skip_maintenance + 1))).zero? end save_status sleep(latency) l_say(3, 'Daemon loop end') true end
Name of the daemon
# File lib/miga/daemon.rb, line 65 def daemon_name "MiGA:#{project.name}" end
Remove temporary files on completion
# File lib/miga/daemon.rb, line 121 def exit_cleanup FileUtils.rm_f(File.join(daemon_home, 'status.json')) end
Remove finished jobs from the internal queue and launch as many as possible respecting maxjobs or nodelist (if set).
# File lib/miga/daemon.rb, line 285 def flush! # Check for finished jobs l_say(2, 'Checking for finished jobs') @jobs_running.select! do |job| ongoing = case job[:job].to_s when 'd' !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil? when 'p' !project.next_task(nil, false).nil? else (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil? end say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing ongoing end # Avoid single datasets hogging resources @jobs_to_run.rotate! rand(jobs_to_run.size) # Prioritize: Project-wide > MiGA Online queries > Other datasets @jobs_to_run.sort_by! do |job| job[:ds].nil? ? 1 : job[:ds_name] =~ /^qG_/ ? 2 : 3 end # Launch as many +jobs_to_run+ as possible while (hostk = next_host) break if jobs_to_run.empty? launch_job(@jobs_to_run.shift, hostk) end end
Get the taks with key symbol job
in dataset ds
.
For project-wide tasks let ds
be nil.
# File lib/miga/daemon.rb, line 272 def get_job(job, ds = nil) (jobs_to_run + jobs_running).find do |j| if ds.nil? j[:ds].nil? && j[:job] == job else !j[:ds].nil? && j[:ds].name == ds.name && j[:job] == job end end end
Construct the command for the given job definition with current daemon settings
# File lib/miga/daemon.rb, line 242 def job_cmd(to_run) what = to_run[:ds].nil? ? :project : :dataset vars = { 'PROJECT' => project.path, 'RUNTYPE' => runopts_for(:type, what), 'CORES' => ppn(what), 'MIGA' => MiGA::MiGA.root_path } vars['DATASET'] = to_run[:ds].name unless to_run[:ds].nil? log_dir = File.expand_path("daemon/#{to_run[:job]}", project.path) FileUtils.mkdir_p(log_dir) var_hsh = { script: MiGA::MiGA.script_path( to_run[:job], miga: vars['MIGA'], project: project ), vars: vars.map do |k, v| runopts(:var).miga_variables(key: k, value: v) end.join(runopts_for(:varsep, what)), cpus: ppn(what), log: File.join(log_dir, "#{to_run[:ds_name]}.log"), task_name: to_run[:task_name], task_name_simple: to_run[:task_name].gsub(/[^A-Za-z0-9_]/, '-'), miga: File.join(MiGA::MiGA.root_path, 'bin/miga').shellescape } runopts_for(:cmd, what).miga_variables(var_hsh) end
Send msg
to say
as long as level
is
at most verbosity
# File lib/miga/daemon.rb, line 127 def l_say(level, *msg) say(*msg) if verbosity >= level end
Launch the job described by Hash job
to hostk
-th
host
# File lib/miga/daemon.rb, line 342 def launch_job(job, hostk = nil) # Execute job job[:cmd] = job_cmd(job) MiGA::MiGA.DEBUG "CMD: #{job[:cmd]}" case runopts(:type) when 'ssh' # Remote job job[:hostk] = hostk job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk]) job[:pid] = spawn job[:cmd] MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}" Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid]) when 'bash' # Local job job[:pid] = spawn job[:cmd] MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}" Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid]) else # Schedule cluster job (qsub, msub, slurm) job[:pid] = MiGA::MiGA.run_cmd(job[:cmd], return: :output).chomp end # Check if registered if [nil, '', 0].include? job[:pid] job[:pid] = nil @jobs_to_run << job say "Unsuccessful #{job[:task_name]}, rescheduling" else @jobs_running << job job_host = " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk] say "Spawned pid:#{job[:pid]}#{job_host} for #{job[:task_name]}" end end
Load the status of a previous instance.
# File lib/miga/daemon.rb, line 161 def load_status f_path = File.join(daemon_home, 'status.json') return unless File.size? f_path say 'Loading previous status in daemon/status.json:' status = MiGA::Json.parse(f_path) status.each_key do |i| status[i].map! do |j| j.tap do |k| unless k[:ds].nil? || k[:ds_name] == 'miga-project' k[:ds] = project.dataset(k[:ds_name]) end k[:job] = k[:job].to_sym unless k[:job].nil? end end end @jobs_running = status[:jobs_running] @jobs_to_run = status[:jobs_to_run] say "- jobs left running: #{@jobs_running.size}" purge! say "- jobs running: #{@jobs_running.size}" say "- jobs to run: #{@jobs_to_run.size}" end
Rename the orginal MiGA::MiGA#say as
miga_say
, allowing external reporting since MiGA::Daemon overwrites say
In SSH daemons, retrieve the host index of an available node, nil if none. In any other daemons, returns true as long as maxjobs is not reached
# File lib/miga/daemon.rb, line 321 def next_host return jobs_running.size < maxjobs if runopts(:type) != 'ssh' allk = (0..nodelist.size - 1).to_a busyk = jobs_running.map { |k| k[:hostk] } (allk - busyk).first end
Alias to project.path
for compatibility with lairs
# File lib/miga/daemon.rb, line 71 def path project.path end
Remove dead jobs.
# File lib/miga/daemon.rb, line 331 def purge! say 'Probing running jobs' @jobs_running.select! do |job| MiGA::MiGA.run_cmd( runopts(:alive).miga_variables(pid: job[:pid]), return: :output ).chomp.to_i == 1 end end
Add the task to the internal queue with symbol key job
. If the
task is dataset-specific, ds
specifies the dataset. To submit
jobs to the scheduler (or to bash or ssh) see flush!
# File lib/miga/daemon.rb, line 229 def queue_job(job, ds = nil) return nil unless get_job(job, ds).nil? ds_name = (ds.nil? ? 'miga-project' : ds.name) task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}" to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name } say 'Queueing %s:%s' % [to_run[:ds_name], to_run[:job]] @jobs_to_run << to_run end
Queue maintenance tasks as an analysis job
# File lib/miga/daemon.rb, line 112 def queue_maintenance(force = false) return if bypass_maintenance? || (!force && shutdown_when_done?) say 'Queueing maintenance tasks' queue_job(:maintenance) end
Reload the project's metadata
# File lib/miga/daemon.rb, line 144 def reload_project l_say(2, 'Reloading project') project.load end
Report status in a JSON file.
# File lib/miga/daemon.rb, line 151 def save_status l_say(2, 'Saving current status') MiGA::Json.generate( { jobs_running: @jobs_running, jobs_to_run: @jobs_to_run }, File.join(daemon_home, 'status.json') ) end
Same as l_say
with +level = 1+
# File lib/miga/daemon.rb, line 138 def say(*msg) super(logfh, *msg) if verbosity >= 1 end
Update from daemon JSON format 0 to the latest version
# File lib/miga/daemon.rb, line 378 def update_format_0 { cmd: %w[script vars cpus log task_name], var: %w[key value], alive: %w[pid], kill: %w[pid] }.each do |k, v| if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/ runopts( k, runopts(k).gsub(/%(\d+\$)?d/, '%\1s') % v.map { |i| "{{#{i}}}" } ) end end runopts(:format_version, 1) end