"""Qwen-Image-Edit Rapid-AIO / Free CPU via GGUF + stable-diffusion.cpp Lightning pre-fused, 4 steps natively.""" import os, sys, time, gc, argparse, signal, threading from PIL import Image sys.stdout.reconfigure(line_buffering=True) sys.stderr.reconfigure(line_buffering=True) def log_mem(): try: with open("/proc/self/status") as f: for line in f: if line.startswith(("VmRSS", "VmPeak")): print(f" [mem] {line.strip()}", flush=True) except Exception: pass def sighandler(signum, frame): print(f"\n[FATAL] Signal {signum} ({signal.Signals(signum).name})", flush=True) log_mem() sys.exit(128 + signum) for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGABRT): try: signal.signal(sig, sighandler) except Exception: pass def get_cpu_count() -> int: try: with open("/sys/fs/cgroup/cpu.max") as f: q, p = f.read().strip().split() if q != "max": return max(1, int(q) // int(p)) except Exception: pass try: with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as f: q = int(f.read().strip()) with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as f: p = int(f.read().strip()) if q > 0: return max(1, q // p) except Exception: pass return max(1, os.cpu_count() or 2) N_THREADS = get_cpu_count() for k in ["OMP_NUM_THREADS", "OPENBLAS_NUM_THREADS", "MKL_NUM_THREADS"]: os.environ.setdefault(k, str(N_THREADS)) print(f"[init] CPU threads: {N_THREADS}") MODELS = { "Rapid-AIO-v23 Q3 (edit)": { "repo": "Arunk25/Qwen-Image-Edit-Rapid-AIO-GGUF", "file": "v23/Qwen-Rapid-NSFW-v23_Q3_K.gguf", "needs_image": True, }, "Image-2512 (txt2img)": { "repo": "unsloth/Qwen-Image-2512-GGUF", "file": "qwen-image-2512-Q3_K_M.gguf", "needs_image": False, }, } DEFAULT_MODEL = "Rapid-AIO-v23 Q3 (edit)" LLM_REPO = "mradermacher/Qwen2.5-VL-7B-Instruct-abliterated-GGUF" LLM_FILE = "Qwen2.5-VL-7B-Instruct-abliterated.Q3_K_M.gguf" VAE_REPO = "Comfy-Org/Qwen-Image_ComfyUI" VAE_FILE = "split_files/vae/qwen_image_vae.safetensors" DEFAULT_NEG = "worst quality, low quality, blurry, watermark, text, signature, jpeg artifacts" MAX_INPUT_PX = 768 from huggingface_hub import hf_hub_download from stable_diffusion_cpp import StableDiffusion def ensure_model(repo_id: str, filename: str) -> str: print(f"[init] Resolving {repo_id}/{filename}...", flush=True) t0 = time.time() path = hf_hub_download(repo_id=repo_id, filename=filename) dt = time.time() - t0 if dt > 1: print(f"[init] Downloaded in {dt:.1f}s", flush=True) return path print("[init] Downloading shared models...", flush=True) llm_path = ensure_model(LLM_REPO, LLM_FILE) vae_path = ensure_model(VAE_REPO, VAE_FILE) print("[init] Pre-caching diffusion models to disk...", flush=True) model_paths = {} for name, cfg in MODELS.items(): model_paths[name] = ensure_model(cfg["repo"], cfg["file"]) print(f"[init] {name}: OK", flush=True) SD_ENGINE = None CURRENT_MODEL = None def load_engine(model_name=None): model_name = model_name or DEFAULT_MODEL if model_name not in model_paths: raise ValueError(f"Unknown model: {model_name!r}. Available: {list(model_paths)}") global SD_ENGINE, CURRENT_MODEL if SD_ENGINE is not None and CURRENT_MODEL == model_name: return SD_ENGINE if SD_ENGINE is not None: print(f"[engine] Unloading {CURRENT_MODEL}...", flush=True) del SD_ENGINE SD_ENGINE = None gc.collect() print(f"[engine] Loading {model_name}...", flush=True) t0 = time.time() SD_ENGINE = StableDiffusion( diffusion_model_path=model_paths[model_name], llm_path=llm_path, vae_path=vae_path, offload_params_to_cpu=True, diffusion_flash_attn=True, qwen_image_zero_cond_t=True, n_threads=N_THREADS, verbose=True, ) CURRENT_MODEL = model_name print(f"[engine] Loaded in {time.time()-t0:.1f}s", flush=True) log_mem() return SD_ENGINE load_engine(DEFAULT_MODEL) ASPECT_PRESETS = { "Auto (match input, max 512px)": None, "1:1 512x512": (512, 512), "16:9 576x320": (576, 320), "9:16 320x576": (320, 576), "4:3 576x432": (576, 432), "3:4 432x576": (432, 576), } MAX_PIXELS = 512 * 512 ALIGN = 16 VAE_STEP_THRESHOLD_S = 120 def calc_output_size(img_w, img_h): img_w = max(1, img_w) img_h = max(1, img_h) ratio = img_w / img_h area = min(img_w * img_h, MAX_PIXELS) h = max(ALIGN, int((area / ratio) ** 0.5)) w = max(ALIGN, int(h * ratio)) w = (w // ALIGN) * ALIGN h = (h // ALIGN) * ALIGN MIN_DIM = ALIGN * 4 while w * h > MAX_PIXELS and (w > MIN_DIM or h > MIN_DIM): if w >= h and w > MIN_DIM: w -= ALIGN elif h > MIN_DIM: h -= ALIGN else: break return w, h def safe_load_image(path, max_px=MAX_INPUT_PX, crop_ratio=None): img = Image.open(path).convert("RGB") if isinstance(path, str) else path.convert("RGB") w, h = img.size if max(w, h) > max_px: scale = max_px / max(w, h) img = img.resize((int(w * scale), int(h * scale)), Image.Resampling.LANCZOS) print(f"[gen] Downscaled input {w}x{h} -> {img.size[0]}x{img.size[1]}", flush=True) w, h = img.size if crop_ratio is not None: target_w, target_h = crop_ratio tr = target_w / target_h ir = w / h if abs(tr - ir) > 0.01: if ir > tr: new_w = int(h * tr) left = (w - new_w) // 2 img = img.crop((left, 0, left + new_w, h)) else: new_h = int(w / tr) top = (h - new_h) // 2 img = img.crop((0, top, w, top + new_h)) print(f"[gen] Center-cropped to {img.size[0]}x{img.size[1]} for {target_w}:{target_h} ratio", flush=True) return img def generate(prompt, negative_prompt, init_image, model_choice, aspect_ratio, steps, cfg_scale, guidance, seed): gc.collect() print(f"\n{'='*60}", flush=True) print(f"[gen] START {time.strftime('%H:%M:%S')}", flush=True) log_mem() sd = load_engine(model_choice) steps = int(steps) try: seed = int(seed) except (TypeError, ValueError): seed = -1 if seed < 0: seed = -1 preset = ASPECT_PRESETS.get(aspect_ratio) pil_input = None if init_image is not None: pil_input = safe_load_image(init_image, crop_ratio=preset) elif MODELS.get(model_choice, {}).get("needs_image"): yield None, "Error: this model requires an input image" return if preset: w, h = preset elif pil_input is not None: w, h = calc_output_size(*pil_input.size) else: w, h = 512, 512 kwargs = dict( prompt=prompt, negative_prompt=negative_prompt or "", width=w, height=h, sample_steps=steps, cfg_scale=cfg_scale, guidance=guidance, sample_method="euler", seed=seed, vae_tiling=True, ) if pil_input is not None: kwargs["ref_images"] = [pil_input] mode = "edit" if pil_input else "txt2img" print(f"[gen] {mode} {w}x{h} steps={steps} cfg={cfg_scale} guidance={guidance} seed={seed}", flush=True) if negative_prompt: print(f"[gen] neg: {negative_prompt[:100]}", flush=True) state = {"phase": "starting...", "step_times": [], "small_step_rounds": 0} result_holder = {"images": None, "error": None} def step_cb(step, steps_total, t_step): if steps_total > steps * 2: pct = int(step / max(steps_total, 1) * 100) state["phase"] = f"preparing {pct}%" return is_vae = (t_step < VAE_STEP_THRESHOLD_S and state["small_step_rounds"] == 0 and init_image is not None) if is_vae: state["phase"] = f"VAE encode {step}/{steps_total}" print(f" [VAE {step}/{steps_total}] {t_step:.1f}s", flush=True) if step >= steps_total: state["small_step_rounds"] += 1 else: state["phase"] = f"diffusion {step}/{steps_total}" state["step_times"].append(t_step) print(f" [diffusion {step}/{steps_total}] {t_step:.1f}s", flush=True) def run_inference(): try: result_holder["images"] = sd.generate_image(**kwargs, progress_callback=step_cb) except Exception as e: import traceback; traceback.print_exc() result_holder["error"] = e t0 = time.time() thread = threading.Thread(target=run_inference) thread.start() yield None, f"Starting {mode} {w}x{h}..." while thread.is_alive(): thread.join(timeout=10) elapsed = time.time() - t0 mins = int(elapsed // 60) secs = int(elapsed % 60) eta = "" if state["step_times"]: avg = sum(state["step_times"]) / len(state["step_times"]) done = len(state["step_times"]) remaining = (steps - done) * avg if remaining > 0: eta_m = int(remaining // 60) eta = f" | ~{eta_m}m left" yield None, f"[{mins}m{secs:02d}s] {state['phase']}{eta}" elapsed = time.time() - t0 if result_holder["error"]: print(f"[gen] EXCEPTION: {result_holder['error']}", flush=True) log_mem(); gc.collect() yield None, f"Error after {elapsed:.0f}s: {result_holder['error']}" return images = result_holder["images"] print(f"[gen] Done, {len(images) if images else 0} images", flush=True) log_mem(); gc.collect() status = f"Done in {elapsed:.0f}s | {mode} {w}x{h}, {steps} steps, seed {seed}" print(f"[gen] {status}", flush=True) yield (images[0] if images else None), status def cli_main(): parser = argparse.ArgumentParser(description="Qwen-Image-Edit Rapid-AIO CPU") parser.add_argument("prompt", help="Text prompt") parser.add_argument("-o", "--output", default="output.png") parser.add_argument("-i", "--init-image", default=None) parser.add_argument("-n", "--negative", default=DEFAULT_NEG) parser.add_argument("--model", default=DEFAULT_MODEL, choices=list(MODELS.keys())) parser.add_argument("--aspect", default="Auto (match input, max 512px)", choices=list(ASPECT_PRESETS.keys())) parser.add_argument("--steps", type=int, default=4) parser.add_argument("--cfg", type=float, default=2.5) parser.add_argument("--guidance", type=float, default=3.0) parser.add_argument("--seed", type=int, default=-1) args = parser.parse_args() for img, status in generate(args.prompt, args.negative, args.init_image, args.model, args.aspect, args.steps, args.cfg, args.guidance, args.seed): if img: img.save(args.output) print(f"Saved: {args.output} ({status})") return print(f" {status}", flush=True) print("Failed") sys.exit(1) def gradio_main(): import gradio as gr def on_model_change(choice): return gr.update(visible=MODELS[choice]["needs_image"]) with gr.Blocks(title="Qwen-Image-Edit CPU") as demo: with gr.Row(equal_height=False): with gr.Column(variant="panel", scale=1, min_width=280): prompt = gr.Textbox(label="Prompt / Qwen-Image-Edit Lightning (~84m/512x512)", lines=2, placeholder="e.g. transform into anime style") with gr.Accordion("Negative prompt", open=False): negative_prompt = gr.Textbox(value=DEFAULT_NEG, lines=1, show_label=False) init_image = gr.Image(label="Input Image", type="filepath", visible=True, height=160) gen_btn = gr.Button("Generate", variant="primary", size="lg") with gr.Row(): model_choice = gr.Dropdown(choices=list(MODELS.keys()), value=DEFAULT_MODEL, label="Model", scale=2) aspect_ratio = gr.Dropdown(choices=list(ASPECT_PRESETS.keys()), value="Auto (match input, max 512px)", label="Aspect (crop)", scale=2) with gr.Row(): steps = gr.Slider(1, 50, value=4, step=1, label="Steps", scale=1) cfg_scale = gr.Slider(1.0, 7.0, value=2.5, step=0.5, label="CFG", scale=1) guidance = gr.Slider(1.0, 10.0, value=3.0, step=0.5, label="Guidance", scale=1) seed = gr.Number(value=-1, label="Seed", precision=0, scale=1) with gr.Column(variant="panel", scale=1, min_width=280): output_image = gr.Image(label="Output", type="pil", height=380) status_text = gr.Textbox(label="Status", interactive=False, lines=1) gr.Markdown( "[Rapid-AIO](https://huggingface.co/Phr00t/Qwen-Image-Edit-Rapid-AIO) · " "[GGUF](https://huggingface.co/Arunk25/Qwen-Image-Edit-Rapid-AIO-GGUF) · " "[sd.cpp](https://github.com/leejet/stable-diffusion.cpp)") model_choice.change(fn=on_model_change, inputs=[model_choice], outputs=[init_image]) gen_btn.click( fn=generate, inputs=[prompt, negative_prompt, init_image, model_choice, aspect_ratio, steps, cfg_scale, guidance, seed], outputs=[output_image, status_text], api_name="infer", concurrency_limit=1, ) demo.queue(default_concurrency_limit=1).launch(ssr_mode=False, show_error=True, mcp_server=True, max_threads=1, theme="Nymbo/Alyx_Theme") if __name__ == "__main__": if len(sys.argv) > 1 and not sys.argv[1].startswith("--"): cli_main() else: gradio_main() else: gradio_main()