openbsd-ports/infrastructure/bin/dpb
espie e4c671527c add affinity information to restart jobs on the right host preferentially.
- affinity info is similar to locks, but with a completely different
lifetime.
- streamline the main loop of the engine, so that it can do two passes:
first pass shuns paths with the wrong affinity. If no good path is found,
those are considered during the second pass.
- make the Core factory aware of what hosts might be running, so that
affinity info for machines removed from a config file will be ignored.

thanks to landry@ for a few tests.
2012-12-24 17:24:46 +00:00

711 lines
15 KiB
Perl
Executable File

#! /usr/bin/perl
# ex:ts=8 sw=4:
# $OpenBSD: dpb,v 1.63 2012/12/24 17:24:46 espie Exp $
#
# Copyright (c) 2010 Marc Espie <espie@openbsd.org>
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
use strict;
use warnings;
my $ports1;
use FindBin;
BEGIN {
$ports1 = $ENV{PORTSDIR} || '/usr/ports';
}
use lib ("$ports1/infrastructure/lib", "$FindBin::Bin/../lib");
package DPB::State;
our @ISA = qw(OpenBSD::State);
use OpenBSD::State;
use OpenBSD::Paths;
use DPB::Heuristics;
use DPB::PkgPath;
use DPB::Logger;
use DPB::Affinity;
use File::Path;
use File::Basename;
sub define_present
{
my ($self, $k) = @_;
return defined $self->{subst}{$k};
}
sub init
{
my $self = shift;
$self->SUPER::init;
$self->{no_exports} = 1;
$self->{heuristics} = DPB::Heuristics->new($self);
$self->{make} = $ENV{MAKE} || OpenBSD::Paths->make;
($self->{ports}, $self->{portspath}, $self->{repo}, $self->{localarch}, $self->{distdir}) =
DPB::Vars->get($self->make,
"PORTSDIR", "PORTSDIR_PATH", "PACKAGE_REPOSITORY",
"MACHINE_ARCH", "DISTDIR");
$self->{arch} = $self->{localarch};
$self->{portspath} = [ split(/:/, $self->{portspath}) ];
$self->{starttime} = time();
return $self;
}
sub startdate
{
my $self = shift;
my @l = gmtime $self->{starttime};
$l[5] += 1900;
$l[4] ++;
return sprintf '%04d-%02d-%02d@%02d:%02d:%02d', @l[5,4,3,2,1,0];
}
sub expand_path
{
my ($self, $path) = @_;
$path =~ s/\%p/$self->{ports}/g;
$path =~ s/\%h/DPB::Core::Local->hostname/ge;
$path =~ s/\%a/$self->{arch}/g;
$path =~ s/\%t/$self->{starttime}/g;
$path =~ s/\%d/$self->startdate/ge;
$path =~ s/\%f/$self->{distdir}/g;
return $path;
}
sub interpret_path
{
my ($state, $path, $do, $scale) = @_;
my $weight;
if ($path =~ s/\=(.*)//) {
$weight = $1;
}
if ($path =~ s/\*(\d+)$//) {
$scale = $1;
}
$path =~ s/\/+$//;
$path =~ s/^\.\/+//;
my $p = DPB::PkgPath->new($path);
if (defined $scale) {
$p->{scaled} = $scale;
}
for my $d (@{$state->{portspath}}) {
if (-d join('/', $d , $p->pkgpath)) {
&$do($p, $weight);
return;
}
}
$state->usage("Bad package path: #1", $path);
}
sub interpret_paths
{
my $state = shift;
my $do = pop;
for my $file (@_) {
my $scale;
if ($file =~ s/\*(\d+)$//) {
$scale = $1;
}
if (-f $file) {
open my $fh, '<', $file or
$state->usage("Can't open $file");
my $_;
while (<$fh>) {
chomp;
s/\s*(?:\#.*)?$//;
next if m/^$/;
$state->interpret_path($_, $do, $scale);
}
} else {
$state->interpret_path($file, $do);
}
}
}
sub handle_options
{
my $state = shift;
$state->{dontclean} = {};
$state->{opt} = {
A => sub {
$state->{arch} = shift;
},
L => sub {
$state->{logdir} = shift;
},
r => sub {
$state->{random} = 1;
$state->heuristics->random;
},
M => sub {
$state->heuristics->set_threshold(shift)
},
P => sub {
push(@{$state->{paths}}, shift);
},
I => sub {
push(@{$state->{ipaths}}, shift);
},
C => sub {
push(@{$state->{cpaths}}, shift);
},
b => sub {
push(@{$state->{build_files}}, shift);
},
S => sub {
$state->parse_size_file(shift, $state->heuristics);
},
};
$state->SUPER::handle_options('acemqrRsuUvh:xA:C:f:F:I:j:J:M:p:P:b:L:S:',
"[-acemrRsuUvx] [-A arch] [-C plist] [-f N] [-F N] [-I plist] [-J p] [-j N]",
"[-p parallel] [-P plist] [-h hosts] [-L logdir] [-b log] [-M threshold]",
"[path ...]");
$state->{fullrepo} = join("/", $state->{repo}, $state->arch, "all");
$state->{logdir} //= $ENV{LOGDIR} // '%p/logs/%a';
if (defined $state->{opt}{F}) {
if (defined $state->{opt}{j} || defined $state->{opt}{f}) {
$state->usage("Can't use -F with -f or -j");
}
$state->{fetch_only} = 1;
$state->{opt}{f} = $state->{opt}{F};
$state->{opt}{j} = 1;
$state->{opt}{e} = 1;
}
$state->{opt}{f} //= 2;
if (defined $state->opt('j')) {
if ($state->localarch ne $state->arch) {
$state->usage(
"Can't use -j if -A arch is not local architecture");
}
if ($state->opt('j') !~ m/^\d+$/) {
$state->usage("-j takes a numerical argument");
}
}
if ($state->opt('f') !~ m/^\d+$/) {
$state->usage("-f takes a numerical argument");
}
if ($state->opt('f')) {
$state->{want_fetchinfo} = 1;
}
if (!$state->{subst}->empty('HISTORY_ONLY')) {
$state->{want_fetchinfo} = 1;
$state->{opt}{f} = 0;
$state->{opt}{j} = 1;
$state->{opt}{e} = 1;
$state->{all} = 1;
$state->{scan_only} = 1;
# XXX not really random, but no need to use dependencies
$state->{random} = 1;
}
$state->{logdir} = $state->expand_path($state->{logdir});
if ($state->opt('h')) {
$state->{config} = $state->expand_path($state->opt('h'));
}
if (!$state->{subst}->value("NO_BUILD_STATS")) {
push @{$state->{build_files}}, "%f/build-stats/%a";
}
for my $cat (qw(build_files paths ipaths cpaths)) {
next unless defined $state->{$cat};
for my $f (@{$state->{$cat}}) {
$f = $state->expand_path($f);
}
}
$state->{permanent_log} = $state->{build_files}[-1];
$state->{logger} = DPB::Logger->new($state->logdir, $state->opt('c'));
$state->heuristics->set_logger($state->logger);
$state->{display_timeout} =
$state->{subst}->value('DISPLAY_TIMEOUT') // 10;
$state->{build_once} = $state->{all};
if ($state->defines("DONT_BUILD_ONCE")) {
$state->{build_once} = 0;
}
$state->{concurrent} = $state->{logger}->open("concurrent");
}
sub start_cores
{
my $state = shift;
my $override_prop = {};
if ($state->opt('j')) {
$override_prop->{jobs} = $state->opt('j');
}
if ($state->opt('p')) {
$override_prop->{parallel} = $state->opt('p');
}
if ($state->define_present('STUCK_TIMEOUT')) {
$override_prop->{stuck} =
$state->{subst}->value('STUCK_TIMEOUT');
}
if ($state->define_present('CONNECTION_TIMEOUT')) {
$override_prop->{timeout} =
$state->{subst}->value('CONNECTION_TIMEOUT');
}
if ($state->define_present('WAIT_TIMEOUT')) {
$override_prop->{wait_timeout} =
$state->{subst}->value('WAIT_TIMEOUT');
}
if ($state->opt('J')) {
$override_prop->{junk} = $state->opt('J');
}
if ($state->defines("ALWAYS_CLEAN")) {
$override_prop->{always_clean} = 1;
}
my $default_prop = {
junk => 100,
parallel => '/2',
wait_timeout => 600,
};
if ($state->{config}) {
DPB::Core->parse_hosts_file($state->{config}, $state,
$default_prop, $override_prop);
}
if (!$state->{config}) {
my $prop = { %$default_prop };
while (my ($k, $v) = each %$override_prop) {
$prop->{$k} = $v;
}
DPB::Core::Factory->new('localhost', $prop);
}
DPB::Core::Factory->init_cores($state);
}
sub logger
{
return shift->{logger};
}
sub heuristics
{
return shift->{heuristics};
}
sub locker
{
return shift->{locker};
}
sub builder
{
return shift->{builder};
}
sub engine
{
return shift->{engine};
}
sub grabber
{
return shift->{grabber};
}
sub make
{
return shift->{make};
}
sub make_args
{
my $self = shift;
my @l = ($self->{make});
if ($self->{build_once}) {
push(@l, 'BUILD_ONCE=Yes');
}
return @l;
}
sub ports
{
return shift->{ports};
}
sub fullrepo
{
return shift->{fullrepo};
}
sub distdir
{
return shift->{distdir};
}
sub localarch
{
return shift->{localarch};
}
sub arch
{
return shift->{arch};
}
sub logdir
{
return shift->{logdir};
}
sub parse_build_line
{
return split(/\s+/, shift);
}
sub parse_build_file
{
my ($state, $fname) = @_;
if (!-f $fname) {
my $arch = $state->arch;
if (-f "$fname/$arch/build.log") {
$fname = "$fname/$arch/build.log";
} elsif (-f "$fname/build.log") {
$fname = "$fname/build.log";
}
}
open my $fh, '<', $fname or return;
my $_;
while (<$fh>) {
chomp;
next if $_ =~ m/!$/;
my ($pkgpath, $host, $time, $sz, @rest) = parse_build_line($_);
next if !defined $sz;
my $o = DPB::PkgPath->new($pkgpath);
push(@{$o->{stats}}, {host => $host, time => $time, sz => $sz});
}
}
sub add_build_info
{
my ($state, @consumers) = @_;
for my $p (DPB::PkgPath->seen) {
next unless defined $p->{stats};
my ($i, $time, $sz, $host);
for my $s (@{$p->{stats}}) {
$time += $s->{time};
$sz += $s->{sz};
$i++;
$host = $s->{host}; # XXX
}
for my $c (@consumers) {
$c->add_build_info($p, $host, $time/$i, $sz/$i);
}
}
}
sub rewrite_build_info
{
my $state = shift;
File::Path::mkpath(File::Basename::dirname($state->{permanent_log}));
open my $f, '>', $state->{permanent_log}.'.part' or return;
for my $p (sort {$a->fullpkgpath cmp $b->fullpkgpath}
DPB::PkgPath->seen) {
next unless defined $p->{stats};
shift @{$p->{stats}} while @{$p->{stats}} > 10;
for my $s (@{$p->{stats}}) {
print $f join(' ', $p->fullpkgpath, $s->{host},
$s->{time}, $s->{sz}), "\n";
}
delete $p->{stats};
}
close $f;
rename $state->{permanent_log}.'.part', $state->{permanent_log};
}
sub handle_build_files
{
my $state = shift;
return if $state->{fetch_only};
return unless defined $state->{build_files};
print "Reading build stats...";
for my $file (@{$state->{build_files}}) {
$state->parse_build_file($file);
}
$state->add_build_info($state->heuristics, "DPB::Job::Port");
print "zapping old stuff...";
$state->rewrite_build_info($state->{permanent_log});
print "Done\n";
$state->heuristics->finished_parsing;
}
sub parse_size_file
{
my ($state, $fname, @consumers) = @_;
open my $fh, '<', $fname or
$state->fatal("Couldn't open build file #1: #2", $fname, $!);
my $_;
while (<$fh>) {
chomp;
my ($pkgpath, $sz, $sz2) = split(/\s+/, $_);
if (defined $sz2) {
$sz += $sz2;
}
my $o = DPB::PkgPath->new($pkgpath);
for my $c (@consumers) {
$c->add_size_info($o, $sz);
}
}
}
package main;
use DPB::PkgPath;
use DPB::Core;
use DPB::Vars;
use DPB::PortInfo;
use DPB::Engine;
use DPB::PortBuilder;
use DPB::Reporter;
use OpenBSD::Error;
use DPB::Locks;
use DPB::Job;
use DPB::Grabber;
my $reporter;
# inspired by Carp::Always
$SIG{__WARN__} = sub {
require Carp;
my $_ = pop @_;
s/(.*)( at .*? line .*?\n$)/$1/s;
push @_, $_;
if (defined $reporter) {
$reporter->myprint(&Carp::longmess);
} else {
warn &Carp::longmess;
}
};
$SIG{__DIE__} = sub {
require Carp;
my $_ = pop @_;
s/(.*)( at .*? line .*?\n$)/$1/s;
push @_, $_;
if (defined $reporter) {
$reporter->reset_cursor;
}
die &Carp::longmess;
};
my $subdirlist = {};
my $state = DPB::State->new('dpb');
$state->handle_options;
$state->start_cores;
$state->{all} = 1;
my $default_handling =
sub {
my ($pkgpath, $weight) = @_;
if (defined $weight) {
$state->heuristics->set_weight($pkgpath);
}
$pkgpath->add_to_subdirlist($subdirlist);
$state->{all} = 0;
};
$state->interpret_paths(@{$state->{paths}}, @ARGV,
sub {
my $p = shift;
&$default_handling($p);
});
$state->interpret_paths(@{$state->{ipaths}},
sub {
my $p = shift;
&$default_handling($p);
$p->{wantinstall} = 1;
});
$state->interpret_paths(@{$state->{cpaths}},
sub {
my $p = shift;
$state->{dontclean}{$p->pkgpath} = 1;
});
if ($state->opt('a')) {
$state->{all} = 1;
}
$state->handle_build_files;
$state->{builder} = DPB::PortBuilder->new($state);
$state->{locker} = DPB::Locks->new($state, join("/", $state->logdir, "locks"));
$state->{affinity} = DPB::Affinity->new($state, join("/", $state->logdir, "affinity"));
$state->{engine} = DPB::Engine->new($state);
$reporter = DPB::Reporter->new($state,
$state->heuristics, "DPB::Core", $state->engine);
while (!DPB::Core->avail) {
DPB::Core->reap;
sleep 1;
}
my $core = DPB::Core->get;
#my $dump = DPB::Util->make_hot($state->logger->open('dump'));
$SIG{INFO} = sub {
$state->engine->info_dump($state->logger->open('info'));
# perl status may spew some garbage on the display,
# remove it during next refresh
$reporter->refresh;
};
my $keep_going = 1;
my $last_time = time() - $state->{display_timeout};
sub handle_non_waiting_jobs
{
my $need_clock = shift;
my $checked = !$need_clock; # XXX
my $reaped = DPB::Core->reap;
$keep_going = !-e $state->logdir."/stop";
if (DPB::Core->avail > 1) {
$state->engine->recheck_errors;
}
if (DPB::Core->avail) {
$state->engine->check_buildable(0);
$checked = 1;
}
while ($keep_going && DPB::Core->avail && $state->engine->can_build) {
$state->engine->start_new_job;
}
while ($keep_going && DPB::Core::Fetcher->avail &&
$state->engine->can_fetch) {
if (!$checked) {
$state->engine->check_buildable(1);
$checked = 1;
}
$state->engine->start_new_fetch;
}
my $current = time();
DPB::Core->log_concurrency($current, $state->{concurrent});
if ($need_clock) {
if ($current >= $last_time + $state->{display_timeout} ||
$reaped) {
$reporter->report;
$last_time = $current;
}
} else {
$reporter->report;
}
return $keep_going;
}
sub main_loop
{
while (1) {
while (1) {
handle_non_waiting_jobs(0);
if (!DPB::Core->running &&
(!$keep_going || !$state->engine->can_build)) {
$state->engine->check_buildable(0);
if (!$state->engine->can_build) {
last;
}
}
if (DPB::Core->running) {
DPB::Core->reap_wait;
}
if ($state->{fetch_only}) {
if (!DPB::Core::Fetcher->running &&
(!$keep_going || !$state->engine->can_fetch)) {
$state->engine->check_buildable(0);
if (!$state->engine->can_fetch) {
last;
}
}
}
}
if (!$state->opt('q') || !$state->engine->recheck_errors) {
last;
}
}
}
$state->{grabber} = DPB::Grabber->new($state,
sub { handle_non_waiting_jobs(1) });
if ($state->{all} && !$state->{random}) {
# when restarting interrupted dpb,
# find the most important paths first
my $list = $state->engine->find_best($state->logger->logfile("dependencies"), 10);
# if we have them, list them before the full ports tree walk.
if (@$list > 0) {
my $actual = {};
for my $name (@$list) {
DPB::PkgPath->new($name)->add_to_subdirlist($actual);
}
$state->grabber->grab_subdirs($core, $actual);
}
}
if (keys %$subdirlist > 0) {
$state->grabber->grab_subdirs($core, $subdirlist);
}
$state->grabber->complete_subdirs($core);
if ($state->{all}) {
$state->grabber->grab_subdirs($core);
}
$state->grabber->complete_subdirs($core);
# give back "our" core to the pool.
my $occupied = 0;
if ($state->{all}) {
$state->engine->dump_dependencies;
if ($state->opt('f')) {
DPB::Distfile->dump($state->{logger});
}
if ($state->grabber->expire_old_distfiles($core, $state->opt('e'))) {
$occupied = 1;
}
}
if (!$state->opt('e') && !$occupied) {
$core->mark_ready;
}
$state->engine->check_buildable(1);
if ($state->{scan_only}) {
# very shortened loop
$reporter->report;
if (DPB::Core->running) {
DPB::Core->reap_wait;
}
} else {
# and let's wait for all jobs now.
DPB::Core->start_clock($state->{display_timeout});
main_loop();
}
$reporter->reset;
DPB::Core->cleanup;
print $state->engine->report;
$state->engine->end_dump($state->logger->open('dump'));