feat: geo risk scores

This commit is contained in:
Iain Learmonth 2023-10-29 15:45:10 +00:00
parent 315dae7f06
commit 0e0d499428
17 changed files with 558 additions and 54 deletions

View file

@ -3,17 +3,18 @@ from datetime import datetime
from typing import Optional, List
import requests
import sqlalchemy
from flask import flash, redirect, url_for, render_template, Response, Blueprint
from flask.typing import ResponseReturnValue
from flask_wtf import FlaskForm
from sqlalchemy import exc
from wtforms import StringField, SelectField, SubmitField, BooleanField
from wtforms import StringField, SelectField, SubmitField, BooleanField, IntegerField
from wtforms.validators import DataRequired
from app.extensions import db
from app.models.base import Group
from app.models.mirrors import Origin
from app.portal.util import response_404, view_lifecycle
from app.models.mirrors import Origin, Country
from app.portal.util import response_404, view_lifecycle, LifecycleForm
bp = Blueprint("origin", __name__)
@ -28,15 +29,22 @@ class NewOriginForm(FlaskForm): # type: ignore
submit = SubmitField('Save Changes')
class EditOriginForm(FlaskForm): # type: ignore
class EditOriginForm(FlaskForm): # type: ignore[misc]
description = StringField('Description', validators=[DataRequired()])
group = SelectField('Group', validators=[DataRequired()])
auto_rotate = BooleanField("Enable auto-rotation?")
smart_proxy = BooleanField("Requires smart proxy?")
asset_domain = BooleanField("Used to host assets for other domains?", default=False)
risk_level_override = BooleanField("Force Risk Level Override?")
risk_level_override_number = IntegerField("Forced Risk Level", description="Number from 0 to 20", default=0)
submit = SubmitField('Save Changes')
class CountrySelectForm(FlaskForm): # type: ignore[misc]
country = SelectField("Country", validators=[DataRequired()])
submit = SubmitField('Save Changes', render_kw={"class": "btn btn-success"})
def final_domain_name(domain_name: str) -> str:
session = requests.Session()
r = session.get(f"https://{domain_name}/", allow_redirects=True, timeout=10)
@ -84,7 +92,9 @@ def origin_edit(origin_id: int) -> ResponseReturnValue:
description=origin.description,
auto_rotate=origin.auto_rotation,
smart_proxy=origin.smart,
asset_domain=origin.assets)
asset_domain=origin.assets,
risk_level_override=origin.risk_level_override is not None,
risk_level_override_number=origin.risk_level_override)
form.group.choices = [(x.id, x.group_name) for x in Group.query.all()]
if form.validate_on_submit():
origin.group_id = form.group.data
@ -92,12 +102,16 @@ def origin_edit(origin_id: int) -> ResponseReturnValue:
origin.auto_rotation = form.auto_rotate.data
origin.smart = form.smart_proxy.data
origin.assets = form.asset_domain.data
if form.risk_level_override.data:
origin.risk_level_override = form.risk_level_override_number.data
else:
origin.risk_level_override = None
origin.updated = datetime.utcnow()
try:
db.session.commit()
flash("Saved changes to group.", "success")
flash(f"Saved changes for origin {origin.domain_name}.", "success")
except exc.SQLAlchemyError:
flash("An error occurred saving the changes to the group.", "danger")
flash("An error occurred saving the changes to the origin.", "danger")
return render_template("origin.html.j2",
section="origin",
origin=origin, form=form)
@ -144,3 +158,74 @@ def origin_destroy(origin_id: int) -> ResponseReturnValue:
resource=origin,
action="destroy"
)
@bp.route('/country_remove/<origin_id>/<country_id>', methods=['GET', 'POST'])
def origin_country_remove(origin_id: int, country_id: int) -> ResponseReturnValue:
origin = Origin.query.filter(Origin.id == origin_id).first()
if origin is None:
return Response(render_template("error.html.j2",
section="origin",
header="404 Pool Not Found",
message="The requested origin could not be found."),
status=404)
country = Country.query.filter(Country.id == country_id).first()
if country is None:
return Response(render_template("error.html.j2",
section="origin",
header="404 Country Not Found",
message="The requested country could not be found."),
status=404)
if country not in origin.countries:
return Response(render_template("error.html.j2",
section="origin",
header="404 Country Not In Pool",
message="The requested country could not be found in the specified origin."),
status=404)
form = LifecycleForm()
if form.validate_on_submit():
origin.countries.remove(country)
try:
db.session.commit()
flash("Saved changes to origin.", "success")
return redirect(url_for("portal.origin.origin_edit", origin_id=origin.id))
except sqlalchemy.exc.SQLAlchemyError:
flash("An error occurred saving the changes to the origin.", "danger")
return render_template("lifecycle.html.j2",
header=f"Remove {country.country_name} from the {origin.domain_name} origin?",
message="Stop monitoring in this country.",
section="origin",
origin=origin, form=form)
@bp.route('/country_add/<origin_id>', methods=['GET', 'POST'])
def origin_country_add(origin_id: int) -> ResponseReturnValue:
origin = Origin.query.filter(Origin.id == origin_id).first()
if origin is None:
return Response(render_template("error.html.j2",
section="origin",
header="404 Origin Not Found",
message="The requested origin could not be found."),
status=404)
form = CountrySelectForm()
form.country.choices = [(x.id, f"{x.country_code} - {x.description}") for x in Country.query.all()]
if form.validate_on_submit():
country = Country.query.filter(Country.id == form.country.data).first()
if country is None:
return Response(render_template("error.html.j2",
section="origin",
header="404 Country Not Found",
message="The requested country could not be found."),
status=404)
origin.countries.append(country)
try:
db.session.commit()
flash("Saved changes to origin.", "success")
return redirect(url_for("portal.origin.origin_edit", origin_id=origin.id))
except sqlalchemy.exc.SQLAlchemyError:
flash("An error occurred saving the changes to the origin.", "danger")
return render_template("lifecycle.html.j2",
header=f"Add a country to {origin.domain_name}",
message="Enable monitoring from this country:",
section="origin",
origin=origin, form=form)