from datetime import datetime from typing import Optional from flask import flash, redirect, url_for, render_template, Response, Blueprint from flask.typing import ResponseReturnValue from flask_wtf import FlaskForm from flask_wtf.file import FileRequired from sqlalchemy import exc from wtforms import StringField, SelectField, SubmitField from flask_wtf.file import FileField from wtforms.validators import DataRequired from app.extensions import db from app.models.base import Group from app.models.onions import Onion from app.portal.util import response_404, view_lifecycle bp = Blueprint("onion", __name__) class NewOnionForm(FlaskForm): # type: ignore domain_name = StringField('Domain Name', validators=[DataRequired()]) description = StringField('Description', validators=[DataRequired()]) onion_private_key = FileField('Onion Private Key', validators=[FileRequired()]) onion_public_key = FileField('Onion Public Key', description="The onion hostname will be automatically calculated from the public key.", validators=[FileRequired()]) tls_private_key = FileField('TLS Private Key (PEM format)', description=("If no TLS key and certificate are provided, a self-signed certificate " "will be generated.")) tls_public_key = FileField('TLS Certificate (PEM format)') group = SelectField('Group', validators=[DataRequired()]) submit = SubmitField('Save Changes') class EditOnionForm(FlaskForm): # type: ignore domain_name = StringField('Domain Name', validators=[DataRequired()]) description = StringField('Description', validators=[DataRequired()]) tls_private_key = FileField('TLS Private Key (PEM format)', description="If no file is submitted, the TLS key will remain unchanged.") tls_public_key = FileField('TLS Certificate (PEM format)', description="If no file is submitted, the TLS certificate will remain unchanged.") group = SelectField('Group', validators=[DataRequired()]) submit = SubmitField('Save Changes') @bp.route("/new", methods=['GET', 'POST']) @bp.route("/new/", methods=['GET', 'POST']) def onion_new(group_id: Optional[int] = None) -> ResponseReturnValue: form = NewOnionForm() form.group.choices = [(x.id, x.group_name) for x in Group.query.all()] if form.validate_on_submit(): onion = Onion() onion.group_id = form.group.data onion.domain_name = form.domain_name.data for at in [ "onion_private_key", "onion_public_key", "tls_private_key", "tls_public_key" ]: if form.__getattribute__(at).data is None: flash(f"Failed to create new onion. {at} was not provided.", "danger") return redirect(url_for("portal.onion.onion_list")) onion.__setattr__(at, form.__getattribute__(at).data.read()) onion.description = form.description.data onion.created = datetime.utcnow() onion.updated = datetime.utcnow() try: db.session.add(onion) db.session.commit() flash(f"Created new onion {onion.onion_name}.", "success") return redirect(url_for("portal.onion.onion_edit", onion_id=onion.id)) except exc.SQLAlchemyError: flash("Failed to create new onion.", "danger") return redirect(url_for("portal.onion.onion_list")) if group_id: form.group.data = group_id return render_template("new.html.j2", section="onion", form=form) @bp.route('/edit/', methods=['GET', 'POST']) def onion_edit(onion_id: int) -> ResponseReturnValue: onion: Optional[Onion] = Onion.query.filter(Onion.id == onion_id).first() if onion is None: return Response(render_template("error.html.j2", section="onion", header="404 Onion Not Found", message="The requested onion service could not be found."), status=404) form = EditOnionForm(group=onion.group_id, domain_name=onion.domain_name, description=onion.description) form.group.choices = [(x.id, x.group_name) for x in Group.query.all()] if form.validate_on_submit(): onion.group_id = form.group.data onion.description = form.description.data onion.domain_name = form.domain_name.data for at in [ "tls_private_key", "tls_public_key" ]: if getattr(form, at).data is not None: # Don't clear the key if no key is uploaded setattr(onion, at, getattr(form, at).data.read()) onion.updated = datetime.utcnow() try: db.session.commit() flash("Saved changes to group.", "success") except exc.SQLAlchemyError: flash("An error occurred saving the changes to the group.", "danger") return render_template("onion.html.j2", section="onion", onion=onion, form=form) @bp.route("/list") def onion_list() -> ResponseReturnValue: onions = Onion.query.order_by(Onion.domain_name).all() return render_template("list.html.j2", section="onion", title="Onion Services", item="onion service", new_link=url_for("portal.onion.onion_new"), items=onions) @bp.route("/destroy/", methods=['GET', 'POST']) def onion_destroy(onion_id: int) -> ResponseReturnValue: onion = Onion.query.filter(Onion.id == onion_id, Onion.destroyed.is_(None)).first() if onion is None: return response_404("The requested onion service could not be found.") return view_lifecycle( header=f"Destroy onion service {onion.onion_name}", message=onion.description, success_message="Successfully removed onion service.", success_view="portal.onion.onion_list", section="onion", resource=onion, action="destroy" )