blob: d69dbdf5f29fe5a27c2fa5fceaa576538b8d57f3 [file] [log] [blame]
# Copyright 2008 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Project related views and functions.
This requires Django 0.97.pre.
"""
# Python imports
import os
import cgi
import random
import re
import logging
import binascii
import datetime
import urllib
import md5
from xml.etree import ElementTree
from cStringIO import StringIO
# AppEngine imports
from google.appengine.api import mail
from google.appengine.api import memcache
from google.appengine.api import users
from google.appengine.api import urlfetch
from google.appengine.ext import db
from google.appengine.ext.db import djangoforms
# Django imports
# TODO(guido): Don't import classes/functions directly.
from django import http
from django import forms
from django.http import HttpResponse, HttpResponseRedirect
from django.http import HttpResponseForbidden, HttpResponseNotFound
from django.shortcuts import render_to_response
from django.utils import simplejson
from django.forms import formsets
# Local imports
import models
import engine
import library
import patching
import fields
import views
from view_util import *
## Project CRUD ##
def _assert_project_and_owner(request, project):
"If the current user is not an owner of this project or an admin, give a 404."
if not project:
raise http.Http404()
if not request.user or not ((project.key() in [
p.key() for p in request.projects_owned_by_user])
or request.user_is_admin):
raise http.Http404()
@project_owner_or_admin_required
def project_list(request):
"""/admin/projects - list of all projects"""
if request.user_is_admin:
projects = models.Project.get_all_projects()
else:
projects = models.Project.projects_owned_by_user(request.user)
logging.info("project_list projects=" + str(projects))
if not projects:
raise http.Http404()
return respond(request, 'admin_projects.html', {'projects': projects})
def _keys_for(values):
return [x.key() for x in values]
def _users_to_accounts(users):
return map(models.Account.get_account_for_user, users)
def _combine_users_and_groups(users, groupKeys):
return models.AccountGroup.get(groupKeys) + _users_to_accounts(users)
def _field_to_approval_right(cleaned):
result = models.ApprovalRight()
result.files = cleaned["files"]
(result.approvers_users,result.approvers_groups
) = fields.UserGroupField.get_user_and_group_keys(cleaned["approvers"])
(result.verifiers_users,result.verifiers_groups
) = fields.UserGroupField.get_user_and_group_keys(cleaned["verifiers"])
result.put()
return result
def _approval_right_to_field(key):
val = models.ApprovalRight.get(key)
if not val:
raise KeyError("no ApprovalRight for key: " + str(key))
return {
'files': val.files,
'approvers': _combine_users_and_groups(val.approvers_users,
val.approvers_groups),
'verifiers': _combine_users_and_groups(val.verifiers_users,
val.verifiers_groups),
}
class AdminProjectForm(BaseForm):
_template = 'admin_project.html'
name = forms.CharField(max_length=30)
comment = forms.CharField(required=False,
max_length=10000,
widget=forms.Textarea(attrs={'cols': 60}))
owners = fields.UserGroupField()
code_reviews = fields.ApproversField()
@classmethod
def _init(cls, project):
owners = fields.UserGroupField.field_value_for_keys(
project.owners_users,
project.owners_groups),
return {'initial': {'name': project.name,
'comment': project.comment,
'owners': owners,
'code_reviews': map(_approval_right_to_field,
project.code_reviews),
}}
def _save(self, cd, project):
new_name = cd['name'].strip()
# TODO(joeo): Big race condition here.
#
in_use = models.Project.get_project_for_name(new_name)
if in_use and project.key() != in_use.key():
self.errors['name'] = ['Name is already in use']
if self.is_valid():
project.name = new_name
project.comment = cd['comment']
project.owners_users, project.owners_groups = \
fields.UserGroupField.get_user_and_group_keys(cd['owners'])
project.set_code_reviews(_keys_for(map(_field_to_approval_right,
cd['code_reviews'])))
project.put()
def project_edit(request, name):
"""/admin/project/project - edit this project"""
project = models.Project.get_project_for_name(name)
_assert_project_and_owner(request, project)
def done():
return HttpResponseRedirect('/admin/projects')
return process_form(request, AdminProjectForm, project, done,
{'project':project,
'del_url': '/admin/project_delete/%s' % project.name,
})
class AdminNewProjectForm(AdminProjectForm):
@classmethod
def _init(cls, state):
return {'initial': {'name': '', 'comment': ''}}
def _save(self, cd, state):
name = cd['name'].strip()
# TODO(joeo): Big race condition here.
#
if models.Project.get_project_for_name(name):
self.errors['name'] = ['Name is already in use']
if self.is_valid():
owners_users = fields.UserGroupField.get_users(cd['owners'])
owners_groups = fields.UserGroupField.get_group_keys(cd['owners'])
project = models.Project(name = name,
comment = cd['comment'],
owners_users = owners_users,
owners_groups = owners_groups)
project.set_code_reviews(_keys_for(map(_field_to_approval_right,
cd['code_reviews'])))
project.put()
@admin_required
def project_new(request):
"""/admin/project/GROUP - add a new project"""
def done():
return HttpResponseRedirect('/admin/projects')
return process_form(request, AdminNewProjectForm, [], done)
@gae_admin_required
@xsrf_required
def project_delete(request, name):
"""/admin/project_delete/GROUP - add a new project"""
project = models.Project.get_project_for_name(name)
assert project
project.remove()
return HttpResponseRedirect('/admin/projects')
def _matches_file_pattern(pattern, files):
"""Returns whether the list of files matches the pattern"""
for filename in files:
if regex.match(filename):
return True
return False
def _is_leaf_pattern(pattern):
"""Returns whether the pattern ends with ..."""
return not pattern.endswith("...")
def _split_rules_for_review(project):
"""Gets the approval file pattern rules for a project
Returns:
A tuple of:
0 - A list of tuples of:
0 - leaf pattern rules (rules of the form /.../xxx)
1 - the ApprovalRight object
1 - A list of tuples of:
0 - directory pattern rules (rules of the form /xxx/...)
1 - the ApprovalRight object
"""
leaf_rules = []
dir_rules = []
for approval_right in project.get_code_reviews():
for pattern in approval_right.files:
if _is_leaf_pattern(pattern):
leaf_rules.append((pattern, approval_right))
else:
dir_rules.append((pattern, approval_right))
return (leaf_rules, dir_rules)
def _file_pattern_to_regex(pattern):
if pattern.startswith('/'):
pattern = pattern[1:]
return re.compile('^%s$' % pattern.replace("...", ".*?"))
def _convert_pattern_to_regex(rules):
return [(_file_pattern_to_regex(pattern),pattern,approval_right)
for (pattern,approval_right) in rules]
def _flatten_rule_users(rules):
flattened = {}
def _flatten_approval_right(approval_right):
if not flattened.has_key(approval_right):
flattened[approval_right] = {
'required': approval_right.required,
'approvers': set([u.email() for u in approval_right.approvers()]),
'verifiers': set([u.email() for u in approval_right.verifiers()]),
}
return flattened[approval_right]
return [(regex,pattern,_flatten_approval_right(approval_right))
for (regex,pattern,approval_right) in rules]
def _split_files_for_review(project, files):
"""Return a mapping of files to the set of users that can approve or verify
that file.
Returns:
A tuple of booleans, corresponding to (can_approve, can_verify).
"""
result = {}
(leaf_rules, dir_rules) = _split_rules_for_review(project)
leaf_rules = _convert_pattern_to_regex(leaf_rules)
dir_rules = _convert_pattern_to_regex(dir_rules)
leaf_rules = _flatten_rule_users(leaf_rules)
dir_rules = _flatten_rule_users(dir_rules)
for file in files:
approvers = []
verifiers = []
# check leaf rules -- we want all of those that match
for (regex,pattern,flat_approval_right) in leaf_rules:
if _is_leaf_pattern(pattern):
if regex.match(file):
user_set = flat_approval_right['approvers']
if not user_set in approvers:
approvers.append(user_set)
user_set = flat_approval_right['verifiers']
if not user_set in verifiers:
verifiers.append(user_set)
# check dir rules -- we want all of those that match
dir_rule_approvers = set()
dir_rule_verifiers = set()
for (regex,pattern,flat_approval_right) in reversed(dir_rules):
if not _is_leaf_pattern(pattern):
if regex.match(file):
dir_rule_approvers |= flat_approval_right['approvers']
dir_rule_verifiers |= flat_approval_right['verifiers']
if flat_approval_right['required']:
break
if not dir_rule_approvers in approvers:
approvers.append(dir_rule_approvers)
if not dir_rule_verifiers in verifiers:
verifiers.append(dir_rule_verifiers)
# save the result
result[file] = {
'approvers': approvers,
'verifiers': verifiers,
}
return result
def _check_users(possible, actual):
"""Return whether for each set in possible there is one entry from actual """
for user_set in possible:
inter = user_set.intersection(actual)
if len(user_set.intersection(actual)) == 0:
return False
return True
def _match_users(possible, actual):
"""Return the users who have actually approved/verified a change"""
matched = set()
for user_set in possible:
inter = user_set.intersection(actual)
matched |= inter
return matched
def ready_to_submit(branch, owner, reviewer_status, files):
"""Returns whether the supplied change is ready to submit.
These are the rules for whether a change is considered ready to submit:
1. If the project, repository or branch has code reviews turned off, then
the change is ok.
2. If all of the following are true, the change is ok:
a. Either:
i. Someone who is authorized to lgtm each file has done so.
ii. For each file where the owner is the only approver in the list,
there is at least one other person that has given a positive
score.
b. No one who is authorized to say no has done so.
If 'owner' can lgtm or verify a change, she is added to the respective list.
Args:
branch: The branch that this change is in (a Branch object).
owner: The user that owns the change. (a User object).
reviewer_status: A map as returned by Change.get_reviewer_status()
files: A list of the files that are affected.
Returns:
A tuple containing:
0 - whether the change is ready to submit over all,
1 - whether it has been approved,
2 - whether it has been denied
3 - whether it has been verified.
4 - A tuple of
0 - whether the owner has lgtm rights
1 - whether the owner has verification rights
"""
if branch is None:
return (False, False, False, False, (False, False))
project = branch.project
if not project.is_code_reviewed():
return (True, True, False, True, (False, False))
if not branch.is_code_reviewed():
return (True, True, False, True, (False, False))
approved_cnt = 0
verified_cnt = 0
denied_cnt = 0
owner_auto_lgtm = False
owner_auto_verify = False
lgtm_emails = set([u.email() for u in reviewer_status['lgtm']])
reject_emails = set([u.email() for u in reviewer_status['reject']])
verified_by_emails = set([u.email() for u in reviewer_status['verified_by']])
owner_email = owner.email()
files_to_approve = _split_files_for_review(project, files)
if files_to_approve:
for (file,user_sets) in files_to_approve.iteritems():
# check the real approvals - the owner is checked in this list
if _check_users(user_sets['approvers'], lgtm_emails):
approved_cnt += 1
if _check_users(user_sets['approvers'], reject_emails):
denied_cnt += 1
if _check_users(user_sets['verifiers'], verified_by_emails):
verified_cnt += 1
# check the owner auto-approve
if _check_users(user_sets['approvers'], [owner_email]):
owner_auto_lgtm = True
if _check_users(user_sets['verifiers'], [owner_email]):
owner_auto_verify = True
require_review = False
approved = ((approved_cnt == len(files_to_approve))
or (owner_auto_lgtm
and ((not require_review)
or (len(reviewer_status['yes']) > 0
or len(reviewer_status['lgtm']) > 0))))
verified = verified_cnt == len(files_to_approve) or owner_auto_verify
denied = denied_cnt > 0
else:
approved = False
verified = False
denied = False
real_approvers = _match_users(user_sets['approvers'], lgtm_emails)
real_deniers = _match_users(user_sets['approvers'], reject_emails)
real_verifiers = _match_users(user_sets['verifiers'], verified_by_emails)
return {
'can_submit': approved and verified and (not denied),
'approved': approved,
'denied': denied,
'verified': verified,
'owner_auto_lgtm': owner_auto_lgtm,
'owner_auto_verify': owner_auto_verify,
'real_approvers': real_approvers,
'real_deniers': real_deniers,
'real_verifiers': real_verifiers,
}
def user_can_approve(user, change):
"""Returns whether the user can approve or verify this change.
Args:
user: The user to test (a User object)
branch: The branch that this change is in (a Branch object)
files: A list of files in question
Returns:
A map of 'approve' and 'verify' to booleans of whether the user
can approve or verify at least one of these files.
"""
# pick the files
patchsets = list(change.patchset_set.order('id'))
if not patchsets:
return (False, False)
files = patchsets[-1].filenames
branch = change.dest_branch
project = branch.project
if not project.is_code_reviewed():
return (True, True)
files_to_approve = _split_files_for_review(project, files)
if not files_to_approve:
return (False, False)
email = user.email()
can_approve = False
can_verify = False
for (file,user_sets) in files_to_approve.iteritems():
for user_set in user_sets["approvers"]:
if email in user_set:
can_approve = True
for user_set in user_sets["verifiers"]:
if email in user_set:
can_verify = True
if can_approve and can_verify:
# short circuit
return (True, True)
return (can_approve, can_verify)