import asyncio import json import os import shutil import tempfile from datetime import datetime, timezone from json import JSONDecodeError from typing import Any, TYPE_CHECKING import aiofiles from sqlalchemy.ext.asyncio import AsyncSession from src.tofu.config import settings from src.tofu.exceptions import TofuExecutionError, TofuTypeError from src.tofu.models import TofuInstanceTask, TofuInstanceTaskLog from src.tofu.security import generate_password, generate_password_hash def _convert_python_to_tf(value: Any) -> Any: """Convert Python types to Terraform-compatible types.""" if isinstance(value, bool): return value elif isinstance(value, (int, float)): return value elif isinstance(value, str): return value elif isinstance(value, list): return [_convert_python_to_tf(item) for item in value] elif isinstance(value, dict): return {str(k): _convert_python_to_tf(v) for k, v in value.items()} elif value is None: return None else: raise TofuTypeError(f"Unsupported type for Terraform conversion: {type(value)}") class TofuManager: def __init__( self, db: AsyncSession, instance_task: TofuInstanceTask, working_dir: str | None = None, ): self.db = db self.instance_task = instance_task self._is_temp_dir = working_dir is None self._working_dir = working_dir self.tofu_path = settings.OPENTOFU_PATH async def __aenter__(self) -> "TofuManager": return self async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: self.instance_task.instance.state_password = None await self.db.commit() # Only clean up temporary directories if there was no exception if self._is_temp_dir and self._working_dir and exc_type is None: shutil.rmtree(self._working_dir) async def _create_config(self) -> None: config = self.instance_task.instance.configuration self.password = generate_password() self.instance_task.instance.state_password = generate_password_hash(self.password) await self.db.commit() if "terraform" not in config: config["terraform"] = {} config["terraform"]["backend"] = { "http": { "address": f"http://localhost:8000/api/v1/tofu/instances/{self.instance_task.instance_id}/state", "lock_address": f"http://localhost:8000/api/v1/tofu/instances/{self.instance_task.instance_id}/state", "unlock_address": f"http://localhost:8000/api/v1/tofu/instances/{self.instance_task.instance_id}/state", } } async with aiofiles.open(await self.config_file(), "w") as f: await f.write(json.dumps(config, indent=2)) async def _process_output_line(self, line: str) -> None: try: data = json.loads(line) except json.decoder.JSONDecodeError: data = { "@level": "info", "@timestamp": datetime.now(tz=timezone.utc).strftime( "%Y-%m-%dT%H:%M:%S.%f%z" ), "@module": "manager", "@message": line, "type": "decode-error", } log = TofuInstanceTaskLog( instance_task_id=self.instance_task.id, timestamp=datetime.strptime(data["@timestamp"], "%Y-%m-%dT%H:%M:%S.%f%z"), log=data, ) self.db.add(log) await self.db.commit() async def _run_command( self, command: str, *args: str, json_output: bool = False, log_output: bool = True, return_output: bool = False, ) -> str | None: cmd = [self.tofu_path, command] + list(args) if json_output: cmd.append("-json") process = await asyncio.create_subprocess_exec( *cmd, cwd=await self.working_dir(), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, env={ "TF_HTTP_USERNAME": "tofu", "TF_HTTP_PASSWORD": self.password, }, ) if TYPE_CHECKING: assert process.stdout is not None result = [] async for line in process.stdout: if return_output: result.append(line.decode("utf-8")) if log_output: await self._process_output_line(line.decode("utf-8")) await process.wait() if process.returncode != 0: raise TofuExecutionError(f"Tofu command failed: {process.returncode}") return "\n".join(result) if return_output else None async def config_file(self) -> str: return os.path.join(await self.working_dir(), "main.tf.json") async def working_dir(self) -> str: if self._working_dir: return self._working_dir self._working_dir = await asyncio.to_thread(tempfile.mkdtemp, prefix="tofu-") return self._working_dir async def init(self, upgrade: bool = True) -> None: await self._create_config() args = [] if upgrade: args.append("-upgrade") await self._run_command("init", *args, json_output=True) async def apply(self) -> None: await self._create_config() await self._run_command("apply", "-auto-approve", json_output=True) async def destroy(self) -> None: await self._create_config() await self._run_command("destroy", "-auto-approve", json_output=True) async def output(self) -> None: await self._create_config() try: outputs = await self._run_command( "output", json_output=True, log_output=False, return_output=True ) self.instance_task.instance.outputs = json.loads(outputs) await self.db.commit() except JSONDecodeError: raise TofuExecutionError("Could not parse JSON output")