Go Architecture a Escala Masiva: Microservicios, Eventos y Sistemas Distribuidos

Go Architecture a Escala Masiva: Microservicios, Eventos y Sistemas Distribuidos

Diseña sistemas procesando millones de eventos diarios. Aprende arquitectura de microservicios, patrones event-driven, trazado distribuido, circuit breakers e decisiones arquitectónicas que separan código prototipo de infraestructura de producción.

Por Omar Flores

El Momento en Que Todo Cambia

Empiezas con un monolito. Un binario único, una base de datos única, funciona bien para 10,000 usuarios. Luego llegas a 100,000 usuarios.

El monolito se convierte en un cuello de botella. La base de datos se convierte en un cuello de botella. Tu pipeline de despliegue se convierte en un cuello de botella. Todo se convierte en un cuello de botella.

Esto no es un problema a resolver con código más inteligente. Este es un problema a resolver con arquitectura diferente.

Esta guía te muestra cómo. No cómo escalar un monolito. Cómo construir la arquitectura que escala desde el día uno.


Parte 1: Entendiendo la Escala — Las Tres Dimensiones

Antes de tomar decisiones arquitectónicas, entiende qué significa “escala” para tu sistema.

Dimensión 1: Escala de Datos

¿Cuántos datos fluyen a través de tu sistema?

Escala pequeña:  100 MB/día (startup)
Escala media:    100 GB/día (crecimiento)
Escala grande:   1 TB/día (empresa)
Escala masiva:   10+ TB/día (finanzas, logística, telecom)

A 1 TB/día:

  • Tu base de datos no puede ser una instancia única
  • No puedes almacenar todo en memoria
  • No puedes consultar todo de forma sincrónica
  • Necesitas particionamiento de datos por defecto

Implicaciones arquitectónicas:

  • Bases de datos sharded (particionar por región, por cliente, por tiempo)
  • Consistencia eventual en lugar de consistencia fuerte
  • Capas de cache son obligatorias
  • Los datos no pueden viajar frecuentemente a través de límites de red

Dimensión 2: Escala de Solicitudes

¿Cuántas solicitudes por segundo debes manejar?

Escala pequeña:  100 RPS (API startup)
Escala media:    1,000 RPS (mercado intermedio)
Escala grande:   10,000 RPS (empresa)
Escala masiva:   100,000+ RPS (hiperscala)

A 100,000 RPS:

  • Un servidor único no puede manejarlo (máx ~10,000 RPS por instancia Go)
  • Necesitas 10+ servidores mínimo
  • El balanceo de carga se vuelve crítico
  • El pooling de conexiones se vuelve crítico
  • El monitoreo de profundidad de cola se vuelve supervivencia

Implicaciones arquitectónicas:

  • Escalado horizontal desde el inicio
  • Servicios sin estado (fáciles de escalar)
  • Puertas de enlace API para enrutamiento
  • Rate limiting y circuit breakers
  • Manejo de backpressure en cada etapa

Dimensión 3: Escala de Consistencia

¿Qué tan frescos necesitan estar tus datos?

Consistencia:    Inmediata (consistencia fuerte)
Antigüedad:      Segundos (consistencia eventual)
Antigüedad:      Minutos (consistencia eventual, aceptable)
Antigüedad:      Horas (procesamiento batch aceptable)

La consistencia fuerte es enemiga de la escala. Cada garantía de consistencia cuesta latencia. A escala masiva, aceptas consistencia eventual.

Implicaciones arquitectónicas:

  • Separa modelos de lectura y escritura (CQRS)
  • Actualizaciones event-driven en lugar de RPCs sincrónicas
  • Acepta consistencia eventual en características de usuario
  • Consistencia fuerte solo donde legal/negocio lo requiera (pagos, cumplimiento)

Parte 2: Las Capas Arquitectónicas — Separación Clara

Construir para escala requiere límites claros entre capas.

graph TB
    subgraph "Client Layer"
        WEB["Web Client<br/>Mobile Client"]
    end

    subgraph "API Gateway Layer"
        GW["API Gateway<br/>Authentication<br/>Rate Limiting<br/>Request Routing"]
    end

    subgraph "Service Layer - Independent Services"
        S1["Auth Service<br/>JWT/OAuth"]
        S2["Invoice Service<br/>Fiscal Documents"]
        S3["Analytics Service<br/>Event Consumer"]
        S4["Notification Service<br/>Email/SMS/Webhook"]
    end

    subgraph "Data Layer - Per Service"
        D1["Auth DB<br/>PostgreSQL"]
        D2["Invoice DB<br/>PostgreSQL Sharded"]
        D3["Analytics DB<br/>ClickHouse"]
        D4["Cache<br/>Redis"]
    end

    subgraph "Event Bus - Central Nervous System"
        EB["Event Broker<br/>Kafka/RabbitMQ<br/>Topic: fiscal.invoice.created<br/>Topic: fiscal.invoice.validated<br/>Topic: fiscal.invoice.archived"]
    end

    subgraph "Infrastructure"
        TRACE["Distributed Tracing<br/>Jaeger/Datadog"]
        METRIC["Metrics<br/>Prometheus"]
        LOG["Logging<br/>ELK Stack"]
        MON["Monitoring & Alerting<br/>Grafana/PagerDuty"]
    end

    WEB -->|HTTP/gRPC| GW
    GW -->|Route by path| S1
    GW -->|Route by path| S2
    GW -->|Route by path| S3
    GW -->|Route by path| S4

    S1 -->|Read/Write| D1
    S2 -->|Read/Write| D2
    S3 -->|Read/Write| D3
    S2 -->|Cache| D4
    S3 -->|Cache| D4

    S2 -->|Publish<br/>invoice.created| EB
    S2 -->|Publish<br/>invoice.validated| EB
    S3 -->|Subscribe| EB
    S4 -->|Subscribe| EB

    S1 -.->|Trace spans| TRACE
    S2 -.->|Trace spans| TRACE
    S3 -.->|Trace spans| TRACE
    S4 -.->|Trace spans| TRACE

    S1 -.->|Metrics| METRIC
    S2 -.->|Metrics| METRIC
    S3 -.->|Metrics| METRIC

    TRACE --> MON
    METRIC --> MON

    style GW fill:#2196f3,stroke:#333,stroke-width:2px
    style EB fill:#ff9800,stroke:#333,stroke-width:2px
    style MON fill:#4caf50,stroke:#333,stroke-width:2px

Principio clave: Cada servicio tiene responsabilidad única, base de datos independiente, con su propia tienda de datos.


Parte 3: Arquitectura Orientada a Servicios — Patrones Core

Servicio 1: El Servicio de Autenticación

Cada sistema a escala necesita autenticación. Hazlo un servicio independiente.

// services/auth/domain/user.go
package domain

import (
	"crypto/sha256"
	"encoding/hex"
	"time"
)

type User struct {
	ID           string
	Email        string
	PasswordHash string
	Role         string // "admin", "analyst", "viewer"
	Status       string // "active", "suspended", "deleted"
	CreatedAt    time.Time
	UpdatedAt    time.Time
}

type Token struct {
	Value     string
	UserID    string
	ExpiresAt time.Time
	IssuedAt  time.Time
	Scope     []string
}

// Repository contract
type UserRepository interface {
	SaveUser(ctx context.Context, user *User) error
	FindByEmail(ctx context.Context, email string) (*User, error)
	FindByID(ctx context.Context, id string) (*User, error)
	UpdateUser(ctx context.Context, user *User) error
}

type TokenRepository interface {
	SaveToken(ctx context.Context, token *Token) error
	FindToken(ctx context.Context, value string) (*Token, error)
	RevokeToken(ctx context.Context, value string) error
}

// UseCase
type AuthenticateUseCase struct {
	users      UserRepository
	tokens     TokenRepository
	eventBus   EventBus
}

func (uc *AuthenticateUseCase) Execute(ctx context.Context, email, password string) (*Token, error) {
	// Find user
	user, err := uc.users.FindByEmail(ctx, email)
	if err != nil {
		return nil, err
	}

	// Validate password
	if !validatePassword(password, user.PasswordHash) {
		// Publish failed auth event for security monitoring
		uc.eventBus.Publish(&AuthFailedEvent{
			Email:     email,
			Timestamp: time.Now(),
		})
		return nil, fmt.Errorf("invalid credentials")
	}

	// Generate token
	token := &Token{
		Value:     generateSecureToken(),
		UserID:    user.ID,
		IssuedAt:  time.Now(),
		ExpiresAt: time.Now().Add(24 * time.Hour),
		Scope:     []string{"api"},
	}

	if err := uc.tokens.SaveToken(ctx, token); err != nil {
		return nil, err
	}

	// Publish success event
	uc.eventBus.Publish(&AuthSucceededEvent{
		UserID:    user.ID,
		Email:     email,
		Timestamp: time.Now(),
	})

	return token, nil
}

Por qué servicio separado:

  • Autenticación es una preocupación transversal (cada servicio lo necesita)
  • Fuente única de verdad para identidad de usuario
  • Escala independientemente de la lógica de negocio
  • Las actualizaciones de seguridad se despliegan independientemente
  • Puede rotar credenciales sin redirigir todo el sistema

Servicio 2: El Servicio de Facturas (Lógica de Negocio Central)

// services/invoice/domain/invoice.go
package domain

type InvoiceStatus string

const (
	StatusPending    InvoiceStatus = "pending"
	StatusValidated  InvoiceStatus = "validated"
	StatusRejected   InvoiceStatus = "rejected"
	StatusProcessing InvoiceStatus = "processing"
	StatusArchived   InvoiceStatus = "archived"
)

type Invoice struct {
	ID              string
	UUID            string
	Issuer          string
	Receiver        string
	Amount          decimal.Decimal
	Currency        string
	IssuanceDate    time.Time
	Status          InvoiceStatus
	FileChecksum    string
	CompressedData  []byte
	ValidationRules string
	CreatedAt       time.Time
	UpdatedAt       time.Time
	Version         int64 // For optimistic locking
}

type InvoiceRepository interface {
	Save(ctx context.Context, invoice *Invoice) error
	FindByID(ctx context.Context, id string) (*Invoice, error)
	FindByStatus(ctx context.Context, status InvoiceStatus, limit int) ([]*Invoice, error)
	UpdateStatus(ctx context.Context, id string, status InvoiceStatus) error
	BulkInsert(ctx context.Context, invoices []*Invoice) error
	FindByUUID(ctx context.Context, uuid string) (*Invoice, error)
}

// CreateInvoiceUseCase handles incoming invoice creation
type CreateInvoiceUseCase struct {
	repo        InvoiceRepository
	eventBus    EventBus
	downloader  FileDownloader
	validator   InvoiceValidator
}

func (uc *CreateInvoiceUseCase) Execute(ctx context.Context, cmd *CreateInvoiceCommand) (*Invoice, error) {
	// Create domain entity
	invoice := &Invoice{
		ID:           generateID(),
		UUID:         cmd.UUID,
		Issuer:       cmd.Issuer,
		Receiver:     cmd.Receiver,
		Amount:       cmd.Amount,
		Currency:     cmd.Currency,
		IssuanceDate: cmd.IssuanceDate,
		Status:       StatusPending,
		CreatedAt:    time.Now(),
	}

	// Save to repository
	if err := uc.repo.Save(ctx, invoice); err != nil {
		return nil, err
	}

	// Publish domain event
	uc.eventBus.Publish(&InvoiceCreatedEvent{
		InvoiceID:   invoice.ID,
		UUID:        invoice.UUID,
		Amount:      invoice.Amount,
		CreatedAt:   invoice.CreatedAt,
	})

	return invoice, nil
}

// ProcessInvoiceUseCase handles validation and archiving
type ProcessInvoiceUseCase struct {
	repo        InvoiceRepository
	downloader  FileDownloader
	validator   InvoiceValidator
	eventBus    EventBus
}

func (uc *ProcessInvoiceUseCase) Execute(ctx context.Context, invoiceID string) error {
	// Read invoice
	invoice, err := uc.repo.FindByID(ctx, invoiceID)
	if err != nil {
		return err
	}

	invoice.Status = StatusProcessing
	uc.repo.UpdateStatus(ctx, invoiceID, StatusProcessing)

	// Download file
	data, checksum, err := uc.downloader.Download(ctx, invoice.FileURL)
	if err != nil {
		invoice.Status = StatusRejected
		uc.repo.UpdateStatus(ctx, invoiceID, StatusRejected)
		uc.eventBus.Publish(&InvoiceFailedEvent{
			InvoiceID: invoice.ID,
			Reason:    err.Error(),
		})
		return err
	}

	// Verify checksum
	if checksum != invoice.FileChecksum {
		invoice.Status = StatusRejected
		uc.repo.UpdateStatus(ctx, invoiceID, StatusRejected)
		return fmt.Errorf("checksum mismatch")
	}

	// Validate
	validationErr := uc.validator.Validate(data)
	if validationErr != nil {
		invoice.Status = StatusRejected
		uc.repo.UpdateStatus(ctx, invoiceID, StatusRejected)
		uc.eventBus.Publish(&InvoiceValidationFailedEvent{
			InvoiceID: invoice.ID,
			Errors:    validationErr.Errors(),
		})
		return validationErr
	}

	// Archive
	invoice.Status = StatusArchived
	invoice.CompressedData = compress(data)
	uc.repo.UpdateStatus(ctx, invoiceID, StatusArchived)

	// Publish success
	uc.eventBus.Publish(&InvoiceArchivedEvent{
		InvoiceID:      invoice.ID,
		UUID:           invoice.UUID,
		Amount:         invoice.Amount,
		CompressedSize: len(invoice.CompressedData),
		ArchivedAt:     time.Now(),
	})

	return nil
}

Decisiones arquitectónicas clave:

  • Eventos de dominio en lugar de actualizaciones sincrónicas (otros servicios aprenden sobre cambios de estado vía eventos)
  • Patrón Repository para abstracción de datos (fácil cambiar base de datos después)
  • Casos de uso mapean a operaciones de negocio (no endpoints HTTP)
  • Eventos de error para rastrear fallas (seguridad, cumplimiento)

Servicio 3: El Servicio de Analytics (Consumidor de Eventos)

// services/analytics/domain/event_consumer.go
package domain

type EventConsumer struct {
	broker       EventBroker
	repository   AnalyticsRepository
	cache        *IndexedCache
	metrics      MetricsCollector
}

// ConsumeEvents escucha eventos de dominio y actualiza análisis
func (ec *EventConsumer) ConsumeEvents(ctx context.Context) error {
	subscription := ec.broker.Subscribe("fiscal.invoice.archived")

	for event := range subscription.Events() {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}

		// Cast to specific event type
		archivedEvent := event.(*InvoiceArchivedEvent)

		// Update analytics
		if err := ec.repository.RecordInvoice(ctx, &AnalyticsRecord{
			InvoiceID:      archivedEvent.InvoiceID,
			UUID:           archivedEvent.UUID,
			Amount:         archivedEvent.Amount,
			CompressedSize: archivedEvent.CompressedSize,
			ProcessedAt:    archivedEvent.ArchivedAt,
		}); err != nil {
			// Dead letter queue for failed processing
			ec.broker.Publish(&ProcessingFailedEvent{
				EventID:   event.EventID(),
				Reason:    err.Error(),
				Timestamp: time.Now(),
			})
			continue
		}

		// Warm cache
		ec.cache.AddRecord(&AnalyticsRecord{
			InvoiceID:      archivedEvent.InvoiceID,
			UUID:           archivedEvent.UUID,
			Amount:         archivedEvent.Amount,
			ProcessedAt:    archivedEvent.ArchivedAt,
		})

		// Record metrics
		ec.metrics.RecordInvoiceProcessed(archivedEvent.Amount)
	}

	return nil
}

Por qué event-driven:

  • El servicio de análisis no necesita estar siempre disponible (consistencia eventual OK)
  • El servicio de facturas no espera a que Analytics se complete
  • Fácil agregar nuevos servicios después (simplemente subscribirse a eventos)
  • Desacopla servicios—Invoice no sabe que Analytics existe

Parte 4: Arquitectura Event-Driven — El Sistema Nervioso

Los eventos no son para logging. Los eventos son la forma principal en que los servicios se comunican a escala.

Diseño de Eventos

// shared/events/domain_event.go
package events

type DomainEvent interface {
	EventID() string
	EventType() string
	AggregateID() string
	Timestamp() time.Time
	Version() int
}

// Define events in shared package (used by all services)
type InvoiceCreatedEvent struct {
	id          string
	eventType   string // "fiscal.invoice.created"
	aggregateID string // invoice ID
	timestamp   time.Time
	version     int

	// Business data
	InvoiceID   string
	UUID        string
	Issuer      string
	Receiver    string
	Amount      decimal.Decimal
	Currency    string
	IssuanceDate time.Time
}

type InvoiceValidatedEvent struct {
	id          string
	eventType   string // "fiscal.invoice.validated"
	aggregateID string
	timestamp   time.Time
	version     int

	// Business data
	InvoiceID      string
	UUID           string
	ValidationTime time.Duration
	Checksum       string
}

type InvoiceArchivedEvent struct {
	id          string
	eventType   string // "fiscal.invoice.archived"
	aggregateID string
	timestamp   time.Time
	version     int

	// Business data
	InvoiceID      string
	UUID           string
	Amount         decimal.Decimal
	CompressedSize int64
	ArchivedAt     time.Time
	StorageURL     string
}

// Implement DomainEvent interface
func (e *InvoiceCreatedEvent) EventID() string      { return e.id }
func (e *InvoiceCreatedEvent) EventType() string    { return e.eventType }
func (e *InvoiceCreatedEvent) AggregateID() string  { return e.aggregateID }
func (e *InvoiceCreatedEvent) Timestamp() time.Time { return e.timestamp }
func (e *InvoiceCreatedEvent) Version() int         { return e.version }

Implementación del Event Broker

// infrastructure/events/kafka_broker.go
package events

import (
	"context"
	"encoding/json"
	"log"

	"github.com/segmentio/kafka-go"
)

type KafkaBroker struct {
	writer  *kafka.Writer
	readers map[string]*kafka.Reader
}

func NewKafkaBroker(brokerAddr string) *KafkaBroker {
	return &KafkaBroker{
		writer: &kafka.Writer{
			Addr: kafka.TCP(brokerAddr),
		},
		readers: make(map[string]*kafka.Reader),
	}
}

// Publish envía evento al tema
func (kb *KafkaBroker) Publish(ctx context.Context, event DomainEvent) error {
	data, err := json.Marshal(event)
	if err != nil {
		return err
	}

	message := kafka.Message{
		Key:   []byte(event.AggregateID()),
		Value: data,
	}

	// Route to topic based on event type
	topic := eventTypeTopic(event.EventType())
	kb.writer.Topic = topic

	_, err = kb.writer.WriteMessages(ctx, message)
	return err
}

// Subscribe escucha al tema
func (kb *KafkaBroker) Subscribe(ctx context.Context, topic string) <-chan DomainEvent {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        []string{"localhost:9092"},
		Topic:          topic,
		GroupID:        "analytics-service",
		CommitInterval: time.Second,
	})

	kb.readers[topic] = reader

	events := make(chan DomainEvent, 100)

	go func() {
		defer close(events)
		for {
			msg, err := reader.ReadMessage(ctx)
			if err != nil {
				log.Printf("reader error: %v", err)
				continue
			}

			var event DomainEvent
			if err := json.Unmarshal(msg.Value, &event); err != nil {
				log.Printf("unmarshal error: %v", err)
				continue
			}

			events <- event
		}
	}()

	return events
}

func eventTypeTopic(eventType string) string {
	// "fiscal.invoice.created" → "fiscal-invoice"
	parts := strings.Split(eventType, ".")
	return strings.Join(parts[:2], "-")
}

Topología de event broker para 1 millón de eventos/día:

graph TB
    subgraph "Producers"
        P1["Invoice Service"]
        P2["Auth Service"]
    end

    subgraph "Event Broker - Kafka"
        T1["Topic: fiscal-invoice<br/>3 partitions<br/>Retention: 7 days"]
        T2["Topic: auth-events<br/>3 partitions<br/>Retention: 30 days"]
    end

    subgraph "Consumers"
        C1["Analytics Service<br/>Consumer Group: analytics"]
        C2["Notification Service<br/>Consumer Group: notifications"]
        C3["Archive Service<br/>Consumer Group: archival"]
    end

    P1 -->|Publish| T1
    P2 -->|Publish| T2
    T1 -->|Subscribe| C1
    T1 -->|Subscribe| C2
    T1 -->|Subscribe| C3
    T2 -->|Subscribe| C1

Parte 5: Patrones de Resiliencia — Manejando Fallo a Escala

A escala masiva, el fallo es garantizado. Construye para ello.

Patrón 1: Circuit Breaker

Previene fallos en cascada evitando solicitudes a servicios que fallan.

// infrastructure/resilience/circuit_breaker.go
package resilience

type CircuitBreakerState string

const (
	StateClosed  CircuitBreakerState = "closed"   // healthy
	StateOpen    CircuitBreakerState = "open"     // failing, reject requests
	StateHalfOpen CircuitBreakerState = "half-open" // testing recovery
)

type CircuitBreaker struct {
	state              CircuitBreakerState
	failureCount       int
	successCount       int
	lastFailureTime    time.Time
	threshold          int     // failures before opening
	halfOpenMax        int     // successes before closing
	resetTimeout       time.Duration
	mu                 sync.RWMutex
}

func NewCircuitBreaker(threshold int, resetTimeout time.Duration) *CircuitBreaker {
	return &CircuitBreaker{
		state:        StateClosed,
		threshold:    threshold,
		halfOpenMax:  3,
		resetTimeout: resetTimeout,
	}
}

func (cb *CircuitBreaker) Call(fn func() error) error {
	cb.mu.Lock()

	// Check if should transition from open to half-open
	if cb.state == StateOpen {
		if time.Since(cb.lastFailureTime) > cb.resetTimeout {
			cb.state = StateHalfOpen
			cb.successCount = 0
		} else {
			cb.mu.Unlock()
			return fmt.Errorf("circuit breaker open")
		}
	}

	cb.mu.Unlock()

	// Execute function
	err := fn()

	cb.mu.Lock()
	defer cb.mu.Unlock()

	if err != nil {
		cb.failureCount++
		cb.lastFailureTime = time.Now()

		if cb.failureCount >= cb.threshold {
			cb.state = StateOpen
			return fmt.Errorf("circuit breaker opened after %d failures", cb.failureCount)
		}
		return err
	}

	// Success
	if cb.state == StateHalfOpen {
		cb.successCount++
		if cb.successCount >= cb.halfOpenMax {
			cb.state = StateClosed
			cb.failureCount = 0
		}
	} else {
		cb.failureCount = 0
	}

	return nil
}

// Usage
func (s *Service) CallDownstream(ctx context.Context) (*Response, error) {
	var result *Response

	err := s.cb.Call(func() error {
		resp, err := s.downstream.Get(ctx)
		result = resp
		return err
	})

	if err != nil {
		// Fallback: return cached data if circuit open
		if s.cb.state == StateOpen {
			return s.cache.GetLatest(), nil
		}
		return nil, err
	}

	return result, nil
}

Patrón 2: Bulkhead (Aislamiento de Recursos)

Limita cuántos recursos una operación puede usar para prevenir inanición de otras operaciones.

// infrastructure/resilience/bulkhead.go
package resilience

type Bulkhead struct {
	semaphore chan struct{}
	name      string
}

func NewBulkhead(name string, maxConcurrent int) *Bulkhead {
	return &Bulkhead{
		semaphore: make(chan struct{}, maxConcurrent),
		name:      name,
	}
}

func (b *Bulkhead) Execute(ctx context.Context, fn func() error) error {
	select {
	case b.semaphore <- struct{}{}:
		defer func() { <-b.semaphore }()
		return fn()
	case <-ctx.Done():
		return fmt.Errorf("context cancelled while waiting for bulkhead %s", b.name)
	default:
		return fmt.Errorf("bulkhead %s exhausted", b.name)
	}
}

// Usage: limit concurrent SFTP downloads
downloadBulkhead := NewBulkhead("sftp-download", 25)

err := downloadBulkhead.Execute(ctx, func() error {
	return downloadFile(ctx)
})

Patrón 3: Retry con Exponential Backoff

// infrastructure/resilience/retry.go
package resilience

type RetryPolicy struct {
	maxAttempts    int
	initialBackoff time.Duration
	maxBackoff     time.Duration
	jitter         bool
}

func NewRetryPolicy(maxAttempts int) *RetryPolicy {
	return &RetryPolicy{
		maxAttempts:    maxAttempts,
		initialBackoff: 100 * time.Millisecond,
		maxBackoff:     30 * time.Second,
		jitter:         true,
	}
}

func (rp *RetryPolicy) Execute(ctx context.Context, fn func() error) error {
	var lastErr error

	for attempt := 0; attempt < rp.maxAttempts; attempt++ {
		if err := fn(); err == nil {
			return nil
		} else {
			lastErr = err
		}

		if attempt < rp.maxAttempts-1 {
			backoff := rp.calculateBackoff(attempt)
			select {
			case <-time.After(backoff):
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	}

	return fmt.Errorf("failed after %d attempts: %w", rp.maxAttempts, lastErr)
}

func (rp *RetryPolicy) calculateBackoff(attempt int) time.Duration {
	// Exponential: 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s...
	backoff := rp.initialBackoff * time.Duration(math.Pow(2, float64(attempt)))

	if backoff > rp.maxBackoff {
		backoff = rp.maxBackoff
	}

	if rp.jitter {
		// Add random jitter: backoff ± 10%
		jitter := time.Duration(rand.Int63n(int64(backoff / 5)))
		backoff = backoff - backoff/10 + jitter
	}

	return backoff
}

Parte 6: Trazado Distribuido — Observabilidad Across Services

A escala, una solicitud individual abarca múltiples servicios. Necesitas trazarla.

// infrastructure/tracing/tracer.go
package tracing

import (
	"context"

	"github.com/opentelemetry/otel"
	"github.com/opentelemetry/otel/exporters/jaeger"
	"github.com/opentelemetry/otel/sdk/resource"
	sdktrace "github.com/opentelemetry/otel/sdk/trace"
)

func InitTracer(serviceName string) (*sdktrace.TracerProvider, error) {
	exporter, err := jaeger.New(
		jaeger.WithAgentHost("localhost"),
		jaeger.WithAgentPort(6831),
	)
	if err != nil {
		return nil, err
	}

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(resource.NewWithAttributes(
			semconv.ServiceNameKey.String(serviceName),
		)),
	)

	otel.SetTracerProvider(tp)
	return tp, nil
}

// Usage in service
type InvoiceService struct {
	tracer trace.Tracer
	repo   InvoiceRepository
}

func (s *InvoiceService) ProcessInvoice(ctx context.Context, invoiceID string) error {
	ctx, span := s.tracer.Start(ctx, "ProcessInvoice")
	defer span.End()

	// Add attributes
	span.SetAttributes(
		attribute.String("invoice.id", invoiceID),
	)

	// Call to downstream service (context propagated)
	if err := s.validateInvoice(ctx, invoiceID); err != nil {
		span.RecordError(err)
		return err
	}

	if err := s.archiveInvoice(ctx, invoiceID); err != nil {
		span.RecordError(err)
		return err
	}

	return nil
}

func (s *InvoiceService) validateInvoice(ctx context.Context, invoiceID string) error {
	ctx, span := s.tracer.Start(ctx, "ValidateInvoice")
	defer span.End()

	span.SetAttributes(
		attribute.String("invoice.id", invoiceID),
		attribute.String("validation.type", "schema"),
	)

	// Validation logic...
	return nil
}

Visualización de trace (qué ves en Jaeger):

graph LR
    A["API Request<br/>POST /invoice/process"]
    B["Auth Service<br/>Verify Token"]
    C["Invoice Service<br/>ProcessInvoice"]
    D["Validate XML<br/>50ms"]
    E["Compress<br/>25ms"]
    F["Upload S3<br/>100ms"]
    G["Publish Event<br/>5ms"]

    A -->|10ms| B
    B -->|5ms propagate| C
    C -->|50ms| D
    D -->|25ms| E
    E -->|100ms| F
    F -->|5ms| G

    style A fill:#2196f3
    style C fill:#ff9800
    style F fill:#4caf50

Parte 7: CQRS (Command Query Responsibility Segregation)

A escala, las lecturas y escrituras tienen características diferentes. Sepáralas.

Lado de Escritura (Command)

// application/commands/create_invoice.go
package commands

type CreateInvoiceCommand struct {
	UUID        string
	Issuer      string
	Receiver    string
	Amount      decimal.Decimal
	Currency    string
	IssuanceDate time.Time
	FileURL     string
	Checksum    string
}

type CreateInvoiceHandler struct {
	invoiceRepo InvoiceRepository
	eventBus    EventBus
}

func (h *CreateInvoiceHandler) Handle(ctx context.Context, cmd *CreateInvoiceCommand) (*InvoiceID, error) {
	invoice := &Invoice{
		ID:           uuid.New().String(),
		UUID:         cmd.UUID,
		Status:       StatusPending,
		CreatedAt:    time.Now(),
	}

	// Write to primary database
	if err := h.invoiceRepo.Save(ctx, invoice); err != nil {
		return nil, err
	}

	// Publish event (will eventually update read model)
	h.eventBus.Publish(&InvoiceCreatedEvent{
		InvoiceID: invoice.ID,
		UUID:      invoice.UUID,
	})

	return &InvoiceID{ID: invoice.ID}, nil
}

Lado de Lectura (Query)

// application/queries/invoice_summary.go
package queries

// Read model (optimized for queries)
type InvoiceSummary struct {
	ID              string
	UUID            string
	Issuer          string
	Amount          decimal.Decimal
	Status          string
	CreatedAt       time.Time
	ProcessingTime  int // milliseconds
	CompressedSize  int64
}

type GetInvoiceSummaryQuery struct {
	InvoiceID string
}

// Read repository (different from write repository)
type InvoiceSummaryRepository interface {
	FindByID(ctx context.Context, id string) (*InvoiceSummary, error)
	FindRecent(ctx context.Context, limit int) ([]*InvoiceSummary, error)
	FindByStatus(ctx context.Context, status string) ([]*InvoiceSummary, error)
}

type GetInvoiceSummaryHandler struct {
	summaryRepo InvoiceSummaryRepository
	cache       *Cache // O(1) hits
}

func (h *GetInvoiceSummaryHandler) Handle(ctx context.Context, query *GetInvoiceSummaryQuery) (*InvoiceSummary, error) {
	// Try cache first
	if summary, ok := h.cache.Get(query.InvoiceID); ok {
		return summary, nil
	}

	// Query read model
	summary, err := h.summaryRepo.FindByID(ctx, query.InvoiceID)
	if err != nil {
		return nil, err
	}

	h.cache.Set(query.InvoiceID, summary)
	return summary, nil
}

Topología CQRS:

graph TB
    subgraph "Write Model"
        CMD["Commands<br/>Create<br/>Update<br/>Delete"]
        PRIMARY["Primary Database<br/>PostgreSQL"]
    end

    subgraph "Event Stream"
        EVT["Events<br/>invoice.created<br/>invoice.archived<br/>invoice.validated"]
    end

    subgraph "Read Models"
        SUMMARY["Summary DB<br/>Denormalized<br/>Optimized for reads"]
        SEARCH["Search Index<br/>Elasticsearch<br/>Full-text search"]
        CACHE["Cache<br/>Redis<br/>Hot data"]
    end

    subgraph "Query Side"
        QRY["Queries<br/>GetInvoice<br/>ListByStatus<br/>SearchByIssuer"]
    end

    CMD -->|Write| PRIMARY
    PRIMARY -->|Publish| EVT
    EVT -->|Async Update| SUMMARY
    EVT -->|Async Update| SEARCH
    EVT -->|Async Update| CACHE
    QRY -->|Read| SUMMARY
    QRY -->|Read| SEARCH
    QRY -->|Read| CACHE

    style PRIMARY fill:#ff9800,stroke:#333,stroke-width:2px
    style EVT fill:#2196f3,stroke:#333,stroke-width:2px
    style SUMMARY fill:#4caf50,stroke:#333,stroke-width:2px

Parte 8: Database Sharding para Escala Masiva

Cuando tienes 100TB de datos, no puede vivir en una base de datos única.

// infrastructure/sharding/shard_key.go
package sharding

type ShardKey struct {
	// Primary key: Issuer RFC
	Issuer string

	// Secondary key: Month (for time-series retention)
	Month string // "2026-03"
}

func (sk *ShardKey) Hash() uint32 {
	h := fnv.New32a()
	h.Write([]byte(sk.Issuer))
	return h.Sum32()
}

// Determine which shard to use
func (sk *ShardKey) ShardNumber(totalShards int) int {
	return int(sk.Hash()) % totalShards
}

// Shard mapping
type ShardMapping struct {
	shards map[int]*ShardConnection
	mu     sync.RWMutex
}

func NewShardMapping(totalShards int) *ShardMapping {
	sm := &ShardMapping{
		shards: make(map[int]*ShardConnection),
	}

	// Initialize shard connections
	for i := 0; i < totalShards; i++ {
		dsn := fmt.Sprintf("postgres://user:pass@shard-%d.db.example.com/fiscal", i)
		db, _ := sql.Open("postgres", dsn)
		sm.shards[i] = &ShardConnection{db: db, shardID: i}
	}

	return sm
}

func (sm *ShardMapping) GetShard(key *ShardKey) *ShardConnection {
	sm.mu.RLock()
	defer sm.mu.RUnlock()

	shardNum := key.ShardNumber(len(sm.shards))
	return sm.shards[shardNum]
}

// Usage
type InvoiceRepository struct {
	sharding *ShardMapping
}

func (r *InvoiceRepository) SaveInvoice(ctx context.Context, invoice *Invoice) error {
	key := &ShardKey{
		Issuer: invoice.Issuer,
		Month:  invoice.IssuanceDate.Format("2006-01"),
	}

	shard := r.sharding.GetShard(key)

	// Execute on correct shard
	return shard.db.ExecContext(ctx, `
		INSERT INTO invoices (id, uuid, issuer, receiver, amount, created_at)
		VALUES ($1, $2, $3, $4, $5, $6)
	`, invoice.ID, invoice.UUID, invoice.Issuer, invoice.Receiver, invoice.Amount, invoice.CreatedAt)
}

Topología de sharding para 100TB de datos:

graph TB
    subgraph "Shard Layer"
        S1["Shard 0<br/>Issuer: A-E<br/>10 TB"]
        S2["Shard 1<br/>Issuer: F-J<br/>10 TB"]
        S3["Shard 2<br/>Issuer: K-O<br/>10 TB"]
        S4["Shard 3<br/>Issuer: P-Z<br/>10 TB"]
    end

    subgraph "Shard Mapping (Router)"
        ROUTE["ShardMapping<br/>Hash(issuer) % 4"]
    end

    subgraph "Query"
        READ["Query issuer ABC"]
    end

    READ -->|Hash ABC<br/>= 0| ROUTE
    ROUTE -->|Route to Shard 0| S1
    ROUTE -->|Can route to S2| S2
    ROUTE -->|Can route to S3| S3
    ROUTE -->|Can route to S4| S4

    style ROUTE fill:#2196f3,stroke:#333,stroke-width:2px
    style S1 fill:#4caf50,stroke:#333,stroke-width:2px

Parte 9: API Gateway — La Puerta de Entrada

Todas las solicitudes externas fluyen a través de la puerta.

// api/gateway/gateway.go
package gateway

type APIGateway struct {
	router        chi.Router
	authService   *grpc.ClientConn
	rateLimiter   *RateLimiter
	tracer        trace.Tracer
	metricsClient MetricsClient
}

func NewAPIGateway() *APIGateway {
	authConn, _ := grpc.Dial("auth-service:50051")

	return &APIGateway{
		router:      chi.NewRouter(),
		authService: authConn,
		rateLimiter: NewRateLimiter(),
	}
}

func (gw *APIGateway) Start() {
	gw.router.Use(gw.loggingMiddleware)
	gw.router.Use(gw.tracingMiddleware)
	gw.router.Use(gw.authMiddleware)
	gw.router.Use(gw.rateLimitMiddleware)

	// Route to services
	gw.router.Post("/api/v1/invoices", gw.createInvoice)
	gw.router.Get("/api/v1/invoices/{id}", gw.getInvoice)
	gw.router.Get("/api/v1/reports/daily", gw.getDailyReport)

	http.ListenAndServe(":8080", gw.router)
}

func (gw *APIGateway) authMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		token := r.Header.Get("Authorization")
		if token == "" {
			http.Error(w, "missing token", http.StatusUnauthorized)
			return
		}

		// Validate token via Auth Service (cached)
		user, err := gw.validateToken(r.Context(), token)
		if err != nil {
			http.Error(w, "invalid token", http.StatusUnauthorized)
			return
		}

		// Add user to context
		ctx := context.WithValue(r.Context(), "user", user)
		next.ServeHTTP(w, r.WithContext(ctx))
	})
}

func (gw *APIGateway) rateLimitMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		user := r.Context().Value("user").(*User)

		// Check rate limit per user (tier-based)
		if !gw.rateLimiter.Allow(user.ID, user.Tier) {
			w.Header().Set("X-RateLimit-Retry-After", "60")
			http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
			return
		}

		next.ServeHTTP(w, r)
	})
}

func (gw *APIGateway) createInvoice(w http.ResponseWriter, r *http.Request) {
	var req CreateInvoiceRequest
	json.NewDecoder(r.Body).Decode(&req)

	// Call Invoice Service via gRPC
	ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
	defer cancel()

	invoiceService := pb.NewInvoiceServiceClient(gw.invoiceConn)
	resp, err := invoiceService.CreateInvoice(ctx, &pb.CreateInvoiceRequest{
		UUID:      req.UUID,
		Issuer:    req.Issuer,
		Amount:    req.Amount,
		FileURL:   req.FileURL,
		Checksum:  req.Checksum,
	})

	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusCreated)
	json.NewEncoder(w).Encode(resp)
}

Parte 10: Arquitectura de Deployment

Cómo ejecutar todo esto a escala:

graph TB
    subgraph "Client"
        CLIENT["Users<br/>APIs"]
    end

    subgraph "Edge"
        CDN["CDN<br/>Cloudflare"]
    end

    subgraph "Load Balancer"
        LB["AWS ALB<br/>Multi-AZ"]
    end

    subgraph "Kubernetes Cluster"
        subgraph "API Gateway Pods"
            GW1["Pod 1"]
            GW2["Pod 2"]
            GW3["Pod 3"]
        end

        subgraph "Invoice Service Pods"
            INV1["Pod 1"]
            INV2["Pod 2"]
            INV3["Pod 3"]
        end

        subgraph "Analytics Service Pods"
            ANA1["Pod 1"]
            ANA2["Pod 2"]
        end
    end

    subgraph "Data Layer"
        subgraph "Sharded PostgreSQL"
            PG1["Shard 0<br/>Primary + Replica"]
            PG2["Shard 1<br/>Primary + Replica"]
        end

        REDIS["Redis Cluster<br/>3 nodes"]
    end

    subgraph "Message Broker"
        KAFKA["Kafka<br/>3 brokers<br/>Replication Factor 3"]
    end

    subgraph "Infrastructure"
        JAEGER["Jaeger<br/>Tracing"]
        PROM["Prometheus<br/>Metrics"]
        GRAFANA["Grafana<br/>Dashboards"]
    end

    CLIENT -->|CDN| CDN
    CDN -->|HTTPS| LB
    LB -->|Route| GW1
    LB -->|Route| GW2
    LB -->|Route| GW3

    GW1 -->|gRPC| INV1
    GW2 -->|gRPC| INV2
    GW3 -->|gRPC| INV3

    INV1 -->|Query| PG1
    INV2 -->|Query| PG2
    INV3 -->|Query| PG1

    INV1 -->|Cache| REDIS
    ANA1 -->|Cache| REDIS

    INV1 -->|Publish| KAFKA
    ANA1 -->|Subscribe| KAFKA
    ANA2 -->|Subscribe| KAFKA

    GW1 -.->|Trace| JAEGER
    INV1 -.->|Metrics| PROM
    PROM -->|Display| GRAFANA

    style LB fill:#2196f3,stroke:#333,stroke-width:2px
    style KAFKA fill:#ff9800,stroke:#333,stroke-width:2px
    style GRAFANA fill:#4caf50,stroke:#333,stroke-width:2px

Parte 11: Excelencia Operacional — Ejecutando a Escala

Health Checks y Readiness

// api/health/handler.go
package health

type HealthHandler struct {
	db      *sql.DB
	kafka   EventBroker
	redis   *redis.Client
}

// Liveness: ¿el pod aún está ejecutándose?
func (h *HealthHandler) Liveness(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(map[string]string{
		"status": "alive",
	})
}

// Readiness: ¿puede el pod manejar tráfico?
func (h *HealthHandler) Readiness(w http.ResponseWriter, r *http.Request) {
	ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
	defer cancel()

	// Check database
	if err := h.db.PingContext(ctx); err != nil {
		w.WriteHeader(http.StatusServiceUnavailable)
		json.NewEncoder(w).Encode(map[string]string{
			"status": "not ready: db unhealthy",
		})
		return
	}

	// Check Kafka
	if err := h.kafka.Health(ctx); err != nil {
		w.WriteHeader(http.StatusServiceUnavailable)
		json.NewEncoder(w).Encode(map[string]string{
			"status": "not ready: kafka unhealthy",
		})
		return
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(map[string]string{
		"status": "ready",
	})
}

Graceful Shutdown

// main.go
func main() {
	app := setupApplication()

	server := &http.Server{
		Addr:         ":8080",
		Handler:      app.router,
		ReadTimeout:  15 * time.Second,
		WriteTimeout: 15 * time.Second,
	}

	// Graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		sig := <-sigChan
		log.Printf("Received signal: %v", sig)

		// Stop accepting new requests
		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		defer cancel()

		// Wait for in-flight requests to complete
		server.Shutdown(ctx)

		// Flush event buffer
		app.eventBus.Close()

		// Close connections
		app.db.Close()
		app.redis.Close()

		os.Exit(0)
	}()

	log.Fatal(server.ListenAndServe())
}

Parte 12: La Lista de Verificación Arquitectónica para Escala

Antes de desplegar tu sistema:

Microservices
├─ [ ] Cada servicio tiene responsabilidad única
├─ [ ] Cada servicio tiene base de datos independiente
├─ [ ] Los servicios se comunican vía eventos async
└─ [ ] Sin llamadas HTTP servicio-a-servicio en hot path

Resiliencia
├─ [ ] Circuit breakers en todas las llamadas externas
├─ [ ] Bulkheads aíslan pools de recursos
├─ [ ] Retry con exponential backoff
└─ [ ] Estrategias de fallback para degradación

Observabilidad
├─ [ ] Trazado distribuido a través de todos los servicios
├─ [ ] Métricas Prometheus exportadas
├─ [ ] Logging estructurado con IDs de correlación
└─ [ ] Alertas para error rate > 1%

Datos
├─ [ ] Base de datos sharded por clave natural
├─ [ ] Replicas de lectura para consultas de análisis
├─ [ ] Estrategia de cache documentada
└─ [ ] Política de retención de datos definida

Deployment
├─ [ ] Health checks (liveness + readiness)
├─ [ ] Graceful shutdown (timeout 30s)
├─ [ ] Deployments rolling configurados
├─ [ ] Deployments canary para cambios críticos
└─ [ ] Migraciones de BD no-bloqueantes

Operaciones
├─ [ ] Alertas PagerDuty configuradas
├─ [ ] Runbooks para fallos comunes
├─ [ ] Rotación on-call establecida
└─ [ ] Postmortems de incidentes obligatorios

Conclusión: Arquitectura como Ventaja Competitiva

Las empresas que procesan millones de eventos diarios no tienen éxito con código inteligente. Tienen éxito con arquitectura disciplinada.

Microservicios que descomponen el problema correctamente. Patrones event-driven que desacoplan servicios. Patrones de resiliencia que previenen fallos en cascada. Trazado distribuido que revela la verdad.

Así es como Uber procesa miles de millones de solicitudes de viajes. Cómo Stripe procesa billones de dólares. Cómo Twitter maneja millones de tweets por segundo.

Go no es el único lenguaje que puede hacer esto. Pero el modelo de concurrencia de Go, las interfaces vacías, y su simplicidad hacen que estos patrones sean fáciles de implementar.

Los patrones son lo que importa. El lenguaje es secundario.

La escala no es un destino. Es un proceso continuo de eliminar cuellos de botella, distribuir carga, y construir sistemas que se degradan con elegancia bajo presión. La mejor arquitectura es aquella que anticipa fallo y diseña en torno a él.

Tags

#go #golang #architecture #microservices #event-driven #distributed-systems #scalability #system-design #ddd #cqrs #messaging #resilience #backend