Big Michael in a channel
Trace an @BigMichael mention from Teams or Slack: the shared dispatcher parses the command, answers instantly or dispatches async work to the orchestrator, and proactive notifications post results back when the matter task completes.
src/orchestrator.ts1137 lines · Orchestrator L133–1137
Outline 30 symbols
- PHASE_SEQUENCES const
- recordOrchestratorCost function
- parseJsonObject function
- Orchestrator class export
- constructor method
- init method
- submitTask method
- getTask method
- listTasks method
- deleteTask method
- assignLawyers method
- listTemplates method
- submitFromTemplate method
- approveGate method
- rejectGate method
- loadExternalAgents method
- loadMikeOSSWorkflows method
- loadLavernWorkflows method
- loadPlugins method
- emit method
- runTask method
- recordAgentOutcomes method
- runPhase method
- generateRoundGoal method
- synthesise method
- tabulate method
- buildSourceTextMap method
- persistTasks method
- restoreTasks method
- waitForGates method
1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 Discover Legal
3// This program is free software: you can redistribute it and/or modify it
4// under the terms of the GNU Affero General Public License as published by
5// the Free Software Foundation, either version 3 of the License, or
6// (at your option) any later version. See <https://www.gnu.org/licenses/>.
7
8/**
9 * Top-level orchestrator — ties the full system together.
10 *
11 * Lifecycle per task:
12 * init → plan phases → for each phase: run DyTopo rounds → apply protocols → gate check → next phase
13 * → final synthesis
14 *
15 * The Root Orchestrator agent (tier 0) generates all RoundGoals via Claude.
16 * The DyTopo engine assembles the agent graph per round from the registry.
17 * Findings flow through the debate + verification protocols before final output.
18 */
19
20import { EventEmitter } from "events";
21import { readdir, readFile, writeFile, rename } from "fs/promises";
22import { join, extname } from "path";
23import { v4 as uuidv4 } from "uuid";
24import { Config } from "./config.js";
25import { logger } from "./logger.js";
26import { getProvider, resolveModelId } from "./providers/index.js";
27import { selectModel, shouldUseThinking } from "./routing/model.js";
28import { auditLogger, ACTOR_SYSTEM } from "./audit/index.js";
29import { AgentRegistry } from "./agents/registry.js";
30import { Agent } from "./agents/base.js";
31import { ROOT_ORCHESTRATOR, ALL_AGENT_DEFINITIONS } from "./agents/definitions.js";
32import { SettingsStore } from "./settings/index.js";
33import { ProfileStore } from "./auth/index.js";
34import { ClientStore } from "./clients/index.js";
35import { TimeStore } from "./time/index.js";
36import { OcgStore } from "./ocg/index.js";
37import { PreBillStore } from "./billing/prebill.js";
38import { agentLearning } from "./learning/index.js";
39import { DyTopoEngine } from "./dytopo/engine.js";
40import type { AgentBillingCtx } from "./dytopo/engine.js";
41import { DocketMonitor } from "./dockets/monitor.js";
42import { InterRoundMemoryStore } from "./memory/index.js";
43import { KnowledgeStore } from "./knowledge/index.js";
44import { TemplateStore } from "./templates/store.js";
45import { LavernAdapter, LavernWorkflowAdapter, instantiateTemplate, fromExternalConfig, fromMikeOSSWorkflow, sanitizePromptContent } from "./adapters/lavern.js";
46import type { TaskTemplate, ExternalAgentConfig, MikeOSSWorkflow } from "./adapters/lavern.js";
47import { pluginRegistry } from "./adapters/plugin.js";
48import {
49 applyCitationGate,
50 runDebate,
51 runVerificationPipeline,
52 identifyGateRequests,
53} from "./protocols/index.js";
54import { detectNosLegal } from "./services/classifier.js";
55import { costStore, calcCostUsd, calcWattHours } from "./cost/index.js";
56import { jobQueue } from "./queue/index.js";
57import { isOllamaModel, isLocalModel } from "./providers/index.js";
58import { BudgetMonitor } from "./budget/index.js";
59import { BudgetPredictor } from "./budget/predictor.js";
60import { ConflictGraph } from "./graph/conflict.js";
61import { RegPulseMonitor } from "./regulatory/pulse.js";
62import { DeadlineEngine } from "./deadlines/engine.js";
63import { CitationEngine } from "./citations/engine.js";
64import { MatterHealthMonitor } from "./matters/health.js";
65import { PlaybookStore, PlaybookBuilder } from "./playbook/index.js";
66import { InvoiceValidator } from "./billing/invoice-validator.js";
67import { RedlineEngine } from "./redline/engine.js";
68import { HeadnoteEngine } from "./headnotes/engine.js";
69import { BriefingEngine } from "./briefing/index.js";
70import { PrecedentGenerator } from "./precedent/generator.js";
71import type {
72 Task,
73 WorkflowType,
74 TaskPhase,
75 RoundGoal,
76 TaskTable,
77} from "./types.js";
78
79const PHASE_SEQUENCES: Record<WorkflowType, TaskPhase[]> = {
80 counsel: ["intake", "research", "drafting", "delivery"],
81 roundtable: ["intake", "research", "analysis", "drafting", "review", "delivery"],
82 adversarial: ["intake", "research", "analysis", "review", "verification", "delivery"],
83 review: ["intake", "analysis", "review", "verification", "delivery"],
84 tabulate: ["intake", "analysis", "delivery"],
85 full_bench: ["intake", "research", "analysis", "drafting", "review", "verification", "delivery"],
86 // Lavern workflow types
87 legal_design: ["intake", "research", "analysis", "drafting", "review", "delivery"],
88 pre_engagement: ["intake", "research", "analysis", "delivery"],
89};
90
91/**
92 * Best-effort extraction of a single JSON object from an LLM response.
93 * Strips markdown fences and isolates the outermost {...} before parsing.
94 */
95function recordOrchestratorCost(
96 response: import("./providers/index.js").ChatResponse,
97 modelId: string,
98 context: import("./cost/index.js").CostContext,
99 taskId?: string,
100): void {
101 const isLocal = isOllamaModel(modelId) || isLocalModel(modelId);
102 const bare = resolveModelId(modelId);
103 const cw = response.usage.cacheWriteTokens ?? 0;
104 const cr = response.usage.cacheReadTokens ?? 0;
105 costStore.record({
106 model: bare,
107 provider: isLocal ? (isOllamaModel(modelId) ? "ollama" : "local") : "anthropic",
108 inputTokens: response.usage.inputTokens,
109 outputTokens: response.usage.outputTokens,
110 ...(cw ? { cacheWriteTokens: cw } : {}),
111 ...(cr ? { cacheReadTokens: cr } : {}),
112 costUsd: isLocal ? null : calcCostUsd(bare, response.usage.inputTokens, response.usage.outputTokens, cw, cr),
113 estimatedWh: isLocal ? calcWattHours(Config.local.inferenceWatts, response.durationMs) : null,
114 estimatedWatts: isLocal ? Config.local.inferenceWatts : null,
115 durationMs: response.durationMs,
116 context,
117 taskId,
118 });
119}
120
121function parseJsonObject(text: string): unknown | undefined {
122 const stripped = text.replace(/```(?:json)?/gi, "").trim();
123 const start = stripped.indexOf("{");
124 const end = stripped.lastIndexOf("}");
125 if (start === -1 || end === -1 || end <= start) return undefined;
126 try {
127 return JSON.parse(stripped.slice(start, end + 1));
128 } catch {
129 return undefined;
130 }
131}
132
133export class Orchestrator {
134 readonly registry: AgentRegistry;
135 readonly memory: InterRoundMemoryStore;
136 readonly knowledge: KnowledgeStore;
137 readonly templates: TemplateStore;
138 readonly settings: SettingsStore;
139 readonly profiles: ProfileStore;
140 readonly clients: ClientStore;
141 readonly time: TimeStore;
142 readonly ocg: OcgStore;
143 readonly budgetMonitor: BudgetMonitor;
144 readonly budgetPredictor: BudgetPredictor;
145 readonly preBills: PreBillStore;
146 readonly conflictGraph: ConflictGraph;
147 readonly regPulse = new RegPulseMonitor();
148 readonly docketMonitor: DocketMonitor;
149 readonly deadlines = new DeadlineEngine();
150 readonly citations = new CitationEngine();
151 readonly matterHealth = new MatterHealthMonitor();
152 readonly playbookStore: PlaybookStore;
153 readonly playbookBuilder = new PlaybookBuilder();
154 readonly invoiceValidator = new InvoiceValidator();
155 readonly redline = new RedlineEngine();
156 readonly headnotes = new HeadnoteEngine();
157 readonly briefing = new BriefingEngine();
158 readonly precedents = new PrecedentGenerator();
159
160 private readonly tasks: Map<string, Task> = new Map();
161 private readonly gateEmitter = new EventEmitter();
162 readonly progressEmitter = new EventEmitter();
163 private engine!: DyTopoEngine;
164 private rootAgent!: Agent;
165
166 constructor() {
167 this.registry = new AgentRegistry();
168 this.memory = new InterRoundMemoryStore();
169 this.knowledge = new KnowledgeStore();
170 this.templates = new TemplateStore();
171 this.settings = new SettingsStore();
172 this.profiles = new ProfileStore();
173 this.clients = new ClientStore();
174 this.time = new TimeStore();
175 this.ocg = new OcgStore();
176 this.budgetMonitor = new BudgetMonitor(this.time, this.clients);
177 this.budgetPredictor = new BudgetPredictor();
178 this.preBills = new PreBillStore(Config.persistence.preBillsFile);
179 this.conflictGraph = new ConflictGraph();
180 this.docketMonitor = new DocketMonitor(Config.dockets.file);
181 this.playbookStore = new PlaybookStore(Config.persistence.playbooksFile ?? "./data/playbooks.json");
182 }
183
184 async init(): Promise<void> {
185 // Load persisted admin settings first so they apply before any task runs.
186 await this.settings.init();
187 await this.profiles.init();
188 await this.clients.init();
189 await this.time.init();
190 await this.ocg.init();
191 await this.preBills.init();
192 await this.conflictGraph.connect();
193 await this.playbookStore.init();
194 await agentLearning.init();
195 await Promise.all([
196 this.registry.init(),
197 this.memory.init(),
198 this.knowledge.init(),
199 this.templates.load(),
200 ]);
201
202 // Seed agent registry if empty
203 const existing = await this.registry.listAll();
204 if (!existing.length) {
205 logger.info("Seeding agent registry with default agents…");
206 await this.registry.registerAll(ALL_AGENT_DEFINITIONS);
207 }
208
209 // Load external and Lavern agents from filesystem
210 await this.loadExternalAgents();
211
212 // Load workflow presets — MikeOSS (native format) and Lavern (9 workflow types)
213 await this.loadMikeOSSWorkflows();
214 await this.loadLavernWorkflows();
215
216 // Load generic JSON plugins from adapters/external/ and register their
217 // agents, templates, and tools into the live stores.
218 await this.loadPlugins();
219
220 // Restore persisted tasks and recent audit history into the in-memory buffer
221 await this.restoreTasks();
222 await auditLogger.restoreFromFile();
223
224 this.rootAgent = new Agent(ROOT_ORCHESTRATOR);
225 this.engine = new DyTopoEngine({
226 registry: this.registry,
227 memory: this.memory,
228 knowledge: this.knowledge,
229 pinnedAgents: [ROOT_ORCHESTRATOR],
230 });
231
232 // Start regulatory pulse monitor if enabled
233 if (this.regPulse.isEnabled()) {
234 this.regPulse.start(() => this.listTasks());
235 logger.info("RegPulseMonitor started");
236 }
237
238 // Start docket monitoring if enabled
239 await this.docketMonitor.init();
240 this.docketMonitor.setKnowledgeStore(this.knowledge);
241 if (this.docketMonitor.isEnabled()) {
242 this.docketMonitor.start();
243 logger.info("DocketMonitor started");
244 }
245
246 // Load deadline rule files
247 await this.deadlines.loadRulesDir(Config.deadlines.rulesDir);
248
249 logger.info("Orchestrator ready");
250 }
251
252 // ─── Task management ──────────────────────────────────────────────────────
253
254 private static readonly MAX_CONCURRENT_TASKS = 10;
255 private static readonly MAX_DESCRIPTION_CHARS = 20_000;
256
257 async submitTask(params: {
258 description: string;
259 workflowType: WorkflowType;
260 documentIds?: string[];
261 clientNumber?: string;
262 matterNumber?: string;
263 jurisdiction?: string;
264 createdByProfileId?: string;
265 }): Promise<Task> {
266 if (params.description.length > Orchestrator.MAX_DESCRIPTION_CHARS) {
267 throw new Error(
268 `Task description exceeds the ${Orchestrator.MAX_DESCRIPTION_CHARS.toLocaleString()} character limit ` +
269 `(${params.description.length.toLocaleString()} received). Please shorten the description.`,
270 );
271 }
272 const running = Array.from(this.tasks.values())
273 .filter((t) => t.status === "running" || t.status === "pending").length;
274 if (running >= Orchestrator.MAX_CONCURRENT_TASKS) {
275 throw new Error(`Server at capacity: ${running} tasks already running. Please wait for one to complete.`);
276 }
277 const phases = PHASE_SEQUENCES[params.workflowType];
278 if (!phases) {
279 throw new Error(`Unknown workflowType '${params.workflowType}'. Valid values: ${Object.keys(PHASE_SEQUENCES).join(", ")}`);
280 }
281 const task: Task = {
282 id: uuidv4(),
283 description: params.description,
284 jurisdiction: params.jurisdiction?.trim().toUpperCase().slice(0, 20) || undefined,
285 clientNumber: params.clientNumber?.trim().slice(0, 100) || undefined,
286 matterNumber: params.matterNumber?.trim().slice(0, 100) || undefined,
287 documentIds: params.documentIds ?? [],
288 createdByProfileId: params.createdByProfileId,
289 workflowType: params.workflowType,
290 status: "pending",
291 currentPhase: phases[0],
292 currentRound: 0,
293 maxRounds: Config.dytopo.maxRounds,
294 activeAgentIds: [],
295 rounds: [],
296 findings: [],
297 pendingGates: [],
298 createdAt: new Date(),
299 updatedAt: new Date(),
300 };
301
302 // Auto-detect NOSLEGAL taxonomy tags from the description (best-effort, never blocks).
303 try {
304 task.noslegal = await detectNosLegal(params.description, params.description);
305 } catch {
306 // detectNosLegal never throws, but guard defensively.
307 }
308
309 // Open a time entry when a profile is associated with this task.
310 if (params.createdByProfileId) {
311 const profile = this.profiles.get(params.createdByProfileId);
312 if (profile) {
313 const entry = this.time.open({
314 profileId: profile.id,
315 profileName: profile.name,
316 taskId: task.id,
317 matterNumber: task.matterNumber,
318 clientNumber: task.clientNumber,
319 description: `Task: ${task.description.slice(0, 200)}`,
320 event: "task_run",
321 startedAt: new Date(),
322 });
323 task.activeTimeEntryId = entry.id;
324 }
325 }
326
327 this.tasks.set(task.id, task);
328 logger.info("Task submitted", { taskId: task.id, workflow: params.workflowType });
329 auditLogger.write({ event: "task.created", actorId: task.createdByProfileId ?? ACTOR_SYSTEM, taskId: task.id, data: { description: params.description, workflowType: params.workflowType } });
330 await this.persistTasks();
331
332 // Run asynchronously — callers poll getTask() for status
333 this.runTask(task).catch((err) => {
334 logger.error("Task execution failed", { taskId: task.id, error: err.message });
335 task.status = "failed";
336 task.error = err.message;
337 // Close the time entry even on failure.
338 if (task.activeTimeEntryId) {
339 const closedOnFail = this.time.close(task.activeTimeEntryId);
340 task.activeTimeEntryId = undefined;
341 if (closedOnFail) {
342 jobQueue.enqueue("summarize_time_entry", {
343 entryId: closedOnFail.id,
344 taskId: task.id,
345 clientNumber: task.clientNumber,
346 }).catch((e) => logger.warn("Failed to enqueue summarize job", { error: (e as Error).message }));
347 }
348 }
349 this.emit(task.id, "failed", { error: err.message });
350 auditLogger.write({ event: "task.failed", actorId: ACTOR_SYSTEM, taskId: task.id, data: { error: err.message } });
351 });
352
353 return task;
354 }
355
356 getTask(taskId: string): Task | null {
357 return this.tasks.get(taskId) ?? null;
358 }
359
360 listTasks(): Task[] {
361 return Array.from(this.tasks.values());
362 }
363
364 /** Delete a matter and its Qdrant memory entries. Returns false if it didn't exist. */
365 deleteTask(taskId: string): boolean {
366 const task = this.tasks.get(taskId);
367 if (task) {
368 if (task.status === "running" || task.status === "pending" || task.status === "awaiting_gate") {
369 throw new Error(`Cannot delete a task in status '${task.status}'; wait for completion or cancel first`);
370 }
371 }
372 const existed = this.tasks.delete(taskId);
373 if (existed) {
374 this.persistTasks().catch((err) => logger.warn("Failed to persist tasks", { error: err.message }));
375 // Clean up orphaned inter-round memory vectors so deleted task data
376 // cannot be surfaced by future semantic memory queries.
377 this.memory.deleteByTaskId(taskId).catch((err) =>
378 logger.warn("Failed to delete task memory from Qdrant", { taskId, error: (err as Error).message }),
379 );
380 auditLogger.write({ event: "task.deleted", actorId: ACTOR_SYSTEM, taskId, data: {} });
381 logger.info("Task deleted", { taskId });
382 }
383 return existed;
384 }
385
386 /** Set the lawyer(s) assigned to a matter (a partner action). */
387 assignLawyers(taskId: string, lawyerIds: string[], actorId: string = ACTOR_SYSTEM): Task | null {
388 const task = this.tasks.get(taskId);
389 if (!task) return null;
390 const prev = task.assignedLawyerIds ?? [];
391 const valid = [...new Set(lawyerIds)].slice(0, 50).filter((id) => this.profiles.get(id));
392 task.assignedLawyerIds = valid;
393 task.updatedAt = new Date();
394 this.persistTasks().catch((err) => logger.warn("Failed to persist tasks", { error: err.message }));
395 const added = valid.filter((id) => !prev.includes(id));
396 const removed = prev.filter((id) => !valid.includes(id));
397 auditLogger.write({ event: "task.assigned", actorId, taskId, data: { lawyerIds: valid, added, removed } });
398 return task;
399 }
400
401 listTemplates(): TaskTemplate[] {
402 return this.templates.list();
403 }
404
405 async submitFromTemplate(
406 templateId: string,
407 substitutions: Record<string, string> = {},
408 documentIds?: string[],
409 refs?: { clientNumber?: string; matterNumber?: string; createdByProfileId?: string },
410 ): Promise<Task> {
411 const template = this.templates.get(templateId);
412 if (!template) throw new Error(`Template not found: ${templateId}`);
413 const { description, workflowType } = instantiateTemplate(template, substitutions);
414 return this.submitTask({ description, workflowType, documentIds, ...refs });
415 }
416
417 /**
418 * Human approves or rejects a gate request.
419 * Approved findings proceed to output; rejected are discarded.
420 */
421 approveGate(taskId: string, gateId: string, note?: string, reviewerProfileId?: string): void {
422 const task = this.tasks.get(taskId);
423 if (!task) throw new Error(`Task not found: ${taskId}`);
424 const gate = task.pendingGates.find((g) => g.id === gateId);
425 if (!gate) throw new Error(`Gate not found: ${gateId}`);
426 gate.status = "approved";
427 gate.reviewerNote = note;
428 gate.reviewedAt = new Date();
429 task.updatedAt = new Date();
430 auditLogger.write({ event: "gate.approved", actorId: reviewerProfileId ?? ACTOR_SYSTEM, taskId, data: { gateId, note } });
431
432 // Record a gate_review time entry for the reviewing lawyer.
433 if (reviewerProfileId) {
434 const profile = this.profiles.get(reviewerProfileId);
435 if (profile) {
436 const entry = this.time.open({
437 profileId: profile.id,
438 profileName: profile.name,
439 taskId,
440 matterNumber: task.matterNumber,
441 clientNumber: task.clientNumber,
442 description: `Gate review: ${gate.finding.content.slice(0, 100)}`,
443 event: "gate_review",
444 startedAt: new Date(),
445 });
446 const closedApprove = this.time.close(entry.id);
447 if (closedApprove) {
448 jobQueue.enqueue("summarize_time_entry", {
449 entryId: closedApprove.id,
450 taskId,
451 clientNumber: task.clientNumber,
452 }).catch((e) => logger.warn("Failed to enqueue summarize job", { error: (e as Error).message }));
453 }
454 }
455 }
456
457 this.gateEmitter.emit(`gates:${taskId}`);
458 this.persistTasks().catch((err) => logger.warn("Failed to persist tasks", { error: err.message }));
459 }
460
461 rejectGate(taskId: string, gateId: string, reason: string, reviewerProfileId?: string): void {
462 const task = this.tasks.get(taskId);
463 if (!task) throw new Error(`Task not found: ${taskId}`);
464 const gate = task.pendingGates.find((g) => g.id === gateId);
465 if (!gate) throw new Error(`Gate not found: ${gateId}`);
466 gate.status = "rejected";
467 gate.reviewerNote = reason;
468 gate.reviewedAt = new Date();
469 task.findings = task.findings.filter((f) => f.id !== gate.findingId);
470 task.updatedAt = new Date();
471 auditLogger.write({ event: "gate.rejected", actorId: reviewerProfileId ?? ACTOR_SYSTEM, taskId, data: { gateId, reason } });
472
473 // Record a gate_review time entry for the reviewing lawyer.
474 if (reviewerProfileId) {
475 const profile = this.profiles.get(reviewerProfileId);
476 if (profile) {
477 const entry = this.time.open({
478 profileId: profile.id,
479 profileName: profile.name,
480 taskId,
481 matterNumber: task.matterNumber,
482 clientNumber: task.clientNumber,
483 description: `Gate review: ${gate.finding.content.slice(0, 100)}`,
484 event: "gate_review",
485 startedAt: new Date(),
486 });
487 const closedReject = this.time.close(entry.id);
488 if (closedReject) {
489 jobQueue.enqueue("summarize_time_entry", {
490 entryId: closedReject.id,
491 taskId,
492 clientNumber: task.clientNumber,
493 }).catch((e) => logger.warn("Failed to enqueue summarize job", { error: (e as Error).message }));
494 }
495 }
496 }
497
498 this.gateEmitter.emit(`gates:${taskId}`);
499 this.persistTasks().catch((err) => logger.warn("Failed to persist tasks", { error: err.message }));
500 }
501
502 // ─── External agent loader ────────────────────────────────────────────────
503
504 private async loadExternalAgents(): Promise<void> {
505 const dirs: Array<{ path: string; type: "external" | "lavern" }> = [
506 { path: join(process.cwd(), "agents", "external"), type: "external" },
507 { path: join(process.cwd(), "agents", "lavern"), type: "lavern" },
508 ];
509
510 const lavernAdapter = new LavernAdapter();
511
512 for (const { path: dir, type } of dirs) {
513 let entries: string[];
514 try {
515 entries = await readdir(dir);
516 } catch {
517 continue; // directory doesn't exist or isn't readable — skip silently
518 }
519
520 const defs = [];
521 for (const entry of entries) {
522 if (extname(entry) !== ".json") continue;
523 try {
524 const raw = await readFile(join(dir, entry), "utf8");
525 const parsed = JSON.parse(raw);
526 const items = Array.isArray(parsed) ? parsed : [parsed];
527 if (type === "lavern") {
528 defs.push(...lavernAdapter.fromConfigs(items));
529 } else {
530 defs.push(...(items as ExternalAgentConfig[]).map(fromExternalConfig));
531 }
532 } catch (err) {
533 logger.warn("Failed to load external agent file", { file: entry, error: (err as Error).message });
534 }
535 }
536
537 if (defs.length) {
538 await this.registry.registerAll(defs);
539 logger.info("External agents registered", { source: type, count: defs.length });
540 }
541 }
542 }
543
544 // ─── MikeOSS workflow loader ──────────────────────────────────────────────
545
546 /**
547 * Load MikeOSS workflow presets (native MikeOSSWorkflow format) from
548 * workflows/mikeoss/ and register each as a TaskTemplate via the adapter.
549 * MikeOSS workflows are task specifications, not agents — our agent system
550 * executes them. Files may contain a single workflow or an array.
551 */
552 private async loadMikeOSSWorkflows(): Promise<void> {
553 const dir = join(process.cwd(), "workflows", "mikeoss");
554 let entries: string[];
555 try {
556 entries = await readdir(dir);
557 } catch {
558 return; // directory doesn't exist — skip silently
559 }
560
561 let loaded = 0;
562 for (const entry of entries) {
563 if (extname(entry) !== ".json") continue;
564 try {
565 const raw = await readFile(join(dir, entry), "utf8");
566 const parsed = JSON.parse(raw) as MikeOSSWorkflow | MikeOSSWorkflow[];
567 const items = Array.isArray(parsed) ? parsed : [parsed];
568 for (const wf of items) {
569 this.templates.add(fromMikeOSSWorkflow(wf));
570 loaded++;
571 }
572 } catch (err) {
573 logger.warn("Failed to load MikeOSS workflow file", { file: entry, error: (err as Error).message });
574 }
575 }
576
577 if (loaded) logger.info("MikeOSS workflows registered as templates", { count: loaded });
578 }
579
580 /**
581 * Load Lavern workflow definitions from workflows/laverne/ and register each
582 * as a TaskTemplate. Lavern workflows specify step pipelines and gate conditions;
583 * we map them to our WorkflowType phase sequences.
584 */
585 private async loadLavernWorkflows(): Promise<void> {
586 const dir = join(process.cwd(), "workflows", "laverne");
587 const adapter = new LavernWorkflowAdapter();
588 const templates = await adapter.load(dir);
589 for (const t of templates) this.templates.add(t);
590 if (templates.length) logger.info("Lavern workflows registered as templates", { count: templates.length });
591 }
592
593 // ─── Generic plugin loader ────────────────────────────────────────────────
594
595 /**
596 * Load JSON plugin packages from adapters/external/ via the PluginRegistry.
597 * Each package may contribute tools (registered into globalToolRegistry),
598 * agent definitions (registered into the AgentRegistry), and workflow
599 * templates (added to the TemplateStore).
600 */
601 private async loadPlugins(): Promise<void> {
602 const dir = join(process.cwd(), "adapters", "external");
603 await pluginRegistry.loadDirectory(dir);
604 if (!pluginRegistry.size) return;
605
606 const { globalToolRegistry } = await import("./tools/index.js");
607
608 const tools = pluginRegistry.allTools();
609 for (const tool of tools) globalToolRegistry.register(tool as Parameters<typeof globalToolRegistry.register>[0]);
610
611 const agents = pluginRegistry.allAgents();
612 if (agents.length) {
613 await this.registry.registerAll(agents);
614 logger.info("Plugin agents registered", { count: agents.length });
615 }
616
617 const workflows = pluginRegistry.allWorkflows();
618 for (const wf of workflows) this.templates.add(wf);
619 if (workflows.length) logger.info("Plugin templates registered", { count: workflows.length });
620 }
621
622 // ─── Internal task runner ─────────────────────────────────────────────────
623
624 private emit(taskId: string, type: string, data: unknown): void {
625 this.progressEmitter.emit(`task:${taskId}`, { type, data });
626 }
627
628 private async runTask(task: Task): Promise<void> {
629 task.status = "running";
630 this.emit(task.id, "started", { taskId: task.id, workflowType: task.workflowType });
631 auditLogger.write({ event: "task.started", actorId: ACTOR_SYSTEM, taskId: task.id, data: { workflowType: task.workflowType } });
632 const phases = PHASE_SEQUENCES[task.workflowType];
633
634 for (const phase of phases) {
635 if (task.currentRound >= task.maxRounds) {
636 logger.warn("Task hit maxRounds cap", { taskId: task.id, maxRounds: task.maxRounds });
637 break;
638 }
639 task.currentPhase = phase;
640 task.updatedAt = new Date();
641 this.emit(task.id, "phase", { phase });
642 await this.runPhase(task, phase);
643
644 // Wait for any pending gates before continuing
645 if (task.pendingGates.some((g) => g.status === "pending")) {
646 task.status = "awaiting_gate";
647 await this.waitForGates(task);
648 task.status = "running";
649 }
650 }
651
652 // Final synthesis by root orchestrator
653 task.output = await this.synthesise(task);
654
655 // For tabulate workflows, also produce a structured spreadsheet-style table.
656 if (task.workflowType === "tabulate") {
657 try {
658 task.table = await this.tabulate(task);
659 } catch (err) {
660 logger.warn("Tabulation failed; falling back to text output only", {
661 taskId: task.id,
662 error: (err as Error).message,
663 });
664 }
665 }
666
667 task.status = "complete";
668 task.completedAt = new Date();
669 task.updatedAt = new Date();
670
671 // Close the time entry now that the task has finished.
672 if (task.activeTimeEntryId) {
673 const closedOnComplete = this.time.close(task.activeTimeEntryId);
674 task.activeTimeEntryId = undefined;
675 if (closedOnComplete) {
676 jobQueue.enqueue("summarize_time_entry", {
677 entryId: closedOnComplete.id,
678 taskId: task.id,
679 clientNumber: task.clientNumber,
680 }).catch((e) => logger.warn("Failed to enqueue summarize job", { error: (e as Error).message }));
681 }
682 }
683
684 // Feed agent performance back into the registry so recommend()-based
685 // recruitment improves over time. High-confidence verified findings → positive
686 // signal; challenged/low-confidence ones → negative signal.
687 this.recordAgentOutcomes(task).catch((err) =>
688 logger.warn("Agent outcome recording failed", { error: (err as Error).message }),
689 );
690
691 this.emit(task.id, "complete", { findings: task.findings.length, output: task.output?.slice(0, 200) });
692 auditLogger.write({ event: "task.complete", actorId: ACTOR_SYSTEM, taskId: task.id, data: { findings: task.findings.length } });
693 this.persistTasks().catch((err) => logger.warn("Failed to persist tasks", { error: err.message }));
694
695 logger.info("Task complete", { taskId: task.id, findings: task.findings.length });
696 }
697
698 /**
699 * Feed task outcomes back into the agent registry's performance scores.
700 * Agents whose findings were verified and not challenged → high signal.
701 * Agents whose findings were challenged, rejected, or very low confidence → low signal.
702 * The registry writes these as `successScore` payload fields so future
703 * recommend()-based recruitment can blend semantic match with proven performance.
704 */
705 private async recordAgentOutcomes(task: Task): Promise<void> {
706 if (!task.findings.length) return;
707
708 // Resolve each finding's phase via a round→phase map (each round runs exactly
709 // one phase) instead of an O(findings²) reverse-search through task.rounds.
710 // The previous fallback to task.currentPhase always attributed unmatched
711 // findings to the final ("delivery") phase, poisoning the per-phase Q-table.
712 const phaseByRound = new Map<number, string>();
713 for (const r of task.rounds) phaseByRound.set(r.goal.round, r.goal.phase);
714
715 // Group findings by (agentId, phase) so Q-learning gets per-phase rewards.
716 const agentPhaseScores = new Map<string, { phase: string; scores: number[] }>();
717 for (const f of task.findings) {
718 const phase = phaseByRound.get(f.round) ?? task.currentPhase;
719 const key = `${f.agentId}::${phase}`;
720 if (!agentPhaseScores.has(key)) {
721 agentPhaseScores.set(key, { phase, scores: [] });
722 }
723 const effective = f.challenged && !f.resolved ? f.confidence * 0.3 : f.confidence;
724 agentPhaseScores.get(key)!.scores.push(effective);
725 }
726
727 const phases = ["intake","research","analysis","drafting","review","verification","delivery"];
728 const done = true; // task is complete
729
730 for (const [key, { phase, scores }] of agentPhaseScores) {
731 const agentId = key.split("::")[0];
732 const avg = scores.reduce((a, b) => a + b, 0) / scores.length;
733 const phaseIdx = phases.indexOf(phase);
734 const nextPhase = phases[phaseIdx + 1] ?? phase;
735
736 // Update Qdrant successScore (for recommend() in future tasks)
737 await this.registry.recordOutcome([agentId], avg);
738
739 // Update RuVector Q-table (for per-phase agent ranking in future tasks)
740 await agentLearning.recordEpisode({
741 phase,
742 nextPhase,
743 jurisdiction: task.jurisdiction,
744 workflowType: task.workflowType,
745 agentId,
746 reward: avg,
747 done,
748 });
749 }
750 }
751
752 private async runPhase(task: Task, phase: TaskPhase): Promise<void> {
753 logger.info("Phase starting", { taskId: task.id, phase });
754 auditLogger.write({ event: "phase.start", actorId: ACTOR_SYSTEM, taskId: task.id, data: { phase } });
755
756 // Root orchestrator generates the round goal for this phase
757 const goal = await this.generateRoundGoal(task, phase);
758 goal.round = ++task.currentRound;
759
760 // Look up tone profile and billing attribution from the task's primary lawyer
761 const primaryProfileId = task.createdByProfileId ?? task.assignedLawyerIds?.[0];
762 const primaryProfile = primaryProfileId ? this.profiles.get(primaryProfileId) : undefined;
763 const lawyerTone = primaryProfile?.toneProfile;
764
765 // Build agent billing context when billing is enabled
766 const billingCtx: AgentBillingCtx | undefined = Config.agentBilling.enabled
767 ? {
768 timeStore: this.time,
769 responsibleLawyerId: primaryProfileId,
770 responsibleLawyerName: primaryProfile?.name,
771 matterNumber: task.matterNumber,
772 clientNumber: task.clientNumber,
773 }
774 : undefined;
775
776 // Run DyTopo round
777 const roundState = await this.engine.runRound(task, goal, lawyerTone, billingCtx);
778 task.rounds.push(roundState);
779
780 // Build source-text map for citation gate (from knowledge store)
781 const sourceTexts = await this.buildSourceTextMap(task.documentIds);
782
783 // Apply protocols to raw findings
784 const rawFindings = roundState.findings;
785 const { passed } = applyCitationGate(rawFindings, sourceTexts);
786
787 // Debate each passing finding. allSettled so a single model error (429/500)
788 // on one finding does not reject the whole phase and fail the entire task,
789 // discarding every other finding. A failed debate leaves the finding as-is.
790 const debateResults = await Promise.allSettled(
791 passed.map((f) => runDebate(f, "adversarial-challenger", task.id)),
792 );
793 const debated = debateResults.map((r, i) => {
794 if (r.status === "fulfilled") return r.value;
795 logger.warn("Debate failed for finding — keeping it unchanged", { findingId: passed[i].id, error: (r.reason as Error)?.message });
796 return passed[i];
797 });
798
799 // Verification pipeline — mutates each finding in place, attaching its
800 // verificationResult (read downstream by identifyGateRequests). allSettled
801 // for the same resilience reason; a failed verification simply leaves the
802 // finding without a verificationResult.
803 const verifyResults = await Promise.allSettled(
804 debated.map((f) => runVerificationPipeline(f, task.id)),
805 );
806 for (let i = 0; i < verifyResults.length; i++) {
807 const r = verifyResults[i];
808 if (r.status === "rejected") {
809 logger.warn("Verification failed for finding", { findingId: debated[i].id, error: (r.reason as Error)?.message });
810 }
811 }
812
813 // Add findings to task
814 task.findings.push(...debated);
815
816 // Identify gate requests
817 const gates = identifyGateRequests(task.id, debated);
818 task.pendingGates.push(...gates);
819
820 task.updatedAt = new Date();
821 this.emit(task.id, "round", {
822 round: task.currentRound,
823 phase,
824 findings: debated.length,
825 gates: gates.length,
826 });
827 auditLogger.write({ event: "phase.complete", actorId: ACTOR_SYSTEM, taskId: task.id, data: { phase, findings: debated.length, gates: gates.length } });
828 logger.info("Phase complete", {
829 taskId: task.id,
830 phase,
831 findings: debated.length,
832 gates: gates.length,
833 });
834 }
835
836 private async generateRoundGoal(task: Task, phase: TaskPhase): Promise<RoundGoal> {
837 const safeDesc = sanitizePromptContent(task.description);
838 const priorPhases = task.rounds.map((r) => r.goal.phase);
839 const prompt = `TASK: ${safeDesc}
840
841WORKFLOW: ${task.workflowType}
842CURRENT PHASE: ${phase}
843PRIOR PHASES COMPLETED: ${priorPhases.join(", ") || "none"}
844FINDINGS SO FAR: ${task.findings.length}
845
846Generate a specific, actionable round goal for the ${phase} phase.
847Format:
848DESCRIPTION: <one paragraph describing what agents should do this round>
849EXPECTED_OUTPUT_1: <first expected output>
850EXPECTED_OUTPUT_2: <second expected output>
851EXPECTED_OUTPUT_3: <third expected output>`;
852
853 const model = selectModel({ tier: 0, taskType: "synthesis" });
854 const provider = getProvider(model);
855 const response = await provider.chat({
856 model: resolveModelId(model),
857 maxTokens: 600,
858 system: ROOT_ORCHESTRATOR.systemPrompt,
859 messages: [{ role: "user", content: prompt }],
860 cacheSystem: true,
861 });
862
863 recordOrchestratorCost(response, model, "round_goal", task.id);
864 const textBlock = response.content.find((b) => b.type === "text");
865 const text = textBlock?.type === "text" ? textBlock.text : "";
866 const descMatch = text.match(/DESCRIPTION:\s*([\s\S]+?)(?=EXPECTED_OUTPUT|$)/i);
867 const outputMatches = [...text.matchAll(/EXPECTED_OUTPUT_\d+:\s*(.+)/gi)];
868
869 return {
870 id: uuidv4(),
871 round: task.currentRound,
872 phase,
873 description: descMatch?.[1]?.trim() ?? `Execute the ${phase} phase for: ${safeDesc}`,
874 expectedOutputs: outputMatches.map((m) => m[1].trim()),
875 };
876 }
877
878 private async synthesise(task: Task): Promise<string> {
879 const safeDesc = sanitizePromptContent(task.description);
880 // Cap each finding and the total to prevent synthesis prompts from exceeding
881 // practical context-window limits when many rounds produce large findings.
882 const findingsSummary = task.findings
883 .filter((f) => !task.pendingGates.some((g) => g.findingId === f.id && g.status === "rejected"))
884 .map((f, i) => `[${i + 1}] (${f.agentName}, Round ${f.round}) ${f.content.slice(0, 5_000)}`)
885 .join("\n\n")
886 .slice(0, 200_000);
887
888 const lawyerTone = (() => {
889 const profileId = task.createdByProfileId ?? task.assignedLawyerIds?.[0];
890 if (!profileId) return undefined;
891 return this.profiles.get(profileId)?.toneProfile;
892 })();
893
894 const toneBlock = lawyerTone
895 ? `\nLAWYER TONE PROFILE — write the final output in this voice:\n${sanitizePromptContent(lawyerTone.injectionSnippet)}\n`
896 : "";
897
898 const prompt = `TASK: ${safeDesc}
899
900ALL FINDINGS FROM ALL ROUNDS:
901${findingsSummary}
902${toneBlock}
903Produce the final legal output for this task. Structure appropriately for the workflow type: ${task.workflowType}.
904Every claim must trace to a specific finding number from the list above.`;
905
906 const model = selectModel({ tier: 0, taskType: "synthesis" });
907 const useThinking = shouldUseThinking({ modelId: model, taskType: "synthesis", tier: 0 });
908 const provider = getProvider(model);
909 const response = await provider.chat({
910 model: resolveModelId(model),
911 // When thinking is on, budget_tokens + output tokens must fit within max_tokens.
912 maxTokens: useThinking ? 16_000 : 4000,
913 system: ROOT_ORCHESTRATOR.systemPrompt,
914 messages: [{ role: "user", content: prompt }],
915 cacheSystem: true,
916 ...(useThinking && { thinking: { budgetTokens: Config.anthropic.thinkingBudgetTokens } }),
917 });
918
919 recordOrchestratorCost(response, model, "synthesis", task.id);
920 const textBlock = response.content.find((b) => b.type === "text");
921 return textBlock?.type === "text" ? textBlock.text : "";
922 }
923
924 /**
925 * Extract the task's findings into a structured table for spreadsheet-style
926 * review. The root orchestrator chooses appropriate columns for the subject
927 * matter and maps each row back to a source finding via `_findingId`.
928 */
929 private async tabulate(task: Task): Promise<TaskTable | undefined> {
930 const safeDesc = sanitizePromptContent(task.description);
931 const findings = task.findings.filter(
932 (f) => !task.pendingGates.some((g) => g.findingId === f.id && g.status === "rejected"),
933 );
934 if (findings.length === 0) return undefined;
935
936 const findingsSummary = findings
937 .map((f) => `id=${f.id} | ${f.agentName} (R${f.round}, conf ${f.confidence.toFixed(2)}): ${f.content}`)
938 .join("\n\n");
939
940 const prompt = `TASK: ${safeDesc}
941
942FINDINGS:
943${findingsSummary}
944
945Extract these findings into a structured table suitable for a spreadsheet review grid.
946If the TASK above names the columns it wants, use exactly those column names and order.
947Otherwise choose 3–6 columns that best capture the structured content for THIS subject matter.
948
949Respond with ONLY valid JSON (no prose, no markdown fences) in exactly this shape:
950{
951 "columns": ["Column A", "Column B"],
952 "rows": [
953 { "Column A": "value", "Column B": "value", "_findingIds": ["<source finding id>", "..."] }
954 ]
955}
956
957Rules:
958- MERGE findings that make the same substantive point into a SINGLE row.
959- Keep findings that make genuinely DIFFERENT points as SEPARATE rows, even when they concern the same clause or topic.
960- "_findingIds" MUST list the bare id(s) of every finding that contributes to that row (no "id=" prefix or brackets).
961- Do NOT add columns for source ids or confidence — those are attached automatically.
962- Do NOT use column names beginning with an underscore.
963- Every column listed in "columns" must appear as a key in every row.
964- Keep cell values concise (a phrase or sentence, not paragraphs).`;
965
966 const model = selectModel({ tier: 0, taskType: "synthesis" });
967 const provider = getProvider(model);
968 const response = await provider.chat({
969 model: resolveModelId(model),
970 maxTokens: 4000,
971 system: ROOT_ORCHESTRATOR.systemPrompt,
972 messages: [{ role: "user", content: prompt }],
973 cacheSystem: true,
974 });
975
976 recordOrchestratorCost(response, model, "tabulate", task.id);
977 const textBlock = response.content.find((b) => b.type === "text");
978 const text = textBlock?.type === "text" ? textBlock.text : "";
979
980 const parsed = parseJsonObject(text) as
981 | { columns?: unknown; rows?: unknown }
982 | undefined;
983 if (!parsed || !Array.isArray(parsed.columns) || !Array.isArray(parsed.rows)) {
984 logger.warn("Tabulation produced unparseable output", { taskId: task.id });
985 return undefined;
986 }
987
988 const confByFinding = new Map(findings.map((f) => [f.id, f.confidence]));
989 const validIds = new Set(findings.map((f) => f.id));
990 // Display columns never include underscore-prefixed internal keys.
991 const columns = parsed.columns.map((c) => String(c)).filter((c) => !c.startsWith("_"));
992
993 const UUID = /[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/gi;
994 // Collect every known finding id referenced anywhere in the row (the model
995 // may use `_findingIds`, `_findingId`, or embed ids inside a stray value).
996 const contributorsOf = (r: Record<string, unknown>): string[] => {
997 const found = new Set<string>();
998 const pool: string[] = [];
999 const raw = (r._findingIds ?? r._findingId) as unknown;
1000 if (Array.isArray(raw)) pool.push(...raw.map(String));
1001 else if (raw != null) pool.push(String(raw));
1002 for (const v of Object.values(r)) pool.push(String(v ?? ""));
1003 for (const s of pool) for (const m of s.matchAll(UUID)) if (validIds.has(m[0])) found.add(m[0]);
1004 return [...found];
1005 };
1006
1007 const byConfDesc = (a: string, b: string) => (confByFinding.get(b) ?? 0) - (confByFinding.get(a) ?? 0);
1008
1009 const rawRows = (parsed.rows as Array<Record<string, unknown>>)
1010 .filter((r) => r && typeof r === "object")
1011 .map((r) => {
1012 const row: Record<string, string> = {};
1013 for (const col of columns) row[col] = r[col] != null ? String(r[col]) : "";
1014 const contributors = contributorsOf(r).sort(byConfDesc);
1015 const confs = contributors.map((id) => confByFinding.get(id)).filter((c): c is number => c != null);
1016 row._findingIds = contributors.join(",");
1017 row._findingId = contributors[0] ?? "";
1018 row._confidence = confs.length ? Math.max(...confs).toFixed(2) : "";
1019 row._sources = String(contributors.length);
1020 return row;
1021 });
1022
1023 // Safety net: collapse rows whose visible cells are identical (the model
1024 // missed a merge). Genuinely distinct points differ in their cells and are
1025 // preserved — each keeps its own confidence.
1026 const merged = new Map<string, Record<string, string>>();
1027 for (const row of rawRows) {
1028 const key = columns.map((c) => (row[c] ?? "").trim().toLowerCase()).join("");
1029 const existing = merged.get(key);
1030 if (!existing) { merged.set(key, { ...row }); continue; }
1031 const ids = [...new Set([
1032 ...existing._findingIds.split(",").filter(Boolean),
1033 ...row._findingIds.split(",").filter(Boolean),
1034 ])].sort(byConfDesc);
1035 existing._findingIds = ids.join(",");
1036 existing._findingId = ids[0] ?? existing._findingId;
1037 existing._sources = String(ids.length);
1038 const maxConf = Math.max(parseFloat(existing._confidence || "0"), parseFloat(row._confidence || "0"));
1039 existing._confidence = maxConf ? maxConf.toFixed(2) : "";
1040 }
1041
1042 const rows = [...merged.values()].sort(
1043 (a, b) => parseFloat(b._confidence || "0") - parseFloat(a._confidence || "0"),
1044 );
1045 const sourceFindingIds = [...new Set(rows.flatMap((r) => r._findingIds.split(",").filter(Boolean)))];
1046
1047 return { columns, rows, sourceFindingIds, generatedAt: new Date() };
1048 }
1049
1050 private async buildSourceTextMap(docIds: string[]): Promise<Map<string, string>> {
1051 const map = new Map<string, string>();
1052 await Promise.all(
1053 docIds.map(async (id) => {
1054 const text = await this.knowledge.getFullText(id);
1055 if (text) map.set(id, text);
1056 }),
1057 );
1058 return map;
1059 }
1060
1061 // ─── Persistence ──────────────────────────────────────────────────────────
1062
1063 async persistTasks(): Promise<void> {
1064 const path = Config.persistence.tasksFile;
1065 const serialisable = Array.from(this.tasks.values()).map((t) => ({
1066 ...t,
1067 createdAt: t.createdAt.toISOString(),
1068 updatedAt: t.updatedAt.toISOString(),
1069 completedAt: t.completedAt?.toISOString(),
1070 }));
1071 const tmp = `${path}.tmp`;
1072 await writeFile(tmp, JSON.stringify(serialisable, null, 2), "utf8");
1073 await rename(tmp, path);
1074 logger.debug("Tasks persisted", { count: this.tasks.size, path });
1075 }
1076
1077 async restoreTasks(): Promise<void> {
1078 const path = Config.persistence.tasksFile;
1079 let raw: string;
1080 try {
1081 raw = await readFile(path, "utf8");
1082 } catch {
1083 return; // no file yet — first run
1084 }
1085
1086 try {
1087 const items = JSON.parse(raw) as Array<Record<string, unknown>>;
1088 for (const item of items) {
1089 // Validate workflowType before restoring — a tampered or corrupted file
1090 // could produce an invalid type that crashes phase execution at runTask().
1091 if (!PHASE_SEQUENCES[item.workflowType as WorkflowType]) {
1092 logger.warn("Skipping restored task with unknown workflowType", { id: item.id, workflowType: item.workflowType });
1093 continue;
1094 }
1095 const task = {
1096 ...item,
1097 createdAt: new Date(item.createdAt as string),
1098 updatedAt: new Date(item.updatedAt as string),
1099 completedAt: item.completedAt ? new Date(item.completedAt as string) : undefined,
1100 } as Task;
1101 this.tasks.set(task.id, task);
1102 }
1103 for (const task of this.tasks.values()) {
1104 if (task.status === "running" || task.status === "pending" || task.status === "awaiting_gate") {
1105 task.status = "failed";
1106 task.error = "Server restarted during execution";
1107 }
1108 }
1109 logger.info("Tasks restored from disk", { count: this.tasks.size, path });
1110 } catch (err) {
1111 logger.warn("Failed to restore tasks", { error: (err as Error).message });
1112 }
1113 }
1114
1115 private waitForGates(task: Task): Promise<void> {
1116 return new Promise((resolve) => {
1117 if (task.pendingGates.every((g) => g.status !== "pending")) {
1118 resolve();
1119 return;
1120 }
1121 const GATE_TIMEOUT_MS = 72 * 60 * 60 * 1000;
1122 const timer = setTimeout(() => {
1123 this.gateEmitter.off(`gates:${task.id}`, handler);
1124 logger.warn("Gate timeout — auto-resolving", { taskId: task.id });
1125 resolve();
1126 }, GATE_TIMEOUT_MS);
1127 const handler = () => {
1128 if (task.pendingGates.every((g) => g.status !== "pending")) {
1129 clearTimeout(timer);
1130 this.gateEmitter.off(`gates:${task.id}`, handler);
1131 resolve();
1132 }
1133 };
1134 this.gateEmitter.on(`gates:${task.id}`, handler);
1135 });
1136 }
1137}