import json import subprocess from abc import abstractmethod from typing import Any, Dict, List, Optional, Tuple import jinja2 from app.terraform import BaseAutomation class TerraformAutomation(BaseAutomation): """ An abstract class to be extended by automation plugins using Terraform providers to deploy resources. """ parallelism = 10 """ Default parallelism for remote API calls. """ provider: str """ Short name for the provider used by this module. """ def automate(self, full: bool = False) -> Tuple[bool, str]: """ Runs the Terraform automation module. The run will follow these steps: 1. The :func:`tf_prehook` hook is run. 2. Generate a Terraform configuration and write it to a single ``main.tf`` file in the working directory (see :func:`working_directory `). 3. Run ``terraform init``. 4. Run ``terraform apply``. This will only include a refresh if *full* is **True**. The apply will wait up to *lock_timeout* minutes for a lock to be released before failing. Up to *parallelism* requests will be sent to remote APIs concurrently. 5. The :func:`tf_posthook` hook is run. 6. The logs from the apply step are returned as a string. :param full: include a Terraform refresh in the automation module run :return: success status and Terraform apply logs """ prehook_result = self.tf_prehook() self.tf_generate() self.tf_init() returncode, logs = self.tf_apply(refresh=full) self.tf_posthook(prehook_result=prehook_result) return True if returncode == 0 else False, logs def tf_apply(self, *, refresh: bool = True, parallelism: Optional[int] = None, lock_timeout: int = 15) -> Tuple[int, str]: if not parallelism: parallelism = self.parallelism tf = subprocess.run( ['terraform', 'apply', '-auto-approve', '-json', f'-refresh={str(refresh).lower()}', f'-parallelism={str(parallelism)}', f'-lock-timeout={str(lock_timeout)}m', ], cwd=self.working_directory(), stdout=subprocess.PIPE) return tf.returncode, tf.stdout.decode('utf-8') @abstractmethod def tf_generate(self) -> None: raise NotImplementedError() def tf_init(self, *, lock_timeout: int = 15) -> None: # The init command does not support JSON output subprocess.run( ['terraform', 'init', f'-lock-timeout={str(lock_timeout)}m', ], cwd=self.working_directory()) def tf_output(self) -> Any: tf = subprocess.run( ['terraform', 'output', '-json'], cwd=self.working_directory(), stdout=subprocess.PIPE) return json.loads(tf.stdout) def tf_plan(self, *, refresh: bool = True, parallelism: Optional[int] = None, lock_timeout: int = 15) -> Tuple[int, str]: tf = subprocess.run( ['terraform', 'plan', '-json', f'-refresh={str(refresh).lower()}', f'-parallelism={str(parallelism)}', f'-lock-timeout={str(lock_timeout)}m', ], cwd=self.working_directory()) return tf.returncode, tf.stdout.decode('utf-8') def tf_posthook(self, *, prehook_result: Any = None) -> None: """ This hook function is called as part of normal automation, after the completion of :func:`tf_apply`. The default, if not overridden by a subclass, is to do nothing. :param prehook_result: the returned value of :func:`tf_prehook` :return: None """ pass def tf_prehook(self) -> Optional[Any]: """ This hook function is called as part of normal automation, before generating the terraform configuration file. The return value will be passed to :func:`tf_posthook` but is otherwise ignored. The default, if not overridden by a subclass, is to do nothing. :return: state that is useful to :func:`tf_posthook`, if required """ pass def tf_show(self) -> Any: terraform = subprocess.run( ['terraform', 'show', '-json'], cwd=self.working_directory(), stdout=subprocess.PIPE) return json.loads(terraform.stdout) def tf_write(self, template: str, **kwargs: Any) -> None: tmpl = jinja2.Template(template) with open(self.working_directory("main.tf"), 'w') as tf: tf.write(tmpl.render(**kwargs))