/**
 * Marketing campaign dispatcher.
 *
 * Two responsibilities, both run on a 60-second cadence:
 *   (1) FAN-OUT: pick scheduled campaigns whose scheduled_at has passed,
 *       resolve their audience, write campaign_deliveries rows, and flip
 *       the campaign into 'sending'.
 *   (2) DRAIN: claim queued delivery rows (FOR UPDATE SKIP LOCKED), respect
 *       quiet hours and per-branch rate limits, hand off to the channel
 *       (email or whatsapp), and update the row status.
 *
 * Throughput model
 * ────────────────
 * Email   — no artificial cap; SMTP server is the real limit. Up to
 *           EMAIL_MAX_PER_BRANCH_TICK rows are claimed per tick and dispatched
 *           concurrently (EMAIL_CONCURRENCY parallel SMTP sends per sub-batch).
 *
 * WhatsApp — capped at (branch.whatsapp_send_rate ?? 10) msg/s × 60 s per tick
 *            so the dispatcher never exceeds the branch's Meta WABA tier.
 *            Sent with WA_CONCURRENCY parallel API calls per sub-batch to stay
 *            well within the per-second limit.
 *
 * Boots once per process. Idempotent — safe to call startDispatcher() from
 * server boot AND from request handlers (e.g. campaign launch endpoint).
 */

import { db } from '@server/db/drizzle';
import { sql } from 'drizzle-orm';
import { bindValue } from '@server/db/bind';
import { fanOutCampaign } from './campaigns.service';
import { isWithinQuietHours, nextAllowedAt } from './quiet-hours';
import { sendCampaignEmail } from './email-send';
import { sendCampaignWhatsApp } from './whatsapp-send';
import type { CampaignChannel } from './types';
import { registerHmrTimer, clearHmrTimer, isHmrTimerRegistered } from '@server/lib/hmrTimer';

import { childLogger } from '@server/logger';
const log = childLogger('svc.marketing.dispatcher');

// `ticking` is shared across module re-imports via globalThis so the
// re-entrancy guard survives Fast Refresh too (the timer pair itself is
// owned by the HMR helper).
const G = globalThis as unknown as { __marketingDispatcher?: { ticking: boolean } };
G.__marketingDispatcher ||= { ticking: false };
const state = G.__marketingDispatcher;

const TIMER_KEY = 'marketing-dispatcher';
const MODULE_TOKEN = {};
const TICK_INTERVAL_MS = 60_000;
const FIRST_TICK_DELAY_MS = 5_000;

// Global row claim limit per tick. Must be large enough that it never becomes
// the throughput bottleneck: cover the highest realistic WhatsApp rate
// (Meta Standard tier ≈ 80 msg/s × 60 s = 4 800) plus a full email batch.
// Rows above a campaign/branch cap are immediately re-queued after claiming.
const DELIVERY_BATCH = 6000;

// ─── Per-channel throughput limits ────────────────────────────────────────
// Email: no artificial cap — the SMTP server is the bottleneck. We allow up to
// EMAIL_MAX_PER_BRANCH_TICK sends per tick; concurrent SMTP calls are bounded
// by EMAIL_CONCURRENCY so Node.js isn't flooded with sockets.
const EMAIL_MAX_PER_BRANCH_TICK = 500;
const EMAIL_CONCURRENCY = 50;

// ─── SMTP auth circuit-breaker ────────────────────────────────────────────
// After this many consecutive permanent SMTP auth/config errors from the same
// campaign in a single tick, the campaign is immediately flipped to 'failed'
// and all remaining queued deliveries are skipped. This prevents burning N DB
// writes and SMTP connection attempts for a campaign whose credentials are
// simply broken.
const SMTP_AUTH_CIRCUIT_BREAKER_THRESHOLD = 3;

interface CircuitBreakerState {
  /** Set to true once the threshold is hit; causes the dispatch loop to abort. */
  blown: boolean;
  /** Running count of permanent SMTP auth/config errors seen this tick. */
  authFailCount: number;
}

/**
 * Returns true when the error string signals a credentials/configuration
 * problem that affects ALL deliveries for this campaign (not a per-recipient
 * rejection). These are the errors that should trigger the circuit breaker.
 */
function isSmtpConfigError(error: string): boolean {
  const lower = error.toLowerCase();
  return (
    lower.includes('smtp authentication failed') ||
    lower.includes('no smtp transport configured')
  );
}

// WhatsApp: cap = branch.whatsapp_send_rate (msg/s) × TICK_INTERVAL_S.
// Default (10 msg/s) is safe for new / unverified WABA accounts.
// Verified business tiers can go 80+ msg/s — operators update it in branch settings.
// Per-second pacing is enforced by dispatchWhatsAppPaced() so the real-time
// send rate never exceeds the configured value even if latency is low.
const DEFAULT_WHATSAPP_SEND_RATE = 10; // msgs/sec
const WA_CONCURRENCY = 5;              // concurrent WA API calls per sub-batch sub-window

export function startDispatcher(): void {
  const wasRegistered = isHmrTimerRegistered(TIMER_KEY);
  registerHmrTimer({
    key: TIMER_KEY,
    moduleToken: MODULE_TOKEN,
    initialDelayMs: FIRST_TICK_DELAY_MS,
    intervalMs: TICK_INTERVAL_MS,
    tick,
  });
  if (!wasRegistered) log.info('started (interval=60s)');
}

export function stopDispatcher(): void {
  clearHmrTimer(TIMER_KEY);
}

async function tick(): Promise<void> {
  if (state.ticking) return;
  state.ticking = true;
  try {
    await fanOutDueCampaigns();
    await drainQueuedDeliveries();
    await maybeFinalizeCampaigns();
  } catch (err) {
    log.error({ err }, 'MarketingDispatcher tick error');
  } finally {
    state.ticking = false;
  }
}

// ─── (1) Fan-out ───────────────────────────────────────────────────────────

interface CampaignToFanOut { id: string }

async function fanOutDueCampaigns(): Promise<void> {
  const { rows } = await db.execute(sql`
    UPDATE marketing_campaigns
    SET status = 'sending', started_at = COALESCE(started_at, now()), updated_at = now()
    WHERE id IN (
      SELECT id FROM marketing_campaigns
      WHERE status = 'scheduled' AND scheduled_at IS NOT NULL AND scheduled_at <= NOW()
      ORDER BY scheduled_at ASC
      LIMIT 5
      FOR UPDATE SKIP LOCKED
    )
    RETURNING id
  `);
  for (const row of rows as unknown as CampaignToFanOut[]) {
    try {
      const inserted = await fanOutCampaign(row.id);
      log.info({ campaignId: row.id, inserted }, 'fanned out campaign');
    } catch (err) {
      log.error({ err, campaignId: row.id }, 'MarketingDispatcher fan-out failed');
      const msg = err instanceof Error ? err.message : String(err);
      await db.execute(sql`
        UPDATE marketing_campaigns
        SET status = 'failed', last_error = ${msg.slice(0, 500)}, completed_at = now(), updated_at = now()
        WHERE id = ${row.id}
      `);
    }
  }
}

// ─── (2) Drain queued deliveries ───────────────────────────────────────────

interface DeliveryToSend {
  id: string;
  campaign_id: string;
  restaurant_id: string;
  customer_id: string | null;
  channel: CampaignChannel;
  recipient: string;
  attempts: number;
}

interface CampaignContext {
  id: string;
  restaurant_id: string;
  branch_id: string | null;
  channel: CampaignChannel;
  subject: string | null;
  body_html: string | null;
  template_name: string | null;
  template_lang: string | null;
  template_vars: string[];
  ignore_quiet_hours: boolean;
  max_per_minute: number | null;
  max_per_hour: number | null;
  branch_timezone: string | null;
  branch_quiet_enabled: boolean | null;
  branch_quiet_start: number | null;
  branch_quiet_end: number | null;
  /**
   * Meta WABA send-rate cap (msgs/sec) configured on the branch. The dispatcher
   * caps WhatsApp deliveries to (rate × 60) per tick. NULL → use default (10/s).
   */
  branch_whatsapp_send_rate: number | null;
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
 * Dispatch `deliveries` concurrently in sub-batches of `concurrency`. Errors
 * inside processDelivery are caught and recorded per-delivery — a single
 * failure never aborts the rest of the batch.
 *
 * For email: use this directly — SMTP is the bottleneck, no artificial delay.
 *
 * If `cb` is provided the loop checks the circuit-breaker flag before every
 * sub-batch. Once blown, remaining deliveries are left in 'sending' status
 * and cleaned up by the caller after this function returns.
 */
async function dispatchConcurrently(
  deliveries: DeliveryToSend[],
  ctx: CampaignContext,
  concurrency: number,
  cb?: CircuitBreakerState,
): Promise<void> {
  for (let i = 0; i < deliveries.length; i += concurrency) {
    if (cb?.blown) break;
    const batch = deliveries.slice(i, i + concurrency);
    await Promise.allSettled(batch.map((d) => processDelivery(d, ctx, cb)));
  }
}

/**
 * Dispatch WhatsApp deliveries with real per-second pacing.
 *
 * Sends `WA_CONCURRENCY` messages per sub-batch and enforces a minimum wall-
 * clock delay between batches so the actual throughput never exceeds
 * `ratePerSec` messages/second, regardless of API latency.
 *
 * Example: ratePerSec=80, WA_CONCURRENCY=5
 *   → targetBatchMs = ceil(5/80 * 1000) = 63 ms per batch
 *   → over 60 s: ~960 batches × 5 = 4 800 messages ✓
 *
 * Example: ratePerSec=10, WA_CONCURRENCY=5
 *   → targetBatchMs = 500 ms per batch
 *   → over 60 s: 120 batches × 5 = 600 messages ✓
 */
async function dispatchWhatsAppPaced(
  deliveries: DeliveryToSend[],
  ctx: CampaignContext,
  ratePerSec: number,
): Promise<void> {
  // Minimum milliseconds to budget per batch to stay at/below ratePerSec
  const targetBatchMs = Math.ceil((WA_CONCURRENCY / ratePerSec) * 1000);

  for (let i = 0; i < deliveries.length; i += WA_CONCURRENCY) {
    const batchStart = Date.now();
    const batch = deliveries.slice(i, i + WA_CONCURRENCY);
    await Promise.allSettled(batch.map((d) => processDelivery(d, ctx)));
    const elapsed = Date.now() - batchStart;
    const delay = targetBatchMs - elapsed;
    if (delay > 0) await sleep(delay);
  }
}

async function drainQueuedDeliveries(): Promise<void> {
  const { rows } = await db.execute(sql`
    UPDATE campaign_deliveries
    SET status = 'sending', attempts = attempts + 1, updated_at = now()
    WHERE id IN (
      SELECT id FROM campaign_deliveries
      WHERE status = 'queued' AND scheduled_for <= NOW()
      ORDER BY scheduled_for ASC
      LIMIT ${DELIVERY_BATCH}
      FOR UPDATE SKIP LOCKED
    )
    RETURNING id, campaign_id, restaurant_id, customer_id, channel, recipient, attempts
  `);
  const claimed = rows as unknown as DeliveryToSend[];
  if (claimed.length === 0) return;

  // Group by campaign_id so we only fetch each campaign context once per tick.
  const byCampaign = new Map<string, DeliveryToSend[]>();
  for (const d of claimed) {
    const arr = byCampaign.get(d.campaign_id) ?? [];
    arr.push(d);
    byCampaign.set(d.campaign_id, arr);
  }
  // Per-(branch, channel) counter shared across campaigns so multiple campaigns
  // from the same branch respect the same throughput budget.
  const perBranchChannelUsed = new Map<string, number>();

  for (const [campaignId, deliveries] of byCampaign.entries()) {
    const ctx = await loadCampaignContext(campaignId);
    if (!ctx) {
      await markDeliveriesSkipped(deliveries.map((d) => d.id), 'campaign no longer exists');
      continue;
    }

    // Quiet hours gate.
    const branchHours = {
      timezone: ctx.branch_timezone,
      quiet_hours_enabled: ctx.branch_quiet_enabled,
      quiet_hours_start: ctx.branch_quiet_start,
      quiet_hours_end: ctx.branch_quiet_end,
    };
    if (!ctx.ignore_quiet_hours && isWithinQuietHours(branchHours)) {
      const next = nextAllowedAt(branchHours);
      const ids = deliveries.map((d) => d.id);
      await db.execute(sql`
        UPDATE campaign_deliveries
        SET status = 'queued', scheduled_for = ${next.toISOString()},
            attempts = GREATEST(attempts - 1, 0),
            last_error = ${'quiet hours: deferred until ' + next.toISOString()},
            updated_at = now()
        WHERE id = ANY(${bindValue(ids)}::uuid[])
      `);
      continue;
    }

    // Per-branch+channel throughput cap.
    // Email: EMAIL_MAX_PER_BRANCH_TICK (no artificial rate limit — SMTP is the
    //        real constraint). WhatsApp: whatsapp_send_rate × tick_seconds.
    const key = `${ctx.branch_id ?? ctx.restaurant_id}:${ctx.channel}`;
    const alreadyUsed = perBranchChannelUsed.get(key) ?? 0;
    const tickSeconds = TICK_INTERVAL_MS / 1000;
    const maxPerTick = ctx.channel === 'email'
      ? EMAIL_MAX_PER_BRANCH_TICK
      : (ctx.branch_whatsapp_send_rate ?? DEFAULT_WHATSAPP_SEND_RATE) * tickSeconds;
    const branchRemaining = Math.max(0, maxPerTick - alreadyUsed);

    // Per-campaign throttle (optional max_per_minute / max_per_hour).
    let campaignRemaining = Number.POSITIVE_INFINITY;
    if (typeof ctx.max_per_minute === 'number' && ctx.max_per_minute > 0) {
      campaignRemaining = Math.min(campaignRemaining, ctx.max_per_minute);
    }
    let hourlyOldestSentAt: Date | null = null;
    if (typeof ctx.max_per_hour === 'number' && ctx.max_per_hour > 0) {
      const { rows: hRows } = await db.execute(sql`
        SELECT COUNT(*)::int AS n, MIN(sent_at) AS oldest
        FROM campaign_deliveries
        WHERE campaign_id = ${ctx.id}
          AND status = 'sent'
          AND sent_at > NOW() - INTERVAL '1 hour'
      `);
      const stat = (hRows[0] ?? {}) as { n?: number; oldest?: string | Date | null };
      const sentLastHour = Number(stat.n ?? 0);
      const hourlyRemaining = Math.max(0, ctx.max_per_hour - sentLastHour);
      campaignRemaining = Math.min(campaignRemaining, hourlyRemaining);
      if (hourlyRemaining === 0 && stat.oldest) {
        hourlyOldestSentAt = new Date(String(stat.oldest));
      }
    }

    const allowed = Math.min(branchRemaining, campaignRemaining);
    const toProcess = deliveries.slice(0, allowed);
    const toDefer = deliveries.slice(allowed);
    if (toDefer.length > 0) {
      const deferIds = toDefer.map((d) => d.id);
      const limitedByCampaign = campaignRemaining < branchRemaining;
      let nextWhen: Date;
      if (limitedByCampaign && hourlyOldestSentAt && campaignRemaining === 0) {
        const t = hourlyOldestSentAt.getTime() + 60 * 60_000 + 1_000;
        nextWhen = new Date(Math.max(t, Date.now() + TICK_INTERVAL_MS));
      } else {
        nextWhen = new Date(Date.now() + TICK_INTERVAL_MS);
      }
      const reason = limitedByCampaign
        ? 'rate-limited (campaign cap): deferred'
        : 'rate-limited (branch cap): deferred to next tick';
      await db.execute(sql`
        UPDATE campaign_deliveries
        SET status = 'queued',
            scheduled_for = ${nextWhen.toISOString()},
            attempts = GREATEST(attempts - 1, 0),
            last_error = ${reason},
            updated_at = now()
        WHERE id = ANY(${bindValue(deferIds)}::uuid[])
      `);
    }
    perBranchChannelUsed.set(key, alreadyUsed + toProcess.length);

    if (toProcess.length > 0) {
      if (ctx.channel === 'email') {
        // Email: high concurrency — SMTP server is the real bottleneck.
        // A per-campaign circuit breaker aborts the loop early if SMTP auth
        // errors are detected, preventing all N recipients from burning retry
        // slots when credentials are simply misconfigured.
        const cb: CircuitBreakerState = { blown: false, authFailCount: 0 };
        await dispatchConcurrently(toProcess, ctx, EMAIL_CONCURRENCY, cb);
        if (cb.blown) {
          // Any delivery rows that were claimed (status='sending') but not yet
          // processed because the loop was cut short must be cleaned up.
          // By this point Promise.allSettled for the last active sub-batch has
          // resolved, so no row is genuinely in-flight any more.
          const reason = 'SMTP authentication failed — check credentials in Branch Settings.';
          await db.execute(sql`
            UPDATE campaign_deliveries
            SET status = 'skipped', last_error = ${reason}, updated_at = now()
            WHERE campaign_id = ${ctx.id} AND status = 'sending'
          `);
        }
      } else {
        // WhatsApp: paced dispatch enforces real per-second rate limit.
        // dispatchWhatsAppPaced guarantees actual msg/s ≤ configured rate
        // regardless of API response latency.
        const ratePerSec = ctx.branch_whatsapp_send_rate ?? DEFAULT_WHATSAPP_SEND_RATE;
        await dispatchWhatsAppPaced(toProcess, ctx, ratePerSec);
      }
    }
  }
}

async function loadCampaignContext(campaignId: string): Promise<CampaignContext | null> {
  const { rows } = await db.execute(sql`
    SELECT c.id, c.restaurant_id, c.branch_id, c.channel, c.subject, c.body_html,
           c.template_name, c.template_lang, c.template_vars, c.ignore_quiet_hours,
           c.max_per_minute, c.max_per_hour,
           b.timezone                AS branch_timezone,
           b.quiet_hours_enabled     AS branch_quiet_enabled,
           b.quiet_hours_start       AS branch_quiet_start,
           b.quiet_hours_end         AS branch_quiet_end,
           b.whatsapp_send_rate      AS branch_whatsapp_send_rate
    FROM marketing_campaigns c
    LEFT JOIN branches b ON b.id = c.branch_id
    WHERE c.id = ${campaignId} LIMIT 1
  `);
  const r = rows[0] as Record<string, unknown> | undefined;
  if (!r) return null;
  let templateVars: string[] = [];
  const rawTv = r.template_vars;
  if (Array.isArray(rawTv)) templateVars = rawTv.filter((x): x is string => typeof x === 'string');
  else if (typeof rawTv === 'string') {
    try { const p = JSON.parse(rawTv); if (Array.isArray(p)) templateVars = p.filter((x): x is string => typeof x === 'string'); }
    catch { /* */ }
  }
  return {
    id: r.id as string,
    restaurant_id: r.restaurant_id as string,
    branch_id: (r.branch_id as string | null) ?? null,
    channel: r.channel as CampaignChannel,
    subject: (r.subject as string | null) ?? null,
    body_html: (r.body_html as string | null) ?? null,
    template_name: (r.template_name as string | null) ?? null,
    template_lang: (r.template_lang as string | null) ?? null,
    template_vars: templateVars,
    ignore_quiet_hours: Boolean(r.ignore_quiet_hours),
    max_per_minute: r.max_per_minute === null || r.max_per_minute === undefined
      ? null : Number(r.max_per_minute),
    max_per_hour: r.max_per_hour === null || r.max_per_hour === undefined
      ? null : Number(r.max_per_hour),
    branch_timezone: (r.branch_timezone as string | null) ?? null,
    branch_quiet_enabled: r.branch_quiet_enabled === null || r.branch_quiet_enabled === undefined
      ? null : Boolean(r.branch_quiet_enabled),
    branch_quiet_start: r.branch_quiet_start === null || r.branch_quiet_start === undefined
      ? null : Number(r.branch_quiet_start),
    branch_quiet_end:   r.branch_quiet_end === null || r.branch_quiet_end === undefined
      ? null : Number(r.branch_quiet_end),
    branch_whatsapp_send_rate: r.branch_whatsapp_send_rate === null || r.branch_whatsapp_send_rate === undefined
      ? null : Number(r.branch_whatsapp_send_rate),
  };
}

async function markDeliveriesSkipped(ids: string[], reason: string): Promise<void> {
  if (ids.length === 0) return;
  await db.execute(sql`
    UPDATE campaign_deliveries
    SET status = 'skipped', last_error = ${reason}, updated_at = now()
    WHERE id = ANY(${bindValue(ids)}::uuid[])
  `);
}

/**
 * Mark a campaign as failed due to the circuit breaker tripping, then bulk-
 * skip every remaining queued delivery so no further SMTP attempts are made.
 * Deliveries still in 'sending' status (mid-flight in the current batch) are
 * handled by the caller once Promise.allSettled resolves.
 */
async function triggerCircuitBreaker(campaignId: string): Promise<void> {
  const reason = 'SMTP authentication failed — check credentials in Branch Settings.';
  await db.execute(sql`
    UPDATE marketing_campaigns
    SET status = 'failed', last_error = ${reason}, completed_at = now(), updated_at = now()
    WHERE id = ${campaignId} AND status = 'sending'
  `);
  await db.execute(sql`
    UPDATE campaign_deliveries
    SET status = 'skipped', last_error = ${reason}, updated_at = now()
    WHERE campaign_id = ${campaignId} AND status = 'queued'
  `);
  log.warn({ campaignId }, 'circuit breaker: campaign failed — SMTP auth errors');
}

async function processDelivery(
  d: DeliveryToSend,
  ctx: CampaignContext,
  cb?: CircuitBreakerState,
): Promise<void> {
  // Final opt-out check at dispatch time — between fan-out and now the
  // customer may have replied STOP or clicked unsubscribe.
  if (d.customer_id) {
    const { rows: orows } = await db.execute(sql`
      SELECT marketing_opt_out FROM customers
      WHERE id = ${d.customer_id} AND restaurant_id = ${ctx.restaurant_id} LIMIT 1
    `);
    const optedOut = (orows[0] as { marketing_opt_out?: boolean } | undefined)?.marketing_opt_out;
    if (optedOut) {
      await db.execute(sql`
        UPDATE campaign_deliveries
        SET status = 'skipped', last_error = 'customer opted out', updated_at = now()
        WHERE id = ${d.id}
      `);
      await db.execute(sql`
        UPDATE marketing_campaigns SET opt_out_count = opt_out_count + 1, updated_at = now()
        WHERE id = ${ctx.id}
      `);
      return;
    }
  }

  try {
    let result: { messageId: string | null; error?: string; permanent?: boolean };
    if (ctx.channel === 'email') {
      result = await sendCampaignEmail({
        to: d.recipient,
        subject: ctx.subject ?? '(no subject)',
        bodyHtml: ctx.body_html ?? '',
        restaurantId: ctx.restaurant_id,
        branchId: ctx.branch_id,
        customerId: d.customer_id,
      });
    } else {
      if (!ctx.branch_id) {
        result = { messageId: null, error: 'WhatsApp campaigns require a branch with credentials', permanent: true };
      } else {
        result = await sendCampaignWhatsApp({
          branchId: ctx.branch_id,
          toE164: d.recipient,
          templateName: ctx.template_name ?? '',
          templateLang: ctx.template_lang ?? 'en',
          templateVars: ctx.template_vars,
        });
      }
    }

    if (result.messageId !== null && !result.error) {
      await db.execute(sql`
        UPDATE campaign_deliveries
        SET status = 'sent', sent_at = now(), provider_message_id = ${result.messageId},
            last_error = NULL, updated_at = now()
        WHERE id = ${d.id}
      `);
      await db.execute(sql`
        UPDATE marketing_campaigns SET sent_count = sent_count + 1, updated_at = now()
        WHERE id = ${ctx.id}
      `);
      if (d.customer_id) {
        try {
          await db.execute(sql`
            UPDATE customers SET last_campaign_id = ${ctx.id}, last_campaign_at = now()
            WHERE id = ${d.customer_id} AND restaurant_id = ${ctx.restaurant_id}
          `);
        } catch { /* non-critical */ }
      }
      // A successful send proves the credentials work — reset the consecutive
      // auth-failure counter so the circuit breaker only trips on a true run
      // of back-to-back auth errors, not a scattered mix.
      if (cb) cb.authFailCount = 0;
    } else if (result.permanent === true) {
      await skipDelivery(d, result.error ?? 'permanent send error');
      // ── Circuit-breaker: count consecutive SMTP auth/config failures ───
      // Per-recipient permanent errors (bad address, 550 rejections, etc.) do
      // NOT count toward the circuit breaker — only errors that indicate the
      // credentials are broken and every subsequent attempt will also fail.
      // Any non-auth permanent error resets the counter so only a genuine
      // consecutive run of auth failures trips the breaker.
      if (cb && ctx.channel === 'email') {
        if (isSmtpConfigError(result.error ?? '')) {
          cb.authFailCount++;
          if (!cb.blown && cb.authFailCount >= SMTP_AUTH_CIRCUIT_BREAKER_THRESHOLD) {
            cb.blown = true;
            await triggerCircuitBreaker(ctx.id);
          }
        } else {
          cb.authFailCount = 0;
        }
      }
    } else {
      await failDelivery(d, result.error ?? 'send failed (no messageId)', ctx.id);
      // Transient failure — not an auth problem, so reset the consecutive counter.
      if (cb) cb.authFailCount = 0;
    }
  } catch (err) {
    const msg = err instanceof Error ? err.message : String(err);
    await failDelivery(d, msg, ctx.id);
    if (cb) cb.authFailCount = 0;
  }
}

async function skipDelivery(d: DeliveryToSend, reason: string): Promise<void> {
  await db.execute(sql`
    UPDATE campaign_deliveries
    SET status = 'skipped', last_error = ${reason.slice(0, 500)}, updated_at = now()
    WHERE id = ${d.id}
  `);
}

async function failDelivery(d: DeliveryToSend, errorMsg: string, campaignId: string): Promise<void> {
  const MAX_ATTEMPTS = 3;
  const willRetry = d.attempts < MAX_ATTEMPTS;
  if (willRetry) {
    const delayMin = Math.min(60, 5 * Math.pow(2, d.attempts - 1));
    await db.execute(sql`
      UPDATE campaign_deliveries
      SET status = 'queued', last_error = ${errorMsg.slice(0, 500)},
          scheduled_for = NOW() + (${delayMin} || ' minutes')::interval, updated_at = now()
      WHERE id = ${d.id}
    `);
  } else {
    await db.execute(sql`
      UPDATE campaign_deliveries
      SET status = 'failed', last_error = ${errorMsg.slice(0, 500)}, updated_at = now()
      WHERE id = ${d.id}
    `);
    await db.execute(sql`
      UPDATE marketing_campaigns SET failed_count = failed_count + 1, updated_at = now()
      WHERE id = ${campaignId}
    `);
  }
}

// ─── Finalize sending campaigns whose deliveries are all done ──────────────

async function maybeFinalizeCampaigns(): Promise<void> {
  // A campaign is "done" when no delivery rows remain in queued or sending
  // state. We then set the terminal status:
  //
  //   completed — at least one message was delivered successfully
  //   failed    — zero messages sent; every delivery was skipped or failed.
  //               This catches broad SMTP auth failures, missing WhatsApp
  //               credentials, and bad-address-only lists so operators get
  //               a clear error signal rather than a silent "completed" run.
  await db.execute(sql`
    UPDATE marketing_campaigns c
    SET status = CASE
          WHEN c.sent_count > 0 THEN 'completed'
          ELSE 'failed'
        END,
        last_error = CASE
          -- At least one real send-side failure → point to credentials / template
          WHEN c.sent_count = 0 AND EXISTS (
            SELECT 1 FROM campaign_deliveries d2
            WHERE d2.campaign_id = c.id AND d2.status = 'failed'
          )
          THEN 'No messages were delivered — check your SMTP credentials or WhatsApp template in Branch Settings.'
          -- All deliveries were skipped (opted out / no contact) → neutral message
          WHEN c.sent_count = 0
          THEN 'No eligible recipients — all contacts were opted out or lacked a valid address for this channel.'
          ELSE NULL
        END,
        completed_at = now(),
        updated_at   = now()
    WHERE c.status = 'sending'
      AND c.started_at IS NOT NULL
      AND NOT EXISTS (
        SELECT 1 FROM campaign_deliveries d
        WHERE d.campaign_id = c.id AND d.status IN ('queued', 'sending')
      )
  `);
}
