from datetime import datetime from typing import Optional from flask import Blueprint, flash, Response, render_template, redirect, url_for from flask.typing import ResponseReturnValue from flask_wtf import FlaskForm from sqlalchemy import exc from wtforms import StringField, SelectField, SubmitField from wtforms.validators import DataRequired from app.extensions import db from app.models.activity import Webhook from app.portal.util import response_404, view_lifecycle bp = Blueprint("webhook", __name__) @bp.app_template_filter("webhook_format_name") def webhook_format_name(s: str) -> str: if s == "telegram": return "Telegram" if s == "matrix": return "Matrix" return "Unknown" class NewWebhookForm(FlaskForm): # type: ignore description = StringField('Description', validators=[DataRequired()]) format = SelectField('Format', choices=[ ("telegram", "Telegram"), ("matrix", "Matrix") ], validators=[DataRequired()]) url = StringField('URL', validators=[DataRequired()]) submit = SubmitField('Save Changes') @bp.route("/new", methods=['GET', 'POST']) def webhook_new() -> ResponseReturnValue: form = NewWebhookForm() if form.validate_on_submit(): webhook = Webhook( description=form.description.data, format=form.format.data, url=form.url.data ) try: db.session.add(webhook) db.session.commit() flash(f"Created new webhook {webhook.url}.", "success") return redirect(url_for("portal.webhook.webhook_edit", webhook_id=webhook.id)) except exc.SQLAlchemyError as e: flash("Failed to create new webhook.", "danger") return redirect(url_for("portal.webhook.webhook_list")) return render_template("new.html.j2", section="webhook", form=form) @bp.route('/edit/', methods=['GET', 'POST']) def webhook_edit(webhook_id: int) -> ResponseReturnValue: webhook = Webhook.query.filter(Webhook.id == webhook_id).first() if webhook is None: return Response(render_template("error.html.j2", section="webhook", header="404 Webhook Not Found", message="The requested webhook could not be found."), status=404) form = NewWebhookForm(description=webhook.description, format=webhook.format, url=webhook.url) if form.validate_on_submit(): webhook.description = form.description.data webhook.format = form.description.data webhook.url = form.description.data webhook.updated = datetime.utcnow() try: db.session.commit() flash("Saved changes to webhook.", "success") except exc.SQLAlchemyError: flash("An error occurred saving the changes to the webhook.", "danger") return render_template("edit.html.j2", section="webhook", title="Edit Webhook", item=webhook, form=form) @bp.route("/list") def webhook_list() -> ResponseReturnValue: webhooks = Webhook.query.all() return render_template("list.html.j2", section="webhook", title="Webhooks", item="webhook", new_link=url_for("portal.webhook.webhook_new"), items=webhooks) @bp.route("/destroy/", methods=['GET', 'POST']) def webhook_destroy(webhook_id: int) -> ResponseReturnValue: webhook: Optional[Webhook] = Webhook.query.filter(Webhook.id == webhook_id, Webhook.destroyed == None).first() if webhook is None: return response_404("The requested webhook could not be found.") return view_lifecycle( header=f"Destroy webhook {webhook.url}", message=webhook.description, success_message="Webhook destroyed.", success_view="portal.webhook.webhook_list", section="webhook", resource=webhook, action="destroy" )