Inside a DyTopo round
Open one round of Dynamic Topology Routing: agents declare what they Need and Offer, cosine similarity wires them into a comm graph, jurisdiction-ineligible agents are dropped, then everyone runs an agentic loop whose findings are gated, debated, verified, and rolled up into memory.
src/dytopo/engine.ts443 lines · DyTopoEngine L72–443
Outline 10 symbols
- AgentBillingCtx interface export
- DyTopoOptions interface export
- DyTopoEngine class export
- constructor method
- runRound method
- recruitAgents method
- fetchAgentMemories method
- buildCommGraph method
- routeMessages method
- persistRoundMemory method
1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 Discover Legal
3// This program is free software: you can redistribute it and/or modify it
4// under the terms of the GNU Affero General Public License as published by
5// the Free Software Foundation, either version 3 of the License, or
6// (at your option) any later version. See <https://www.gnu.org/licenses/>.
7
8/**
9 * DyTopo Engine — Dynamic Topology Routing for Multi-Agent Reasoning
10 *
11 * Based on arXiv:2602.06039 (Lu et al., 2026).
12 * Each reasoning round:
13 * 1. Manager issues a RoundGoal (natural language).
14 * 2. Recruited agents emit Need + Offer descriptors conditioned on the goal.
15 * 3. Need/Offer embeddings are matched via cosine similarity.
16 * 4. A sparse directed communication graph is constructed (edges above threshold).
17 * 5. Messages are routed along edges (offering agent's content → needing agent's context).
18 * 6. Agents process their context and produce findings.
19 * 7. Round state is written to inter-round memory.
20 *
21 * Extended beyond the paper: agents are recruited from a live vector DB (AgentRegistry)
22 * based on semantic match against the round goal, not from a fixed roster.
23 */
24
25import { v4 as uuidv4 } from "uuid";
26import { Config } from "../config.js";
27import { embed, embedBatch, cosineSimilarity } from "../embeddings.js";
28import { logger } from "../logger.js";
29import { auditLogger, ACTOR_SYSTEM } from "../audit/index.js";
30import { Agent } from "../agents/base.js";
31import { AgentRegistry } from "../agents/registry.js";
32import { globalToolRegistry } from "../tools/index.js";
33import { IntraRoundMemoryStore, InterRoundMemoryStore } from "../memory/index.js";
34import { getProvider, resolveModelId } from "../providers/index.js";
35import { selectModel } from "../routing/model.js";
36import { agentLearning } from "../learning/index.js";
37import type { KnowledgeStore } from "../knowledge/index.js";
38import type { TimeStore } from "../time/index.js";
39import type {
40 AgentDefinition,
41 AgentMessage,
42 CommunicationEdge,
43 Finding,
44 NeedDescriptor,
45 OfferDescriptor,
46 RoundGoal,
47 RoundState,
48 Task,
49 ToneProfile,
50} from "../types.js";
51import { jurisdictionMatch } from "./jurisdiction.js";
52
53/** Billing context forwarded from the orchestrator into each agent's process() call. */
54export interface AgentBillingCtx {
55 timeStore: TimeStore;
56 responsibleLawyerId?: string;
57 responsibleLawyerName?: string;
58 matterNumber?: string;
59 clientNumber?: string;
60}
61
62export { jurisdictionMatch } from "./jurisdiction.js";
63
64export interface DyTopoOptions {
65 registry: AgentRegistry;
66 memory: InterRoundMemoryStore;
67 knowledge: KnowledgeStore;
68 /** Agents pre-selected for this round (e.g. tier-0 root is always included) */
69 pinnedAgents?: AgentDefinition[];
70}
71
72export class DyTopoEngine {
73 private readonly registry: AgentRegistry;
74 private readonly memory: InterRoundMemoryStore;
75 private readonly knowledge: KnowledgeStore;
76 private readonly pinnedAgents: AgentDefinition[];
77
78 constructor(opts: DyTopoOptions) {
79 this.registry = opts.registry;
80 this.memory = opts.memory;
81 this.knowledge = opts.knowledge;
82 this.pinnedAgents = opts.pinnedAgents ?? [];
83 }
84
85 /**
86 * Execute one round of DyTopo orchestration.
87 * Returns the completed RoundState including all messages, edges, and findings.
88 */
89 async runRound(task: Task, goal: RoundGoal, lawyerTone?: ToneProfile, billingCtx?: AgentBillingCtx): Promise<RoundState> {
90 const roundId = uuidv4();
91 const intraMemory = new IntraRoundMemoryStore(roundId);
92
93 auditLogger.write({
94 event: "round.start",
95 actorId: ACTOR_SYSTEM,
96 taskId: task.id,
97 data: { round: goal.round, phase: goal.phase, roundId },
98 });
99
100 logger.info("DyTopo round starting", {
101 taskId: task.id,
102 round: goal.round,
103 phase: goal.phase,
104 goal: goal.description.slice(0, 80),
105 });
106
107 // ── Step 1: Recruit agents ──────────────────────────────────────────────
108 const recruitedAgents = await this.recruitAgents(goal, task);
109 const agentMap = new Map<string, AgentDefinition>();
110 for (const a of [...this.pinnedAgents, ...recruitedAgents]) agentMap.set(a.id, a);
111
112 const activeDefinitions = Array.from(agentMap.values())
113 .filter((a) => jurisdictionMatch(a, task.jurisdiction))
114 .slice(0, Config.dytopo.maxAgentsPerRound);
115 const activeAgents = activeDefinitions.map((d) => new Agent(d));
116
117 logger.info("Agents recruited for round", {
118 round: goal.round,
119 agents: activeDefinitions.map((a) => a.name),
120 });
121
122 // ── Step 2: Retrieve inter-round memory for each agent ─────────────────
123 const agentMemories = await this.fetchAgentMemories(activeDefinitions, task, goal);
124
125 // ── Step 3: Need/Offer descriptors ─────────────────────────────────────
126 const needsOffers = await Promise.all(
127 activeAgents.map((agent) =>
128 agent.generateNeedOffer({
129 roundGoal: goal,
130 incomingMessages: [],
131 memoryEntries: agentMemories.get(agent.definition.id) ?? [],
132 taskDescription: task.description,
133 }),
134 ),
135 );
136 const needs = needsOffers.map((no) => no.need);
137 const offers = needsOffers.map((no) => no.offer);
138
139 // ── Step 4: Build sparse directed comm graph ────────────────────────────
140 const edges = await this.buildCommGraph(needs, offers, activeDefinitions);
141 logger.info("Communication graph built", {
142 round: goal.round,
143 edges: edges.length,
144 threshold: Config.dytopo.similarityThreshold,
145 });
146
147 // ── Step 5: Route messages along edges ─────────────────────────────────
148 const messages = this.routeMessages(edges, offers, goal.round);
149 for (const msg of messages) intraMemory.recordMessage(msg.to, msg);
150
151 // ── Step 6: Agents process — full agentic loops ─────────────────────────
152 // allSettled, not all: one agent throwing (e.g. a transient provider error)
153 // must not discard every other agent's findings and fail the whole round.
154 // Per-agent wall-clock cap so one hung agent can't stall the whole round.
155 const withTimeout = <T>(p: Promise<T>, agentId: string): Promise<T> =>
156 Promise.race([
157 p,
158 new Promise<T>((_, rej) =>
159 setTimeout(
160 () => rej(new Error(`Agent ${agentId} exceeded round timeout`)),
161 Config.agents.roundTimeoutMs,
162 ).unref?.(),
163 ),
164 ]);
165
166 const settled = await Promise.allSettled(
167 activeAgents.map((agent) =>
168 withTimeout(agent.process({
169 roundGoal: goal,
170 incomingMessages: intraMemory.getMessagesFor(agent.definition.id),
171 memoryEntries: agentMemories.get(agent.definition.id) ?? [],
172 taskDescription: task.description,
173 taskId: task.id,
174 toolRegistry: globalToolRegistry,
175 knowledge: this.knowledge,
176 memory: this.memory,
177 ownerId: task.createdByProfileId,
178 assignedLawyerTone: lawyerTone,
179 // Agent billing context — only present when a billing rate is configured
180 timeStore: billingCtx?.timeStore,
181 responsibleLawyerId: billingCtx?.responsibleLawyerId,
182 responsibleLawyerName: billingCtx?.responsibleLawyerName,
183 matterNumber: billingCtx?.matterNumber,
184 clientNumber: billingCtx?.clientNumber,
185 }), agent.definition.id),
186 ),
187 );
188 const allFindings = settled.flatMap((r, i) => {
189 if (r.status === "fulfilled") return r.value;
190 logger.warn("Agent failed during round — skipping its findings", {
191 agentId: activeAgents[i].definition.id,
192 round: goal.round,
193 error: (r.reason as Error)?.message,
194 });
195 return [];
196 });
197
198 for (const finding of allFindings) {
199 finding.round = goal.round;
200 intraMemory.recordFinding(finding.agentId, finding);
201 // Write to the intra-round whiteboard so persistRoundMemory can roll it up
202 intraMemory.addSharedContext(
203 `[${finding.agentName}] ${finding.content.replace(/\s+/g, " ").slice(0, 200)}`,
204 );
205 }
206
207 // ── Step 7: Haiku rollup → inter-round memory ───────────────────────────
208 await this.persistRoundMemory(task, goal, allFindings, intraMemory);
209
210 const state: RoundState = {
211 roundId,
212 goal,
213 activeAgentIds: activeDefinitions.map((a) => a.id),
214 edges,
215 messages,
216 findings: allFindings,
217 status: "complete",
218 startedAt: new Date(),
219 completedAt: new Date(),
220 };
221
222 auditLogger.write({
223 event: "round.complete",
224 actorId: ACTOR_SYSTEM,
225 taskId: task.id,
226 data: {
227 round: goal.round,
228 phase: goal.phase,
229 roundId,
230 agents: activeDefinitions.map((a) => ({ id: a.id, name: a.name })),
231 findings: allFindings.length,
232 edges: edges.length,
233 },
234 });
235
236 logger.info("DyTopo round complete", {
237 round: goal.round,
238 findings: allFindings.length,
239 messages: messages.length,
240 });
241
242 return state;
243 }
244
245 // ─── Private helpers ────────────────────────────────────────────────────────
246
247 private async recruitAgents(goal: RoundGoal, task: Task): Promise<AgentDefinition[]> {
248 const phaseQueries: Record<string, { tier?: 1 | 2 | 3 }> = {
249 intake: { tier: 1 },
250 research: { tier: 2 },
251 analysis: { tier: 2 },
252 drafting: { tier: 2 },
253 review: { tier: 2 },
254 verification: { tier: 2 },
255 delivery: { tier: 1 },
256 };
257 const tierOpt = phaseQueries[goal.phase] ?? {};
258 const topK = Config.dytopo.maxAgentsPerRound - 1;
259
260 // From prior rounds in this task, collect agents whose findings were not
261 // challenged (positive) and agents whose findings were challenged (negative).
262 // Use these to bias recruitment toward historically effective agents.
263 const positive: string[] = [];
264 const negative: string[] = [];
265 for (const round of task.rounds) {
266 for (const f of round.findings) {
267 if (f.challenged) negative.push(f.agentId);
268 else positive.push(f.agentId);
269 }
270 }
271
272 // Deduplicate and cap to avoid blowing out the recommend() request.
273 const uniquePositive = [...new Set(positive)].slice(0, 8);
274 const uniqueNegative = [...new Set(negative)].slice(0, 4);
275
276 // Semantic search — always run to ensure broad, relevant candidate pool.
277 const candidates = uniquePositive.length
278 ? await this.registry.recommend(goal.description, {
279 positive: uniquePositive,
280 negative: uniqueNegative,
281 ...tierOpt,
282 topK,
283 })
284 : await this.registry.search(goal.description, { ...tierOpt, topK });
285
286 // Q-learning rerank — promotes agents with strong historical performance
287 // for this exact (phase, jurisdiction, workflowType) combination.
288 // The learning layer uses epsilon-greedy exploration so it keeps discovering
289 // new agents even as it exploits known good ones.
290 const rankedIds = agentLearning.rankCandidates(
291 goal.phase,
292 task.jurisdiction,
293 task.workflowType,
294 candidates.map((a) => a.id),
295 );
296
297 // Restore AgentDefinition objects in the new ranked order.
298 const byId = new Map(candidates.map((a) => [a.id, a]));
299 return rankedIds.map((id) => byId.get(id)).filter((a): a is AgentDefinition => a != null);
300 }
301
302 private async fetchAgentMemories(
303 agents: AgentDefinition[],
304 task: Task,
305 goal: RoundGoal,
306 ): Promise<Map<string, import("../types.js").MemoryEntry[]>> {
307 const map = new Map<string, import("../types.js").MemoryEntry[]>();
308 await Promise.all(
309 agents.map(async (agent) => {
310 const entries = await this.memory.query(goal.description, {
311 taskId: task.id,
312 agentId: agent.id,
313 beforeRound: goal.round,
314 topK: 6,
315 });
316 // Also fetch task-level summaries
317 const taskEntries = await this.memory.query(goal.description, {
318 taskId: task.id,
319 beforeRound: goal.round,
320 topK: 4,
321 });
322 map.set(agent.id, [...entries, ...taskEntries]);
323 }),
324 );
325 return map;
326 }
327
328 private async buildCommGraph(
329 needs: NeedDescriptor[],
330 offers: OfferDescriptor[],
331 agents: AgentDefinition[],
332 ): Promise<CommunicationEdge[]> {
333 // Embed all descriptors in batch
334 const needTexts = needs.map((n) => n.text);
335 const offerTexts = offers.map((o) => o.text);
336 const allTexts = [...needTexts, ...offerTexts];
337
338 const embeddings = await embedBatch(allTexts);
339
340 const needEmbeddings = embeddings.slice(0, needs.length).map((e) => e.embedding);
341 const offerEmbeddings = embeddings.slice(needs.length).map((e) => e.embedding);
342
343 const edges: CommunicationEdge[] = [];
344 const threshold = Config.dytopo.similarityThreshold;
345
346 for (let i = 0; i < needs.length; i++) {
347 for (let j = 0; j < offers.length; j++) {
348 // An agent does not route messages to itself
349 if (needs[i].agentId === offers[j].agentId) continue;
350 const sim = cosineSimilarity(needEmbeddings[i], offerEmbeddings[j]);
351 if (sim >= threshold) {
352 edges.push({
353 from: offers[j].agentId, // offering agent → sends to needing agent
354 to: needs[i].agentId,
355 similarity: sim,
356 offerText: offers[j].text,
357 });
358 }
359 }
360 }
361
362 // Sort edges by similarity descending for cleaner logs
363 return edges.sort((a, b) => b.similarity - a.similarity);
364 }
365
366 private routeMessages(
367 edges: CommunicationEdge[],
368 offers: OfferDescriptor[],
369 round: number,
370 ): AgentMessage[] {
371 const offerMap = new Map(offers.map((o) => [o.agentId, o.text]));
372 return edges.map((edge) => ({
373 id: uuidv4(),
374 from: edge.from,
375 to: edge.to,
376 content: `[Offer from ${edge.from}] ${(offerMap.get(edge.from) ?? "").slice(0, 500)}`,
377 round,
378 timestamp: new Date(),
379 }));
380 }
381
382 /**
383 * Persist intra-round findings as individual memory entries, then synthesize
384 * a round-level rollup via Haiku. The rollup is a 2-3 sentence digest of the
385 * round's key conclusions — much richer than a string truncation.
386 */
387 private async persistRoundMemory(
388 task: Task,
389 goal: RoundGoal,
390 findings: Finding[],
391 intraMemory: IntraRoundMemoryStore,
392 ): Promise<void> {
393 // Write individual finding memories in parallel
394 await Promise.all(
395 findings.map((f) =>
396 this.memory.writeFindingMemory({
397 taskId: task.id,
398 round: goal.round,
399 phase: goal.phase,
400 agentId: f.agentId,
401 finding: f,
402 }),
403 ),
404 );
405
406 // Build the round rollup — Haiku synthesis of all findings, falling back to
407 // the naive concatenation if the model call fails so memory always gets written.
408 let summaryContent: string;
409 if (findings.length) {
410 const bulletList = findings
411 .slice(0, 12)
412 .map((f) => `- [${f.agentName}] ${f.content.replace(/\s+/g, " ").slice(0, 150)}`)
413 .join("\n");
414 try {
415 const model = selectModel({ tier: 3, type: "tool", taskType: "descriptor" });
416 const provider = getProvider(model);
417 const response = await provider.chat({
418 model: resolveModelId(model),
419 maxTokens: 300,
420 system: "You are a legal analysis synthesizer. Produce a concise inter-round memory digest.",
421 messages: [{
422 role: "user",
423 content: `Round ${goal.round} (${goal.phase}) findings:\n${bulletList}\n\nSummarise the key legal conclusions from this round in 2-3 sentences. Be specific — name parties, statutes, or doctrines where present. This summary will be retrieved as memory by agents in the next round.`,
424 }],
425 });
426 const textBlock = response.content.find((b) => b.type === "text");
427 summaryContent = textBlock?.type === "text" ? textBlock.text.trim() : bulletList;
428 } catch {
429 summaryContent = `Round ${goal.round} key findings: ${findings.slice(0, 3).map((f) => f.content.slice(0, 100)).join("; ")}`;
430 }
431 } else {
432 summaryContent = `Round ${goal.round} (${goal.phase}): No findings produced.`;
433 }
434
435 await this.memory.writeRoundSummary({
436 taskId: task.id,
437 round: goal.round,
438 phase: goal.phase,
439 summary: summaryContent,
440 findingCount: findings.length,
441 });
442 }
443}