/**
 * Email outbox + worker.
 *
 *  - enqueueEmail() persists a message; immediate-channel rows are scheduled
 *    for now(), digest rows are scheduled to the next 30-minute boundary
 *    (per-restaurant, per-template_kind grouping is achieved by the worker
 *    coalescing rows that share template_key+restaurant_id+kind).
 *  - The worker picks up due rows in a loop, renders+sends, marks status,
 *    and respects soft caps (60 transactional/hour per recipient).
 *  - The worker auto-starts on first enqueue and on first DB init.
 */

import { db } from '@server/db/drizzle';
import { sql } from 'drizzle-orm';
import { renderTemplate } from './render';
import { sendRaw } from './transport';
import { registerHmrTimer, clearHmrTimer, isHmrTimerRegistered } from '@server/lib/hmrTimer';

import { childLogger } from '@server/logger';
const log = childLogger('svc.email.outbox');

export type Channel = 'immediate' | 'digest';

export interface EnqueueOptions {
  to: string;
  templateKey: string;
  vars?: Record<string, unknown>;
  channel?: Channel;
  /** Free-form grouping label used by digest worker (e.g. "order", "booking"). */
  kind?: string;
  restaurantId?: string | null;
  branchId?: string | null;
  /** Schedule send at a specific time (overrides channel schedule). */
  scheduledFor?: Date;
}

const DIGEST_WINDOW_MS = 30 * 60 * 1000; // 30 min

function nextDigestBoundary(now = new Date()): Date {
  const t = now.getTime();
  return new Date(Math.ceil(t / DIGEST_WINDOW_MS) * DIGEST_WINDOW_MS);
}

export async function enqueueEmail(opts: EnqueueOptions): Promise<string> {
  const channel: Channel = opts.channel ?? 'immediate';
  const scheduled = opts.scheduledFor
    ? opts.scheduledFor
    : channel === 'digest' ? nextDigestBoundary() : new Date();

  const { rows } = await db.execute(sql`
    INSERT INTO email_outbox (restaurant_id, branch_id, recipient, template_key, payload, channel, kind, status, scheduled_for)
    VALUES (${opts.restaurantId ?? null}, ${opts.branchId ?? null}, ${opts.to}, ${opts.templateKey},
            ${JSON.stringify(opts.vars ?? {})}::jsonb, ${channel}, ${opts.kind ?? null}, 'queued', ${scheduled.toISOString()})
    RETURNING id
  `);
  startWorker();
  return (rows[0] as { id: string }).id;
}

// ---------- Worker ----------

const TIMER_KEY = 'email-outbox';
// Fresh identity per module evaluation; same identity across repeated
// runtime calls from this module instance (so startWorker(), called from
// enqueueEmail() / retryOutboxRow() on every request, stays a no-op once
// armed). A Fast-Refresh re-import yields a new object and the helper
// replaces the prior timer.
const MODULE_TOKEN = {};
const RECIPIENT_HOURLY_CAP = 60;
// First tick after 2s, then every 30s.
const FIRST_TICK_DELAY_MS = 2_000;
const TICK_INTERVAL_MS = 30_000;

export function startWorker(): void {
  const wasRegistered = isHmrTimerRegistered(TIMER_KEY);
  registerHmrTimer({
    key: TIMER_KEY,
    moduleToken: MODULE_TOKEN,
    initialDelayMs: FIRST_TICK_DELAY_MS,
    intervalMs: TICK_INTERVAL_MS,
    tick,
  });
  if (!wasRegistered) log.info('worker started (interval=30s)');
}

async function recentSendCount(recipient: string): Promise<number> {
  try {
    const { rows } = await db.execute(sql`
      SELECT COUNT(*)::int AS n FROM email_outbox
      WHERE recipient = ${recipient} AND status = 'sent' AND sent_at > NOW() - INTERVAL '1 hour'
    `);
    return (rows[0] as { n: number })?.n ?? 0;
  } catch { return 0; }
}

interface OutboxRow {
  id: string;
  restaurant_id: string | null;
  branch_id: string | null;
  recipient: string;
  template_key: string;
  payload: Record<string, unknown>;
  channel: Channel;
  kind: string | null;
  attempts: number;
}

async function claimDueRows(limit = 25): Promise<OutboxRow[]> {
  // Atomic claim: flip queued/throttled→sending in one statement (throttled rows
  // are eligible again once their re-scheduled_for passes).
  const { rows } = await db.execute(sql`
    UPDATE email_outbox
    SET status = 'sending', attempts = attempts + 1
    WHERE id IN (
      SELECT id FROM email_outbox
      WHERE status IN ('queued','throttled') AND scheduled_for <= NOW()
      ORDER BY scheduled_for ASC
      LIMIT ${limit}
      FOR UPDATE SKIP LOCKED
    )
    RETURNING id, restaurant_id, branch_id, recipient, template_key, payload, channel, kind, attempts
  `);
  return rows as unknown as OutboxRow[];
}

async function processRow(row: OutboxRow): Promise<void> {
  try {
    const cap = await recentSendCount(row.recipient);
    if (cap >= RECIPIENT_HOURLY_CAP) {
      await db.execute(sql`UPDATE email_outbox SET status='throttled', scheduled_for = NOW() + INTERVAL '15 minutes', last_error='throttled: recipient hourly cap' WHERE id = ${row.id}`);
      return;
    }

    // Digest coalescing: claim sibling pending rows that share recipient+template_key+restaurant_id
    // and merge their payloads so we send ONE consolidated email per window.
    let payload = row.payload || {};
    let mergedIds: string[] = [];
    if (row.channel === 'digest') {
      const { rows: siblings } = await db.execute(sql`
        UPDATE email_outbox SET status='sending', attempts = attempts + 1
        WHERE id IN (
          SELECT id FROM email_outbox
          WHERE status = 'queued'
            AND channel = 'digest'
            AND recipient = ${row.recipient}
            AND template_key = ${row.template_key}
            AND COALESCE(restaurant_id::text,'') = COALESCE(${row.restaurant_id ?? null}::text,'')
            AND scheduled_for <= NOW()
            AND id <> ${row.id}
          ORDER BY scheduled_for ASC
          LIMIT 200
          FOR UPDATE SKIP LOCKED
        )
        RETURNING id, payload
      `);
      for (const s of siblings as { id: string; payload: Record<string, unknown> }[]) {
        payload = mergeDigestPayloads(payload, s.payload || {});
        mergedIds.push(s.id);
      }
    }

    const rendered = await renderTemplate(row.template_key, payload, row.restaurant_id);
    // Attachments are passed via the special `_attachments` payload key
    // (an array of { filename, content_b64, content_type }). Strip it so it
    // never reaches template rendering.
    const attachments = Array.isArray((payload as Record<string, unknown>)._attachments)
      ? ((payload as Record<string, unknown>)._attachments as Array<{ filename: string; content_b64: string; content_type?: string }>)
      : undefined;
    const result = await sendRaw(row.recipient, rendered.subject, rendered.html, row.branch_id, row.restaurant_id, attachments);
    if (!result) {
      await db.execute(sql`UPDATE email_outbox SET status='failed', last_error='no SMTP configured', sent_at=NOW() WHERE id = ${row.id}`);
      if (mergedIds.length > 0) {
        await db.execute(sql`UPDATE email_outbox SET status='failed', last_error='no SMTP configured', sent_at=NOW() WHERE id = ANY(${mergedIds}::uuid[])`);
      }
      return;
    }
    await db.execute(sql`UPDATE email_outbox SET status='sent', sent_at=NOW(), last_error=NULL WHERE id = ${row.id}`);
    if (mergedIds.length > 0) {
      await db.execute(sql`UPDATE email_outbox SET status='sent', sent_at=NOW(), last_error=${'coalesced into ' + row.id} WHERE id = ANY(${mergedIds}::uuid[])`);
    }
  } catch (err) {
    const msg = err instanceof Error ? err.message : String(err);
    const willRetry = row.attempts < 5;
    if (willRetry) {
      const delaySec = Math.min(3600, 60 * Math.pow(2, row.attempts));
      await db.execute(sql`UPDATE email_outbox SET status='queued', last_error=${msg.slice(0, 500)}, scheduled_for = NOW() + (${delaySec} || ' seconds')::interval WHERE id = ${row.id}`);
    } else {
      await db.execute(sql`UPDATE email_outbox SET status='failed', last_error=${msg.slice(0, 500)} WHERE id = ${row.id}`);
    }
  }
}

let ticking = false;
async function tick(): Promise<void> {
  if (ticking) return;
  ticking = true;
  try {
    const rows = await claimDueRows(25);
    for (const row of rows) {
      await processRow(row);
    }
    await purgeExpiredPendingSignups();
  } catch (err) {
    log.error({ err }, 'EmailOutbox tick error');
  } finally {
    ticking = false;
  }
}

/** Merge two digest payloads: numbers add, strings/HTML concat, arrays append, scalars favor latest. */
function mergeDigestPayloads(a: Record<string, unknown>, b: Record<string, unknown>): Record<string, unknown> {
  const out: Record<string, unknown> = { ...a };
  for (const [k, v] of Object.entries(b)) {
    const av = out[k];
    if (typeof av === 'number' && typeof v === 'number') out[k] = av + v;
    else if (Array.isArray(av) && Array.isArray(v)) out[k] = [...av, ...v];
    else if (typeof av === 'string' && typeof v === 'string' && (k.endsWith('Html') || k.endsWith('html'))) out[k] = av + v;
    else if (av === undefined) out[k] = v;
  }
  return out;
}

/** Manual retry from admin UI. */
export async function retryOutboxRow(id: string): Promise<void> {
  await db.execute(sql`UPDATE email_outbox SET status='queued', scheduled_for=NOW(), last_error=NULL WHERE id = ${id}`);
  startWorker();
}

/** Periodic purge of expired pending_signups rows. Runs lazily from the worker tick. */
let lastPurgeAt = 0;
export async function purgeExpiredPendingSignups(): Promise<void> {
  const now = Date.now();
  if (now - lastPurgeAt < 60 * 60 * 1000) return; // at most once per hour
  lastPurgeAt = now;
  try {
    await db.execute(sql`DELETE FROM pending_signups WHERE expires_at <= NOW()`);
  } catch (e) { log.warn({ err: e }, 'EmailOutbox purge pending_signups failed'); }
}

export function stopWorker(): void {
  clearHmrTimer(TIMER_KEY);
}
