blob: a33e56a9f67a4ba94d5b1b9d2a5b9e2c5d236161 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2014 Rackspace Australia
# Copyright 2018-2019 Red Hat, Inc
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
"""
Utility to upload files to google
Run this from the CLI from the zuul/jobs/roles directory with:
python -m gcs-upload.library.gcs_upload
"""
import argparse
import datetime
import json
import logging
import os
import time
try:
import queue as queuelib
except ImportError:
import Queue as queuelib
import sys
import threading
from google.cloud import storage
import google.auth.compute_engine.credentials as gce_cred
import mimetypes
from ansible.module_utils.basic import AnsibleModule
POST_ATTEMPTS = 3
MAX_UPLOAD_THREADS = 24
def retry_function(func):
for attempt in range(1, POST_ATTEMPTS + 1):
try:
return func()
except Exception:
if attempt >= POST_ATTEMPTS:
raise
else:
logging.exception("Error on attempt %d" % attempt)
time.sleep(attempt * 10)
class Credentials(gce_cred.Credentials):
def _set_path(self, path):
"""Call this after initialization"""
self._path = path
self.refresh(None)
def refresh(self, request):
with open(self._path) as f:
data = json.loads(f.read())
self.token = data['access_token']
self.expiry = (datetime.datetime.utcnow() +
datetime.timedelta(seconds=data['expires_in']))
def with_scopes(self, *args, **kw):
ret = super(Credentials, self).with_scopes(*args, **kw)
ret._set_path(self._path)
return ret
class Uploader():
def __init__(self, client, container, prefix, cache_control):
self.client = client
self.prefix = prefix or ''
if self.prefix and not self.prefix.endswith('/'):
self.prefix += '/'
self.cache_control = cache_control
self.bucket = client.bucket(container)
def upload(self, file_list):
"""Spin up thread pool to upload to storage"""
num_threads = min(len(file_list), MAX_UPLOAD_THREADS)
threads = []
queue = queuelib.Queue()
# add items to queue
for f in file_list:
queue.put(f)
for x in range(num_threads):
t = threading.Thread(target=self.post_thread, args=(queue,))
threads.append(t)
t.start()
for t in threads:
t.join()
def post_thread(self, queue):
while True:
try:
file_detail = queue.get_nowait()
logging.debug("%s: processing job %s",
threading.current_thread(),
file_detail)
retry_function(lambda: self._post_file(file_detail))
except IOError:
# Do our best to attempt to upload all the files
logging.exception("Error opening file")
continue
except queuelib.Empty:
# No more work to do
return
def _post_file(self, file_detail):
full_path, relative_path = file_detail
relative_path = self.prefix + relative_path
data = open(full_path, 'rb')
blob = self.bucket.blob(relative_path)
if self.cache_control:
blob.cache_control = self.cache_control
mime_guess, encoding = mimetypes.guess_type(full_path)
mimetype = mime_guess if mime_guess else 'application/octet-stream'
blob.upload_from_file(data, content_type=mimetype)
def run(container, prefix, root, credentials_file=None, project=None,
cache_control=None):
if credentials_file:
cred = Credentials()
cred._set_path(credentials_file)
client = storage.Client(credentials=cred, project=project)
else:
client = storage.Client()
file_list = []
if root.endswith('/'):
root = root[:-1]
for path, folders, files in os.walk(root):
for filename in files:
full_path = os.path.join(path, filename)
relative_path = full_path[len(root)+1:]
file_list.append((full_path, relative_path))
uploader = Uploader(client, container, prefix, cache_control)
uploader.upload(file_list)
return file_list
def ansible_main():
module = AnsibleModule(
argument_spec=dict(
container=dict(required=True, type='str'),
root=dict(required=True, type='str'),
prefix=dict(type='str'),
credentials_file=dict(type='str'),
project=dict(type='str'),
cache_control=dict(type='str'),
)
)
p = module.params
file_list = run(p.get('container'), p.get('prefix'), p.get('root'),
credentials_file=p.get('credentials_file'),
project=p.get('project'),
cache_control=p.get('cache_control'))
module.exit_json(changed=True, file_list=file_list)
def cli_main():
parser = argparse.ArgumentParser(
description="Upload files to Google Cloud Storage"
)
parser.add_argument('--verbose', action='store_true',
help='show debug information')
parser.add_argument('--credentials-file',
help='A file with Google Cloud credentials')
parser.add_argument('--project',
help='Name of the Google Cloud project (required for '
'credential file)')
parser.add_argument('container',
help='Name of the container to use when uploading')
parser.add_argument('prefix',
help='The prefix under the container root')
parser.add_argument('cache_control',
help='The cache-control header to set')
parser.add_argument('root',
help='The root of the directory to upload')
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.captureWarnings(True)
file_list = run(args.container, args.prefix, args.root,
credentials_file=args.credentials_file,
project=args.project,
cache_control=args.cache_control)
if __name__ == '__main__':
if sys.stdin.isatty():
cli_main()
else:
ansible_main()