NNew build scheduler written in python to replace the make+sh job

ordering, which had become too limited.

We now build packages ordered by those that are part of the longest
dependency chains first.  This has the effect of building the deepest
parts of the tree first and levelling out the tree height, hopefully
avoiding the situation we currently face where there appear
bottlenecks late in the build where the cluster becomes mostly idle
while waiting for a few long dependency chains to finish building
before the cluster can become fully loaded again.

The algorithm is that we sort the list of remaining packages according
to height (longest dependency chain), then add leaf packages from each
in order until we have filled a queue of length between 100 and 200,
to amortise the cost of this queue rebalancing while not losing the
height averaging property.  Jobs are dispatched from this queue into
worker threads as machine slots become available.

Unlike the make-based solution that required a fixed -j concurrency
value and could not respond to addition/removal of build resources, we
now can dynamically add new machines as they become available to the
queue.

The other advantage of using python is that we have more
customisability and visibility into the build status, e.g. we
periodically report the number of remaining packages, as well as the
list of deepest packages that we are working on.

TODO:

* Implement mtime checking for parent package staleness, so that
  parents are rebuilt if the dependencies are touched more recently.
  Currently packages will not be rebuild if they exist, whether or not
  they are "stale" wrt their dependencies.

* Offload the machine selection into an external queue manager.
  Currently the queue manager used here doesn't interoperate with the
  old one (getmachine/releasemachine) because it's not possible to use
  the lockf()-based mutual exclusion within a multithreaded client.
  Doing that will also allow for a more flexible job placement
  algorithm as well as finer queue customization.
This commit is contained in:
Kris Kennaway 2008-05-10 13:22:51 +00:00
parent 8012e444ac
commit 778518d745
Notes: svn2git 2021-03-31 03:12:20 +00:00
svn path=/head/; revision=212890
2 changed files with 1040 additions and 0 deletions

520
Tools/portbuild/scripts/build Executable file
View File

@ -0,0 +1,520 @@
#!/usr/bin/env python
# Improved build scheduler. We try to build leaf packages (those
# which can be built immediately without requiring additional
# dependencies to be built) in the order such that the ones required
# by the longest dependency chains are built first.
#
# This has the effect of favouring deep parts of the package tree and
# evening out the depth over time, hopefully avoiding the situation
# where the entire cluster waits for a deep part of the tree to
# build on a small number of machines
#
# Other advantages are that this system is easily customizable and
# will let us customize things like the matching policy of jobs to
# machines.
#
# TODO:
# * External queue manager
# * Mark completed packages instead of deleting them
# * check mtime for package staleness (cf make)
# * Check for parent mtimes after finishing child
import os, sys, threading, time, subprocess, fcntl, operator
#from itertools import ifilter, imap
from random import choice
def parseindex(indexfile):
tmp={}
pkghash={}
for i in file(indexfile):
line=i.rstrip().split("|")
pkg = line[0]
tmp[pkg] = line[1:]
# XXX hash category names too
# Trick python into storing package names by reference instead of copying strings and wasting 60MB
pkghash[pkg] = pkg
index=dict.fromkeys(tmp.keys())
for pkg in tmp.iterkeys():
line = tmp[pkg]
data={'name': pkg, 'path':line[0],
#'prefix':line[1],
#'comment':line[2],
#'descr':line[3],
#'maintainer':line[4],
'categories':line[5], # XXX duplicates strings
'bdep':[pkghash[i] for i in line[6].split(None)],
'rdep':[pkghash[i] for i in line[7].split(None)],
#'www':line[8],
'edep':[pkghash[i] for i in line[9].split(None)],
'pdep':[pkghash[i] for i in line[10].split(None)],
'fdep':[pkghash[i] for i in line[11].split(None)],
'height':None}
if index[pkg] is None:
index[pkg] = data
else:
index[pkg].update(data)
if not index[pkg].has_key('parents'):
index[pkg]['parents'] = []
# XXX iter?
deps=set()
for j in ['bdep','rdep','edep','fdep','pdep']:
deps.update(set(index[pkg][j]))
index[pkg]['deps'] = [pkghash[i] for i in deps]
for j in deps:
# This grossness is needed to avoid a second pass through
# the index, because we might be about to refer to
# packages that have not yet been processed
if index[j] is not None:
if index[j].has_key('parents'):
index[j]['parents'].append(pkghash[pkg])
else:
index[j]['parents'] = [pkghash[pkg]]
else:
index[j] = {'parents':[pkghash[pkg]]}
return index
def gettargets(index, targets):
""" split command line arguments into list of packages to build. Returns set or iterable """
# XXX make this return the full recursive list and use this later for processing wqueue
plist = set()
if len(targets) == 0:
targets = ["all"]
for i in targets:
if i == "all":
plist = index.iterkeys()
break
if i.endswith("-all"):
cat = i.rpartition("-")[0]
plist.update(j for j in index.iterkeys() if cat in index[j]['categories'])
elif i.rstrip(".tbz") in index.iterkeys():
plist.update([i.rstrip(".tbz")])
return plist
def heightindex(index, targets):
""" Initial population of height tree """
for i in targets:
heightdown(index, i)
def heightdown(index, pkgname):
"""
Recursively populate the height tree down from a given package,
assuming empty values on entries not yet visited
"""
pkg=index[pkgname]
if pkg['height'] is None:
if len(pkg['deps']) > 0:
max = 0
for i in pkg['deps']:
w = heightdown(index, i)
if w > max:
max = w
pkg['height'] = max + 1
else:
pkg['height'] = 1
return pkg['height']
def heightup(index, pkgname):
""" Recalculate the height tree going upwards from a package """
if not index.has_key(pkgname):
raise KeyError
parents=set(index[pkgname]['parents'])
while len(parents) > 0:
# XXX use a deque?
pkgname = parents.pop()
if not index.has_key(pkgname):
# XXX can this happen?
continue
pkg=index[pkgname]
oldheight=pkg['height']
if oldheight is None:
# Parent is in our build target list
continue
if len(pkg['deps']) == 0:
newheight = 1
else:
newheight=max(index[j]['height'] for j in pkg['deps']) + 1
if newheight > oldheight:
print "%s height increasing: %d -> %d", pkg, oldheight, newheight
assert(False)
if newheight != oldheight:
pkg['height'] = newheight
parents.update(pkg['parents'])
def deleteup(index, pkgname):
if not index.has_key(pkgname):
raise KeyError
parents=set([pkgname])
children=[]
removed=[]
while len(parents) > 0:
pkgname = parents.pop()
if not index.has_key(pkgname):
# Parent was already deleted via another path
# XXX can happen?
print "YYYYYYYYYYYYYYYYYYYYYY %s deleted" % pkgname
continue
if index[pkgname]['height'] is None:
# parent is not in our list of build targets
continue
pkg=index[pkgname]
children.extend(pkg['deps'])
parents.update(pkg['parents'])
removed.append(pkgname)
del index[pkgname]
removed = set(removed)
children = set(children)
# print "Removed %d packages, touching %d children" % (len(removed), len(children))
for i in children.difference(removed):
par=index[i]['parents']
index[i]['parents'] = list(set(par).difference(removed))
# XXX return an iter
def selectheights(index, level):
return [i for i in index.iterkeys() if index[i]['height'] == level]
def rank(index, ready, sortd, max = None):
""" rank the list of ready packages according to those listed as
dependencies in successive entries of the sorted list """
input=set(ready)
output = []
count = 0
print "Working on depth ",
for i in sortd:
deps = set(index[i]['deps'])
both = deps.intersection(input)
if len(both) > 0:
print "%d " % index[i]['height'],
input.difference_update(both)
output.extend(list(both))
if len(input) == 0:
break
if max:
count+=len(both)
if count > max:
return output
print
output.extend(list(input))
return output
def jobsuccess(index, job):
pkg = index[job]
# Build succeeded
for i in pkg['parents']:
index[i]['deps'].remove(job)
# deps/parents tree now partially inconsistent but this is
# what we need to avoid counting the height of the entry
# we are about to remove (which would make it a NOP)
heightup(index, job)
del index[job]
def jobfailure(index, job):
# Build failed
deleteup(index, job)
class worker(threading.Thread):
lock = threading.Lock()
# List of running threads
tlist = []
# List of running jobs
running = []
# Used to signal dispatcher when we finish a job
event = threading.Event()
def __init__(self, mach, job, queue, arch, branch):
threading.Thread.__init__(self)
self.job = job
self.mach = mach
self.queue = queue
self.arch = arch
self.branch = branch
def run(self):
global index
pkg = index[self.job]
if len(pkg['deps']) != 0:
print "Running job with non-empty deps: %s" % pkg
assert(False)
print "Running job %s" % (self.job)
while True:
retcode = subprocess.call(["/usr/bin/env", "FD=%s" % " ".join(["%s.tbz" % i for i in pkg['fdep']]), "ED=%s" % " ".join(["%s.tbz" % i for i in pkg['edep']]), "PD=%s" % " ".join(["%s.tbz" % i for i in pkg['pdep']]), "BD=%s" % " ".join(["%s.tbz" % i for i in pkg['bdep']]), "RD=%s" % " ".join(["%s.tbz" % i for i in pkg['rdep']]), "/var/portbuild/scripts/pdispatch2", self.mach, self.arch, self.branch, "/var/portbuild/scripts/portbuild", "%s.tbz" % self.job, pkg['path']])
self.queue.release(self.mach)
if retcode != 254:
break
# Failed to obtain job slot
time.sleep(15)
(self.mach, dummy) = self.queue.pick()
print "Retrying on %s" % self.mach
print "Finished job %s" % self.job,
if retcode == 0:
status = True
print
else:
status = False
print " with status %d" % retcode
worker.lock.acquire()
worker.running.remove(self.job)
worker.tlist.remove(self)
if status == True:
jobsuccess(index, self.job)
else:
jobfailure(index, self.job)
# Wake up dispatcher in case it was blocked
worker.event.set()
worker.event.clear()
worker.lock.release()
@staticmethod
def dispatch(mach, job, queue, arch, branch):
worker.lock.acquire()
wrk = worker(mach, job, queue, arch, branch)
worker.tlist.append(wrk)
worker.lock.release()
wrk.start()
class machqueue(object):
path = '';
fd = -1;
# fcntl locks are per-process, so the fcntl lock acquisition will
# succeed if another thread already holds it. We need the fcntl
# lock for external visibility between processes but also need an
# internal lock for protecting against out own threads.
ilock = threading.Lock()
def __init__(self, path):
super(machqueue, self).__init__()
self.path = path
self.fd = os.open("%s.lock" % self.path, os.O_RDWR|os.O_CREAT)
# print "Initializing with %s %d" % (self.path, self.fd)
def lock(self):
print "Locking...",
# ret = fcntl.lockf(self.fd, fcntl.LOCK_EX)
self.ilock.acquire()
print "success"
def unlock(self):
print "Unlocking fd"
self.ilock.release()
# ret = fcntl.lockf(self.fd, fcntl.LOCK_UN)
def poll(self):
""" Return currently available machines """
mfile = file(self.path + "../mlist", "r")
mlist = mfile.readlines()
mfile.close()
mlist = [i.rstrip() for i in mlist] # Chop \n
list = os.listdir(self.path)
special = []
machines = []
for i in list:
if i.startswith('.'):
special.append(i)
else:
if i in mlist:
machines.append(i)
else:
os.unlink(self.path + i)
print "Found machines %s" % machines
return (machines, special)
def pick(self):
""" Choose a random machine from the queue """
min = 999
while min == 999:
while True:
self.lock()
(machines, special) = self.poll()
if len(machines):
break
else:
self.unlock()
time.sleep(15)
# XXX Use kqueue to monitor for changes
list = []
# XXX Choose as fraction of capacity
for i in machines:
f = file(self.path + i, "r")
out = f.readline().rstrip()
try:
load = int(out)
except ValueError:
print "Bad value for %s: %s" % (i, out)
load = 999
f.close()
if load < min:
min = load
list=[]
if load == min:
list.append(i)
print "(%s, %d)" % (list, load)
if min == 999:
print "Bad queue length for %s" % list
self.unlock()
machine = choice(list)
# XXX hook up config files
if min == 2:
# Queue full
os.unlink(self.path + machine)
else:
f = file(self.path + machine, "w")
f.write("%d\n" % (min + 1))
f.flush()
f.close()
self.unlock()
return (machine, special)
def release(self, mach):
self.lock()
print "Releasing %s" % mach,
if os.path.exists(self.path + mach):
f = file(self.path + mach, "r+")
out = f.readline().rstrip()
try:
load = int(out)
except ValueError:
print "Queue error on release of %s: %s" % (mach, out)
load = 3 #XXX
else:
f = file(self.path + mach, "w")
load = 3 #XXX
# f.truncate(0)
f.write("%d\n" % (load - 1))
print "...now %d" % (load - 1)
f.flush()
f.close()
self.unlock()
def main(arch, branch, args):
global index
basedir="/var/portbuild/"+arch+"/"+branch
portsdir=basedir+"/ports"
indexfile=portsdir+"/INDEX-"+branch
indexfile="/var/portbuild/i386/7-exp/ports/INDEX-7"
qlen = 100
q = machqueue("/var/portbuild/%s/queue/" % arch)
print "parseindex..."
index=parseindex(indexfile)
print "length = %s" % len(index)
targets = gettargets(index, args)
print "heightindex..."
heightindex(index, targets)
sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
wqueue = rank(index, selectheights(index, 1), (i[0] for i in sortd), qlen)
# Main work loop
while len(sortd) > 0:
worker.lock.acquire()
print "Remaining %s" % len(sortd)
while len(wqueue) > 0:
job = wqueue.pop(0)
if os.path.exists("/var/portbuild/%s/%s/packages/All/%s.tbz" % (arch, branch, job)):
print "Skipping %s since it already exists" % job
jobsuccess(index, job)
else:
worker.running.append(job) # Protect against a queue
# rebalance adding this
# back during build
worker.lock.release()
(machine, specials) = q.pick()
worker.dispatch(machine, job, q, arch, branch)
worker.lock.acquire()
if len(wqueue) == 0:
if len(sortd) == 0:
# All jobs in progress, wait for children to exit
break
print "Rebalancing queue...",
sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
if len(sortd) == 0:
break
print sortd[0:3]
if sortd[0][0] == 1:
# Everything left is depth 1, no need to waste time rebalancing further
qlen = len(index)
# Don't add too many deps at once (e.g. after we build a
# package like gmake), or we will switch to buildinglots
# of shallow packages
ready = [i for i in selectheights(index, 1) if i not in worker.running]
wqueue = rank(index, ready, (i[0] for i in sortd), qlen)[:2*qlen]
print "now %s (%s ready)" % (wqueue, len(ready))
worker.lock.release()
if len(wqueue) == 0:
# Ran out of work, wait for workers to free up some more
print "No work to do, sleeping on workers"
worker.event.wait()
for i in worker.tlist:
i.join()
print "Finished"
if __name__ == "__main__":
# from guppy import hpy; h = hpy()
main(sys.argv[1], sys.argv[2], sys.argv[3:])
# index = parseindex("/var/portbuild/i386/7-exp/ports/INDEX-7")
# print index['gmake-3.81_2']

View File

@ -0,0 +1,520 @@
#!/usr/bin/env python
# Improved build scheduler. We try to build leaf packages (those
# which can be built immediately without requiring additional
# dependencies to be built) in the order such that the ones required
# by the longest dependency chains are built first.
#
# This has the effect of favouring deep parts of the package tree and
# evening out the depth over time, hopefully avoiding the situation
# where the entire cluster waits for a deep part of the tree to
# build on a small number of machines
#
# Other advantages are that this system is easily customizable and
# will let us customize things like the matching policy of jobs to
# machines.
#
# TODO:
# * External queue manager
# * Mark completed packages instead of deleting them
# * check mtime for package staleness (cf make)
# * Check for parent mtimes after finishing child
import os, sys, threading, time, subprocess, fcntl, operator
#from itertools import ifilter, imap
from random import choice
def parseindex(indexfile):
tmp={}
pkghash={}
for i in file(indexfile):
line=i.rstrip().split("|")
pkg = line[0]
tmp[pkg] = line[1:]
# XXX hash category names too
# Trick python into storing package names by reference instead of copying strings and wasting 60MB
pkghash[pkg] = pkg
index=dict.fromkeys(tmp.keys())
for pkg in tmp.iterkeys():
line = tmp[pkg]
data={'name': pkg, 'path':line[0],
#'prefix':line[1],
#'comment':line[2],
#'descr':line[3],
#'maintainer':line[4],
'categories':line[5], # XXX duplicates strings
'bdep':[pkghash[i] for i in line[6].split(None)],
'rdep':[pkghash[i] for i in line[7].split(None)],
#'www':line[8],
'edep':[pkghash[i] for i in line[9].split(None)],
'pdep':[pkghash[i] for i in line[10].split(None)],
'fdep':[pkghash[i] for i in line[11].split(None)],
'height':None}
if index[pkg] is None:
index[pkg] = data
else:
index[pkg].update(data)
if not index[pkg].has_key('parents'):
index[pkg]['parents'] = []
# XXX iter?
deps=set()
for j in ['bdep','rdep','edep','fdep','pdep']:
deps.update(set(index[pkg][j]))
index[pkg]['deps'] = [pkghash[i] for i in deps]
for j in deps:
# This grossness is needed to avoid a second pass through
# the index, because we might be about to refer to
# packages that have not yet been processed
if index[j] is not None:
if index[j].has_key('parents'):
index[j]['parents'].append(pkghash[pkg])
else:
index[j]['parents'] = [pkghash[pkg]]
else:
index[j] = {'parents':[pkghash[pkg]]}
return index
def gettargets(index, targets):
""" split command line arguments into list of packages to build. Returns set or iterable """
# XXX make this return the full recursive list and use this later for processing wqueue
plist = set()
if len(targets) == 0:
targets = ["all"]
for i in targets:
if i == "all":
plist = index.iterkeys()
break
if i.endswith("-all"):
cat = i.rpartition("-")[0]
plist.update(j for j in index.iterkeys() if cat in index[j]['categories'])
elif i.rstrip(".tbz") in index.iterkeys():
plist.update([i.rstrip(".tbz")])
return plist
def heightindex(index, targets):
""" Initial population of height tree """
for i in targets:
heightdown(index, i)
def heightdown(index, pkgname):
"""
Recursively populate the height tree down from a given package,
assuming empty values on entries not yet visited
"""
pkg=index[pkgname]
if pkg['height'] is None:
if len(pkg['deps']) > 0:
max = 0
for i in pkg['deps']:
w = heightdown(index, i)
if w > max:
max = w
pkg['height'] = max + 1
else:
pkg['height'] = 1
return pkg['height']
def heightup(index, pkgname):
""" Recalculate the height tree going upwards from a package """
if not index.has_key(pkgname):
raise KeyError
parents=set(index[pkgname]['parents'])
while len(parents) > 0:
# XXX use a deque?
pkgname = parents.pop()
if not index.has_key(pkgname):
# XXX can this happen?
continue
pkg=index[pkgname]
oldheight=pkg['height']
if oldheight is None:
# Parent is in our build target list
continue
if len(pkg['deps']) == 0:
newheight = 1
else:
newheight=max(index[j]['height'] for j in pkg['deps']) + 1
if newheight > oldheight:
print "%s height increasing: %d -> %d", pkg, oldheight, newheight
assert(False)
if newheight != oldheight:
pkg['height'] = newheight
parents.update(pkg['parents'])
def deleteup(index, pkgname):
if not index.has_key(pkgname):
raise KeyError
parents=set([pkgname])
children=[]
removed=[]
while len(parents) > 0:
pkgname = parents.pop()
if not index.has_key(pkgname):
# Parent was already deleted via another path
# XXX can happen?
print "YYYYYYYYYYYYYYYYYYYYYY %s deleted" % pkgname
continue
if index[pkgname]['height'] is None:
# parent is not in our list of build targets
continue
pkg=index[pkgname]
children.extend(pkg['deps'])
parents.update(pkg['parents'])
removed.append(pkgname)
del index[pkgname]
removed = set(removed)
children = set(children)
# print "Removed %d packages, touching %d children" % (len(removed), len(children))
for i in children.difference(removed):
par=index[i]['parents']
index[i]['parents'] = list(set(par).difference(removed))
# XXX return an iter
def selectheights(index, level):
return [i for i in index.iterkeys() if index[i]['height'] == level]
def rank(index, ready, sortd, max = None):
""" rank the list of ready packages according to those listed as
dependencies in successive entries of the sorted list """
input=set(ready)
output = []
count = 0
print "Working on depth ",
for i in sortd:
deps = set(index[i]['deps'])
both = deps.intersection(input)
if len(both) > 0:
print "%d " % index[i]['height'],
input.difference_update(both)
output.extend(list(both))
if len(input) == 0:
break
if max:
count+=len(both)
if count > max:
return output
print
output.extend(list(input))
return output
def jobsuccess(index, job):
pkg = index[job]
# Build succeeded
for i in pkg['parents']:
index[i]['deps'].remove(job)
# deps/parents tree now partially inconsistent but this is
# what we need to avoid counting the height of the entry
# we are about to remove (which would make it a NOP)
heightup(index, job)
del index[job]
def jobfailure(index, job):
# Build failed
deleteup(index, job)
class worker(threading.Thread):
lock = threading.Lock()
# List of running threads
tlist = []
# List of running jobs
running = []
# Used to signal dispatcher when we finish a job
event = threading.Event()
def __init__(self, mach, job, queue, arch, branch):
threading.Thread.__init__(self)
self.job = job
self.mach = mach
self.queue = queue
self.arch = arch
self.branch = branch
def run(self):
global index
pkg = index[self.job]
if len(pkg['deps']) != 0:
print "Running job with non-empty deps: %s" % pkg
assert(False)
print "Running job %s" % (self.job)
while True:
retcode = subprocess.call(["/usr/bin/env", "FD=%s" % " ".join(["%s.tbz" % i for i in pkg['fdep']]), "ED=%s" % " ".join(["%s.tbz" % i for i in pkg['edep']]), "PD=%s" % " ".join(["%s.tbz" % i for i in pkg['pdep']]), "BD=%s" % " ".join(["%s.tbz" % i for i in pkg['bdep']]), "RD=%s" % " ".join(["%s.tbz" % i for i in pkg['rdep']]), "/var/portbuild/scripts/pdispatch2", self.mach, self.arch, self.branch, "/var/portbuild/scripts/portbuild", "%s.tbz" % self.job, pkg['path']])
self.queue.release(self.mach)
if retcode != 254:
break
# Failed to obtain job slot
time.sleep(15)
(self.mach, dummy) = self.queue.pick()
print "Retrying on %s" % self.mach
print "Finished job %s" % self.job,
if retcode == 0:
status = True
print
else:
status = False
print " with status %d" % retcode
worker.lock.acquire()
worker.running.remove(self.job)
worker.tlist.remove(self)
if status == True:
jobsuccess(index, self.job)
else:
jobfailure(index, self.job)
# Wake up dispatcher in case it was blocked
worker.event.set()
worker.event.clear()
worker.lock.release()
@staticmethod
def dispatch(mach, job, queue, arch, branch):
worker.lock.acquire()
wrk = worker(mach, job, queue, arch, branch)
worker.tlist.append(wrk)
worker.lock.release()
wrk.start()
class machqueue(object):
path = '';
fd = -1;
# fcntl locks are per-process, so the fcntl lock acquisition will
# succeed if another thread already holds it. We need the fcntl
# lock for external visibility between processes but also need an
# internal lock for protecting against out own threads.
ilock = threading.Lock()
def __init__(self, path):
super(machqueue, self).__init__()
self.path = path
self.fd = os.open("%s.lock" % self.path, os.O_RDWR|os.O_CREAT)
# print "Initializing with %s %d" % (self.path, self.fd)
def lock(self):
print "Locking...",
# ret = fcntl.lockf(self.fd, fcntl.LOCK_EX)
self.ilock.acquire()
print "success"
def unlock(self):
print "Unlocking fd"
self.ilock.release()
# ret = fcntl.lockf(self.fd, fcntl.LOCK_UN)
def poll(self):
""" Return currently available machines """
mfile = file(self.path + "../mlist", "r")
mlist = mfile.readlines()
mfile.close()
mlist = [i.rstrip() for i in mlist] # Chop \n
list = os.listdir(self.path)
special = []
machines = []
for i in list:
if i.startswith('.'):
special.append(i)
else:
if i in mlist:
machines.append(i)
else:
os.unlink(self.path + i)
print "Found machines %s" % machines
return (machines, special)
def pick(self):
""" Choose a random machine from the queue """
min = 999
while min == 999:
while True:
self.lock()
(machines, special) = self.poll()
if len(machines):
break
else:
self.unlock()
time.sleep(15)
# XXX Use kqueue to monitor for changes
list = []
# XXX Choose as fraction of capacity
for i in machines:
f = file(self.path + i, "r")
out = f.readline().rstrip()
try:
load = int(out)
except ValueError:
print "Bad value for %s: %s" % (i, out)
load = 999
f.close()
if load < min:
min = load
list=[]
if load == min:
list.append(i)
print "(%s, %d)" % (list, load)
if min == 999:
print "Bad queue length for %s" % list
self.unlock()
machine = choice(list)
# XXX hook up config files
if min == 2:
# Queue full
os.unlink(self.path + machine)
else:
f = file(self.path + machine, "w")
f.write("%d\n" % (min + 1))
f.flush()
f.close()
self.unlock()
return (machine, special)
def release(self, mach):
self.lock()
print "Releasing %s" % mach,
if os.path.exists(self.path + mach):
f = file(self.path + mach, "r+")
out = f.readline().rstrip()
try:
load = int(out)
except ValueError:
print "Queue error on release of %s: %s" % (mach, out)
load = 3 #XXX
else:
f = file(self.path + mach, "w")
load = 3 #XXX
# f.truncate(0)
f.write("%d\n" % (load - 1))
print "...now %d" % (load - 1)
f.flush()
f.close()
self.unlock()
def main(arch, branch, args):
global index
basedir="/var/portbuild/"+arch+"/"+branch
portsdir=basedir+"/ports"
indexfile=portsdir+"/INDEX-"+branch
indexfile="/var/portbuild/i386/7-exp/ports/INDEX-7"
qlen = 100
q = machqueue("/var/portbuild/%s/queue/" % arch)
print "parseindex..."
index=parseindex(indexfile)
print "length = %s" % len(index)
targets = gettargets(index, args)
print "heightindex..."
heightindex(index, targets)
sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
wqueue = rank(index, selectheights(index, 1), (i[0] for i in sortd), qlen)
# Main work loop
while len(sortd) > 0:
worker.lock.acquire()
print "Remaining %s" % len(sortd)
while len(wqueue) > 0:
job = wqueue.pop(0)
if os.path.exists("/var/portbuild/%s/%s/packages/All/%s.tbz" % (arch, branch, job)):
print "Skipping %s since it already exists" % job
jobsuccess(index, job)
else:
worker.running.append(job) # Protect against a queue
# rebalance adding this
# back during build
worker.lock.release()
(machine, specials) = q.pick()
worker.dispatch(machine, job, q, arch, branch)
worker.lock.acquire()
if len(wqueue) == 0:
if len(sortd) == 0:
# All jobs in progress, wait for children to exit
break
print "Rebalancing queue...",
sortd = sorted(((key, val["height"]) for (key, val) in index.iteritems() if val["height"] is not None), key=operator.itemgetter(1), reverse=True)
if len(sortd) == 0:
break
print sortd[0:3]
if sortd[0][0] == 1:
# Everything left is depth 1, no need to waste time rebalancing further
qlen = len(index)
# Don't add too many deps at once (e.g. after we build a
# package like gmake), or we will switch to buildinglots
# of shallow packages
ready = [i for i in selectheights(index, 1) if i not in worker.running]
wqueue = rank(index, ready, (i[0] for i in sortd), qlen)[:2*qlen]
print "now %s (%s ready)" % (wqueue, len(ready))
worker.lock.release()
if len(wqueue) == 0:
# Ran out of work, wait for workers to free up some more
print "No work to do, sleeping on workers"
worker.event.wait()
for i in worker.tlist:
i.join()
print "Finished"
if __name__ == "__main__":
# from guppy import hpy; h = hpy()
main(sys.argv[1], sys.argv[2], sys.argv[3:])
# index = parseindex("/var/portbuild/i386/7-exp/ports/INDEX-7")
# print index['gmake-3.81_2']