import hashlib
import re
import sqlite3
from freepost import random, settings
db = sqlite3.connect(settings['sqlite']['database'])
# Returns SQLite rows as dictionaries instead of tuples.
# https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.row_factory
db.row_factory = sqlite3.Row
# A custom function to compute SHA-512 because it's not built into SQLite
db.create_function('SHA512', 1, lambda text:
None if text is None else hashlib.sha512(text.encode('UTF-8')).hexdigest())
# The REGEXP operator is a special syntax for the regexp() user function. No
# regexp() user function is defined by default and so use of the REGEXP operator
# will normally result in an error message. If an application-defined SQL
# function named "regexp" is added at run-time, then the "X REGEXP Y" operator
# will be implemented as a call to "regexp(Y,X)".
db.create_function('REGEXP', 2, lambda pattern, string:
re.search(pattern, string, flags=re.IGNORECASE) is not None)
# Store a new session_id for a user that has logged in
# The session token is stored in the user cookies during login, here
# we store the hash value of that token.
def new_session(user_id, session_token):
with db:
db.execute(
"""
UPDATE user
SET session = SHA512(:session)
WHERE id = :user
""",
{
'user': user_id,
'session': session_token
}
)
# Delete user session token on logout
def delete_session (user_id):
with db:
db.execute (
"""
UPDATE user
SET session = NULL
WHERE id = :user
""",
{
'user': user_id
}
)
# Check user login credentials
#
# @return None if bad credentials, otherwise return the user
def check_user_credentials (username, password):
with db:
cursor = db.execute (
"""
SELECT *
FROM user
WHERE username = :username
AND password = SHA512(:password || salt)
AND isActive = 1
""",
{
'username': username,
'password': password
}
)
return cursor.fetchone ()
# Check if username exists
def username_exists (username, case_sensitive = True):
if not username:
return None
if case_sensitive:
where = 'WHERE username = :username'
else:
where = 'WHERE LOWER(username) = LOWER(:username)'
with db:
cursor = db.execute(
"""
SELECT *
FROM user
""" +
where,
{
'username': username
}
)
return cursor.fetchone() is not None
# Create new user account
def new_user (username, password):
# Create a hash_id for the new post
hash_id = random.alphanumeric_string (10)
# Create a salt for user's password
salt = random.ascii_string (16)
# Add user to database
with db:
db.execute (
"""
INSERT INTO user (hashId, isActive, password, registered, salt, username)
VALUES (:hash_id, 1, SHA512(:password || :salt), DATE(), :salt, :username)
""",
{
'hash_id': hash_id,
'password': password,
'salt': salt,
'username': username
}
)
# Check if session token exists
def is_valid_session (token):
return get_user_by_session_token (token) is not None
# Return the number of unread replies
def count_unread_messages (user_id):
with db:
cursor = db.execute (
"""
SELECT COUNT(1) AS new_messages
FROM comment
WHERE parentUserId = :user AND userId != :user AND `read` = 0
""",
{
'user': user_id
}
)
return cursor.fetchone ()['new_messages']
# Retrieve a user
def get_user_by_username (username):
if not username:
return None
with db:
cursor = db.execute(
"""
SELECT *
FROM user
WHERE username = :username
""",
{
'username': username
}
)
return cursor.fetchone()
# Retrieve a user from a session cookie
def get_user_by_session_token(session_token):
with db:
cursor = db.execute(
"""
SELECT *
FROM user
WHERE session = SHA512(:session)
""",
{
'session': session_token
}
)
return cursor.fetchone()
# Get posts by date (for homepage)
def get_posts (page = 0, session_user_id = None, sort = 'hot', topic = None):
if sort == 'new':
sort = 'ORDER BY P.created DESC'
else:
sort = 'ORDER BY P.dateCreated DESC, P.vote DESC, P.commentsCount DESC'
if topic:
topic_name = 'WHERE T.name = :topic'
else:
topic_name = ''
with db:
cursor = db.execute (
"""
SELECT P.*,
U.username,
V.vote AS user_vote,
GROUP_CONCAT(T.name, " ") AS topics
FROM post AS P
JOIN user AS U ON P.userId = U.id
LEFT JOIN vote_post as V ON V.postId = P.id AND V.userId = :user
LEFT JOIN topic as T ON T.post_id = P.id
{topic}
GROUP BY P.id
{order}
LIMIT :limit
OFFSET :offset
""".format (topic=topic_name, order=sort),
{
'user': session_user_id,
'limit': settings['defaults']['items_per_page'],
'offset': page * settings['defaults']['items_per_page'],
'topic': topic
}
)
return cursor.fetchall ()
# Retrieve user's own posts
def get_user_posts (user_id):
with db:
cursor = db.execute (
"""
SELECT *
FROM post
WHERE userId = :user
ORDER BY created DESC
LIMIT 50
""",
{
'user': user_id
}
)
return cursor.fetchall()
# Retrieve user's own comments
def get_user_comments (user_id):
with db:
cursor = db.execute (
"""
SELECT C.*,
P.title AS postTitle,
P.hashId AS postHashId
FROM comment AS C
JOIN post AS P ON P.id = C.postId
WHERE C.userId = :user
ORDER BY C.created DESC
LIMIT 50
""",
{
'user': user_id
}
)
return cursor.fetchall()
# Retrieve user's own replies to other people
def get_user_replies (user_id):
with db:
cursor = db.execute(
"""
SELECT C.*,
P.title AS postTitle,
P.hashId AS postHashId,
U.username AS username
FROM comment AS C
JOIN post AS P ON P.id = C.postId
JOIN user AS U ON U.id = C.userId
WHERE C.parentUserId = :user AND C.userId != :user
ORDER BY C.created DESC
LIMIT 50
""",
{
'user': user_id
}
)
return cursor.fetchall()
# Update user information
def update_user (user_id, about, email, email_notifications, preferred_feed):
with db:
# Update user info, but not email address
db.execute(
"""
UPDATE user
SET about = :about,
email_notifications = :notifications,
preferred_feed = :preferred_feed
WHERE id = :user
""",
{
'about': about,
'notifications': email_notifications,
'user': user_id,
'preferred_feed': preferred_feed
}
)
# Update email address. Convert all addresses to LOWER() case. This
# prevents two users from using the same address with different case.
# IGNORE update if the email address is already specified. This is
# necessary to avoid an "duplicate key" exception when updating value.
db.execute (
"""
UPDATE OR IGNORE user
SET email = LOWER(:email)
WHERE id = :user
""",
{
'email': email,
'user': user_id
}
)
# Set user replies as read
def set_replies_as_read (user_id):
with db:
db.execute(
"""
UPDATE comment
SET `read` = 1
WHERE parentUserId = :user AND `read` = 0
""",
{
'user': user_id
}
)
# Submit a new post/link
def new_post (title, link, text, user_id):
# Create a hash_id for the new post
hash_id = random.alphanumeric_string (10)
with db:
db.execute(
"""
INSERT INTO post (hashId, created, dateCreated, title,
link, text, vote, commentsCount, userId)
VALUES (:hash_id, DATETIME(), DATE(), :title, :link,
:text, 0, 0, :user)
""",
{
'hash_id': hash_id,
'title': title,
'link': link,
'text': text,
'user': user_id
}
)
return hash_id
# Set topics post. Deletes existing ones.
def replace_post_topics (post_id, topics = ''):
if not topics:
return
# Normalize topics
# 1. Split topics by space
# 2. Remove empty strings
# 3. Lower case topic name
topics = [ topic.lower () for topic in topics.split (' ') if topic ]
if len (topics) == 0:
return
# Remove extra topics if the list is too long
topics = topics[:settings['defaults']['topics_per_post']]
with db:
# First we delete the existing topics
db.execute (
"""
DELETE
FROM topic
WHERE post_id = :post
""",
{
'post': post_id
}
)
# Now insert the new topics.
# IGNORE duplicates that trigger UNIQUE constraint.
db.executemany (
"""
INSERT OR IGNORE INTO topic (post_id, name)
VALUES (?, ?)
""",
[ (post_id, topic) for topic in topics ]
)
# Retrieve a post
def get_post (hash, session_user_id = None):
with db:
cursor = db.execute (
"""
SELECT P.*,
U.username,
V.vote AS user_vote
FROM post AS P
JOIN user AS U ON P.userId = U.id
LEFT JOIN vote_post as V ON V.postId = P.id AND V.userId = :user
WHERE P.hashId = :post
""",
{
'user': session_user_id,
'post': hash
}
)
return cursor.fetchone ()
# Update a post
def update_post (title, link, text, post_hash_id, user_id):
with db:
db.execute (
"""
UPDATE post
SET title = :title,
link = :link,
text = :text
WHERE hashId = :hash_id
AND userId = :user
""",
{
'title': title,
'link': link,
'text': text,
'hash_id': post_hash_id,
'user': user_id
}
)
# Retrieve all comments for a specific post
def get_post_comments (post_id, session_user_id = None):
with db:
cursor = db.execute (
"""
SELECT C.*,
U.username,
V.vote AS user_vote
FROM comment AS C
JOIN user AS U ON C.userId = U.id
LEFT JOIN vote_comment as V ON V.commentId = C.id AND V.userId = :user
WHERE C.postId = :post
ORDER BY C.vote DESC,
C.created ASC
""",
{
'user': session_user_id,
'post': post_id
}
)
return cursor.fetchall ()
# Retrieve all topics for a specific post
def get_post_topics (post_id):
with db:
cursor = db.execute (
"""
SELECT T.name
FROM topic AS T
WHERE T.post_id = :post
ORDER BY T.name ASC
""",
{
'post': post_id
}
)
return cursor.fetchall ()
# Submit a new comment to a post
def new_comment (comment_text, post_hash_id, user_id, parent_user_id = None, parent_comment_id = None):
# Create a hash_id for the new comment
hash_id = random.alphanumeric_string (10)
# Retrieve post
post = get_post (post_hash_id)
with db:
db.execute (
"""
INSERT INTO comment (hashId, created, dateCreated, `read`, text, vote,
parentId, parentUserId, postId, userId)
VALUES (:hash_id, DATETIME(), DATE(), 0, :text, 0, :parent_id,
:parent_user_id, :post_id, :user)
""",
{
'hash_id': hash_id,
'text': comment_text,
'parent_id': parent_comment_id,
'parent_user_id': parent_user_id,
'post_id': post['id'],
'user': user_id
}
)
# Increase comments count for post
db.execute (
"""
UPDATE post
SET commentsCount = commentsCount + 1
WHERE id = :post
""",
{
'post': post['id']
}
)
return hash_id
# Retrieve a single comment
def get_comment (hash_id, session_user_id = None):
with db:
cursor = db.execute(
"""
SELECT C.*,
P.hashId AS postHashId,
P.title AS postTitle,
U.username,
V.vote AS user_vote
FROM comment AS C
JOIN user AS U ON C.userId = U.id
JOIN post AS P ON P.id = C.postId
LEFT JOIN vote_comment as V ON V.commentId = C.id AND V.userId = :user
WHERE C.hashId = :comment
""",
{
'user': session_user_id,
'comment': hash_id
}
)
return cursor.fetchone()
# Retrieve last N newest comments
def get_latest_comments ():
with db:
cursor = db.execute (
"""
SELECT C.*,
P.hashId AS postHashId,
P.title AS postTitle,
U.username
FROM comment AS C
JOIN user AS U ON C.userId = U.id
JOIN post AS P ON P.id = C.postId
ORDER BY C.id DESC
LIMIT 50
""",
{
}
)
return cursor.fetchall ()
# Update a comment
def update_comment (text, comment_hash_id, user_id):
with db:
db.execute (
"""
UPDATE comment
SET text = :text
WHERE hashId = :comment AND userId = :user
""",
{
'text': text,
'comment': comment_hash_id,
'user': user_id
}
)
# Add or update vote to a post
def vote_post (post_id, user_id, vote):
with db:
# Create a new vote for this post, if one doesn't already exist
db.execute(
"""
INSERT OR IGNORE INTO vote_post (vote, datetime, postId, userId)
VALUES (0, DATETIME(), :post, :user)
""",
{
'post': post_id,
'user': user_id
}
)
# Update user vote (+1 or -1)
db.execute(
"""
UPDATE vote_post
SET vote = vote + :vote
WHERE postId = :post AND userId = :user
""",
{
'vote': vote,
'post': post_id,
'user': user_id
}
)
# Update post's total
db.execute (
"""
UPDATE post
SET vote = vote + :vote
WHERE id = :post
""",
{
'vote': vote,
'post': post_id
}
)
# Add or update vote to a comment
def vote_comment (comment_id, user_id, vote):
with db:
# Create a new vote for this post, if one doesn't already exist
db.execute (
"""
INSERT INTO vote_comment (vote, datetime, commentId, userId)
VALUES (0, DATETIME(), :comment, :user)
""",
{
'comment': comment_id,
'user': user_id
}
)
# Update user vote (+1 or -1)
db.execute (
"""
UPDATE vote_comment
SET vote = vote + :vote
WHERE commentId = :comment AND userId = :user
""",
{
'vote': vote,
'comment': comment_id,
'user': user_id
}
)
# Update comment's total
db.execute (
"""
UPDATE comment
SET vote = vote + :vote
WHERE id = :comment
""",
{
'vote': vote,
'comment': comment_id
}
)
# Search posts
def search (query, sort='newest', page=0):
if not query:
return []
# Remove multiple white spaces and replace with '|' (for query REGEXP)
query = re.sub (' +', '|', query.strip ())
if len (query) == 0:
return []
if sort == 'newest':
sort = 'P.created DESC'
if sort == 'points':
sort = 'P.vote DESC'
with db:
cursor = db.execute (
"""
SELECT P.*,
U.username
FROM post AS P
JOIN user AS U ON P.userId = U.id
WHERE P.title REGEXP :query
ORDER BY {sort}
LIMIT :limit
OFFSET :offset
""".format (sort=sort),
{
'query': query,
'sort': sort,
'limit': settings['defaults']['search_results_per_page'],
'offset': page * settings['defaults']['search_results_per_page']
}
)
return cursor.fetchall ()
# Set reset token for user email
def set_password_reset_token (user_id = None, token = None):
if not user_id or not token:
return
with db:
db.execute (
"""
UPDATE user
SET passwordResetToken = SHA512(:token),
passwordResetTokenExpire = NOW() + INTERVAL 1 HOUR
WHERE id = :user
""",
{
'user': user_id,
'token': token
}
)
# Delete the password reset token for a user
def delete_password_reset_token (user_id = None):
with db:
db.execute (
"""
UPDATE user
SET passwordResetToken = NULL,
passwordResetTokenExpire = NULL
WHERE id = :user
""",
{
'user': user_id
}
)
# Check if a reset token has expired.
def is_password_reset_token_valid (user_id = None):
with db:
cursor = db.execute(
"""
SELECT COUNT(1) AS valid
FROM user
WHERE id = :user
AND passwordResetToken IS NOT NULL
AND passwordResetTokenExpire IS NOT NULL
AND passwordResetTokenExpire > DATE()
""",
{
'user': user_id
}
)
return cursor.fetchone()['valid'] == 1
# Reset user password
def reset_password (username = None, email = None, new_password = None, secret_token = None):
if not new_password:
return
with db:
db.execute (
"""
UPDATE user
SET password = SHA512(:password || `salt`),
passwordResetToken = NULL,
passwordResetTokenExpire = NULL
WHERE username = :user
AND email = :email
AND passwordResetToken = SHA512(:token)
AND passwordResetTokenExpire > DATE()
""",
{
'password': new_password,
'user': username,
'email': email,
'token': secret_token
}
)