Signal Pipeline 09 · Kafka · dbt · Timescale · pgvector · Neo4j · PDPA at producer · k-anon at consumer

Planning · v0.2
Infrastructure layer — รอง Dashboard + Cases + Alerting · CLAUDE.md Layer A/B/C/D/E เป็น spec. PDPA hashing ที่ producer · k-anonymity ≥5 ที่ consumer · 19 Kafka topics (locked retention per CLAUDE.md).

Context

Signal Pipeline เป็นโครงสร้างพื้นฐาน A-owned · อยู่ก่อน runtime ทุกตัว · ตอบสนอง CLAUDE.md Layer B (Event Bus) · Layer C (Data Lake) · Layer D (Pipelines) · Layer E (Knowledge Store). Feeds Dashboard (metrics) · Cases (AI analytics + multi-source intake) · Alerting (breach detect) · Wallet (transactions WORM) · City Health (vital signs).

CLAUDE.md §Kafka 19 topicsRetention · compression · partitions · key schemas
CLAUDE.md Layer B/C/D/EBronze/Silver/Gold · dbt models · embeddings
docs/kb/data/city_health_contract.json5 panels · 14 metrics · a_owned_dependency declared
docs/kb/data/dashboard_contract.json17 dashboards · primary_metrics consumed from Gold
docs/kb/data/multi_source_intake.jsonAI case intake from external signals
docs/kb/data/ai_issue_analytics.json12-stage pipeline consumes signals

A-owned Runtime Boundary

B owns (read-only for A)

  • city_health_contract metric list + a_owned_dependency declarations
  • dashboard_contract metric names
  • multi_source_intake source_origins
  • ai_issue_analytics pipeline stage spec

A owns (this runtime)

  • Kafka cluster (Confluent Cloud) + 19 topics config
  • Producer services (booking-svc, payment-svc, behavior-svc, location-svc, ugc-svc)
  • Consumer groups (data-lake-ingester, alerting, kpi-snapshot, rag-embed)
  • dbt project (Bronze→Silver→Gold models)
  • Airflow DAGs (6: ETL, NLP, Features, KPI, Reports, Backfill)
  • TimescaleDB schemas + hypertables
  • pgvector schemas + HNSW indexes
  • Neo4j schema + Cypher loaders
  • PII hashing (SHA-256) at producer before publish
  • GPS fuzz ±50m at producer
  • k-anonymity ≥5 filter at consumer before materialize
  • DLQ pattern (19 auto-paired .dlq topics)

Scope

ER Diagram layered architecture

Data flow · producer → Kafka → lake → warehouse → runtime consumers
Producers (A) booking-svc payment-svc wallet-svc affiliate-svc behavior-svc location-svc ugc-svc reviews-svc ai-svc external-signals kpi-emitter audit-emitter ↓ hash PII · fuzz GPS Kafka (19 topics) booking.events payment.transactions 7yr wallet-events affiliate.clicks behavior.clickstream location.pings 24h location.poi-visits ugc.content-created reviews.submitted ai.llm-interactions external.signals metrics.kpi-snapshots audit.trail 7yr + 6 more · .dlq auto Consumer Groups data-lake-ingester alerting kpi-snapshot rag-embed case-ai-pipeline ↓ k-anon ≥5 Data Lake (Iceberg) Bronze (raw · snappy) Silver (validated) Gold (marts) dbt tests · FAIL=block Airflow DAGs Warehouse / Store BigQuery (Gold mart) TimescaleDB (metrics) Postgres (OLTP) pgvector (embed HNSW) Neo4j (relations) Redis (feature store) Runtime Consumers Dashboard Cases (AI · alerting) Generate (audit) Wizard (suggestions) Reporting (PDF) Governance (A) Schema Registry DLQ depth watch Consumer lag dbt_test fail Silver freshness Anomaly (Z/EWMA) PagerDuty P0/P1 Slack P2/P3 Cost monitor Retention policy WORM for 7yr

Field Mapping — Kafka Topics 19 topics per CLAUDE.md

Topic Parts Retention Producer (A) Consumer Target Binding Notes
ptt.booking.events1230dbooking-svcBronze · Cases (signal)mapped-not-boundkey: user_id · snappy
ptt.payment.transactions247yrpayment-svcWORM S3 · Walletmapped-not-boundcompress=none · Object Lock
ptt.payment.wallet-events1290dwallet-svcSilver · user_wallet_summarymapped-not-bound
ptt.affiliate.clicks830daffiliate-svcBronze · attributionmapped-not-boundkey: partner_id
ptt.affiliate.conversions8365daffiliate-svcGold · affiliate_martmapped-not-boundWHT 3% calc
ptt.behavior.clickstream483dbehavior-svcBronze · session_analyticsmapped-not-boundlz4 compression
ptt.behavior.search-events2414dbehavior-svcSilver · search_intentmapped-not-bound
ptt.location.pings4824hlocation-svc (fuzz ±50m)Silver · h3_heatmap (k≥5)placeholderPDPA: consent_location=true required
ptt.location.poi-visits1290dlocation-svcGold · visit_frequencyplaceholderk-anon ≥5 before materialize
ptt.ugc.content-created8∞ compactugc-svcrag-embed → pgvectormapped-not-boundkey: poi_id
ptt.reviews.submitted8∞ compactreviews-svcrag-embed + sentimentmapped-not-bound
ptt.knowledge.ingest-requests47dingest-svcdag_notebooklm_ingestionplaceholderPDF/Audio/Video → Whisper
ptt.ai.llm-interactions890dai-svcSilver · RAGAS evalmapped-not-boundFaithfulness log
ptt.external.signals430dexternal-signalsBronze · multi_source_intakemapped-not-boundkey: location_id
ptt.metrics.kpi-snapshots4365dkpi-emitterTimescaleDB · dashboard feedmapped-not-bounddaily dag_kpi_snapshot
ptt.feedback.nps4365dfeedback-svcSilver · nps_trendmapped-not-bound
ptt.audit.trail87yraudit-emitterWORM · compress=noneplaceholderAdmin plane audit chain
ptt.notifications.outbound67dnotifier-svcPagerDuty/Slack/LINEplaceholderalerting_model escalation
ptt.booking.status-changes127dbooking-svcSilver · booking_lifecyclemapped-not-boundkey: booking_id

Field Mapping — city_health_contract B-declared metric pipeline dependencies

Source: city_health_contract.panels.*.metrics[*]. Every metric carries a_owned_dependency + refresh_cadence_request.

B Metric A Signal Source Refresh Binding Notes
vital_sign.feverAI sentiment from reviews + UGC15minmapped-not-boundThreshold <7/10 · verbatim CONCEPT-005
vital_sign.heartbeatBooking completion rate · silver mart15minmapped-not-boundThreshold <60% · verbatim CONCEPT-005
vital_sign.blood_densitySpend velocity anomaly · Z-score1hrmapped-not-boundThreshold −20% · verbatim
vital_sign.respirationExternal AQI API1hrplaceholderThreshold AQI >100 · needs API key
vital_sign.pressureCivic report aggregation6hrplaceholderThreshold +30% volume · verbatim
zone_upliftH3-cell GMV uplift · k-anon ≥5dailyplaceholderProposed · Claude-derived formula · requires_human_review
pulseReal-time activity rate1minmapped-not-boundProposed · TimescaleDB continuous aggregate

API Sketch pipeline ops + query

GET/api/ops/pipeline/status
Pipeline health overview · silver/gold freshness · DAG status · consumer lag · DLQ depth
// response { "kafka": { "lag_p99_ms": 145, "dlq_depth": { "ptt.booking.events.dlq": 2 } }, "dbt": { "silver_freshness_min": 12, "gold_freshness_min": 48, "last_test_fail": null }, "airflow": { "dag_bronze_to_silver": "success", "dag_silver_to_gold": "running" }, "health": "green" }
GET/api/metrics/query?metric=gmv_daily&range=30d&dims=tenant
Dashboard metric query · hits TimescaleDB hypertable
// response { "metric": "gmv_daily", "unit": "THB", "series": [ { "ts": "2026-04-17", "value": 1520000, "tenant": "pty-zeroth" } ], "freshness_min": 14, "k_anon_applied": true }
POST/api/ops/backfill
Admin-only backfill trigger · ranges date + metric · Airflow DAG

Sequence Flow PDPA-compliant ingest

Producer → Kafka → Consumer · hash + fuzz + k-anon
location-svc Kafka (fuzz'd) Lake ingester dbt silver/gold Dashboard API 1 ping (consent=true · GPS fuzz ±50m · hash device) 2 24h retention topic · poll 3 write Bronze (Iceberg · snappy) 4 dbt silver model · H3 aggregate k-anon ≥5 filter · reject k<5 cells 5 gold mart · TimescaleDB hypertable 6 query · return with k_anon_applied=true

Dependencies

Upstream (infra)

  • Confluent Cloud Kafka
  • GCS + Iceberg
  • BigQuery + dbt
  • Cloud Composer 2 (Airflow)
  • Postgres + TimescaleDB + pgvector
  • Neo4j · Redis Cluster
  • Schema Registry

Downstream (enables)

  • Dashboard Runtime
  • Cases AI pipeline
  • Wizard AI suggestions (RAG)
  • Alerting anomaly detect
  • City Health Dashboard
  • Reporting PDF generation

Approval Gates

Risks

PII leak via unhashed producer
High
Schema Registry enforces hash fields · CI test · pentest · alert on raw PII detected in Bronze
k-anon violated on small H3 cell
High
Consumer-side filter k≥5 · pentest · unit test per dbt model · suppression card on UI
Consumer lag causes stale dashboard
Med
Lag alerts · horizontal autoscale consumer group · dashboard shows freshness_min
dbt_test fail but gold materializes anyway
High
Fail blocks promote · hard rule · CI/CD enforces · alert on skip
7yr retention accidentally compressed (data loss)
High
compress=none for payment + audit · S3 Object Lock · annual verify
DLQ grows unbounded
Med
DLQ 14d retention · depth alert · manual replay tooling · RCA on large DLQ

Definition of Done

  1. All 19 Kafka topics created with correct partitions / retention / compression / DLQ
  2. Producer libraries enforce PII hash + GPS fuzz (CI test passes)
  3. Consumer k-anon filter tested ≥5 on all spatial consumers
  4. Bronze/Silver/Gold dbt models pass tests with ≤15min freshness
  5. 6 Airflow DAGs scheduled and green for 7 consecutive days
  6. TimescaleDB hypertables + pgvector HNSW + Neo4j schemas deployed
  7. Retention policy verified (7yr WORM for payment + audit)
  8. Grafana dashboards show consumer lag · DLQ · freshness · anomaly
  9. Penetration test on PDPA compliance passed
  10. All 6 approval gates signed off

Deferred

Signal Pipeline Planning · v0.2 · Session A · A-owned ← Planning hub