from datetime import datetime from typing import Optional from flask import flash, redirect, url_for, render_template, Response, Blueprint from flask.typing import ResponseReturnValue from flask_wtf import FlaskForm from flask_wtf.file import FileAllowed, FileRequired from sqlalchemy import exc from wtforms import StringField, SelectField, SubmitField, FileField from wtforms.validators import DataRequired from app.extensions import db from app.models.base import Group from app.models.onions import Onion from app.portal.util import response_404, view_lifecycle bp = Blueprint("onion", __name__) class NewOnionForm(FlaskForm): # type: ignore domain_name = StringField('Domain Name', validators=[DataRequired()]) # onion_name = StringField('Onion Name', validators=[DataRequired(), Length(min=56, max=56)], # description="Onion service hostname, excluding the .onion suffix") description = StringField('Description', validators=[DataRequired()]) onion_private_key = FileField('Onion Private Key', validators=[FileRequired()]) onion_public_key = FileField('Onion Public Key', description="The onion hostname will be automatically calculated from the public key.", validators=[FileRequired()]) tls_private_key = FileField('TLS Private Key (PEM format)', description=("If no TLS key and certificate are provided, a self-signed certificate " "will be generated.")) tls_public_key = FileField('TLS Certificate (PEM format)') group = SelectField('Group', validators=[DataRequired()]) submit = SubmitField('Save Changes') class EditOnionForm(FlaskForm): # type: ignore description = StringField('Description', validators=[DataRequired()]) group = SelectField('Group', validators=[DataRequired()]) onion_private_key = FileField('Onion Private Key') onion_public_key = FileField('Onion Public Key', description="The onion hostname will be automatically calculated from the public key.") tls_private_key = FileField('TLS Private Key (PEM format)', description="If no file is submitted, the TLS key will remain unchanged.") tls_public_key = FileField('TLS Certificate (PEM format)', description="If no file is submitted, the TLS certificate will remain unchanged.") submit = SubmitField('Save Changes') @bp.route("/new", methods=['GET', 'POST']) @bp.route("/new/", methods=['GET', 'POST']) def onion_new(group_id: Optional[int] = None) -> ResponseReturnValue: form = NewOnionForm() form.group.choices = [(x.id, x.group_name) for x in Group.query.all()] if form.validate_on_submit(): onion = Onion() onion.group_id = form.group.data onion.domain_name = form.domain_name.data # onion.onion_name = form.onion_name.data for at in [ "onion_private_key", "onion_public_key", "tls_private_key", "tls_public_key" ]: print(f"testing {at}") if form.__getattribute__(at).data is not None: print(f"Setting {at}") onion.__setattr__(at, form.__getattribute__(at).data.read()) onion.description = form.description.data onion.created = datetime.utcnow() onion.updated = datetime.utcnow() try: db.session.add(onion) db.session.commit() flash(f"Created new onion {onion.onion_name}.", "success") return redirect(url_for("portal.onion.onion_edit", onion_id=onion.id)) except exc.SQLAlchemyError: flash("Failed to create new onion.", "danger") return redirect(url_for("portal.onion.onion_list")) if group_id: form.group.data = group_id return render_template("new.html.j2", section="onion", form=form) @bp.route('/edit/', methods=['GET', 'POST']) def onion_edit(onion_id: int) -> ResponseReturnValue: onion: Optional[Onion] = Onion.query.filter(Onion.id == onion_id).first() if onion is None: return Response(render_template("error.html.j2", section="onion", header="404 Onion Not Found", message="The requested onion service could not be found."), status=404) form = EditOnionForm(group=onion.group_id, description=onion.description) form.group.choices = [(x.id, x.group_name) for x in Group.query.all()] if form.validate_on_submit(): onion.group_id = form.group.data onion.description = form.description.data for at in [ "onion_private_key", "onion_public_key", "tls_private_key", "tls_public_key" ]: print(f"testing {at}") if form.__getattribute__(at).data is not None: print(f"Setting {at}") onion.__setattr__(at, form.__getattribute__(at).data.read()) onion.updated = datetime.utcnow() try: db.session.commit() flash("Saved changes to group.", "success") except exc.SQLAlchemyError: flash("An error occurred saving the changes to the group.", "danger") return render_template("onion.html.j2", section="onion", onion=onion, form=form) @bp.route("/list") def onion_list() -> ResponseReturnValue: onions = Onion.query.order_by(Onion.domain_name).all() return render_template("list.html.j2", section="onion", title="Onion Services", item="onion service", new_link=url_for("portal.onion.onion_new"), items=onions) @bp.route("/destroy/", methods=['GET', 'POST']) def onion_destroy(onion_id: int) -> ResponseReturnValue: onion = Onion.query.filter(Onion.id == onion_id, Onion.destroyed.is_(None)).first() if onion is None: return response_404("The requested onion service could not be found.") return view_lifecycle( header=f"Destroy onion service {onion.onion_name}", message=onion.description, success_message="You will need to manually remove this from the EOTK configuration.", success_view="portal.onion.onion_list", section="onion", resource=onion, action="destroy" )