blob: 7cd26cb7cde9bf40cccd02f246bd510c1fef2a23 [file] [log] [blame]
# Copyright 2008 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
from google.appengine.ext import db
from codereview.models import gql
from codereview.git_models import *
from bundle_store_pb2 import BundleStoreService
from next_received_bundle_pb2 import *
from update_received_bundle_pb2 import *
from prune_bundles_pb2 import *
from util import InternalAPI, u, commit_to_revision, automatic_retry
MAX_PRUNE = 10
MAX_SEGS_PER_PRUNE = 10
class BundleStoreServiceImp(BundleStoreService, InternalAPI):
@automatic_retry
def NextReceivedBundle(self, rpc_controller, req, done):
rsp = NextReceivedBundleResponse()
rb = ReceivedBundle.lock_next_new()
if rb:
rsp.status_code = NextReceivedBundleResponse.BUNDLE_AVAILABLE
rsp.bundle_key = str(rb.key())
rsp.dest_project = str(rb.dest_project.name)
rsp.dest_project_key = str(rb.dest_project.key())
rsp.dest_branch_key = str(rb.dest_branch.key())
rsp.owner = str(rb.owner.email())
rsp.n_segments = rb.n_segments
seg = rb.get_segment(1)
if seg:
rsp.bundle_data = seg.bundle_data
else:
rsp.status_code = NextReceivedBundleResponse.QUEUE_EMPTY
done(rsp)
@automatic_retry
def BundleSegment(self, rpc_controller, req, done):
rsp = BundleSegmentResponse()
rb = db.get(db.Key(req.bundle_key))
if not rb:
rsp.status_code = BundleSegmentResponse.UNKNOWN_BUNDLE
done(rsp)
return
seg = rb.get_segment(req.segment_id)
if seg:
rsp.status_code = BundleSegmentResponse.DATA
rsp.bundle_data = seg.bundle_data
else:
rsp.status_code = BundleSegmentResponse.UNKNOWN_SEGMENT
done(rsp)
@automatic_retry
def UpdateReceivedBundle(self, rpc_controller, req, done):
rsp = UpdateReceivedBundleResponse()
old_state = ReceivedBundle.STATE_UNPACKING
sc = req.status_code
if UpdateReceivedBundleRequest.UNPACKED_OK == sc:
new_state = ReceivedBundle.STATE_UNPACKED
err_msg = None
elif UpdateReceivedBundleRequest.SUSPEND_BUNDLE == sc:
new_state = ReceivedBundle.STATE_SUSPENDED
err_msg = req.error_details
else:
new_state = ReceivedBundle.STATE_INVALID
err_msg = req.error_details
try:
ReceivedBundle.update_state(req.bundle_key, old_state, new_state, err_msg)
rsp.status_code = UpdateReceivedBundleResponse.UPDATED
except InvalidBundleId, err:
logging.warn("Invalid bundle id %s: %s" % (req.bundle_key, err))
rsp.status_code = UpdateReceivedBundleResponse.INVALID_BUNDLE
except InvalidBundleState, err:
logging.warn("Invalid bundle state %s: %s" % (req.bundle_key, err))
rsp.status_code = UpdateReceivedBundleResponse.INVALID_STATE
done(rsp)
@automatic_retry
def PruneBundles(self, rpc_controller, req, done):
rsp = PruneBundlesResponse()
rb_list = []
to_rm = []
for m in [_AgedUploading,
_AgedInvalid,
_AgedSuspended,
_AgedUnpacking,
_AgedUnpacked,
]:
rb_list.extend(m())
if len(rb_list) >= MAX_PRUNE:
break
for rb in rb_list:
segs = gql(ReceivedBundleSegment,
'WHERE ANCESTOR IS :1',
rb).fetch(MAX_SEGS_PER_PRUNE)
if len(segs) < MAX_SEGS_PER_PRUNE:
to_rm.append(rb)
to_rm.extend(segs)
if to_rm:
db.delete(to_rm)
rsp.status_code = PruneBundlesResponse.BUNDLES_PRUNED
else:
rsp.status_code = PruneBundlesResponse.QUEUE_EMPTY
done(rsp)
def _AgedUploading():
aged = datetime.datetime.now() - datetime.timedelta(days=7)
return gql(ReceivedBundle,
'WHERE state = :1 AND created <= :2',
ReceivedBundle.STATE_UPLOADING, aged).fetch(MAX_PRUNE)
def _AgedInvalid():
aged = datetime.datetime.now() - datetime.timedelta(days=2)
return gql(ReceivedBundle,
'WHERE state = :1 AND created <= :2',
ReceivedBundle.STATE_INVALID, aged).fetch(MAX_PRUNE)
def _AgedSuspended():
aged = datetime.datetime.now() - datetime.timedelta(days=7)
return gql(ReceivedBundle,
'WHERE state = :1 AND created <= :2',
ReceivedBundle.STATE_SUSPENDED, aged).fetch(MAX_PRUNE)
def _AgedUnpacking():
aged = datetime.datetime.now() - datetime.timedelta(days=7)
return gql(ReceivedBundle,
'WHERE state = :1 AND created <= :2',
ReceivedBundle.STATE_UNPACKING, aged).fetch(MAX_PRUNE)
def _AgedUnpacked():
aged = datetime.datetime.now() - datetime.timedelta(hours=1)
return gql(ReceivedBundle,
'WHERE state = :1 AND created <= :2',
ReceivedBundle.STATE_UNPACKED, aged).fetch(MAX_PRUNE)