Nekochu's picture
84m
4dc8dcb
"""Qwen-Image-Edit Rapid-AIO / Free CPU via GGUF + stable-diffusion.cpp
Lightning pre-fused, 4 steps natively."""
import os, sys, time, gc, argparse, signal, threading
from PIL import Image
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
def log_mem():
try:
with open("/proc/self/status") as f:
for line in f:
if line.startswith(("VmRSS", "VmPeak")):
print(f" [mem] {line.strip()}", flush=True)
except Exception: pass
def sighandler(signum, frame):
print(f"\n[FATAL] Signal {signum} ({signal.Signals(signum).name})", flush=True)
log_mem()
sys.exit(128 + signum)
for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGABRT):
try: signal.signal(sig, sighandler)
except Exception: pass
def get_cpu_count() -> int:
try:
with open("/sys/fs/cgroup/cpu.max") as f:
q, p = f.read().strip().split()
if q != "max": return max(1, int(q) // int(p))
except Exception: pass
try:
with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as f: q = int(f.read().strip())
with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as f: p = int(f.read().strip())
if q > 0: return max(1, q // p)
except Exception: pass
return max(1, os.cpu_count() or 2)
N_THREADS = get_cpu_count()
for k in ["OMP_NUM_THREADS", "OPENBLAS_NUM_THREADS", "MKL_NUM_THREADS"]:
os.environ.setdefault(k, str(N_THREADS))
print(f"[init] CPU threads: {N_THREADS}")
MODELS = {
"Rapid-AIO-v23 Q3 (edit)": {
"repo": "Arunk25/Qwen-Image-Edit-Rapid-AIO-GGUF",
"file": "v23/Qwen-Rapid-NSFW-v23_Q3_K.gguf",
"needs_image": True,
},
"Image-2512 (txt2img)": {
"repo": "unsloth/Qwen-Image-2512-GGUF",
"file": "qwen-image-2512-Q3_K_M.gguf",
"needs_image": False,
},
}
DEFAULT_MODEL = "Rapid-AIO-v23 Q3 (edit)"
LLM_REPO = "mradermacher/Qwen2.5-VL-7B-Instruct-abliterated-GGUF"
LLM_FILE = "Qwen2.5-VL-7B-Instruct-abliterated.Q3_K_M.gguf"
VAE_REPO = "Comfy-Org/Qwen-Image_ComfyUI"
VAE_FILE = "split_files/vae/qwen_image_vae.safetensors"
DEFAULT_NEG = "worst quality, low quality, blurry, watermark, text, signature, jpeg artifacts"
MAX_INPUT_PX = 768
from huggingface_hub import hf_hub_download
from stable_diffusion_cpp import StableDiffusion
def ensure_model(repo_id: str, filename: str) -> str:
print(f"[init] Resolving {repo_id}/{filename}...", flush=True)
t0 = time.time()
path = hf_hub_download(repo_id=repo_id, filename=filename)
dt = time.time() - t0
if dt > 1:
print(f"[init] Downloaded in {dt:.1f}s", flush=True)
return path
print("[init] Downloading shared models...", flush=True)
llm_path = ensure_model(LLM_REPO, LLM_FILE)
vae_path = ensure_model(VAE_REPO, VAE_FILE)
print("[init] Pre-caching diffusion models to disk...", flush=True)
model_paths = {}
for name, cfg in MODELS.items():
model_paths[name] = ensure_model(cfg["repo"], cfg["file"])
print(f"[init] {name}: OK", flush=True)
SD_ENGINE = None
CURRENT_MODEL = None
def load_engine(model_name=None):
model_name = model_name or DEFAULT_MODEL
if model_name not in model_paths:
raise ValueError(f"Unknown model: {model_name!r}. Available: {list(model_paths)}")
global SD_ENGINE, CURRENT_MODEL
if SD_ENGINE is not None and CURRENT_MODEL == model_name:
return SD_ENGINE
if SD_ENGINE is not None:
print(f"[engine] Unloading {CURRENT_MODEL}...", flush=True)
del SD_ENGINE
SD_ENGINE = None
gc.collect()
print(f"[engine] Loading {model_name}...", flush=True)
t0 = time.time()
SD_ENGINE = StableDiffusion(
diffusion_model_path=model_paths[model_name],
llm_path=llm_path,
vae_path=vae_path,
offload_params_to_cpu=True,
diffusion_flash_attn=True,
qwen_image_zero_cond_t=True,
n_threads=N_THREADS,
verbose=True,
)
CURRENT_MODEL = model_name
print(f"[engine] Loaded in {time.time()-t0:.1f}s", flush=True)
log_mem()
return SD_ENGINE
load_engine(DEFAULT_MODEL)
ASPECT_PRESETS = {
"Auto (match input, max 512px)": None,
"1:1 512x512": (512, 512),
"16:9 576x320": (576, 320),
"9:16 320x576": (320, 576),
"4:3 576x432": (576, 432),
"3:4 432x576": (432, 576),
}
MAX_PIXELS = 512 * 512
ALIGN = 16
VAE_STEP_THRESHOLD_S = 120
def calc_output_size(img_w, img_h):
img_w = max(1, img_w)
img_h = max(1, img_h)
ratio = img_w / img_h
area = min(img_w * img_h, MAX_PIXELS)
h = max(ALIGN, int((area / ratio) ** 0.5))
w = max(ALIGN, int(h * ratio))
w = (w // ALIGN) * ALIGN
h = (h // ALIGN) * ALIGN
MIN_DIM = ALIGN * 4
while w * h > MAX_PIXELS and (w > MIN_DIM or h > MIN_DIM):
if w >= h and w > MIN_DIM: w -= ALIGN
elif h > MIN_DIM: h -= ALIGN
else: break
return w, h
def safe_load_image(path, max_px=MAX_INPUT_PX, crop_ratio=None):
img = Image.open(path).convert("RGB") if isinstance(path, str) else path.convert("RGB")
w, h = img.size
if max(w, h) > max_px:
scale = max_px / max(w, h)
img = img.resize((int(w * scale), int(h * scale)), Image.Resampling.LANCZOS)
print(f"[gen] Downscaled input {w}x{h} -> {img.size[0]}x{img.size[1]}", flush=True)
w, h = img.size
if crop_ratio is not None:
target_w, target_h = crop_ratio
tr = target_w / target_h
ir = w / h
if abs(tr - ir) > 0.01:
if ir > tr:
new_w = int(h * tr)
left = (w - new_w) // 2
img = img.crop((left, 0, left + new_w, h))
else:
new_h = int(w / tr)
top = (h - new_h) // 2
img = img.crop((0, top, w, top + new_h))
print(f"[gen] Center-cropped to {img.size[0]}x{img.size[1]} for {target_w}:{target_h} ratio", flush=True)
return img
def generate(prompt, negative_prompt, init_image, model_choice, aspect_ratio, steps, cfg_scale, guidance, seed):
gc.collect()
print(f"\n{'='*60}", flush=True)
print(f"[gen] START {time.strftime('%H:%M:%S')}", flush=True)
log_mem()
sd = load_engine(model_choice)
steps = int(steps)
try: seed = int(seed)
except (TypeError, ValueError): seed = -1
if seed < 0: seed = -1
preset = ASPECT_PRESETS.get(aspect_ratio)
pil_input = None
if init_image is not None:
pil_input = safe_load_image(init_image, crop_ratio=preset)
elif MODELS.get(model_choice, {}).get("needs_image"):
yield None, "Error: this model requires an input image"
return
if preset:
w, h = preset
elif pil_input is not None:
w, h = calc_output_size(*pil_input.size)
else:
w, h = 512, 512
kwargs = dict(
prompt=prompt,
negative_prompt=negative_prompt or "",
width=w, height=h,
sample_steps=steps,
cfg_scale=cfg_scale,
guidance=guidance,
sample_method="euler",
seed=seed,
vae_tiling=True,
)
if pil_input is not None:
kwargs["ref_images"] = [pil_input]
mode = "edit" if pil_input else "txt2img"
print(f"[gen] {mode} {w}x{h} steps={steps} cfg={cfg_scale} guidance={guidance} seed={seed}", flush=True)
if negative_prompt:
print(f"[gen] neg: {negative_prompt[:100]}", flush=True)
state = {"phase": "starting...", "step_times": [], "small_step_rounds": 0}
result_holder = {"images": None, "error": None}
def step_cb(step, steps_total, t_step):
if steps_total > steps * 2:
pct = int(step / max(steps_total, 1) * 100)
state["phase"] = f"preparing {pct}%"
return
is_vae = (t_step < VAE_STEP_THRESHOLD_S and state["small_step_rounds"] == 0 and init_image is not None)
if is_vae:
state["phase"] = f"VAE encode {step}/{steps_total}"
print(f" [VAE {step}/{steps_total}] {t_step:.1f}s", flush=True)
if step >= steps_total:
state["small_step_rounds"] += 1
else:
state["phase"] = f"diffusion {step}/{steps_total}"
state["step_times"].append(t_step)
print(f" [diffusion {step}/{steps_total}] {t_step:.1f}s", flush=True)
def run_inference():
try:
result_holder["images"] = sd.generate_image(**kwargs, progress_callback=step_cb)
except Exception as e:
import traceback; traceback.print_exc()
result_holder["error"] = e
t0 = time.time()
thread = threading.Thread(target=run_inference)
thread.start()
yield None, f"Starting {mode} {w}x{h}..."
while thread.is_alive():
thread.join(timeout=10)
elapsed = time.time() - t0
mins = int(elapsed // 60)
secs = int(elapsed % 60)
eta = ""
if state["step_times"]:
avg = sum(state["step_times"]) / len(state["step_times"])
done = len(state["step_times"])
remaining = (steps - done) * avg
if remaining > 0:
eta_m = int(remaining // 60)
eta = f" | ~{eta_m}m left"
yield None, f"[{mins}m{secs:02d}s] {state['phase']}{eta}"
elapsed = time.time() - t0
if result_holder["error"]:
print(f"[gen] EXCEPTION: {result_holder['error']}", flush=True)
log_mem(); gc.collect()
yield None, f"Error after {elapsed:.0f}s: {result_holder['error']}"
return
images = result_holder["images"]
print(f"[gen] Done, {len(images) if images else 0} images", flush=True)
log_mem(); gc.collect()
status = f"Done in {elapsed:.0f}s | {mode} {w}x{h}, {steps} steps, seed {seed}"
print(f"[gen] {status}", flush=True)
yield (images[0] if images else None), status
def cli_main():
parser = argparse.ArgumentParser(description="Qwen-Image-Edit Rapid-AIO CPU")
parser.add_argument("prompt", help="Text prompt")
parser.add_argument("-o", "--output", default="output.png")
parser.add_argument("-i", "--init-image", default=None)
parser.add_argument("-n", "--negative", default=DEFAULT_NEG)
parser.add_argument("--model", default=DEFAULT_MODEL, choices=list(MODELS.keys()))
parser.add_argument("--aspect", default="Auto (match input, max 512px)", choices=list(ASPECT_PRESETS.keys()))
parser.add_argument("--steps", type=int, default=4)
parser.add_argument("--cfg", type=float, default=2.5)
parser.add_argument("--guidance", type=float, default=3.0)
parser.add_argument("--seed", type=int, default=-1)
args = parser.parse_args()
for img, status in generate(args.prompt, args.negative, args.init_image, args.model, args.aspect, args.steps, args.cfg, args.guidance, args.seed):
if img:
img.save(args.output)
print(f"Saved: {args.output} ({status})")
return
print(f" {status}", flush=True)
print("Failed")
sys.exit(1)
def gradio_main():
import gradio as gr
def on_model_change(choice):
return gr.update(visible=MODELS[choice]["needs_image"])
with gr.Blocks(title="Qwen-Image-Edit CPU") as demo:
with gr.Row(equal_height=False):
with gr.Column(variant="panel", scale=1, min_width=280):
prompt = gr.Textbox(label="Prompt / Qwen-Image-Edit Lightning (~84m/512x512)", lines=2, placeholder="e.g. transform into anime style")
with gr.Accordion("Negative prompt", open=False):
negative_prompt = gr.Textbox(value=DEFAULT_NEG, lines=1, show_label=False)
init_image = gr.Image(label="Input Image", type="filepath", visible=True, height=160)
gen_btn = gr.Button("Generate", variant="primary", size="lg")
with gr.Row():
model_choice = gr.Dropdown(choices=list(MODELS.keys()), value=DEFAULT_MODEL, label="Model", scale=2)
aspect_ratio = gr.Dropdown(choices=list(ASPECT_PRESETS.keys()), value="Auto (match input, max 512px)", label="Aspect (crop)", scale=2)
with gr.Row():
steps = gr.Slider(1, 50, value=4, step=1, label="Steps", scale=1)
cfg_scale = gr.Slider(1.0, 7.0, value=2.5, step=0.5, label="CFG", scale=1)
guidance = gr.Slider(1.0, 10.0, value=3.0, step=0.5, label="Guidance", scale=1)
seed = gr.Number(value=-1, label="Seed", precision=0, scale=1)
with gr.Column(variant="panel", scale=1, min_width=280):
output_image = gr.Image(label="Output", type="pil", height=380)
status_text = gr.Textbox(label="Status", interactive=False, lines=1)
gr.Markdown(
"[Rapid-AIO](https://huggingface.co/Phr00t/Qwen-Image-Edit-Rapid-AIO) · "
"[GGUF](https://huggingface.co/Arunk25/Qwen-Image-Edit-Rapid-AIO-GGUF) · "
"[sd.cpp](https://github.com/leejet/stable-diffusion.cpp)")
model_choice.change(fn=on_model_change, inputs=[model_choice], outputs=[init_image])
gen_btn.click(
fn=generate,
inputs=[prompt, negative_prompt, init_image, model_choice, aspect_ratio, steps, cfg_scale, guidance, seed],
outputs=[output_image, status_text],
api_name="infer", concurrency_limit=1,
)
demo.queue(default_concurrency_limit=1).launch(ssr_mode=False, show_error=True, mcp_server=True, max_threads=1, theme="Nymbo/Alyx_Theme")
if __name__ == "__main__":
if len(sys.argv) > 1 and not sys.argv[1].startswith("--"):
cli_main()
else:
gradio_main()
else:
gradio_main()