Go Architecture a Escala Masiva: Microservicios, Eventos y Sistemas Distribuidos
Diseña sistemas procesando millones de eventos diarios. Aprende arquitectura de microservicios, patrones event-driven, trazado distribuido, circuit breakers e decisiones arquitectónicas que separan código prototipo de infraestructura de producción.
El Momento en Que Todo Cambia
Empiezas con un monolito. Un binario único, una base de datos única, funciona bien para 10,000 usuarios. Luego llegas a 100,000 usuarios.
El monolito se convierte en un cuello de botella. La base de datos se convierte en un cuello de botella. Tu pipeline de despliegue se convierte en un cuello de botella. Todo se convierte en un cuello de botella.
Esto no es un problema a resolver con código más inteligente. Este es un problema a resolver con arquitectura diferente.
Esta guía te muestra cómo. No cómo escalar un monolito. Cómo construir la arquitectura que escala desde el día uno.
Parte 1: Entendiendo la Escala — Las Tres Dimensiones
Antes de tomar decisiones arquitectónicas, entiende qué significa “escala” para tu sistema.
Dimensión 1: Escala de Datos
¿Cuántos datos fluyen a través de tu sistema?
Escala pequeña: 100 MB/día (startup)
Escala media: 100 GB/día (crecimiento)
Escala grande: 1 TB/día (empresa)
Escala masiva: 10+ TB/día (finanzas, logística, telecom)
A 1 TB/día:
- Tu base de datos no puede ser una instancia única
- No puedes almacenar todo en memoria
- No puedes consultar todo de forma sincrónica
- Necesitas particionamiento de datos por defecto
Implicaciones arquitectónicas:
- Bases de datos sharded (particionar por región, por cliente, por tiempo)
- Consistencia eventual en lugar de consistencia fuerte
- Capas de cache son obligatorias
- Los datos no pueden viajar frecuentemente a través de límites de red
Dimensión 2: Escala de Solicitudes
¿Cuántas solicitudes por segundo debes manejar?
Escala pequeña: 100 RPS (API startup)
Escala media: 1,000 RPS (mercado intermedio)
Escala grande: 10,000 RPS (empresa)
Escala masiva: 100,000+ RPS (hiperscala)
A 100,000 RPS:
- Un servidor único no puede manejarlo (máx ~10,000 RPS por instancia Go)
- Necesitas 10+ servidores mínimo
- El balanceo de carga se vuelve crítico
- El pooling de conexiones se vuelve crítico
- El monitoreo de profundidad de cola se vuelve supervivencia
Implicaciones arquitectónicas:
- Escalado horizontal desde el inicio
- Servicios sin estado (fáciles de escalar)
- Puertas de enlace API para enrutamiento
- Rate limiting y circuit breakers
- Manejo de backpressure en cada etapa
Dimensión 3: Escala de Consistencia
¿Qué tan frescos necesitan estar tus datos?
Consistencia: Inmediata (consistencia fuerte)
Antigüedad: Segundos (consistencia eventual)
Antigüedad: Minutos (consistencia eventual, aceptable)
Antigüedad: Horas (procesamiento batch aceptable)
La consistencia fuerte es enemiga de la escala. Cada garantía de consistencia cuesta latencia. A escala masiva, aceptas consistencia eventual.
Implicaciones arquitectónicas:
- Separa modelos de lectura y escritura (CQRS)
- Actualizaciones event-driven en lugar de RPCs sincrónicas
- Acepta consistencia eventual en características de usuario
- Consistencia fuerte solo donde legal/negocio lo requiera (pagos, cumplimiento)
Parte 2: Las Capas Arquitectónicas — Separación Clara
Construir para escala requiere límites claros entre capas.
graph TB
subgraph "Client Layer"
WEB["Web Client<br/>Mobile Client"]
end
subgraph "API Gateway Layer"
GW["API Gateway<br/>Authentication<br/>Rate Limiting<br/>Request Routing"]
end
subgraph "Service Layer - Independent Services"
S1["Auth Service<br/>JWT/OAuth"]
S2["Invoice Service<br/>Fiscal Documents"]
S3["Analytics Service<br/>Event Consumer"]
S4["Notification Service<br/>Email/SMS/Webhook"]
end
subgraph "Data Layer - Per Service"
D1["Auth DB<br/>PostgreSQL"]
D2["Invoice DB<br/>PostgreSQL Sharded"]
D3["Analytics DB<br/>ClickHouse"]
D4["Cache<br/>Redis"]
end
subgraph "Event Bus - Central Nervous System"
EB["Event Broker<br/>Kafka/RabbitMQ<br/>Topic: fiscal.invoice.created<br/>Topic: fiscal.invoice.validated<br/>Topic: fiscal.invoice.archived"]
end
subgraph "Infrastructure"
TRACE["Distributed Tracing<br/>Jaeger/Datadog"]
METRIC["Metrics<br/>Prometheus"]
LOG["Logging<br/>ELK Stack"]
MON["Monitoring & Alerting<br/>Grafana/PagerDuty"]
end
WEB -->|HTTP/gRPC| GW
GW -->|Route by path| S1
GW -->|Route by path| S2
GW -->|Route by path| S3
GW -->|Route by path| S4
S1 -->|Read/Write| D1
S2 -->|Read/Write| D2
S3 -->|Read/Write| D3
S2 -->|Cache| D4
S3 -->|Cache| D4
S2 -->|Publish<br/>invoice.created| EB
S2 -->|Publish<br/>invoice.validated| EB
S3 -->|Subscribe| EB
S4 -->|Subscribe| EB
S1 -.->|Trace spans| TRACE
S2 -.->|Trace spans| TRACE
S3 -.->|Trace spans| TRACE
S4 -.->|Trace spans| TRACE
S1 -.->|Metrics| METRIC
S2 -.->|Metrics| METRIC
S3 -.->|Metrics| METRIC
TRACE --> MON
METRIC --> MON
style GW fill:#2196f3,stroke:#333,stroke-width:2px
style EB fill:#ff9800,stroke:#333,stroke-width:2px
style MON fill:#4caf50,stroke:#333,stroke-width:2px
Principio clave: Cada servicio tiene responsabilidad única, base de datos independiente, con su propia tienda de datos.
Parte 3: Arquitectura Orientada a Servicios — Patrones Core
Servicio 1: El Servicio de Autenticación
Cada sistema a escala necesita autenticación. Hazlo un servicio independiente.
// services/auth/domain/user.go
package domain
import (
"crypto/sha256"
"encoding/hex"
"time"
)
type User struct {
ID string
Email string
PasswordHash string
Role string // "admin", "analyst", "viewer"
Status string // "active", "suspended", "deleted"
CreatedAt time.Time
UpdatedAt time.Time
}
type Token struct {
Value string
UserID string
ExpiresAt time.Time
IssuedAt time.Time
Scope []string
}
// Repository contract
type UserRepository interface {
SaveUser(ctx context.Context, user *User) error
FindByEmail(ctx context.Context, email string) (*User, error)
FindByID(ctx context.Context, id string) (*User, error)
UpdateUser(ctx context.Context, user *User) error
}
type TokenRepository interface {
SaveToken(ctx context.Context, token *Token) error
FindToken(ctx context.Context, value string) (*Token, error)
RevokeToken(ctx context.Context, value string) error
}
// UseCase
type AuthenticateUseCase struct {
users UserRepository
tokens TokenRepository
eventBus EventBus
}
func (uc *AuthenticateUseCase) Execute(ctx context.Context, email, password string) (*Token, error) {
// Find user
user, err := uc.users.FindByEmail(ctx, email)
if err != nil {
return nil, err
}
// Validate password
if !validatePassword(password, user.PasswordHash) {
// Publish failed auth event for security monitoring
uc.eventBus.Publish(&AuthFailedEvent{
Email: email,
Timestamp: time.Now(),
})
return nil, fmt.Errorf("invalid credentials")
}
// Generate token
token := &Token{
Value: generateSecureToken(),
UserID: user.ID,
IssuedAt: time.Now(),
ExpiresAt: time.Now().Add(24 * time.Hour),
Scope: []string{"api"},
}
if err := uc.tokens.SaveToken(ctx, token); err != nil {
return nil, err
}
// Publish success event
uc.eventBus.Publish(&AuthSucceededEvent{
UserID: user.ID,
Email: email,
Timestamp: time.Now(),
})
return token, nil
}
Por qué servicio separado:
- Autenticación es una preocupación transversal (cada servicio lo necesita)
- Fuente única de verdad para identidad de usuario
- Escala independientemente de la lógica de negocio
- Las actualizaciones de seguridad se despliegan independientemente
- Puede rotar credenciales sin redirigir todo el sistema
Servicio 2: El Servicio de Facturas (Lógica de Negocio Central)
// services/invoice/domain/invoice.go
package domain
type InvoiceStatus string
const (
StatusPending InvoiceStatus = "pending"
StatusValidated InvoiceStatus = "validated"
StatusRejected InvoiceStatus = "rejected"
StatusProcessing InvoiceStatus = "processing"
StatusArchived InvoiceStatus = "archived"
)
type Invoice struct {
ID string
UUID string
Issuer string
Receiver string
Amount decimal.Decimal
Currency string
IssuanceDate time.Time
Status InvoiceStatus
FileChecksum string
CompressedData []byte
ValidationRules string
CreatedAt time.Time
UpdatedAt time.Time
Version int64 // For optimistic locking
}
type InvoiceRepository interface {
Save(ctx context.Context, invoice *Invoice) error
FindByID(ctx context.Context, id string) (*Invoice, error)
FindByStatus(ctx context.Context, status InvoiceStatus, limit int) ([]*Invoice, error)
UpdateStatus(ctx context.Context, id string, status InvoiceStatus) error
BulkInsert(ctx context.Context, invoices []*Invoice) error
FindByUUID(ctx context.Context, uuid string) (*Invoice, error)
}
// CreateInvoiceUseCase handles incoming invoice creation
type CreateInvoiceUseCase struct {
repo InvoiceRepository
eventBus EventBus
downloader FileDownloader
validator InvoiceValidator
}
func (uc *CreateInvoiceUseCase) Execute(ctx context.Context, cmd *CreateInvoiceCommand) (*Invoice, error) {
// Create domain entity
invoice := &Invoice{
ID: generateID(),
UUID: cmd.UUID,
Issuer: cmd.Issuer,
Receiver: cmd.Receiver,
Amount: cmd.Amount,
Currency: cmd.Currency,
IssuanceDate: cmd.IssuanceDate,
Status: StatusPending,
CreatedAt: time.Now(),
}
// Save to repository
if err := uc.repo.Save(ctx, invoice); err != nil {
return nil, err
}
// Publish domain event
uc.eventBus.Publish(&InvoiceCreatedEvent{
InvoiceID: invoice.ID,
UUID: invoice.UUID,
Amount: invoice.Amount,
CreatedAt: invoice.CreatedAt,
})
return invoice, nil
}
// ProcessInvoiceUseCase handles validation and archiving
type ProcessInvoiceUseCase struct {
repo InvoiceRepository
downloader FileDownloader
validator InvoiceValidator
eventBus EventBus
}
func (uc *ProcessInvoiceUseCase) Execute(ctx context.Context, invoiceID string) error {
// Read invoice
invoice, err := uc.repo.FindByID(ctx, invoiceID)
if err != nil {
return err
}
invoice.Status = StatusProcessing
uc.repo.UpdateStatus(ctx, invoiceID, StatusProcessing)
// Download file
data, checksum, err := uc.downloader.Download(ctx, invoice.FileURL)
if err != nil {
invoice.Status = StatusRejected
uc.repo.UpdateStatus(ctx, invoiceID, StatusRejected)
uc.eventBus.Publish(&InvoiceFailedEvent{
InvoiceID: invoice.ID,
Reason: err.Error(),
})
return err
}
// Verify checksum
if checksum != invoice.FileChecksum {
invoice.Status = StatusRejected
uc.repo.UpdateStatus(ctx, invoiceID, StatusRejected)
return fmt.Errorf("checksum mismatch")
}
// Validate
validationErr := uc.validator.Validate(data)
if validationErr != nil {
invoice.Status = StatusRejected
uc.repo.UpdateStatus(ctx, invoiceID, StatusRejected)
uc.eventBus.Publish(&InvoiceValidationFailedEvent{
InvoiceID: invoice.ID,
Errors: validationErr.Errors(),
})
return validationErr
}
// Archive
invoice.Status = StatusArchived
invoice.CompressedData = compress(data)
uc.repo.UpdateStatus(ctx, invoiceID, StatusArchived)
// Publish success
uc.eventBus.Publish(&InvoiceArchivedEvent{
InvoiceID: invoice.ID,
UUID: invoice.UUID,
Amount: invoice.Amount,
CompressedSize: len(invoice.CompressedData),
ArchivedAt: time.Now(),
})
return nil
}
Decisiones arquitectónicas clave:
- Eventos de dominio en lugar de actualizaciones sincrónicas (otros servicios aprenden sobre cambios de estado vía eventos)
- Patrón Repository para abstracción de datos (fácil cambiar base de datos después)
- Casos de uso mapean a operaciones de negocio (no endpoints HTTP)
- Eventos de error para rastrear fallas (seguridad, cumplimiento)
Servicio 3: El Servicio de Analytics (Consumidor de Eventos)
// services/analytics/domain/event_consumer.go
package domain
type EventConsumer struct {
broker EventBroker
repository AnalyticsRepository
cache *IndexedCache
metrics MetricsCollector
}
// ConsumeEvents escucha eventos de dominio y actualiza análisis
func (ec *EventConsumer) ConsumeEvents(ctx context.Context) error {
subscription := ec.broker.Subscribe("fiscal.invoice.archived")
for event := range subscription.Events() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Cast to specific event type
archivedEvent := event.(*InvoiceArchivedEvent)
// Update analytics
if err := ec.repository.RecordInvoice(ctx, &AnalyticsRecord{
InvoiceID: archivedEvent.InvoiceID,
UUID: archivedEvent.UUID,
Amount: archivedEvent.Amount,
CompressedSize: archivedEvent.CompressedSize,
ProcessedAt: archivedEvent.ArchivedAt,
}); err != nil {
// Dead letter queue for failed processing
ec.broker.Publish(&ProcessingFailedEvent{
EventID: event.EventID(),
Reason: err.Error(),
Timestamp: time.Now(),
})
continue
}
// Warm cache
ec.cache.AddRecord(&AnalyticsRecord{
InvoiceID: archivedEvent.InvoiceID,
UUID: archivedEvent.UUID,
Amount: archivedEvent.Amount,
ProcessedAt: archivedEvent.ArchivedAt,
})
// Record metrics
ec.metrics.RecordInvoiceProcessed(archivedEvent.Amount)
}
return nil
}
Por qué event-driven:
- El servicio de análisis no necesita estar siempre disponible (consistencia eventual OK)
- El servicio de facturas no espera a que Analytics se complete
- Fácil agregar nuevos servicios después (simplemente subscribirse a eventos)
- Desacopla servicios—Invoice no sabe que Analytics existe
Parte 4: Arquitectura Event-Driven — El Sistema Nervioso
Los eventos no son para logging. Los eventos son la forma principal en que los servicios se comunican a escala.
Diseño de Eventos
// shared/events/domain_event.go
package events
type DomainEvent interface {
EventID() string
EventType() string
AggregateID() string
Timestamp() time.Time
Version() int
}
// Define events in shared package (used by all services)
type InvoiceCreatedEvent struct {
id string
eventType string // "fiscal.invoice.created"
aggregateID string // invoice ID
timestamp time.Time
version int
// Business data
InvoiceID string
UUID string
Issuer string
Receiver string
Amount decimal.Decimal
Currency string
IssuanceDate time.Time
}
type InvoiceValidatedEvent struct {
id string
eventType string // "fiscal.invoice.validated"
aggregateID string
timestamp time.Time
version int
// Business data
InvoiceID string
UUID string
ValidationTime time.Duration
Checksum string
}
type InvoiceArchivedEvent struct {
id string
eventType string // "fiscal.invoice.archived"
aggregateID string
timestamp time.Time
version int
// Business data
InvoiceID string
UUID string
Amount decimal.Decimal
CompressedSize int64
ArchivedAt time.Time
StorageURL string
}
// Implement DomainEvent interface
func (e *InvoiceCreatedEvent) EventID() string { return e.id }
func (e *InvoiceCreatedEvent) EventType() string { return e.eventType }
func (e *InvoiceCreatedEvent) AggregateID() string { return e.aggregateID }
func (e *InvoiceCreatedEvent) Timestamp() time.Time { return e.timestamp }
func (e *InvoiceCreatedEvent) Version() int { return e.version }
Implementación del Event Broker
// infrastructure/events/kafka_broker.go
package events
import (
"context"
"encoding/json"
"log"
"github.com/segmentio/kafka-go"
)
type KafkaBroker struct {
writer *kafka.Writer
readers map[string]*kafka.Reader
}
func NewKafkaBroker(brokerAddr string) *KafkaBroker {
return &KafkaBroker{
writer: &kafka.Writer{
Addr: kafka.TCP(brokerAddr),
},
readers: make(map[string]*kafka.Reader),
}
}
// Publish envía evento al tema
func (kb *KafkaBroker) Publish(ctx context.Context, event DomainEvent) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
message := kafka.Message{
Key: []byte(event.AggregateID()),
Value: data,
}
// Route to topic based on event type
topic := eventTypeTopic(event.EventType())
kb.writer.Topic = topic
_, err = kb.writer.WriteMessages(ctx, message)
return err
}
// Subscribe escucha al tema
func (kb *KafkaBroker) Subscribe(ctx context.Context, topic string) <-chan DomainEvent {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
GroupID: "analytics-service",
CommitInterval: time.Second,
})
kb.readers[topic] = reader
events := make(chan DomainEvent, 100)
go func() {
defer close(events)
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
log.Printf("reader error: %v", err)
continue
}
var event DomainEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("unmarshal error: %v", err)
continue
}
events <- event
}
}()
return events
}
func eventTypeTopic(eventType string) string {
// "fiscal.invoice.created" → "fiscal-invoice"
parts := strings.Split(eventType, ".")
return strings.Join(parts[:2], "-")
}
Topología de event broker para 1 millón de eventos/día:
graph TB
subgraph "Producers"
P1["Invoice Service"]
P2["Auth Service"]
end
subgraph "Event Broker - Kafka"
T1["Topic: fiscal-invoice<br/>3 partitions<br/>Retention: 7 days"]
T2["Topic: auth-events<br/>3 partitions<br/>Retention: 30 days"]
end
subgraph "Consumers"
C1["Analytics Service<br/>Consumer Group: analytics"]
C2["Notification Service<br/>Consumer Group: notifications"]
C3["Archive Service<br/>Consumer Group: archival"]
end
P1 -->|Publish| T1
P2 -->|Publish| T2
T1 -->|Subscribe| C1
T1 -->|Subscribe| C2
T1 -->|Subscribe| C3
T2 -->|Subscribe| C1
Parte 5: Patrones de Resiliencia — Manejando Fallo a Escala
A escala masiva, el fallo es garantizado. Construye para ello.
Patrón 1: Circuit Breaker
Previene fallos en cascada evitando solicitudes a servicios que fallan.
// infrastructure/resilience/circuit_breaker.go
package resilience
type CircuitBreakerState string
const (
StateClosed CircuitBreakerState = "closed" // healthy
StateOpen CircuitBreakerState = "open" // failing, reject requests
StateHalfOpen CircuitBreakerState = "half-open" // testing recovery
)
type CircuitBreaker struct {
state CircuitBreakerState
failureCount int
successCount int
lastFailureTime time.Time
threshold int // failures before opening
halfOpenMax int // successes before closing
resetTimeout time.Duration
mu sync.RWMutex
}
func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
threshold: threshold,
halfOpenMax: 3,
resetTimeout: resetTimeout,
}
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
// Check if should transition from open to half-open
if cb.state == StateOpen {
if time.Since(cb.lastFailureTime) > cb.resetTimeout {
cb.state = StateHalfOpen
cb.successCount = 0
} else {
cb.mu.Unlock()
return fmt.Errorf("circuit breaker open")
}
}
cb.mu.Unlock()
// Execute function
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.failureCount >= cb.threshold {
cb.state = StateOpen
return fmt.Errorf("circuit breaker opened after %d failures", cb.failureCount)
}
return err
}
// Success
if cb.state == StateHalfOpen {
cb.successCount++
if cb.successCount >= cb.halfOpenMax {
cb.state = StateClosed
cb.failureCount = 0
}
} else {
cb.failureCount = 0
}
return nil
}
// Usage
func (s *Service) CallDownstream(ctx context.Context) (*Response, error) {
var result *Response
err := s.cb.Call(func() error {
resp, err := s.downstream.Get(ctx)
result = resp
return err
})
if err != nil {
// Fallback: return cached data if circuit open
if s.cb.state == StateOpen {
return s.cache.GetLatest(), nil
}
return nil, err
}
return result, nil
}
Patrón 2: Bulkhead (Aislamiento de Recursos)
Limita cuántos recursos una operación puede usar para prevenir inanición de otras operaciones.
// infrastructure/resilience/bulkhead.go
package resilience
type Bulkhead struct {
semaphore chan struct{}
name string
}
func NewBulkhead(name string, maxConcurrent int) *Bulkhead {
return &Bulkhead{
semaphore: make(chan struct{}, maxConcurrent),
name: name,
}
}
func (b *Bulkhead) Execute(ctx context.Context, fn func() error) error {
select {
case b.semaphore <- struct{}{}:
defer func() { <-b.semaphore }()
return fn()
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for bulkhead %s", b.name)
default:
return fmt.Errorf("bulkhead %s exhausted", b.name)
}
}
// Usage: limit concurrent SFTP downloads
downloadBulkhead := NewBulkhead("sftp-download", 25)
err := downloadBulkhead.Execute(ctx, func() error {
return downloadFile(ctx)
})
Patrón 3: Retry con Exponential Backoff
// infrastructure/resilience/retry.go
package resilience
type RetryPolicy struct {
maxAttempts int
initialBackoff time.Duration
maxBackoff time.Duration
jitter bool
}
func NewRetryPolicy(maxAttempts int) *RetryPolicy {
return &RetryPolicy{
maxAttempts: maxAttempts,
initialBackoff: 100 * time.Millisecond,
maxBackoff: 30 * time.Second,
jitter: true,
}
}
func (rp *RetryPolicy) Execute(ctx context.Context, fn func() error) error {
var lastErr error
for attempt := 0; attempt < rp.maxAttempts; attempt++ {
if err := fn(); err == nil {
return nil
} else {
lastErr = err
}
if attempt < rp.maxAttempts-1 {
backoff := rp.calculateBackoff(attempt)
select {
case <-time.After(backoff):
case <-ctx.Done():
return ctx.Err()
}
}
}
return fmt.Errorf("failed after %d attempts: %w", rp.maxAttempts, lastErr)
}
func (rp *RetryPolicy) calculateBackoff(attempt int) time.Duration {
// Exponential: 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s...
backoff := rp.initialBackoff * time.Duration(math.Pow(2, float64(attempt)))
if backoff > rp.maxBackoff {
backoff = rp.maxBackoff
}
if rp.jitter {
// Add random jitter: backoff ± 10%
jitter := time.Duration(rand.Int63n(int64(backoff / 5)))
backoff = backoff - backoff/10 + jitter
}
return backoff
}
Parte 6: Trazado Distribuido — Observabilidad Across Services
A escala, una solicitud individual abarca múltiples servicios. Necesitas trazarla.
// infrastructure/tracing/tracer.go
package tracing
import (
"context"
"github.com/opentelemetry/otel"
"github.com/opentelemetry/otel/exporters/jaeger"
"github.com/opentelemetry/otel/sdk/resource"
sdktrace "github.com/opentelemetry/otel/sdk/trace"
)
func InitTracer(serviceName string) (*sdktrace.TracerProvider, error) {
exporter, err := jaeger.New(
jaeger.WithAgentHost("localhost"),
jaeger.WithAgentPort(6831),
)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.ServiceNameKey.String(serviceName),
)),
)
otel.SetTracerProvider(tp)
return tp, nil
}
// Usage in service
type InvoiceService struct {
tracer trace.Tracer
repo InvoiceRepository
}
func (s *InvoiceService) ProcessInvoice(ctx context.Context, invoiceID string) error {
ctx, span := s.tracer.Start(ctx, "ProcessInvoice")
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("invoice.id", invoiceID),
)
// Call to downstream service (context propagated)
if err := s.validateInvoice(ctx, invoiceID); err != nil {
span.RecordError(err)
return err
}
if err := s.archiveInvoice(ctx, invoiceID); err != nil {
span.RecordError(err)
return err
}
return nil
}
func (s *InvoiceService) validateInvoice(ctx context.Context, invoiceID string) error {
ctx, span := s.tracer.Start(ctx, "ValidateInvoice")
defer span.End()
span.SetAttributes(
attribute.String("invoice.id", invoiceID),
attribute.String("validation.type", "schema"),
)
// Validation logic...
return nil
}
Visualización de trace (qué ves en Jaeger):
graph LR
A["API Request<br/>POST /invoice/process"]
B["Auth Service<br/>Verify Token"]
C["Invoice Service<br/>ProcessInvoice"]
D["Validate XML<br/>50ms"]
E["Compress<br/>25ms"]
F["Upload S3<br/>100ms"]
G["Publish Event<br/>5ms"]
A -->|10ms| B
B -->|5ms propagate| C
C -->|50ms| D
D -->|25ms| E
E -->|100ms| F
F -->|5ms| G
style A fill:#2196f3
style C fill:#ff9800
style F fill:#4caf50
Parte 7: CQRS (Command Query Responsibility Segregation)
A escala, las lecturas y escrituras tienen características diferentes. Sepáralas.
Lado de Escritura (Command)
// application/commands/create_invoice.go
package commands
type CreateInvoiceCommand struct {
UUID string
Issuer string
Receiver string
Amount decimal.Decimal
Currency string
IssuanceDate time.Time
FileURL string
Checksum string
}
type CreateInvoiceHandler struct {
invoiceRepo InvoiceRepository
eventBus EventBus
}
func (h *CreateInvoiceHandler) Handle(ctx context.Context, cmd *CreateInvoiceCommand) (*InvoiceID, error) {
invoice := &Invoice{
ID: uuid.New().String(),
UUID: cmd.UUID,
Status: StatusPending,
CreatedAt: time.Now(),
}
// Write to primary database
if err := h.invoiceRepo.Save(ctx, invoice); err != nil {
return nil, err
}
// Publish event (will eventually update read model)
h.eventBus.Publish(&InvoiceCreatedEvent{
InvoiceID: invoice.ID,
UUID: invoice.UUID,
})
return &InvoiceID{ID: invoice.ID}, nil
}
Lado de Lectura (Query)
// application/queries/invoice_summary.go
package queries
// Read model (optimized for queries)
type InvoiceSummary struct {
ID string
UUID string
Issuer string
Amount decimal.Decimal
Status string
CreatedAt time.Time
ProcessingTime int // milliseconds
CompressedSize int64
}
type GetInvoiceSummaryQuery struct {
InvoiceID string
}
// Read repository (different from write repository)
type InvoiceSummaryRepository interface {
FindByID(ctx context.Context, id string) (*InvoiceSummary, error)
FindRecent(ctx context.Context, limit int) ([]*InvoiceSummary, error)
FindByStatus(ctx context.Context, status string) ([]*InvoiceSummary, error)
}
type GetInvoiceSummaryHandler struct {
summaryRepo InvoiceSummaryRepository
cache *Cache // O(1) hits
}
func (h *GetInvoiceSummaryHandler) Handle(ctx context.Context, query *GetInvoiceSummaryQuery) (*InvoiceSummary, error) {
// Try cache first
if summary, ok := h.cache.Get(query.InvoiceID); ok {
return summary, nil
}
// Query read model
summary, err := h.summaryRepo.FindByID(ctx, query.InvoiceID)
if err != nil {
return nil, err
}
h.cache.Set(query.InvoiceID, summary)
return summary, nil
}
Topología CQRS:
graph TB
subgraph "Write Model"
CMD["Commands<br/>Create<br/>Update<br/>Delete"]
PRIMARY["Primary Database<br/>PostgreSQL"]
end
subgraph "Event Stream"
EVT["Events<br/>invoice.created<br/>invoice.archived<br/>invoice.validated"]
end
subgraph "Read Models"
SUMMARY["Summary DB<br/>Denormalized<br/>Optimized for reads"]
SEARCH["Search Index<br/>Elasticsearch<br/>Full-text search"]
CACHE["Cache<br/>Redis<br/>Hot data"]
end
subgraph "Query Side"
QRY["Queries<br/>GetInvoice<br/>ListByStatus<br/>SearchByIssuer"]
end
CMD -->|Write| PRIMARY
PRIMARY -->|Publish| EVT
EVT -->|Async Update| SUMMARY
EVT -->|Async Update| SEARCH
EVT -->|Async Update| CACHE
QRY -->|Read| SUMMARY
QRY -->|Read| SEARCH
QRY -->|Read| CACHE
style PRIMARY fill:#ff9800,stroke:#333,stroke-width:2px
style EVT fill:#2196f3,stroke:#333,stroke-width:2px
style SUMMARY fill:#4caf50,stroke:#333,stroke-width:2px
Parte 8: Database Sharding para Escala Masiva
Cuando tienes 100TB de datos, no puede vivir en una base de datos única.
// infrastructure/sharding/shard_key.go
package sharding
type ShardKey struct {
// Primary key: Issuer RFC
Issuer string
// Secondary key: Month (for time-series retention)
Month string // "2026-03"
}
func (sk *ShardKey) Hash() uint32 {
h := fnv.New32a()
h.Write([]byte(sk.Issuer))
return h.Sum32()
}
// Determine which shard to use
func (sk *ShardKey) ShardNumber(totalShards int) int {
return int(sk.Hash()) % totalShards
}
// Shard mapping
type ShardMapping struct {
shards map[int]*ShardConnection
mu sync.RWMutex
}
func NewShardMapping(totalShards int) *ShardMapping {
sm := &ShardMapping{
shards: make(map[int]*ShardConnection),
}
// Initialize shard connections
for i := 0; i < totalShards; i++ {
dsn := fmt.Sprintf("postgres://user:pass@shard-%d.db.example.com/fiscal", i)
db, _ := sql.Open("postgres", dsn)
sm.shards[i] = &ShardConnection{db: db, shardID: i}
}
return sm
}
func (sm *ShardMapping) GetShard(key *ShardKey) *ShardConnection {
sm.mu.RLock()
defer sm.mu.RUnlock()
shardNum := key.ShardNumber(len(sm.shards))
return sm.shards[shardNum]
}
// Usage
type InvoiceRepository struct {
sharding *ShardMapping
}
func (r *InvoiceRepository) SaveInvoice(ctx context.Context, invoice *Invoice) error {
key := &ShardKey{
Issuer: invoice.Issuer,
Month: invoice.IssuanceDate.Format("2006-01"),
}
shard := r.sharding.GetShard(key)
// Execute on correct shard
return shard.db.ExecContext(ctx, `
INSERT INTO invoices (id, uuid, issuer, receiver, amount, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
`, invoice.ID, invoice.UUID, invoice.Issuer, invoice.Receiver, invoice.Amount, invoice.CreatedAt)
}
Topología de sharding para 100TB de datos:
graph TB
subgraph "Shard Layer"
S1["Shard 0<br/>Issuer: A-E<br/>10 TB"]
S2["Shard 1<br/>Issuer: F-J<br/>10 TB"]
S3["Shard 2<br/>Issuer: K-O<br/>10 TB"]
S4["Shard 3<br/>Issuer: P-Z<br/>10 TB"]
end
subgraph "Shard Mapping (Router)"
ROUTE["ShardMapping<br/>Hash(issuer) % 4"]
end
subgraph "Query"
READ["Query issuer ABC"]
end
READ -->|Hash ABC<br/>= 0| ROUTE
ROUTE -->|Route to Shard 0| S1
ROUTE -->|Can route to S2| S2
ROUTE -->|Can route to S3| S3
ROUTE -->|Can route to S4| S4
style ROUTE fill:#2196f3,stroke:#333,stroke-width:2px
style S1 fill:#4caf50,stroke:#333,stroke-width:2px
Parte 9: API Gateway — La Puerta de Entrada
Todas las solicitudes externas fluyen a través de la puerta.
// api/gateway/gateway.go
package gateway
type APIGateway struct {
router chi.Router
authService *grpc.ClientConn
rateLimiter *RateLimiter
tracer trace.Tracer
metricsClient MetricsClient
}
func NewAPIGateway() *APIGateway {
authConn, _ := grpc.Dial("auth-service:50051")
return &APIGateway{
router: chi.NewRouter(),
authService: authConn,
rateLimiter: NewRateLimiter(),
}
}
func (gw *APIGateway) Start() {
gw.router.Use(gw.loggingMiddleware)
gw.router.Use(gw.tracingMiddleware)
gw.router.Use(gw.authMiddleware)
gw.router.Use(gw.rateLimitMiddleware)
// Route to services
gw.router.Post("/api/v1/invoices", gw.createInvoice)
gw.router.Get("/api/v1/invoices/{id}", gw.getInvoice)
gw.router.Get("/api/v1/reports/daily", gw.getDailyReport)
http.ListenAndServe(":8080", gw.router)
}
func (gw *APIGateway) authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("Authorization")
if token == "" {
http.Error(w, "missing token", http.StatusUnauthorized)
return
}
// Validate token via Auth Service (cached)
user, err := gw.validateToken(r.Context(), token)
if err != nil {
http.Error(w, "invalid token", http.StatusUnauthorized)
return
}
// Add user to context
ctx := context.WithValue(r.Context(), "user", user)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func (gw *APIGateway) rateLimitMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
user := r.Context().Value("user").(*User)
// Check rate limit per user (tier-based)
if !gw.rateLimiter.Allow(user.ID, user.Tier) {
w.Header().Set("X-RateLimit-Retry-After", "60")
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
func (gw *APIGateway) createInvoice(w http.ResponseWriter, r *http.Request) {
var req CreateInvoiceRequest
json.NewDecoder(r.Body).Decode(&req)
// Call Invoice Service via gRPC
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
invoiceService := pb.NewInvoiceServiceClient(gw.invoiceConn)
resp, err := invoiceService.CreateInvoice(ctx, &pb.CreateInvoiceRequest{
UUID: req.UUID,
Issuer: req.Issuer,
Amount: req.Amount,
FileURL: req.FileURL,
Checksum: req.Checksum,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(resp)
}
Parte 10: Arquitectura de Deployment
Cómo ejecutar todo esto a escala:
graph TB
subgraph "Client"
CLIENT["Users<br/>APIs"]
end
subgraph "Edge"
CDN["CDN<br/>Cloudflare"]
end
subgraph "Load Balancer"
LB["AWS ALB<br/>Multi-AZ"]
end
subgraph "Kubernetes Cluster"
subgraph "API Gateway Pods"
GW1["Pod 1"]
GW2["Pod 2"]
GW3["Pod 3"]
end
subgraph "Invoice Service Pods"
INV1["Pod 1"]
INV2["Pod 2"]
INV3["Pod 3"]
end
subgraph "Analytics Service Pods"
ANA1["Pod 1"]
ANA2["Pod 2"]
end
end
subgraph "Data Layer"
subgraph "Sharded PostgreSQL"
PG1["Shard 0<br/>Primary + Replica"]
PG2["Shard 1<br/>Primary + Replica"]
end
REDIS["Redis Cluster<br/>3 nodes"]
end
subgraph "Message Broker"
KAFKA["Kafka<br/>3 brokers<br/>Replication Factor 3"]
end
subgraph "Infrastructure"
JAEGER["Jaeger<br/>Tracing"]
PROM["Prometheus<br/>Metrics"]
GRAFANA["Grafana<br/>Dashboards"]
end
CLIENT -->|CDN| CDN
CDN -->|HTTPS| LB
LB -->|Route| GW1
LB -->|Route| GW2
LB -->|Route| GW3
GW1 -->|gRPC| INV1
GW2 -->|gRPC| INV2
GW3 -->|gRPC| INV3
INV1 -->|Query| PG1
INV2 -->|Query| PG2
INV3 -->|Query| PG1
INV1 -->|Cache| REDIS
ANA1 -->|Cache| REDIS
INV1 -->|Publish| KAFKA
ANA1 -->|Subscribe| KAFKA
ANA2 -->|Subscribe| KAFKA
GW1 -.->|Trace| JAEGER
INV1 -.->|Metrics| PROM
PROM -->|Display| GRAFANA
style LB fill:#2196f3,stroke:#333,stroke-width:2px
style KAFKA fill:#ff9800,stroke:#333,stroke-width:2px
style GRAFANA fill:#4caf50,stroke:#333,stroke-width:2px
Parte 11: Excelencia Operacional — Ejecutando a Escala
Health Checks y Readiness
// api/health/handler.go
package health
type HealthHandler struct {
db *sql.DB
kafka EventBroker
redis *redis.Client
}
// Liveness: ¿el pod aún está ejecutándose?
func (h *HealthHandler) Liveness(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "alive",
})
}
// Readiness: ¿puede el pod manejar tráfico?
func (h *HealthHandler) Readiness(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()
// Check database
if err := h.db.PingContext(ctx); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "not ready: db unhealthy",
})
return
}
// Check Kafka
if err := h.kafka.Health(ctx); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "not ready: kafka unhealthy",
})
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "ready",
})
}
Graceful Shutdown
// main.go
func main() {
app := setupApplication()
server := &http.Server{
Addr: ":8080",
Handler: app.router,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
}
// Graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
log.Printf("Received signal: %v", sig)
// Stop accepting new requests
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Wait for in-flight requests to complete
server.Shutdown(ctx)
// Flush event buffer
app.eventBus.Close()
// Close connections
app.db.Close()
app.redis.Close()
os.Exit(0)
}()
log.Fatal(server.ListenAndServe())
}
Parte 12: La Lista de Verificación Arquitectónica para Escala
Antes de desplegar tu sistema:
Microservices
├─ [ ] Cada servicio tiene responsabilidad única
├─ [ ] Cada servicio tiene base de datos independiente
├─ [ ] Los servicios se comunican vía eventos async
└─ [ ] Sin llamadas HTTP servicio-a-servicio en hot path
Resiliencia
├─ [ ] Circuit breakers en todas las llamadas externas
├─ [ ] Bulkheads aíslan pools de recursos
├─ [ ] Retry con exponential backoff
└─ [ ] Estrategias de fallback para degradación
Observabilidad
├─ [ ] Trazado distribuido a través de todos los servicios
├─ [ ] Métricas Prometheus exportadas
├─ [ ] Logging estructurado con IDs de correlación
└─ [ ] Alertas para error rate > 1%
Datos
├─ [ ] Base de datos sharded por clave natural
├─ [ ] Replicas de lectura para consultas de análisis
├─ [ ] Estrategia de cache documentada
└─ [ ] Política de retención de datos definida
Deployment
├─ [ ] Health checks (liveness + readiness)
├─ [ ] Graceful shutdown (timeout 30s)
├─ [ ] Deployments rolling configurados
├─ [ ] Deployments canary para cambios críticos
└─ [ ] Migraciones de BD no-bloqueantes
Operaciones
├─ [ ] Alertas PagerDuty configuradas
├─ [ ] Runbooks para fallos comunes
├─ [ ] Rotación on-call establecida
└─ [ ] Postmortems de incidentes obligatorios
Conclusión: Arquitectura como Ventaja Competitiva
Las empresas que procesan millones de eventos diarios no tienen éxito con código inteligente. Tienen éxito con arquitectura disciplinada.
Microservicios que descomponen el problema correctamente. Patrones event-driven que desacoplan servicios. Patrones de resiliencia que previenen fallos en cascada. Trazado distribuido que revela la verdad.
Así es como Uber procesa miles de millones de solicitudes de viajes. Cómo Stripe procesa billones de dólares. Cómo Twitter maneja millones de tweets por segundo.
Go no es el único lenguaje que puede hacer esto. Pero el modelo de concurrencia de Go, las interfaces vacías, y su simplicidad hacen que estos patrones sean fáciles de implementar.
Los patrones son lo que importa. El lenguaje es secundario.
La escala no es un destino. Es un proceso continuo de eliminar cuellos de botella, distribuir carga, y construir sistemas que se degradan con elegancia bajo presión. La mejor arquitectura es aquella que anticipa fallo y diseña en torno a él.
Tags
Artículos relacionados
API Versioning Strategies: Cómo Evolucionar APIs sin Romper Clientes
Una guía exhaustiva sobre estrategias de versionado de APIs: URL versioning vs Header versioning, cómo deprecar endpoints sin shock, migration patterns reales, handling de cambios backwards-incompatibles, y decisiones arquitectónicas que importan. Con 50+ ejemplos de código en Go.
Arquitectura de software: Más allá del código
Una guía completa sobre arquitectura de software explicada en lenguaje humano: patrones, organización, estructura y cómo construir sistemas que escalen con tu negocio.
Automatizando tu vida con Go CLI: Guía profesional para crear herramientas de línea de comandos escalables
Una guía exhaustiva y paso a paso sobre cómo crear herramientas CLI escalables con Go 1.25.5: desde lo básico hasta proyectos empresariales complejos con flags, configuración, logging, y ejemplos prácticos para Windows y Linux.