8c7b5028b3
Submitted by: gonzo Approved by: myself Differential Revision: https://reviews.freebsd.org/D14550
234 lines
7.8 KiB
Python
Executable File
234 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (c) 2012 Sofian Brabez <sbz@FreeBSD.org>
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions
|
|
# are met:
|
|
# 1. Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer
|
|
# 2. Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
|
|
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
|
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
#
|
|
# $FreeBSD$
|
|
#
|
|
# MAINTAINER= sbz@FreeBSD.org
|
|
|
|
import argparse
|
|
import codecs
|
|
import os
|
|
import re
|
|
import ssl
|
|
import sys
|
|
if sys.version_info.major == 3:
|
|
import urllib.request as urllib2
|
|
else:
|
|
import urllib2
|
|
|
|
"""
|
|
FreeBSD getpatch handles Gnats and Bugzilla patch attachments
|
|
"""
|
|
|
|
|
|
def create_ssl_context(cafile):
|
|
if os.path.exists(cafile):
|
|
return ssl.create_default_context(cafile=cafile)
|
|
else:
|
|
return ssl._create_unverified_context()
|
|
|
|
|
|
class GetPatch(object):
|
|
|
|
def __init__(self, pr, category='ports'):
|
|
self.pr = pr
|
|
self.category = category
|
|
self.patchs = []
|
|
self.url = ""
|
|
self.patch = ""
|
|
self.output_stdout = False
|
|
self.default_locale = sys.getdefaultencoding()
|
|
self.ssl_context = create_ssl_context('/usr/local/etc/ssl/cert.pem')
|
|
|
|
def fetch(self, *largs, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
def write(self, filename, data):
|
|
if filename.endswith(('.patch', '.txt')):
|
|
filename = "{}.diff".format(filename[:filename.rindex('.')])
|
|
f = codecs.open(filename, encoding=self.default_locale, mode='w')
|
|
f.write(data.decode(self.default_locale))
|
|
f.close()
|
|
self.out("[+] {} created".format(filename))
|
|
|
|
def get(self, only_last=False, output_stdout=False):
|
|
self.output_stdout = output_stdout
|
|
self.fetch(self.pr, category=self.category)
|
|
|
|
if len(self.patchs) == 0:
|
|
self.out("[-] No patch found")
|
|
sys.exit(os.EX_UNAVAILABLE)
|
|
|
|
if only_last:
|
|
self.patchs = [self.patchs.pop()]
|
|
|
|
for patch in self.patchs:
|
|
url = patch['url']
|
|
p = patch['name']
|
|
|
|
data = urllib2.urlopen(url, context=self.ssl_context).read()
|
|
|
|
if self.output_stdout:
|
|
sys.stdout.write(data.decode(self.default_locale))
|
|
else:
|
|
self.write(p, data)
|
|
|
|
def add_patch(self, url, name):
|
|
self.patchs.append({'url': url, 'name': name})
|
|
|
|
def out(self, s):
|
|
if not self.output_stdout:
|
|
print(s)
|
|
|
|
|
|
class GnatsGetPatch(GetPatch):
|
|
|
|
URL_BASE = 'https://www.freebsd.org/cgi'
|
|
URL = '{}/query-pr.cgi?pr='.format(URL_BASE)
|
|
REGEX = r'<b>Download <a href="([^"]*)">([^<]*)</a>'
|
|
|
|
def __init__(self, pr, category):
|
|
GetPatch.__init__(self, pr, category)
|
|
|
|
def fetch(self, *largs, **kwargs):
|
|
category = kwargs['category']
|
|
target = ("{}/{}".format(category, self.pr),
|
|
"{}".format(self.pr))[category == '']
|
|
self.out("[+] Fetching patch for pr {}".format(target))
|
|
pattern = re.compile(self.REGEX)
|
|
u = urllib2.urlopen("{}{}".format(self.URL, target),
|
|
context=self.ssl_context)
|
|
data = u.read()
|
|
if data is None:
|
|
self.out("[-] No patch found")
|
|
sys.exit(os.EX_UNAVAILABLE)
|
|
|
|
for patchs in re.findall(pattern, str(data)):
|
|
self.add_patch(patchs[0], patchs[1])
|
|
|
|
|
|
class BzGetPatch(GetPatch):
|
|
|
|
URL_BASE = 'https://bugs.freebsd.org/bugzilla/'
|
|
URL_SHOW = '{}/show_bug.cgi?id='.format(URL_BASE)
|
|
REGEX_ATTACHMENTS_TABLE = r'<table id="attachment_table">(.*?)</table>'
|
|
REGEX_ATTACHMENT_TR = r'(<tr id="a\d+"[^<]+>.*?</tr>)'
|
|
REGEX_URL = r'<a href="([^<]+)">Details</a>'
|
|
REGEX = r'<div class="details">([^ ]+) \(text/plain(?:; charset=[-\w]+)?\)'
|
|
|
|
def __init__(self, pr, category):
|
|
GetPatch.__init__(self, pr, category)
|
|
|
|
def _get_patch_name(self, url):
|
|
data = urllib2.urlopen(url, context=self.ssl_context).read()
|
|
match = re.search(self.REGEX, str(data))
|
|
if match is None:
|
|
return None
|
|
return match.group(1)
|
|
|
|
def _get_patch_url(self, data):
|
|
for url in re.findall(self.REGEX_URL, str(data)):
|
|
url = '{}{}'.format(self.URL_BASE, url)
|
|
file_name = self._get_patch_name(url)
|
|
if file_name is None:
|
|
msg = "[-] Could not determine the patch file name in {}." \
|
|
"Skipping."
|
|
self.out(msg.format(url))
|
|
continue
|
|
download_url = url[:url.find('&')]
|
|
return download_url, file_name
|
|
|
|
def _get_patch_urls(self, data):
|
|
patch_urls = {}
|
|
match = re.search(self.REGEX_ATTACHMENTS_TABLE, str(data), re.DOTALL)
|
|
if match is None:
|
|
return patch_urls
|
|
table = match.group(1)
|
|
for tr in re.findall(self.REGEX_ATTACHMENT_TR, str(data), re.DOTALL):
|
|
if (tr.find('bz_tr_obsolete') >= 0):
|
|
continue
|
|
download_url, file_name = self._get_patch_url(tr)
|
|
patch_urls[download_url] = file_name
|
|
|
|
return patch_urls
|
|
|
|
def fetch(self, *largs, **kwargs):
|
|
category = kwargs['category']
|
|
target = ("{}/{}".format(category, self.pr),
|
|
"{}".format(self.pr))[category == '']
|
|
self.out("[+] Fetching patch for pr {}".format(target))
|
|
u = urllib2.urlopen("{}{}".format(self.URL_SHOW, self.pr),
|
|
context=self.ssl_context)
|
|
data = u.read()
|
|
|
|
if data is None:
|
|
self.out("[-] No patch found")
|
|
sys.exit(os.EX_UNAVAILABLE)
|
|
|
|
patch_urls = self._get_patch_urls(data)
|
|
if not patch_urls:
|
|
self.out("[-] No patch found")
|
|
sys.exit(os.EX_UNAVAILABLE)
|
|
|
|
for url, file_name in patch_urls.items():
|
|
self.add_patch(url, file_name)
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Gets patch attachments from a Bug Tracking System'
|
|
)
|
|
parser.add_argument('pr', metavar='pr', type=str, nargs=1,
|
|
help='Pr id number')
|
|
parser.add_argument('--mode', type=str, choices=['gnats', 'bz'],
|
|
default='bz', help='available modes to retrieve patch')
|
|
parser.add_argument('--last', action='store_true',
|
|
help='only retrieve the latest iteration of a patch')
|
|
parser.add_argument('--stdout', action='store_true',
|
|
help='dump patch on stdout')
|
|
|
|
if len(sys.argv) == 1:
|
|
parser.print_help()
|
|
sys.exit(os.EX_USAGE)
|
|
|
|
args = parser.parse_args()
|
|
|
|
category = ""
|
|
pr = str(args.pr[0])
|
|
|
|
if pr and '/' in pr:
|
|
category, pr = pr.split('/')
|
|
|
|
Clazz = globals()['%sGetPatch' % args.mode.capitalize()]
|
|
gp = Clazz(pr, category)
|
|
gp.get(only_last=args.last, output_stdout=args.stdout)
|
|
|
|
return os.EX_OK
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|