|
| 1 | +from time import time |
| 2 | + |
| 3 | +from backend.computing import Computing |
| 4 | +from backend.wuerstchen.models.setting import WurstchenSetting |
| 5 | +from torch import Generator |
| 6 | +from diffusers.pipelines.wuerstchen import DEFAULT_STAGE_C_TIMESTEPS |
| 7 | +from diffusers import AutoPipelineForText2Image |
| 8 | + |
| 9 | + |
| 10 | +class Wuerstchen: |
| 11 | + def __init__(self, compute: Computing): |
| 12 | + self.compute = compute |
| 13 | + self.pipeline = None |
| 14 | + self.device = self.compute.name |
| 15 | + super().__init__() |
| 16 | + |
| 17 | + def get_text_to_image_wuerstchen_pipleline( |
| 18 | + self, |
| 19 | + model_id: str = "warp-ai/wuerstchen", |
| 20 | + low_vram_mode: bool = False, |
| 21 | + ): |
| 22 | + self.model_id = model_id |
| 23 | + |
| 24 | + self.low_vram_mode = low_vram_mode |
| 25 | + print(f"Wuerstchen - {self.compute.name},{self.compute.datatype}") |
| 26 | + print(f"using model {model_id}") |
| 27 | + tic = time() |
| 28 | + self._load_model() |
| 29 | + self._pipeline_to_device() |
| 30 | + delta = time() - tic |
| 31 | + print(f"Model loaded in {delta:.2f}s ") |
| 32 | + |
| 33 | + def text_to_image_wuerstchen(self, setting: WurstchenSetting): |
| 34 | + if self.pipeline is None: |
| 35 | + raise Exception("Text to image pipeline not initialized") |
| 36 | + |
| 37 | + generator = None |
| 38 | + if setting.seed != -1: |
| 39 | + print(f"Using seed {setting.seed}") |
| 40 | + generator = Generator(self.device).manual_seed(setting.seed) |
| 41 | + |
| 42 | + images = self.pipeline( |
| 43 | + setting.prompt, |
| 44 | + negative_prompt=setting.negative_prompt, |
| 45 | + height=setting.image_height, |
| 46 | + width=setting.image_width, |
| 47 | + prior_timesteps=DEFAULT_STAGE_C_TIMESTEPS, |
| 48 | + prior_guidance_scale=setting.prior_guidance_scale, |
| 49 | + num_images_per_prompt=setting.number_of_images, |
| 50 | + generator=generator, |
| 51 | + ).images |
| 52 | + |
| 53 | + return images |
| 54 | + |
| 55 | + def _pipeline_to_device(self): |
| 56 | + if self.low_vram_mode: |
| 57 | + print("Running in low VRAM mode,slower to generate images") |
| 58 | + self.pipeline.enable_sequential_cpu_offload() |
| 59 | + else: |
| 60 | + if self.compute.name == "cuda": |
| 61 | + self.pipeline = self.pipeline.to("cuda") |
| 62 | + elif self.compute.name == "mps": |
| 63 | + self.pipeline = self.pipeline.to("mps") |
| 64 | + |
| 65 | + def _load_full_precision_model(self): |
| 66 | + self.pipeline = AutoPipelineForText2Image.from_pretrained( |
| 67 | + self.model_id, |
| 68 | + torch_dtype=self.compute.datatype, |
| 69 | + ) |
| 70 | + |
| 71 | + def _load_model(self): |
| 72 | + if self.compute.name == "cuda": |
| 73 | + try: |
| 74 | + self.pipeline = AutoPipelineForText2Image.from_pretrained( |
| 75 | + self.model_id, |
| 76 | + torch_dtype=self.compute.datatype, |
| 77 | + ) |
| 78 | + except Exception as ex: |
| 79 | + print( |
| 80 | + f" The fp16 of the model not found using full precision model, {ex}" |
| 81 | + ) |
| 82 | + self._load_full_precision_model() |
| 83 | + else: |
| 84 | + self._load_full_precision_model() |
0 commit comments