A Coding Implementation on Qwen 3.6-35B-A3B Covering Multimodal Inference, Thinking Control, Tool Calling, MoE Routing, RAG, and Session Persistence

by CryptoExpert
synthesia


class QwenChat:
def __init__(self, model, processor, system=None, tools=None):
self.model, self.processor = model, processor
self.tokenizer = processor.tokenizer
self.history: list[dict] = []
if system: self.history.append({“role”: “system”, “content”: system})
self.tools = tools

def user(self, content): self.history.append({“role”:”user”,”content”:content}); return self
def assistant(self, content, reasoning=””):
m = {“role”:”assistant”,”content”:content}
if reasoning: m[“reasoning_content”] = reasoning
self.history.append(m); return self
def tool_result(self, name, result):
self.history.append({“role”:”tool”,”name”:name,
“content”: result if isinstance(result, str) else json.dumps(result)})
return self

def _inputs(self, enable_thinking, preserve_thinking):
return self.processor.apply_chat_template(
self.history, tools=self.tools, tokenize=True,
add_generation_prompt=True, return_dict=True, return_tensors=”pt”,
enable_thinking=enable_thinking, preserve_thinking=preserve_thinking,
).to(self.model.device)

def generate(self, *, enable_thinking=True, preserve_thinking=False,
max_new_tokens=2048, preset=”thinking_general”,
stopping_criteria=None, append_to_history=True):
inp = self._inputs(enable_thinking, preserve_thinking)
cfg = SAMPLING[preset]
gk = dict(**inp, max_new_tokens=max_new_tokens, do_sample=True,
temperature=cfg[“temperature”], top_p=cfg[“top_p”], top_k=cfg[“top_k”],
repetition_penalty=1.0,
pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
if stopping_criteria is not None: gk[“stopping_criteria”] = stopping_criteria
with torch.inference_mode(): out = self.model.generate(**gk)
raw = self.tokenizer.decode(out[0, inp[“input_ids”].shape[-1]:], skip_special_tokens=True)
think, ans = split_thinking(raw)
if append_to_history: self.assistant(ans, reasoning=think)
return think, ans

frase

def stream(self, *, enable_thinking=True, preserve_thinking=False,
max_new_tokens=2048, preset=”thinking_general”,
on_thinking=None, on_answer=None):
inp = self._inputs(enable_thinking, preserve_thinking)
cfg = SAMPLING[preset]
streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
gk = dict(**inp, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True,
temperature=cfg[“temperature”], top_p=cfg[“top_p”], top_k=cfg[“top_k”],
pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
t = threading.Thread(target=self.model.generate, kwargs=gk); t.start()
buf, in_think = “”, enable_thinking
think_text, answer_text = “”, “”
for piece in streamer:
buf += piece
if in_think:
if THINK_CLOSE in buf:
close_at = buf.index(THINK_CLOSE)
resid = buf[:close_at]
if on_thinking: on_thinking(resid[len(think_text):])
think_text = resid
buf = buf[close_at + len(THINK_CLOSE):]
in_think = False
if buf and on_answer: on_answer(buf)
answer_text = buf; buf = “”
else:
if on_thinking: on_thinking(piece)
think_text += piece
else:
if on_answer: on_answer(piece)
answer_text += piece
t.join()
self.assistant(answer_text.strip(), reasoning=think_text.strip())
return think_text.strip(), answer_text.strip()

def save(self, path):
with open(path, “w”) as f:
json.dump({“history”: self.history, “tools”: self.tools}, f, indent=2)
@classmethod
def load(cls, model, processor, path):
with open(path) as f: data = json.load(f)
c = cls(model, processor, tools=data.get(“tools”))
c.history = data[“history”]; return c

class ThinkingBudget(StoppingCriteria):
def __init__(self, tokenizer, budget: int):
self.budget = budget
self.open_ids = tokenizer.encode(THINK_OPEN, add_special_tokens=False)
self.close_ids = tokenizer.encode(THINK_CLOSE, add_special_tokens=False)
self.start = None
def _find(self, seq, needle):
n = len(needle)
for i in range(len(seq)-n+1):
if seq[i:i+n] == needle: return i
return None
def __call__(self, input_ids, scores, **kwargs):
seq = input_ids[0].tolist()
if self.start is None:
idx = self._find(seq, self.open_ids)
if idx is not None: self.start = idx + len(self.open_ids)
return False
if self._find(seq[self.start:], self.close_ids) is not None: return False
return (len(seq) – self.start) >= self.budget

TOOL_CALL_RE = re.compile(r”<tool_call>\s*(\{.*?\})\s*</tool_call>”, re.S)

def run_calculate(expr: str) -> str:
if any(c not in “0123456789+-*/().% ” for c in expr):
return json.dumps({“error”:”illegal chars”})
try: return json.dumps({“result”: eval(expr, {“__builtins__”: {}}, {})})
except Exception as e: return json.dumps({“error”: str(e)})

_DOCS = {
“qwen3.6”: “Qwen3.6-35B-A3B is a 35B MoE with 3B active params and 262k native context.”,
“deltanet”: “Gated DeltaNet is a linear-attention variant used in Qwen3.6’s hybrid layers.”,
“moe”: “Qwen3.6 uses 256 experts with 8 routed + 1 shared per token.”,
}
def run_search_docs(q):
hits = [v for k,v in _DOCS.items() if k in q.lower()]
return json.dumps({“results”: hits or [“no hits”]})
def run_get_time():
import datetime as dt
return json.dumps({“iso”: dt.datetime.utcnow().isoformat()+”Z”})

TOOL_FNS = {
“calculate”: lambda a: run_calculate(a[“expression”]),
“search_docs”: lambda a: run_search_docs(a[“query”]),
“get_time”: lambda a: run_get_time(),
}
TOOLS_SCHEMA = [
{“type”:”function”,”function”:{“name”:”calculate”,”description”:”Evaluate arithmetic.”,
“parameters”:{“type”:”object”,”properties”:{“expression”:{“type”:”string”}},”required”:[“expression”]}}},
{“type”:”function”,”function”:{“name”:”search_docs”,”description”:”Search internal docs.”,
“parameters”:{“type”:”object”,”properties”:{“query”:{“type”:”string”}},”required”:[“query”]}}},
{“type”:”function”,”function”:{“name”:”get_time”,”description”:”Get current UTC time.”,
“parameters”:{“type”:”object”,”properties”:{}}}},
]



Source link

aistudios

You may also like

Subscribe To Our Newsletter

Join our mailing list to receive the latest news and updates from our team.

You have Successfully Subscribed!

Verified by MonsterInsights