import bottle
from bottle import jinja2_template as template, request, response
import datetime
import email
import email.policy
import functools
import glob
import hashlib
import magic
import os
import pathlib
import pygit2
import pytz
import re
import stat
import subprocess
import timeago
from pygments import highlight
from pygments.lexers import guess_lexer, guess_lexer_for_filename
from pygments.formatters import HtmlFormatter
###############################################################################
# SETTINGS
###############################################################################
# The root folder where Gitolite stores the repositories. This is used to find the
# actual repositories.
GITOLITE_REPOSITORIES_ROOT = '/home/git/repositories'
# These are only used when anonymous cloning over HTTPS
GITOLITE_SHELL = '/home/git/bin/gitolite-shell'
GITOLITE_HTTP_HOME = '/home/git'
# The domain of this instance. This is only really used when displaying list addresses,
# or when the domain needs to be displayed on some pages.
INSTANCE_DOMAIN = 'domain.local'
###############################################################################
# UTILITY FUNCTIONS
# This is code that is reused several times within the Bottle controllers
# below, so it's been grouped into functions.
###############################################################################
def list_repositories():
"""
Scan GITOLITE_REPOSITORIES_ROOT for Git repositories, and return a list of them.
"""
repositories = []
# When topdown is True, the caller can modify the dirnames list in-place and
# walk() will only recurse into the subdirectories whose names remain in dirnames;
# this can be used to prune the search.
# https://docs.python.org/3.12/library/os.html#os.walk
for path, dirs, files in os.walk(GITOLITE_REPOSITORIES_ROOT, topdown=True):
# Remove all files, we only want to recurse into directories
files.clear()
# This path is a git repo. Remove all sub-dirs because we don't need to
# recurse any further
if path.endswith('.git'):
dirs.clear()
repository = os.path.relpath(path, GITOLITE_REPOSITORIES_ROOT)
# DO NOT LIST gitolite-admin repository!
# This is the administration repository of this instance!
if repository.lower() == 'gitolite-admin.git':
continue
repositories.append(repository)
repositories.sort()
return repositories
def parse_thread_tags(data):
"""
Parse "tags" file of a mailing list thread.
"""
tags = {}
for line in data.splitlines():
k, v = line.split('=', 1)
k = k.strip()
v = v.strip()
tags[k] = tags.get(k, []) + [ v ]
return tags
###############################################################################
# WEB APP
# Here below are all the Bottle routes and controllers.
###############################################################################
if not os.path.isdir(GITOLITE_REPOSITORIES_ROOT):
print('Invalid repositories path: {}'.format(GITOLITE_REPOSITORIES_ROOT))
exit(4)
# This only exists for exporting the bottle app object for a WSGI server such as Gunicorn
application = bottle.app()
# Directories to search for HTML templates
bottle.TEMPLATE_PATH = [ './templates' ]
def human_size(bytes, B=False):
"""
Convert a file size in bytes to a human friendly form.
This is only used in templates when showing file sizes.
"""
for unit in [ 'B' if B else '', 'K', 'M', 'G', 'T', 'P' ]:
if bytes < 1024: break
bytes = bytes / 1024
return '{}{}'.format(round(bytes), unit).rjust(5)
def humanct(commit_time, commit_time_offset = 0):
"""
The following will add custom functions to the jinja2 template engine.
These will be available to use within templates.
"""
delta = datetime.timedelta(minutes=commit_time_offset)
tz = datetime.timezone(delta)
dt = datetime.datetime.fromtimestamp(commit_time, tz)
return dt.astimezone(pytz.utc).strftime('%Y-%m-%d %H:%M:%S')
template = functools.partial(template, template_settings = {
'filters': {
'ago': timeago.format,
'datetime': lambda date: dateutil.parser.parse(date).strftime('%b %-d, %Y - %H:%M%z%Z'),
# Convert a file's mode to a string of the form '-rwxrwxrwx'
'filemode': stat.filemode,
# Human-friendly file size:
'human_size': human_size,
},
'globals': {
'commit_time': humanct,
'instance_domain': INSTANCE_DOMAIN,
'now': lambda: datetime.datetime.now(datetime.timezone.utc),
'request': request,
'url': application.get_url,
},
'autoescape': True
})
@bottle.error(404)
def error404(error):
"""
Custom 404 page.
:param error: bottle.HTTPError given by Bottle when calling abort(404).
"""
return '[404] {}'.format(error.body)
@bottle.get('/static/<filename:path>', name='static')
def static(filename):
"""
Path for serving static files.
"""
return bottle.static_file(filename, root='./static/')
@bottle.get('/', name='about')
def about():
"""
The home page displayed at https://domain/
"""
return template('about.html', domain=INSTANCE_DOMAIN)
@bottle.get('/explore', name='explore')
def explore():
"""
The home page displayed at https://domain/
"""
repositories = list_repositories()
return template('explore.html', repositories=repositories)
@bottle.get('/<repository:path>.git', name='readme')
def readme(repository):
"""
Show README of the repository.
:param repository: Match repository name ending with ".git"
"""
repository += '.git'
path = os.path.join(GITOLITE_REPOSITORIES_ROOT, repository)
if not os.path.isdir(path):
bottle.abort(404, 'No repository at this path.')
repo = pygit2.Repository(path)
local_branches = list(repo.branches.local)
HEAD = None
ref_name = None
try:
HEAD = repo.head.name
ref_name = HEAD
except:
for name_candidate in [ 'master', 'main', 'trunk', 'development', 'dev' ]:
if name_candidate in local_branches:
ref_name = name_candidate
break
readme = ''
if ref_name:
tree = repo.revparse_single(ref_name).tree
for e in tree:
if e.name.lower() not in [ 'readme', 'readme.md', 'readme.rst' ]:
continue
if e.is_binary:
continue
# Read the README content, cut at 1MB
readme = tree[e.name].data[:1048576].decode('UTF-8')
break
repo_size = sum(f.stat().st_size for f in pathlib.Path(path).glob("**/*"))
return template('repository/readme.html',
readme=readme,
repository=repository,
repository_size=human_size(repo_size),
head_ref=HEAD)
@bottle.get('/<repository:path>.git/refs', name='refs')
def refs(repository):
"""
List repository refs
"""
repository += '.git'
path = os.path.join(GITOLITE_REPOSITORIES_ROOT, repository)
if not os.path.isdir(path):
bottle.abort(404, 'No repository at this path.')
repo = pygit2.Repository(path)
if repo.is_empty:
return template('repository/refs.html',
repository=repository)
HEAD = None
heads = []
tags = []
for ref in repo.references:
if ref.startswith('refs/heads/'):
heads.append(ref)
if ref.startswith('refs/tags/'):
tags.append(ref)
heads.sort()
tags.sort()
try:
HEAD = repo.head.name
except:
pass
return template('repository/refs.html',
repository=repository,
heads=heads, tags=tags, head=HEAD)
@bottle.get('/<repository:path>.git/tree/<revision>', name='tree')
@bottle.get('/<repository:path>.git/tree/<revision>/<tree_path:path>', name='tree_path')
def tree(repository, revision, tree_path=None):
"""
Show commit tree.
"""
repository += '.git'
repository_path = os.path.join(GITOLITE_REPOSITORIES_ROOT, repository)
if not os.path.isdir(repository_path):
bottle.abort(404, 'No repository at this path.')
repo = pygit2.Repository(repository_path)
if repo.is_empty:
return template('repository/tree.html',
repository=repository, revision=revision)
git_object = None
try:
git_object = repo.revparse_single(revision)
except:
pass
if not git_object:
return template('repository/tree.html',
repository=repository, revision=revision)
# List all the references.
# This is used for allowing the user to switch revision with a selector.
HEAD = None
heads = []
tags = []
for ref in repo.references:
if ref.startswith('refs/heads/'): heads.append(ref)
if ref.startswith('refs/tags/'): tags.append(ref)
heads.sort()
tags.sort()
try:
HEAD = repo.head.name
except:
pass
if git_object.type == pygit2.GIT_OBJ_TAG:
git_object = git_object.peel(None)
if git_object.type == pygit2.GIT_OBJ_COMMIT:
git_object = git_object.tree
if git_object.type == pygit2.GIT_OBJ_TREE and tree_path:
git_object = git_object[tree_path]
if git_object.type == pygit2.GIT_OBJ_TREE:
return template(
'repository/tree.html',
heads=heads, head_ref=HEAD, tags=tags,
tree=git_object,
tree_path=tree_path,
repository=repository, revision=revision)
if git_object.type == pygit2.GIT_OBJ_BLOB:
# Highlight blob text
if git_object.is_binary:
blob_formatted = ''
else:
blob_data = git_object.data.decode('UTF-8')
# Guess Pygments lexer by filename, or by content if we can't find one
try:
pygments_lexer = guess_lexer_for_filename(git_object.name, blob_data)
except:
pygments_lexer = guess_lexer(blob_data)
pygments_formatter = HtmlFormatter(nobackground=True, linenos=True, anchorlinenos=True,
lineanchors='line')
blob_formatted = highlight(blob_data, pygments_lexer, pygments_formatter)
return template(
'repository/blob.html',
heads=heads, tags=tags,
blob=git_object,
blob_formatted=blob_formatted,
repository=repository, revision=revision,
tree_path=tree_path)
bottle.abort(404)
@bottle.post('/<repository:path>.git/tree', name='tree_change')
def tree_change(repository):
"""
Switch revision in tree page.
This route is used by the <form> in the "tree page when changing the revision
to be displayed.
"""
revision = request.forms.get('revision')
bottle.redirect(application.get_url('tree',
repository=repository,
revision=revision))
@bottle.get('/<repository:path>.git/log/<revision>', name='log')
def log(repository, revision):
"""
Show commit log.
"""
repository += '.git'
repository_path = os.path.join(GITOLITE_REPOSITORIES_ROOT, repository)
if not os.path.isdir(repository_path):
bottle.abort(404, 'No repository at this path.')
repo = pygit2.Repository(repository_path)
if repo.is_empty:
return template('repository/log.html',
repository=repository, revision=revision)
git_object = None
try:
git_object = repo.revparse_single(revision)
except:
pass
if not git_object:
return template('repository/log.html',
repository=repository, revision=revision)
# List all the references.
# This is used for allowing the user to switch revision with a selector.
HEAD = None
heads = []
tags = []
for ref in repo.references:
if ref.startswith('refs/heads/'): heads.append(ref)
if ref.startswith('refs/tags/'): tags.append(ref)
heads.sort()
tags.sort()
try:
HEAD = repo.head.name
except:
pass
if git_object.type in [ pygit2.GIT_OBJ_TREE, pygit2.GIT_OBJ_BLOB ]:
return 'Not a valid ref'
if git_object.type == pygit2.GIT_OBJ_TAG:
git_object = git_object.peel(None)
# At this point git_object should be a valid pygit2.GIT_OBJ_COMMIT
# Read 50 commits
commits = []
for commit in repo.walk(git_object.id):
commits.append(commit)
if len(commits) >= 50:
break
return template(
'repository/log.html',
heads=heads, head_ref=HEAD, tags=tags,
commits=commits,
repository=repository, revision=revision)
@bottle.get('/<repository:path>.git/raw/<revision>/<tree_path:path>', name='raw')
def raw(repository, revision, tree_path):
"""
Return a raw blow object.
"""
repository += '.git'
repository_path = os.path.join(GITOLITE_REPOSITORIES_ROOT, repository)
if not os.path.isdir(repository_path):
bottle.abort(404, 'No repository at this path.')
repo = pygit2.Repository(repository_path)
if repo.is_empty:
return ""
git_tree = None
try:
git_object = repo.revparse_single(revision)
except:
pass
if not git_object or git_object.type != pygit2.GIT_OBJ_COMMIT:
bottle.abort(404, 'Not a valid revision.')
blob = None
try:
blob = git_object.tree[tree_path]
except:
bottle.abort(404, 'Object does not exist.')
if blob.type != pygit2.GIT_OBJ_BLOB:
bottle.abort(404, 'Object is not a blob.')
mime = magic.from_buffer(blob.data[:1048576], mime=True)
response.content_type = mime
return blob.data
@bottle.get('/<repository:path>.git/info/refs')
@bottle.post('/<repository:path>.git/git-upload-pack')
def git_smart_http(repository):
"""
This controller proxies Git Smart HTTP requests to gitolite-shell for allowing
anonymous clones over HTTP. Looks like anonymous clones are not possible via SSH,
hence why we have this feature.
Note that this controller only matches "git-upload-pack" (used for fetching)
but does not match "git-receive-pack" (used for pushing). Pushing should only
happen via SSH.
Note: If CLIF is running behind a web server such as httpd or lighttpd, the
same behavior of this controller can be achieved much more simply by configuring
the server with CGI and an alias that redirects the URLs above to the gitolite-shell
script. However, this controller exists so that anonymous HTTP clones can work
"out of the box" without any manual configuration of the server.
Documentation useful for understanding how this works:
https://git-scm.com/docs/http-protocol
https://bottlepy.org/docs/dev/async.html
https://gitolite.com/gitolite/http.html#allowing-unauthenticated-access
"""
# Environment variables for the Gitolite shell
# TODO Gitolite gives a warning: "WARNING: Use of uninitialized value in concatenation (.) or string at /home/git/bin/gitolite-shell line 239"
# Looks like some non-critical env vars are missing here: REMOTE_PORT SERVER_ADDR SERVER_PORT
gitenv = {
**os.environ,
# https://git-scm.com/docs/git-http-backend#_environment
'PATH_INFO': request.path,
'REMOTE_USER': 'anonymous', # This user must be set in ~/.gitolite.rc like this:
# HTTP_ANON_USER => 'anonymous',
'REMOTE_ADDR': request.remote_addr,
'CONTENT_TYPE': request.content_type,
'QUERY_STRING': request.query_string,
'REQUEST_METHOD': request.method,
'GIT_PROJECT_ROOT': GITOLITE_REPOSITORIES_ROOT,
'GIT_HTTP_EXPORT_ALL': 'true',
# Additional variables required by Gitolite
'REQUEST_URI': request.fullpath,
'GITOLITE_HTTP_HOME': GITOLITE_HTTP_HOME,
'HOME': GITOLITE_HTTP_HOME,
}
# Start a Gitolite shell.
# Do not replace .Popen() with .run() because it waits for child process to finish before returning.
proc = subprocess.Popen(
[ GITOLITE_SHELL ],
env = gitenv,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE)
# stderr = )
# Write the whole request body to Gitolite stdin.
# Don't forget to close the pipe or it will hang!
proc.stdin.write(request.body.read())
proc.stdin.close()
# Now we process the Gitolite response and return it to the client.
# First we need to scan all the HTTP headers in the response so that we can
# add them to the bottle response...
for line in proc.stdout:
line = line.decode('UTF-8').strip()
# Empty line means no more headers
if line == '':
break
header = line.split(':', 1)
response.set_header(header[0].strip(), header[1].strip())
# ...then we can return the rest of the Gitolite response to the client as we read it
for line in proc.stdout:
yield line
@bottle.get('/<repository:path>.mlist', name='threads')
def threads(repository):
"""
List email threads.
:param repository: Match repository name NOT ending with ".git"
"""
# List of seletected tags, retrieved from the query string
query_tags = { k: request.query.getall(k) for k in request.query.keys() }
repository += '.mlist.git'
path = os.path.join(GITOLITE_REPOSITORIES_ROOT, repository)
list_address = '{}@{}'.format(repository[:-10], INSTANCE_DOMAIN)
if not os.path.isdir(path):
bottle.abort(404, 'No repository at this path.')
repo = pygit2.Repository(path)
tree = repo.revparse_single('HEAD').tree
threads_list = []
tags = {}
for obj in tree:
if obj.type != pygit2.GIT_OBJ_TREE:
continue
thread_date, thread_time, thread_id, thread_title = obj.name.split(' ', 3)
thread_tags = {}
try:
thread_tags = parse_thread_tags(obj['tags'].data.decode('UTF-8'))
# Collect tags for filters
for k, v in thread_tags.items():
tags[k] = tags.get(k, []) + v
except:
pass
# Check if we should filter out this thread from the list
keep = True
for key in query_tags.keys():
for value in query_tags[key]:
if value not in thread_tags.get(key, []):
keep = False
break
if not keep: break
if keep:
threads_list.append({
'datetime': thread_date + ' ' + thread_time,
'id': thread_id,
'title': thread_title,
'tags': thread_tags
})
threads_list.reverse()
return template('mailing_list/emails.html', threads=threads_list,
list_address=list_address,
repository=repository,
tags=tags, query_tags=query_tags)
@bottle.get('/<repository:path>.mlist/<thread_id>', name='thread')
def thread(repository, thread_id):
"""
Show a single email thread.
"""
repository += '.mlist.git'
path = os.path.join(GITOLITE_REPOSITORIES_ROOT, repository)
list_address = '{}@{}'.format(repository[:-10], INSTANCE_DOMAIN)
if not os.path.isdir(path):
bottle.abort(404, 'No repository at this path.')
repo = pygit2.Repository(path)
head_tree = repo.revparse_single('HEAD').tree
thread_tree = None
for obj in head_tree:
if obj.type != pygit2.GIT_OBJ_TREE:
continue
if thread_id in obj.name:
thread_tree = obj
break
if not thread_tree:
bottle.abort(404, 'Not a valid thread')
thread_date, thread_time, thread_id, thread_title = thread_tree.name.split(' ', 3)
thread_data = {
'datetime': thread_date + ' ' + thread_time,
'id': thread_id,
'title': thread_title
}
# Read all the emails in this thread and collect some statistics on the way (for
# displaying purposes only)
emails = []
participants = []
tags = {}
for obj in thread_tree:
if obj.type != pygit2.GIT_OBJ_BLOB:
continue
if obj.name == 'tags':
tags = parse_thread_tags(obj.data.decode('UTF-8'))
continue
if not obj.name.endswith('.email'):
continue
message = email.message_from_string(obj.data.decode('UTF-8'), policy=email.policy.default)
email_data = {
'id': message.get('message-id'),
'id_hash': hashlib.sha256(message.get('message-id').encode('utf-8')).hexdigest()[:8],
'from': email.utils.parseaddr(message.get('from')),
'to': email.utils.parseaddr(message.get('to')),
'in_reply_to': message.get('in-reply-to'),
'sent_at': email.utils.parsedate_to_datetime(message.get('date')).astimezone(pytz.utc).strftime('%Y-%m-%d %H:%M:%S'),
'received_at': email.utils.parsedate_to_datetime(message.get_all('received')[0].rsplit(";")[-1]).astimezone(pytz.utc).strftime('%Y-%m-%d %H:%M:%S'),
'subject': message.get('subject'),
'body': message.get_body(('plain',)).get_content()
}
emails.append(email_data)
if email_data['from'] not in participants:
participants.append(email_data['from'])
emails.sort(key = lambda email: email['received_at'])
return template('mailing_list/emails_thread.html', thread=thread_data, emails=emails,
participants=participants, list_address=list_address, tags=tags,
repository=repository)