majuna/app/portal/storage.py

70 lines
2.6 KiB
Python
Raw Normal View History

from datetime import datetime
from typing import Optional
from flask import render_template, flash, Response, Blueprint, current_app
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import SubmitField, BooleanField
from app.extensions import db
from app.models.automation import Automation, AutomationLogs
from app.models.tfstate import TerraformState
from app.portal.util import view_lifecycle, response_404
bp = Blueprint("storage", __name__)
_SECTION_TEMPLATE_VARS = {
"section": "automation",
"help_url": "https://bypass.censorship.guide/user/automation.html"
}
class EditStorageForm(FlaskForm): # type: ignore
force_unlock = BooleanField('Force Unlock')
submit = SubmitField('Save Changes')
@bp.route('/edit/<storage_key>', methods=['GET', 'POST'])
def storage_edit(storage_key: str) -> ResponseReturnValue:
storage: Optional[TerraformState] = TerraformState.query.filter(TerraformState.key == storage_key).first()
if storage is None:
return Response(render_template("error.html.j2",
header="404 Storage Key Not Found",
message="The requested storage could not be found.",
**_SECTION_TEMPLATE_VARS),
status=404)
form = EditStorageForm()
if form.validate_on_submit():
if form.force_unlock.data:
storage.lock = None
storage.updated = datetime.utcnow()
try:
db.session.commit()
flash("Storage has been force unlocked.", "success")
except exc.SQLAlchemyError:
flash("An error occurred unlocking the storage.", "danger")
return render_template("storage.html.j2",
storage=storage,
form=form,
**_SECTION_TEMPLATE_VARS)
@bp.route("/kick/<automation_id>", methods=['GET', 'POST'])
def automation_kick(automation_id: int) -> ResponseReturnValue:
automation = Automation.query.filter(
Automation.id == automation_id,
Automation.destroyed.is_(None)).first()
if automation is None:
return response_404("The requested bridge configuration could not be found.")
return view_lifecycle(
header="Kick automation timer?",
message=automation.description,
section="automation",
success_view="portal.automation.automation_list",
success_message="This automation job will next run within 1 minute.",
resource=automation,
action="kick"
)