Add gitlab webhook support
This commit is contained in:
parent
9d41d56e0c
commit
a1ae717c8f
26 changed files with 1824 additions and 8 deletions
70
ops_bot/gitlab/hook.py
Normal file
70
ops_bot/gitlab/hook.py
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
import re
|
||||
import attr
|
||||
import logging
|
||||
from typing import Any, Tuple
|
||||
from jinja2 import TemplateNotFound
|
||||
|
||||
from mautrix.types import (EventType, RoomID, StateEvent, Membership, MessageType, JSON,
|
||||
TextMessageEventContent, Format, ReactionEventContent, RelationType)
|
||||
from mautrix.util.formatter import parse_html
|
||||
|
||||
from ..util.template import TemplateManager, TemplateUtil
|
||||
|
||||
from .types import EventParse, OTHER_ENUMS, Action
|
||||
|
||||
from ..common import COLOR_ALARM, COLOR_OK, COLOR_UNKNOWN
|
||||
|
||||
spaces = re.compile(" +")
|
||||
space = " "
|
||||
|
||||
|
||||
messages = TemplateManager("gitlab", "messages")
|
||||
templates = TemplateManager("gitlab", "mixins")
|
||||
|
||||
async def parse_event(x_gitlab_event: str, payload: Any) -> Tuple[str, str]:
|
||||
evt = EventParse[x_gitlab_event].deserialize(payload)
|
||||
print("processing", evt)
|
||||
try:
|
||||
tpl = messages[evt.template_name]
|
||||
except TemplateNotFound as e:
|
||||
msg = f"Received unhandled gitlab event type {x_gitlab_event}"
|
||||
logging.info(msg)
|
||||
logging.info(payload)
|
||||
return [(msg, msg)]
|
||||
|
||||
aborted = False
|
||||
|
||||
def abort() -> None:
|
||||
nonlocal aborted
|
||||
aborted = True
|
||||
base_args = {
|
||||
**{field.key: field for field in Action if field.key.isupper()},
|
||||
**OTHER_ENUMS,
|
||||
"util": TemplateUtil,
|
||||
}
|
||||
|
||||
msgs = []
|
||||
for subevt in evt.preprocess():
|
||||
print("preprocessing", subevt)
|
||||
args = {
|
||||
**attr.asdict(subevt, recurse=False),
|
||||
**{key: getattr(subevt, key) for key in subevt.event_properties},
|
||||
"abort": abort,
|
||||
**base_args,
|
||||
}
|
||||
args["templates"] = templates.proxy(args)
|
||||
|
||||
html = tpl.render(**args)
|
||||
if not html or aborted:
|
||||
aborted = False
|
||||
continue
|
||||
html = spaces.sub(space, html.strip())
|
||||
|
||||
content = TextMessageEventContent(msgtype=MessageType.TEXT, format=Format.HTML,
|
||||
formatted_body=html, body=await parse_html(html))
|
||||
content["xyz.maubot.gitlab.webhook"] = {
|
||||
"event_type": x_gitlab_event,
|
||||
**subevt.meta,
|
||||
}
|
||||
msgs.append((content.body, content.formatted_body))
|
||||
return msgs
|
||||
930
ops_bot/gitlab/types.py
Normal file
930
ops_bot/gitlab/types.py
Normal file
|
|
@ -0,0 +1,930 @@
|
|||
# gitlab - A GitLab client and webhook receiver for maubot
|
||||
# Copyright (C) 2019 Lorenz Steinert
|
||||
# Copyright (C) 2021 Tulir Asokan
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import List, Union, Dict, Optional, Type, NewType, ClassVar, Tuple, Iterable
|
||||
from datetime import datetime
|
||||
|
||||
from jinja2 import TemplateNotFound
|
||||
from attr import dataclass
|
||||
from yarl import URL
|
||||
import attr
|
||||
|
||||
from mautrix.types import JSON, ExtensibleEnum, SerializableAttrs, serializer, deserializer
|
||||
|
||||
from ..util.contrast import contrast, hex_to_rgb
|
||||
|
||||
|
||||
@serializer(datetime)
|
||||
def datetime_serializer(dt: datetime) -> JSON:
|
||||
return dt.strftime('%Y-%m-%dT%H:%M:%S%z')
|
||||
|
||||
|
||||
@deserializer(datetime)
|
||||
def datetime_deserializer(data: JSON) -> datetime:
|
||||
try:
|
||||
return datetime.strptime(data, "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return datetime.strptime(data, "%Y-%m-%dT%H:%M:%S%z")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return datetime.strptime(data, "%Y-%m-%d %H:%M:%S %z")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return datetime.strptime(data, "%Y-%m-%dT%H:%M:%SZ")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return datetime.strptime(data, "%Y-%m-%d %H:%M:%S %Z")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return datetime.strptime(data, "%Y-%m-%d")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
raise ValueError(data)
|
||||
|
||||
|
||||
class LabelType(ExtensibleEnum):
|
||||
PROJECT = "ProjectLabel"
|
||||
# TODO group?
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GitlabLabel(SerializableAttrs):
|
||||
contrast_threshold: ClassVar[float] = 2.5
|
||||
white_rgb: ClassVar[Tuple[int, int, int]] = (1, 1, 1)
|
||||
white_hex: ClassVar[str] = "#ffffff"
|
||||
black_hex: ClassVar[str] = "#000000"
|
||||
|
||||
id: int
|
||||
title: str
|
||||
color: str
|
||||
project_id: int
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
template: bool
|
||||
description: str
|
||||
type: LabelType
|
||||
group_id: Optional[int]
|
||||
remove_on_close: bool = False
|
||||
|
||||
@property
|
||||
def foreground_color(self) -> str:
|
||||
return (self.white_hex
|
||||
if contrast(hex_to_rgb(self.color), self.white_rgb) >= self.contrast_threshold
|
||||
else self.black_hex)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabProject(SerializableAttrs):
|
||||
id: Optional[int] = None
|
||||
name: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
web_url: Optional[str] = None
|
||||
avatar_url: Optional[str] = None
|
||||
git_ssh_url: Optional[str] = None
|
||||
git_http_url: Optional[str] = None
|
||||
namespace: Optional[str] = None
|
||||
visibility_level: Optional[int] = None
|
||||
path_with_namespace: Optional[str] = None
|
||||
default_branch: Optional[str] = None
|
||||
homepage: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
ssh_url: Optional[str] = None
|
||||
http_url: Optional[str] = None
|
||||
|
||||
@property
|
||||
def gitlab_base_url(self) -> str:
|
||||
return self.web_url.split(self.path_with_namespace)[0].rstrip("/")
|
||||
|
||||
|
||||
@dataclass(eq=False, hash=False)
|
||||
class GitlabUser(SerializableAttrs):
|
||||
name: str
|
||||
username: Optional[str] = None
|
||||
avatar_url: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
id: Optional[int] = None
|
||||
web_url: Optional[str] = None
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return self.id
|
||||
|
||||
def __eq__(self, other: 'GitlabUser') -> bool:
|
||||
if not isinstance(other, GitlabUser):
|
||||
return False
|
||||
return self.id == other.id
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabAuthor(SerializableAttrs):
|
||||
name: str
|
||||
email: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseCommit:
|
||||
message: str
|
||||
|
||||
@property
|
||||
def cut_message(self) -> str:
|
||||
max_len = 72
|
||||
message = self.message.strip()
|
||||
if "\n" in message:
|
||||
message = message.split("\n")[0]
|
||||
if len(message) <= max_len:
|
||||
message += " […]"
|
||||
return message
|
||||
if len(message) > max_len:
|
||||
message = message[:max_len] + "…"
|
||||
return message
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabCommit(BaseCommit, SerializableAttrs):
|
||||
id: str
|
||||
timestamp: Optional[datetime] = None
|
||||
url: Optional[str] = None
|
||||
author: Optional[GitlabAuthor] = None
|
||||
|
||||
added: Optional[List[str]] = None
|
||||
modified: Optional[List[str]] = None
|
||||
removed: Optional[List[str]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabRepository(SerializableAttrs):
|
||||
name: str
|
||||
url: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
homepage: Optional[str] = None
|
||||
git_http_url: Optional[str] = None
|
||||
git_ssh_url: Optional[str] = None
|
||||
visibility_level: Optional[int] = None
|
||||
|
||||
@property
|
||||
def path(self) -> str:
|
||||
return URL(self.homepage).path.strip("/")
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabStDiff(SerializableAttrs):
|
||||
diff: str
|
||||
new_path: str
|
||||
old_path: str
|
||||
a_mode: str
|
||||
b_mode: str
|
||||
new_file: bool
|
||||
renamed_file: bool
|
||||
deleted_file: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabSource(SerializableAttrs):
|
||||
name: str
|
||||
namespace: str
|
||||
|
||||
description: Optional[str] = None
|
||||
web_url: Optional[str] = None
|
||||
avatar_url: Optional[str] = None
|
||||
git_ssh_url: Optional[str] = None
|
||||
git_http_url: Optional[str] = None
|
||||
visibility_level: Optional[int] = None
|
||||
path_with_namespace: Optional[str] = None
|
||||
default_branch: Optional[str] = None
|
||||
homepage: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
ssh_url: Optional[str] = None
|
||||
http_url: Optional[str] = None
|
||||
|
||||
|
||||
GitlabTarget = NewType('GitlabTarget', GitlabSource)
|
||||
|
||||
|
||||
class GitlabChangeWrapper:
|
||||
previous: list
|
||||
current: list
|
||||
|
||||
@property
|
||||
def added(self) -> list:
|
||||
previous_set = set(self.previous)
|
||||
return [item for item in self.current if item not in previous_set]
|
||||
|
||||
@property
|
||||
def removed(self) -> list:
|
||||
current_set = set(self.current)
|
||||
return [item for item in self.previous if item not in current_set]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabDatetimeChange(SerializableAttrs):
|
||||
previous: Optional[datetime]
|
||||
current: datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabAssigneeChanges(GitlabChangeWrapper, SerializableAttrs):
|
||||
previous: List[GitlabUser]
|
||||
current: List[GitlabUser]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabLabelChanges(GitlabChangeWrapper, SerializableAttrs):
|
||||
previous: List[GitlabLabel]
|
||||
current: List[GitlabLabel]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabIntChange(SerializableAttrs):
|
||||
previous: Optional[int]
|
||||
current: Optional[int]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabBoolChange(SerializableAttrs):
|
||||
previous: Optional[bool]
|
||||
current: Optional[bool]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabStringChange(SerializableAttrs):
|
||||
previous: Optional[str]
|
||||
current: Optional[str]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabChanges(SerializableAttrs):
|
||||
created_at: Optional[GitlabDatetimeChange] = None
|
||||
updated_at: Optional[GitlabDatetimeChange] = None
|
||||
updated_by: Optional[GitlabIntChange] = None
|
||||
author_id: Optional[GitlabIntChange] = None
|
||||
id: Optional[GitlabIntChange] = None
|
||||
iid: Optional[GitlabIntChange] = None
|
||||
project_id: Optional[GitlabIntChange] = None
|
||||
milestone_id: Optional[GitlabIntChange] = None
|
||||
description: Optional[GitlabStringChange] = None
|
||||
title: Optional[GitlabStringChange] = None
|
||||
labels: Optional[GitlabLabelChanges] = None
|
||||
assignees: Optional[GitlabAssigneeChanges] = None
|
||||
time_estimate: Optional[GitlabIntChange] = None
|
||||
total_time_spent: Optional[GitlabIntChange] = None
|
||||
weight: Optional[GitlabIntChange] = None
|
||||
due_date: Optional[GitlabDatetimeChange] = None
|
||||
confidential: Optional[GitlabBoolChange] = None
|
||||
discussion_locked: Optional[GitlabBoolChange] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabIssue(SerializableAttrs):
|
||||
id: int
|
||||
issue_id: int = attr.ib(metadata={"json": "iid"})
|
||||
title: str
|
||||
description: str
|
||||
author_id: int
|
||||
project_id: int
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
assignee_id: Optional[int] = None
|
||||
assignee_ids: Optional[List[int]] = None
|
||||
relative_position: Optional[int] = None
|
||||
branch_name: Optional[str] = None
|
||||
milestone_id: Optional[int] = None
|
||||
state: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabSnippet(SerializableAttrs):
|
||||
id: int
|
||||
title: str
|
||||
content: str
|
||||
author_id: int
|
||||
project_id: int
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
file_name: str
|
||||
expires_at: datetime
|
||||
type: str
|
||||
visibility_level: int
|
||||
|
||||
|
||||
class Action(ExtensibleEnum):
|
||||
OPEN = "open"
|
||||
REOPEN = "reopen"
|
||||
CLOSE = "close"
|
||||
UPDATE = "update"
|
||||
CREATE = "create"
|
||||
DELETE = "delete"
|
||||
|
||||
@property
|
||||
def past_tense(self) -> str:
|
||||
action = self.value
|
||||
if not action:
|
||||
return action
|
||||
elif action[-2:] != "ed":
|
||||
if action[-1] == "e":
|
||||
return f"{action}d"
|
||||
return f"{action}ed"
|
||||
return action
|
||||
|
||||
|
||||
class NoteableType(ExtensibleEnum):
|
||||
ISSUE = "Issue"
|
||||
MERGE_REQUEST = "MergeRequest"
|
||||
|
||||
|
||||
class CommentType(ExtensibleEnum):
|
||||
DISCUSSION_NOTE = "DiscussionNote"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabIssueAttributes(SerializableAttrs):
|
||||
id: int
|
||||
project_id: int
|
||||
issue_id: int = attr.ib(metadata={"json": "iid"})
|
||||
state: str
|
||||
url: str
|
||||
author_id: int
|
||||
title: str
|
||||
description: str
|
||||
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
updated_by_id: Optional[int] = None
|
||||
due_date: Optional[datetime] = None
|
||||
closed_at: Optional[datetime] = None
|
||||
|
||||
time_estimate: Optional[int] = None
|
||||
total_time_spent: Optional[int] = None
|
||||
human_time_estimate: Optional[str] = None
|
||||
human_total_time_spent: Optional[str] = None
|
||||
|
||||
action: Optional[Action] = None
|
||||
assignee_id: Optional[int] = None
|
||||
assignee_ids: Optional[List[int]] = None
|
||||
branch_name: Optional[str] = None
|
||||
confidential: bool = False
|
||||
duplicated_to_id: Optional[int] = None
|
||||
moved_to_id: Optional[int] = None
|
||||
state_id: Optional[int] = None
|
||||
milestone_id: Optional[int] = None
|
||||
labels: Optional[List[GitlabLabel]] = None
|
||||
position: Optional[int] = None
|
||||
original_position: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabCommentAttributes(SerializableAttrs):
|
||||
id: int
|
||||
note: str
|
||||
noteable_type: NoteableType
|
||||
project_id: int
|
||||
url: str
|
||||
author_id: int
|
||||
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
updated_by_id: Optional[int] = None
|
||||
resolved_at: Optional[datetime] = None
|
||||
resolved_by_id: Optional[int] = None
|
||||
|
||||
commit_id: Optional[str] = None
|
||||
noteable_id: Optional[int] = None
|
||||
discussion_id: Optional[str] = None
|
||||
type: Optional[CommentType] = None
|
||||
|
||||
system: Optional[bool] = None
|
||||
line_code: Optional[str] = None
|
||||
st_diff: Optional[GitlabStDiff] = None
|
||||
attachment: Optional[str] = None
|
||||
position: Optional[int] = None
|
||||
original_position: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabMergeRequestAttributes(SerializableAttrs):
|
||||
id: int
|
||||
merge_request_id: int = attr.ib(metadata={"json": "iid"})
|
||||
target_branch: str
|
||||
source_branch: str
|
||||
source: GitlabProject
|
||||
source_project_id: int
|
||||
target: GitlabProject
|
||||
target_project_id: int
|
||||
last_commit: GitlabCommit
|
||||
author_id: int
|
||||
title: str
|
||||
description: str
|
||||
work_in_progress: bool
|
||||
url: str
|
||||
state: str
|
||||
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
updated_by_id: Optional[int] = None
|
||||
last_edited_at: Optional[datetime] = None
|
||||
last_edited_by_id: Optional[int] = None
|
||||
|
||||
merge_commit_sha: Optional[str] = None
|
||||
merge_error: Optional[str] = None
|
||||
merge_status: Optional[str] = None
|
||||
merge_user_id: Optional[int] = None
|
||||
merge_when_pipeline_succeeds: Optional[bool] = False
|
||||
|
||||
time_estimate: Optional[int] = None
|
||||
total_time_spent: Optional[int] = None
|
||||
human_time_estimate: Optional[str] = None
|
||||
human_total_time_spent: Optional[str] = None
|
||||
|
||||
head_pipeline_id: Optional[int] = None
|
||||
milestone_id: Optional[int] = None
|
||||
assignee_id: Optional[int] = None
|
||||
assignee_ids: Optional[List[int]] = None
|
||||
assignee: Optional[GitlabUser] = None
|
||||
action: Optional[Action] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabWikiPageAttributes(SerializableAttrs):
|
||||
title: str
|
||||
content: str
|
||||
format: str
|
||||
slug: str
|
||||
url: str
|
||||
action: Action
|
||||
message: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabVariable(SerializableAttrs):
|
||||
key: str
|
||||
value: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabPipelineAttributes(SerializableAttrs):
|
||||
id: int
|
||||
ref: str
|
||||
tag: bool
|
||||
sha: str
|
||||
before_sha: str
|
||||
source: str
|
||||
status: str
|
||||
stages: List[str]
|
||||
|
||||
created_at: datetime
|
||||
finished_at: datetime
|
||||
duration: int
|
||||
variables: List[GitlabVariable]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabArtifact(SerializableAttrs):
|
||||
filename: str
|
||||
size: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabWiki(SerializableAttrs):
|
||||
web_url: str
|
||||
git_ssh_url: str
|
||||
git_http_url: str
|
||||
path_with_namespace: str
|
||||
default_branch: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabMergeRequest(SerializableAttrs):
|
||||
id: int
|
||||
merge_request_id: int = attr.ib(metadata={"json": "iid"})
|
||||
target_branch: str
|
||||
source_branch: str
|
||||
source_project_id: int
|
||||
assignee_id: int
|
||||
author_id: int
|
||||
title: str
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
milestone_id: int
|
||||
state: str
|
||||
merge_status: str
|
||||
target_project_id: int
|
||||
description: str
|
||||
source: GitlabSource
|
||||
target: GitlabTarget
|
||||
last_commit: GitlabCommit
|
||||
work_in_progress: bool
|
||||
position: Optional[int] = None
|
||||
locked_at: Optional[datetime] = None
|
||||
assignee: Optional[GitlabUser] = None
|
||||
|
||||
|
||||
class BuildStatus(ExtensibleEnum):
|
||||
CREATED = "created"
|
||||
PENDING = "pending"
|
||||
RUNNING = "running"
|
||||
SUCCESS = "success"
|
||||
FAILED = "failed"
|
||||
CANCELED = "canceled"
|
||||
|
||||
@property
|
||||
def color_circle(self) -> str:
|
||||
return _build_status_circles[self]
|
||||
|
||||
|
||||
_build_status_circles: Dict[BuildStatus, str] = {
|
||||
BuildStatus.CREATED: "🟡",
|
||||
BuildStatus.RUNNING: "🔵",
|
||||
BuildStatus.SUCCESS: "🟢",
|
||||
BuildStatus.FAILED: "🔴",
|
||||
BuildStatus.CANCELED: "⚫️",
|
||||
}
|
||||
|
||||
|
||||
class FailureReason(ExtensibleEnum):
|
||||
UNKNOWN = "unknown_failure"
|
||||
SCRIPT = "script_failure"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabJobCommit(BaseCommit, SerializableAttrs):
|
||||
author_email: str
|
||||
author_name: str
|
||||
author_url: Optional[str]
|
||||
|
||||
id: int
|
||||
sha: str
|
||||
status: BuildStatus
|
||||
started_at: Optional[datetime]
|
||||
finished_at: Optional[datetime]
|
||||
duration: Optional[int]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabBuild(SerializableAttrs):
|
||||
id: int
|
||||
stage: str
|
||||
name: str
|
||||
status: BuildStatus
|
||||
created_at: datetime
|
||||
started_at: datetime
|
||||
finished_at: datetime
|
||||
when: str
|
||||
manual: bool
|
||||
user: GitlabUser
|
||||
runner: str
|
||||
artifacts_file: GitlabArtifact
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabEvent:
|
||||
def preprocess(self) -> List['GitlabEvent']:
|
||||
return [self]
|
||||
|
||||
@property
|
||||
def template_name(self) -> str:
|
||||
raise TemplateNotFound("")
|
||||
|
||||
@property
|
||||
def meta(self) -> JSON:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def event_properties(self) -> Iterable[str]:
|
||||
return []
|
||||
|
||||
@property
|
||||
def message_id(self) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabPushEvent(SerializableAttrs, GitlabEvent):
|
||||
object_kind: str
|
||||
before: str
|
||||
after: str
|
||||
ref: str
|
||||
checkout_sha: str
|
||||
message: Optional[str]
|
||||
user_id: int
|
||||
user_name: str
|
||||
user_username: str
|
||||
user_email: str
|
||||
user_avatar: str
|
||||
project_id: int
|
||||
project: GitlabProject
|
||||
repository: GitlabRepository
|
||||
commits: List[GitlabCommit]
|
||||
total_commits_count: int
|
||||
|
||||
@property
|
||||
def user(self) -> GitlabUser:
|
||||
return GitlabUser(id=self.user_id, name=self.user_name, email=self.user_email,
|
||||
username=self.user_username, avatar_url=self.user_avatar,
|
||||
web_url=f"{self.project.gitlab_base_url}/{self.user_username}")
|
||||
|
||||
@property
|
||||
def template_name(self) -> str:
|
||||
return "tag" if self.ref_type == "tag" else "push"
|
||||
|
||||
@property
|
||||
def event_properties(self) -> Iterable[str]:
|
||||
return ("user", "is_new_ref", "is_deleted_ref", "ref_name", "ref_type", "ref_url",
|
||||
"diff_url")
|
||||
|
||||
@property
|
||||
def diff_url(self) -> str:
|
||||
before = self.project.default_branch if self.is_new_ref else self.before
|
||||
after = self.after
|
||||
return f"{self.project.web_url}/-/compare/{before}...{after}"
|
||||
|
||||
@property
|
||||
def is_new_ref(self) -> bool:
|
||||
return self.before == "0" * len(self.before)
|
||||
|
||||
@property
|
||||
def is_deleted_ref(self) -> bool:
|
||||
return self.after == "0" * len(self.after)
|
||||
|
||||
@property
|
||||
def ref_type(self) -> str:
|
||||
if self.ref.startswith("refs/heads/"):
|
||||
return "branch"
|
||||
elif self.ref.startswith("refs/tags/"):
|
||||
return "tag"
|
||||
else:
|
||||
return "ref"
|
||||
|
||||
@property
|
||||
def ref_name(self) -> str:
|
||||
return self.ref.split("/", 2)[2]
|
||||
|
||||
@property
|
||||
def ref_url(self) -> Optional[str]:
|
||||
if self.ref.startswith("refs/heads/"):
|
||||
return f"{self.project.web_url}/-/tree/{self.ref_name}"
|
||||
elif self.ref.startswith("refs/tags/"):
|
||||
return f"{self.project.web_url}/-/tags/{self.ref_name}"
|
||||
else:
|
||||
return None
|
||||
|
||||
@property
|
||||
def message_id(self) -> str:
|
||||
return f"push-{self.project_id}-{self.checkout_sha}-{self.ref_name}"
|
||||
|
||||
|
||||
def split_updates(evt: Union['GitlabIssueEvent', 'GitlabMergeRequestEvent']) -> List[GitlabEvent]:
|
||||
if not evt.changes:
|
||||
return [evt]
|
||||
output = []
|
||||
# We don't want to handle multiple issue change types in a single Matrix message,
|
||||
# so split each change into a separate event.
|
||||
for field in attr.fields(GitlabChanges):
|
||||
value = getattr(evt.changes, field.name)
|
||||
if value:
|
||||
output.append(attr.evolve(evt, changes=GitlabChanges(**{field.name: value})))
|
||||
return output
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabIssueEvent(SerializableAttrs, GitlabEvent):
|
||||
object_kind: str
|
||||
user: GitlabUser
|
||||
project: GitlabProject
|
||||
repository: GitlabRepository
|
||||
object_attributes: GitlabIssueAttributes
|
||||
assignees: Optional[List[GitlabUser]] = None
|
||||
labels: Optional[List[GitlabLabel]] = None
|
||||
changes: Optional[GitlabChanges] = None
|
||||
|
||||
def preprocess(self) -> List['GitlabIssueEvent']:
|
||||
users_to_mutate = [self.user]
|
||||
if self.changes and self.changes.assignees:
|
||||
users_to_mutate += self.changes.assignees.previous
|
||||
users_to_mutate += self.changes.assignees.current
|
||||
if self.assignees:
|
||||
users_to_mutate += self.assignees
|
||||
for user in users_to_mutate:
|
||||
user.web_url = f"{self.project.gitlab_base_url}/{user.username}"
|
||||
|
||||
return split_updates(self) if self.action == Action.UPDATE else [self]
|
||||
|
||||
@property
|
||||
def template_name(self) -> str:
|
||||
return f"issue_{self.action.key.lower()}"
|
||||
|
||||
@property
|
||||
def event_properties(self) -> Iterable[str]:
|
||||
return "action",
|
||||
|
||||
@property
|
||||
def action(self) -> Action:
|
||||
return self.object_attributes.action
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabCommentEvent(SerializableAttrs, GitlabEvent):
|
||||
object_kind: str
|
||||
user: GitlabUser
|
||||
project_id: int
|
||||
project: GitlabProject
|
||||
repository: GitlabRepository
|
||||
object_attributes: GitlabCommentAttributes
|
||||
merge_request: Optional[GitlabMergeRequest] = None
|
||||
commit: Optional[GitlabCommit] = None
|
||||
issue: Optional[GitlabIssue] = None
|
||||
snippet: Optional[GitlabSnippet] = None
|
||||
|
||||
def preprocess(self) -> List['GitlabCommentEvent']:
|
||||
self.user.web_url = f"{self.project.gitlab_base_url}/{self.user.username}"
|
||||
return [self]
|
||||
|
||||
@property
|
||||
def template_name(self) -> str:
|
||||
return "comment"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabMergeRequestEvent(SerializableAttrs, GitlabEvent):
|
||||
object_kind: str
|
||||
user: GitlabUser
|
||||
project: GitlabProject
|
||||
repository: GitlabRepository
|
||||
object_attributes: GitlabMergeRequestAttributes
|
||||
labels: List[GitlabLabel]
|
||||
changes: GitlabChanges
|
||||
|
||||
def preprocess(self) -> List['GitlabMergeRequestEvent']:
|
||||
users_to_mutate = [self.user]
|
||||
if self.changes and self.changes.assignees:
|
||||
users_to_mutate += self.changes.assignees.previous
|
||||
users_to_mutate += self.changes.assignees.current
|
||||
for user in users_to_mutate:
|
||||
user.web_url = f"{self.project.gitlab_base_url}/{user.username}"
|
||||
|
||||
return split_updates(self) if self.action == Action.UPDATE else [self]
|
||||
|
||||
@property
|
||||
def template_name(self) -> str:
|
||||
return "issue_update" if self.action == Action.UPDATE else "merge_request"
|
||||
|
||||
@property
|
||||
def event_properties(self) -> Iterable[str]:
|
||||
return "action",
|
||||
|
||||
@property
|
||||
def action(self) -> Action:
|
||||
return self.object_attributes.action
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabWikiPageEvent(SerializableAttrs, GitlabEvent):
|
||||
object_kind: str
|
||||
user: GitlabUser
|
||||
project: GitlabProject
|
||||
wiki: GitlabWiki
|
||||
object_attributes: GitlabWikiPageAttributes
|
||||
|
||||
def preprocess(self) -> List['GitlabWikiPageEvent']:
|
||||
self.user.web_url = f"{self.project.gitlab_base_url}/{self.user.username}"
|
||||
return [self]
|
||||
|
||||
@property
|
||||
def template_name(self) -> str:
|
||||
return "wiki"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabPipelineEvent(SerializableAttrs, GitlabEvent):
|
||||
object_kind: str
|
||||
object_attributes: GitlabPipelineAttributes
|
||||
user: GitlabUser
|
||||
project: GitlabProject
|
||||
commit: GitlabCommit
|
||||
builds: List[GitlabBuild]
|
||||
|
||||
@property
|
||||
def message_id(self) -> str:
|
||||
return f"pipeline-{self.object_attributes.id}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabRunner(SerializableAttrs):
|
||||
active: bool
|
||||
description: str
|
||||
id: int
|
||||
tags: List[str]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitlabJobEvent(SerializableAttrs, GitlabEvent):
|
||||
object_kind: str
|
||||
ref: str
|
||||
tag: str
|
||||
before_sha: str
|
||||
sha: str
|
||||
pipeline_id: int
|
||||
build_id: int
|
||||
build_name: str
|
||||
build_stage: str
|
||||
build_status: BuildStatus
|
||||
build_started_at: datetime
|
||||
build_finished_at: datetime
|
||||
build_duration: int
|
||||
build_allow_failure: bool
|
||||
build_failure_reason: FailureReason
|
||||
project_id: int
|
||||
project_name: str
|
||||
user: GitlabUser
|
||||
commit: GitlabJobCommit
|
||||
repository: GitlabRepository
|
||||
runner: Optional[GitlabRunner]
|
||||
|
||||
def preprocess(self) -> List['GitlabJobEvent']:
|
||||
base_url = str(URL(self.repository.homepage).with_path(""))
|
||||
self.user.web_url = f"{base_url}/{self.user.username}"
|
||||
return [self]
|
||||
|
||||
@property
|
||||
def template_name(self) -> str:
|
||||
return "job"
|
||||
|
||||
@property
|
||||
def push_id(self) -> str:
|
||||
return f"push-{self.project_id}-{self.sha}-{self.ref}"
|
||||
|
||||
@property
|
||||
def reaction_id(self) -> str:
|
||||
return f"job-{self.project_id}-{self.sha}-{self.ref}-{self.build_name}"
|
||||
|
||||
@property
|
||||
def meta(self) -> JSON:
|
||||
return {
|
||||
"build": {
|
||||
"pipeline_id": self.pipeline_id,
|
||||
"id": self.build_id,
|
||||
"name": self.build_name,
|
||||
"stage": self.build_stage,
|
||||
"status": self.build_status.value,
|
||||
"url": self.build_url,
|
||||
},
|
||||
}
|
||||
|
||||
@property
|
||||
def event_properties(self) -> Iterable[str]:
|
||||
return "build_url",
|
||||
|
||||
@property
|
||||
def build_url(self) -> str:
|
||||
return f"{self.repository.homepage}/-/jobs/{self.build_id}"
|
||||
|
||||
|
||||
GitlabEventType = Union[Type[GitlabPushEvent],
|
||||
Type[GitlabIssueEvent],
|
||||
Type[GitlabCommentEvent],
|
||||
Type[GitlabMergeRequestEvent],
|
||||
Type[GitlabWikiPageEvent],
|
||||
Type[GitlabPipelineEvent],
|
||||
Type[GitlabJobEvent]]
|
||||
|
||||
EventParse: Dict[str, GitlabEventType] = {
|
||||
"Push Hook": GitlabPushEvent,
|
||||
"Tag Push Hook": GitlabPushEvent,
|
||||
"Issue Hook": GitlabIssueEvent,
|
||||
"Confidential Issue Hook": GitlabIssueEvent,
|
||||
"Note Hook": GitlabCommentEvent,
|
||||
"Confidential Note Hook": GitlabCommentEvent,
|
||||
"Merge Request Hook": GitlabMergeRequestEvent,
|
||||
"Wiki Page Hook": GitlabWikiPageEvent,
|
||||
"Pipeline Hook": GitlabPipelineEvent,
|
||||
"Job Hook": GitlabJobEvent
|
||||
}
|
||||
|
||||
OTHER_ENUMS = {
|
||||
"NoteableType": NoteableType,
|
||||
"CommentType": CommentType,
|
||||
"BuildStatus": BuildStatus,
|
||||
"FailureReason": FailureReason,
|
||||
}
|
||||
|
|
@ -1,21 +1,26 @@
|
|||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Literal, Optional, Tuple, cast
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import uvicorn
|
||||
from fastapi import Depends, FastAPI, HTTPException, Request, status
|
||||
from fastapi import Depends, FastAPI, HTTPException, Request, status, Header
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
import pydantic
|
||||
from pydantic import BaseSettings
|
||||
|
||||
from ops_bot import aws, pagerduty
|
||||
from ops_bot.matrix import MatrixClient, MatrixClientSettings
|
||||
from ops_bot.gitlab import hook as gitlab_hook
|
||||
|
||||
load_dotenv()
|
||||
|
||||
class BotSettings(BaseSettings):
|
||||
bearer_token: str
|
||||
routing_keys: Dict[str, str]
|
||||
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
|
||||
matrix: MatrixClientSettings
|
||||
|
||||
class Config:
|
||||
env_prefix = "BOT_"
|
||||
|
|
@ -40,11 +45,14 @@ async def matrix_main(matrix_client: MatrixClient) -> None:
|
|||
|
||||
@app.on_event("startup")
|
||||
async def startup_event() -> None:
|
||||
bot_settings = BotSettings(_env_file=".env", _env_file_encoding="utf-8")
|
||||
# if "config.json" exists read it
|
||||
if Path("config.json").exists():
|
||||
bot_settings = BotSettings.parse_file("config.json")
|
||||
else:
|
||||
bot_settings = BotSettings(_env_file=".env", _env_file_encoding="utf-8")
|
||||
logging.getLogger().setLevel(bot_settings.log_level)
|
||||
matrix_settings = MatrixClientSettings(_env_file=".env", _env_file_encoding="utf-8")
|
||||
matrix_settings.join_rooms = list(bot_settings.routing_keys.values())
|
||||
c = MatrixClient(settings=matrix_settings)
|
||||
bot_settings.matrix.join_rooms = list(bot_settings.routing_keys.values())
|
||||
c = MatrixClient(settings=bot_settings.matrix)
|
||||
app.state.matrix_client = c
|
||||
app.state.bot_settings = bot_settings
|
||||
asyncio.create_task(matrix_main(c))
|
||||
|
|
@ -120,6 +128,29 @@ async def aws_sns_hook(
|
|||
)
|
||||
return {"message": msg_plain, "message_formatted": msg_formatted}
|
||||
|
||||
@app.post("/hook/gitlab/{routing_key}")
|
||||
async def gitlab_webhook(
|
||||
request: Request,
|
||||
x_gitlab_token: str = Header(default=""),
|
||||
x_gitlab_event: str = Header(default=""),
|
||||
matrix_client: MatrixClient = Depends(get_matrix_service)
|
||||
) -> Dict[str, str]:
|
||||
bearer_token = request.app.state.bot_settings.bearer_token
|
||||
if x_gitlab_token != bearer_token:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorrect X-Gitlab-Token"
|
||||
)
|
||||
room_id, payload = await receive_helper(request)
|
||||
messages = await gitlab_hook.parse_event(x_gitlab_event, payload)
|
||||
for msg_plain, msg_formatted in messages:
|
||||
await matrix_client.room_send(
|
||||
room_id,
|
||||
msg_plain,
|
||||
message_formatted=msg_formatted,
|
||||
)
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
def start_dev() -> None:
|
||||
uvicorn.run("ops_bot.main:app", port=1111, host="127.0.0.1", reload=True)
|
||||
|
|
|
|||
62
ops_bot/util/contrast.py
Normal file
62
ops_bot/util/contrast.py
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
# Based on https://github.com/gsnedders/wcag-contrast-ratio
|
||||
# Copyright (c) 2015 Geoffrey Sneddon
|
||||
# Copyright (c) 2019 Tulir Asokan
|
||||
# MIT license
|
||||
from typing import Tuple
|
||||
|
||||
RGB = Tuple[float, float, float]
|
||||
|
||||
|
||||
def hex_to_rgb(color: str) -> RGB:
|
||||
color = color.lstrip("#")
|
||||
if len(color) != 3 and len(color) != 6:
|
||||
raise ValueError("Invalid hex length")
|
||||
step = 1 if len(color) == 3 else 2
|
||||
try:
|
||||
r = int(color[0:step], 16)
|
||||
g = int(color[step:2 * step], 16)
|
||||
b = int(color[2 * step:3 * step], 16)
|
||||
except ValueError as e:
|
||||
raise ValueError("Invalid hex value") from e
|
||||
return r / 255, g / 255, b / 255
|
||||
|
||||
|
||||
def rgb_to_hex(rgb: RGB) -> str:
|
||||
r, g, b = rgb
|
||||
r = int(r * 255)
|
||||
g = int(g * 255)
|
||||
b = int(b * 255)
|
||||
return f"{r:02x}{g:02x}{b:02x}"
|
||||
|
||||
|
||||
def contrast(rgb1: RGB, rgb2: RGB) -> float:
|
||||
for r, g, b in (rgb1, rgb2):
|
||||
if not 0.0 <= r <= 1.0:
|
||||
raise ValueError(f"r {r} is out of valid range (0.0 - 1.0)")
|
||||
if not 0.0 <= g <= 1.0:
|
||||
raise ValueError(f"g {g} is out of valid range (0.0 - 1.0)")
|
||||
if not 0.0 <= b <= 1.0:
|
||||
raise ValueError(f"b {b} is out of valid range (0.0 - 1.0)")
|
||||
|
||||
l1 = _relative_luminance(*rgb1)
|
||||
l2 = _relative_luminance(*rgb2)
|
||||
|
||||
if l1 > l2:
|
||||
return (l1 + 0.05) / (l2 + 0.05)
|
||||
else:
|
||||
return (l2 + 0.05) / (l1 + 0.05)
|
||||
|
||||
|
||||
def _relative_luminance(r: float, g: float, b: float) -> float:
|
||||
r = _linearize(r)
|
||||
g = _linearize(g)
|
||||
b = _linearize(b)
|
||||
|
||||
return 0.2126 * r + 0.7152 * g + 0.0722 * b
|
||||
|
||||
|
||||
def _linearize(v: float) -> float:
|
||||
if v <= 0.03928:
|
||||
return v / 12.92
|
||||
else:
|
||||
return ((v + 0.055) / 1.055) ** 2.4
|
||||
36
ops_bot/util/markdown.py
Normal file
36
ops_bot/util/markdown.py
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
# Copyright (c) 2022 Tulir Asokan
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
import commonmark
|
||||
|
||||
|
||||
class HtmlEscapingRenderer(commonmark.HtmlRenderer):
|
||||
def __init__(self, allow_html: bool = False):
|
||||
super().__init__()
|
||||
self.allow_html = allow_html
|
||||
|
||||
def lit(self, s):
|
||||
if self.allow_html:
|
||||
return super().lit(s)
|
||||
return super().lit(s.replace("<", "<").replace(">", ">"))
|
||||
|
||||
def image(self, node, entering):
|
||||
prev = self.allow_html
|
||||
self.allow_html = True
|
||||
super().image(node, entering)
|
||||
self.allow_html = prev
|
||||
|
||||
|
||||
md_parser = commonmark.Parser()
|
||||
yes_html_renderer = commonmark.HtmlRenderer()
|
||||
no_html_renderer = HtmlEscapingRenderer()
|
||||
|
||||
|
||||
def render(message: str, allow_html: bool = False) -> str:
|
||||
parsed = md_parser.parse(message)
|
||||
if allow_html:
|
||||
return yes_html_renderer.render(parsed)
|
||||
else:
|
||||
return no_html_renderer.render(parsed)
|
||||
132
ops_bot/util/template.py
Normal file
132
ops_bot/util/template.py
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
# gitlab - A GitLab client and webhook receiver for maubot
|
||||
# Copyright (C) 2021 Tulir Asokan
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Dict, Any, Tuple, Callable, Iterable, List, Union
|
||||
import os.path
|
||||
|
||||
from jinja2 import Environment as JinjaEnvironment, Template, BaseLoader, TemplateNotFound, FileSystemLoader
|
||||
|
||||
from ops_bot.util import markdown
|
||||
|
||||
def sync_read_file(path: str) -> str:
|
||||
with open(path) as file:
|
||||
return file.read()
|
||||
|
||||
|
||||
def sync_list_files(directory: str) -> list[str]:
|
||||
return os.listdir(directory)
|
||||
|
||||
class TemplateUtil:
|
||||
@staticmethod
|
||||
def bold_scope(label: str) -> str:
|
||||
try:
|
||||
scope, label = label.rsplit("::", 1)
|
||||
return f"{scope}::<strong>{label}</strong>"
|
||||
except ValueError:
|
||||
return label
|
||||
|
||||
@staticmethod
|
||||
def pluralize(val: int, unit: str) -> str:
|
||||
if val == 1:
|
||||
return f"{val} {unit}"
|
||||
return f"{val} {unit}s"
|
||||
|
||||
@classmethod
|
||||
def format_time(cls, seconds: Union[int, float], enable_days: bool = False) -> str:
|
||||
seconds = abs(seconds)
|
||||
frac_seconds = round(seconds - int(seconds), 1)
|
||||
minutes, seconds = divmod(int(seconds), 60)
|
||||
hours, minutes = divmod(minutes, 60)
|
||||
if enable_days:
|
||||
days, hours = divmod(hours, 24)
|
||||
else:
|
||||
days = 0
|
||||
parts = []
|
||||
if days > 0:
|
||||
parts.append(cls.pluralize(days, "day"))
|
||||
if hours > 0:
|
||||
parts.append(cls.pluralize(hours, "hour"))
|
||||
if minutes > 0:
|
||||
parts.append(cls.pluralize(minutes, "minute"))
|
||||
if seconds > 0 or len(parts) == 0:
|
||||
parts.append(cls.pluralize(seconds + frac_seconds, "second"))
|
||||
|
||||
if len(parts) == 1:
|
||||
return parts[0]
|
||||
return ", ".join(parts[:-1]) + f" and {parts[-1]}"
|
||||
|
||||
@staticmethod
|
||||
def join_human_list(data: List[str], *, joiner: str = ", ", final_joiner: str = " and ",
|
||||
mutate: Callable[[str], str] = lambda val: val) -> str:
|
||||
if not data:
|
||||
return ""
|
||||
elif len(data) == 1:
|
||||
return mutate(data[0])
|
||||
return joiner.join(mutate(val) for val in data[:-1]) + final_joiner + mutate(data[-1])
|
||||
|
||||
|
||||
class TemplateProxy:
|
||||
_env: JinjaEnvironment
|
||||
_args: Dict[str, Any]
|
||||
|
||||
def __init__(self, env: JinjaEnvironment, args: Dict[str, Any]) -> None:
|
||||
self._env = env
|
||||
self._args = args
|
||||
|
||||
def __getattr__(self, item: str) -> str:
|
||||
try:
|
||||
tpl = self._env.get_template(item)
|
||||
except TemplateNotFound:
|
||||
raise AttributeError(item)
|
||||
return tpl.render(**self._args)
|
||||
|
||||
|
||||
class PluginTemplateLoader(BaseLoader):
|
||||
directory: str
|
||||
macros: str
|
||||
|
||||
def __init__(self, base: str, directory: str) -> None:
|
||||
self.directory = os.path.join("templates", base, directory)
|
||||
self.macros = sync_read_file(os.path.join("templates", base, "macros.html"))
|
||||
|
||||
def get_source(self, environment: Any, name: str) -> Tuple[str, str, Callable[[], bool]]:
|
||||
path = f"{os.path.join(self.directory, name)}.html"
|
||||
try:
|
||||
tpl = sync_read_file(path)
|
||||
except KeyError:
|
||||
raise TemplateNotFound(name)
|
||||
return self.macros + tpl, name, lambda: True
|
||||
|
||||
def list_templates(self) -> Iterable[str]:
|
||||
return [os.path.splitext(os.path.basename(path))[0]
|
||||
for path in sync_list_files(self.directory)
|
||||
if path.endswith(".html")]
|
||||
|
||||
class TemplateManager:
|
||||
_env: JinjaEnvironment
|
||||
_loader: PluginTemplateLoader
|
||||
|
||||
def __init__(self, base: str, directory: str) -> None:
|
||||
#self._loader = FileSystemLoader(os.path.join("templates/", base))
|
||||
self._loader = PluginTemplateLoader(base, directory)
|
||||
self._env = JinjaEnvironment(loader=self._loader, lstrip_blocks=True, trim_blocks=True,
|
||||
extensions=["jinja2.ext.do"])
|
||||
self._env.filters["markdown"] = lambda message: markdown.render(message, allow_html=True)
|
||||
|
||||
def __getitem__(self, item: str) -> Template:
|
||||
return self._env.get_template(item)
|
||||
|
||||
def proxy(self, args: Dict[str, Any]) -> TemplateProxy:
|
||||
return TemplateProxy(self._env, args)
|
||||
Loading…
Add table
Add a link
Reference in a new issue