Data Extraction, SQL & Automation in Go: Building Scalable Data Systems
Master data engineering in pure Go. Learn SQL querying, data extraction from multiple sources, transformation pipelines, automation workflows, and scientific data processing—all without external dependencies.
Introduction: Data Engineering in Go
Most developers think of data engineering as Python’s domain.
Data scientists use Python. Data engineers use Python. Everyone uses Python for data work.
Wrong.
Go is actually better for production data systems. Here’s why:
- Single binary - Deploy anywhere with zero dependencies
- Concurrency - Process millions of rows in parallel
- Performance - 10-100x faster than Python for CPU-intensive work
- Type safety - Catch errors at compile time, not production
- Memory efficiency - Handle massive datasets without bloat
- SQL first - Go’s database/sql is cleaner than ORMs
This guide teaches you to build production-grade data systems in Go. Not theoretical. Real systems that extract data, transform it, automate workflows, and run science.
Chapter 1: Why Go for Data Work
The Python Misconception
Python is great for exploration. Jupyter notebooks are amazing. Pandas is intuitive.
But production Python data systems are:
- Slow (interpreted language)
- Fragile (dependency hell)
- Memory-heavy (garbage collection pauses)
- Hard to deploy (runtime version conflicts)
- Difficult to scale (GIL limits parallelism)
Example: A Python data pipeline that processes 1 million records takes 10 minutes.
The same pipeline in Go takes 1 minute. No code changes. Just language choice.
Go’s Advantages for Data Systems
Advantage 1: Concurrency Without Complexity
Process data in parallel:
// Process 1 million records, 10 at a time
sem := make(chan struct{}, 10)
var wg sync.WaitGroup
for _, record := range millionRecords {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore
go func(r Record) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
processRecord(r)
}(record)
}
wg.Wait()
In Python: Complex with threading/multiprocessing. In Go: Simple goroutines.
Advantage 2: Single Binary Deployment
# Go
go build -o pipeline
./pipeline # Works everywhere
# Python
python pipeline.py # Needs Python 3.9+, pip packages, virtual env...
Advantage 3: Type Safety
// Go
type Record struct {
ID int
Name string
Value float64
}
// Compile error if you access .Namex (typo)
name := record.Namex // ✗ Compile error
// Python
record = {"id": 1, "name": "John", "value": 100}
name = record["nmae"] # ✓ Runs fine, returns None. Bug at runtime.
Advantage 4: Performance
Benchmark: Process 10 million CSV rows, extract specific columns, aggregate.
- Python: 45 seconds
- Go: 3 seconds
15x faster. Same algorithm.
Chapter 2: SQL Fundamentals for Data Engineers
You need SQL. Go’s database/sql is minimal but powerful.
Setting Up Database Connection
import "database/sql"
import _ "github.com/lib/pq" // PostgreSQL driver
func main() {
db, err := sql.Open("postgres",
"user=myuser password=mypass dbname=mydb sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Test connection
if err := db.Ping(); err != nil {
log.Fatal(err)
}
fmt.Println("Connected to database")
}
Simple Query: Select
type User struct {
ID int
Name string
Email string
Age int
}
func getUsers(db *sql.DB) ([]User, error) {
rows, err := db.Query("SELECT id, name, email, age FROM users")
if err != nil {
return nil, err
}
defer rows.Close()
var users []User
for rows.Next() {
var u User
err := rows.Scan(&u.ID, &u.Name, &u.Email, &u.Age)
if err != nil {
return nil, err
}
users = append(users, u)
}
if err := rows.Err(); err != nil {
return nil, err
}
return users, nil
}
Single Row Query
func getUserByID(db *sql.DB, id int) (*User, error) {
var u User
err := db.QueryRow(
"SELECT id, name, email, age FROM users WHERE id = $1",
id,
).Scan(&u.ID, &u.Name, &u.Email, &u.Age)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("user %d not found", id)
}
if err != nil {
return nil, err
}
return &u, nil
}
Insert
func insertUser(db *sql.DB, name, email string, age int) (int, error) {
var id int
err := db.QueryRow(
"INSERT INTO users (name, email, age) VALUES ($1, $2, $3) RETURNING id",
name, email, age,
).Scan(&id)
if err != nil {
return 0, err
}
return id, nil
}
Update
func updateUser(db *sql.DB, id int, name, email string) error {
result, err := db.Exec(
"UPDATE users SET name = $1, email = $2 WHERE id = $3",
name, email, id,
)
if err != nil {
return err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return fmt.Errorf("user %d not found", id)
}
return nil
}
Batch Operations
func batchInsertUsers(db *sql.DB, users []User) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(
"INSERT INTO users (name, email, age) VALUES ($1, $2, $3)",
)
if err != nil {
return err
}
defer stmt.Close()
for _, u := range users {
_, err := stmt.Exec(u.Name, u.Email, u.Age)
if err != nil {
return err
}
}
return tx.Commit().Error
}
Complex Query: Join & Aggregate
type OrderSummary struct {
UserID int
UserName string
OrderCount int
TotalSpent float64
}
func getOrderSummary(db *sql.DB) ([]OrderSummary, error) {
rows, err := db.Query(`
SELECT
u.id, u.name,
COUNT(o.id) as order_count,
COALESCE(SUM(o.amount), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.name
ORDER BY total_spent DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var summaries []OrderSummary
for rows.Next() {
var s OrderSummary
err := rows.Scan(&s.UserID, &s.UserName, &s.OrderCount, &s.TotalSpent)
if err != nil {
return nil, err
}
summaries = append(summaries, s)
}
return summaries, rows.Err()
}
Chapter 3: Data Extraction Patterns
Extract data from different sources.
Pattern 1: Database Extraction
type DataExtractor interface {
Extract(ctx context.Context) ([]Record, error)
}
type DatabaseExtractor struct {
db *sql.DB
query string
}
func (de *DatabaseExtractor) Extract(ctx context.Context) ([]Record, error) {
rows, err := de.db.QueryContext(ctx, de.query)
if err != nil {
return nil, err
}
defer rows.Close()
var records []Record
for rows.Next() {
var r Record
if err := rows.Scan(&r.ID, &r.Value, &r.Timestamp); err != nil {
return nil, err
}
records = append(records, r)
}
return records, rows.Err()
}
Pattern 2: API Extraction
type APIExtractor struct {
url string
client *http.Client
}
func (ae *APIExtractor) Extract(ctx context.Context) ([]Record, error) {
req, err := http.NewRequestWithContext(ctx, "GET", ae.url, nil)
if err != nil {
return nil, err
}
resp, err := ae.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("API returned %d", resp.StatusCode)
}
var result struct {
Data []Record `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.Data, nil
}
Pattern 3: File Extraction (CSV)
type CSVExtractor struct {
filepath string
}
func (ce *CSVExtractor) Extract(ctx context.Context) ([]Record, error) {
file, err := os.Open(ce.filepath)
if err != nil {
return nil, err
}
defer file.Close()
reader := csv.NewReader(file)
// Skip header
_, err = reader.Read()
if err != nil {
return nil, err
}
var records []Record
for {
row, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
id, _ := strconv.Atoi(row[0])
value, _ := strconv.ParseFloat(row[1], 64)
timestamp := row[2]
records = append(records, Record{
ID: id,
Value: value,
Timestamp: timestamp,
})
}
return records, nil
}
Pattern 4: JSON File Extraction
type JSONExtractor struct {
filepath string
}
func (je *JSONExtractor) Extract(ctx context.Context) ([]Record, error) {
data, err := os.ReadFile(je.filepath)
if err != nil {
return nil, err
}
var records []Record
if err := json.Unmarshal(data, &records); err != nil {
return nil, err
}
return records, nil
}
Pattern 5: Stream Extraction (Real-time)
type StreamExtractor struct {
channel chan Record
}
func (se *StreamExtractor) Extract(ctx context.Context) ([]Record, error) {
var records []Record
timeout := time.After(5 * time.Second)
for {
select {
case record := <-se.channel:
records = append(records, record)
case <-timeout:
return records, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
Chapter 4: Data Transformation & Processing
Transform extracted data.
Pattern 1: Filter
func filterByThreshold(records []Record, threshold float64) []Record {
var filtered []Record
for _, r := range records {
if r.Value > threshold {
filtered = append(filtered, r)
}
}
return filtered
}
// Concurrent version
func filterByThresholdConcurrent(records []Record, threshold float64, workers int) []Record {
sem := make(chan struct{}, workers)
var mu sync.Mutex
var filtered []Record
var wg sync.WaitGroup
for _, record := range records {
wg.Add(1)
sem <- struct{}{}
go func(r Record) {
defer wg.Done()
defer func() { <-sem }()
if r.Value > threshold {
mu.Lock()
filtered = append(filtered, r)
mu.Unlock()
}
}(record)
}
wg.Wait()
return filtered
}
Pattern 2: Map/Transform
type TransformedRecord struct {
ID int
Value float64
ValueNorm float64 // Normalized
Category string
}
func transformRecords(records []Record) []TransformedRecord {
// Calculate min/max for normalization
min, max := findMinMax(records)
transformed := make([]TransformedRecord, len(records))
for i, r := range records {
norm := (r.Value - min) / (max - min)
category := "high"
if norm < 0.5 {
category = "low"
}
transformed[i] = TransformedRecord{
ID: r.ID,
Value: r.Value,
ValueNorm: norm,
Category: category,
}
}
return transformed
}
Pattern 3: Aggregate
type AggregateResult struct {
Count int
Sum float64
Average float64
Min float64
Max float64
StdDev float64
}
func aggregateRecords(records []Record) AggregateResult {
if len(records) == 0 {
return AggregateResult{}
}
sum := 0.0
min := records[0].Value
max := records[0].Value
for _, r := range records {
sum += r.Value
if r.Value < min {
min = r.Value
}
if r.Value > max {
max = r.Value
}
}
avg := sum / float64(len(records))
// Standard deviation
variance := 0.0
for _, r := range records {
variance += math.Pow(r.Value-avg, 2)
}
variance /= float64(len(records))
stdDev := math.Sqrt(variance)
return AggregateResult{
Count: len(records),
Sum: sum,
Average: avg,
Min: min,
Max: max,
StdDev: stdDev,
}
}
Pattern 4: Join/Merge
type EnrichedRecord struct {
ID int
Value float64
Category string
Score int
}
func enrichRecords(records []Record, categories map[int]string, scores map[int]int) []EnrichedRecord {
enriched := make([]EnrichedRecord, len(records))
for i, r := range records {
enriched[i] = EnrichedRecord{
ID: r.ID,
Value: r.Value,
Category: categories[r.ID],
Score: scores[r.ID],
}
}
return enriched
}
Pattern 5: Deduplicate
func deduplicateRecords(records []Record) []Record {
seen := make(map[int]bool)
var deduped []Record
for _, r := range records {
if !seen[r.ID] {
seen[r.ID] = true
deduped = append(deduped, r)
}
}
return deduped
}
// By custom field
func deduplicateByField(records []Record, fieldFunc func(Record) string) []Record {
seen := make(map[string]bool)
var deduped []Record
for _, r := range records {
key := fieldFunc(r)
if !seen[key] {
seen[key] = true
deduped = append(deduped, r)
}
}
return deduped
}
Chapter 5: Building a Data Pipeline
Combine extraction, transformation, and loading.
type Pipeline struct {
extractors []DataExtractor
transformers []func([]Record) []Record
loaders []DataLoader
errorHandler func(error)
batchSize int
}
type DataLoader interface {
Load(ctx context.Context, records []Record) error
}
func (p *Pipeline) Run(ctx context.Context) error {
// Extract
var allRecords []Record
for _, extractor := range p.extractors {
records, err := extractor.Extract(ctx)
if err != nil {
p.errorHandler(err)
continue
}
allRecords = append(allRecords, records...)
}
// Transform
transformed := allRecords
for _, transformer := range p.transformers {
transformed = transformer(transformed)
}
// Load in batches
for i := 0; i < len(transformed); i += p.batchSize {
end := i + p.batchSize
if end > len(transformed) {
end = len(transformed)
}
batch := transformed[i:end]
for _, loader := range p.loaders {
if err := loader.Load(ctx, batch); err != nil {
p.errorHandler(err)
}
}
}
return nil
}
// Usage
func main() {
pipeline := &Pipeline{
extractors: []DataExtractor{
&CSVExtractor{filepath: "data.csv"},
&DatabaseExtractor{db: db, query: "SELECT * FROM archive"},
},
transformers: []func([]Record) []Record{
func(r []Record) []Record { return filterByThreshold(r, 10.0) },
func(r []Record) []Record { return deduplicateRecords(r) },
},
loaders: []DataLoader{
&DatabaseLoader{db: db, table: "processed_data"},
&CSVLoader{filepath: "output.csv"},
},
batchSize: 1000,
errorHandler: func(err error) {
log.Printf("Pipeline error: %v", err)
},
}
if err := pipeline.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
Chapter 6: Data Automation Workflows
Automate data pipelines on schedule.
type ScheduledPipeline struct {
name string
pipeline *Pipeline
schedule string // Cron expression
}
func (sp *ScheduledPipeline) Start() error {
c := cron.New()
_, err := c.AddFunc(sp.schedule, func() {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
log.Printf("[%s] Starting pipeline", sp.name)
if err := sp.pipeline.Run(ctx); err != nil {
log.Printf("[%s] Pipeline failed: %v", sp.name, err)
return
}
duration := time.Since(start)
log.Printf("[%s] Pipeline completed in %v", sp.name, duration)
})
if err != nil {
return err
}
c.Start()
return nil
}
// Usage
func main() {
sp := &ScheduledPipeline{
name: "daily_data_sync",
pipeline: myPipeline,
schedule: "0 2 * * *", // 2 AM daily
}
if err := sp.Start(); err != nil {
log.Fatal(err)
}
select {} // Keep running
}
Chapter 7: Scientific Data Processing
Work with scientific datasets.
Pattern 1: Time Series Analysis
type TimeSeriesData struct {
Timestamp time.Time
Value float64
}
func analyzeTimeSeries(data []TimeSeriesData) map[string]interface{} {
if len(data) == 0 {
return nil
}
// Sort by timestamp
sort.Slice(data, func(i, j int) bool {
return data[i].Timestamp.Before(data[j].Timestamp)
})
// Extract values
values := make([]float64, len(data))
for i, d := range data {
values[i] = d.Value
}
// Calculate statistics
sum := 0.0
for _, v := range values {
sum += v
}
mean := sum / float64(len(values))
// Trend detection
trend := calculateTrend(data)
// Anomaly detection
anomalies := detectAnomalies(data, mean)
return map[string]interface{}{
"mean": mean,
"std_dev": calculateStdDev(values, mean),
"trend": trend,
"anomalies": anomalies,
"min": values[0],
"max": values[len(values)-1],
}
}
func calculateTrend(data []TimeSeriesData) string {
if len(data) < 2 {
return "insufficient_data"
}
// Linear regression
n := float64(len(data))
sumX := n * (n + 1) / 2
sumY := 0.0
sumXY := 0.0
sumX2 := 0.0
for i, d := range data {
x := float64(i + 1)
y := d.Value
sumY += y
sumXY += x * y
sumX2 += x * x
}
slope := (n*sumXY - sumX*sumY) / (n*sumX2 - sumX*sumX)
if slope > 0.1 {
return "increasing"
} else if slope < -0.1 {
return "decreasing"
}
return "stable"
}
func detectAnomalies(data []TimeSeriesData, mean float64) []int {
var anomalies []int
stdDev := 0.0
// Calculate std dev
for _, d := range data {
stdDev += math.Pow(d.Value-mean, 2)
}
stdDev = math.Sqrt(stdDev / float64(len(data)))
// 3-sigma rule: values > 3*stdDev are anomalies
for i, d := range data {
if math.Abs(d.Value-mean) > 3*stdDev {
anomalies = append(anomalies, i)
}
}
return anomalies
}
Pattern 2: Statistical Analysis
func performTTest(group1, group2 []float64) map[string]float64 {
mean1 := calculateMean(group1)
mean2 := calculateMean(group2)
var1 := calculateVariance(group1, mean1)
var2 := calculateVariance(group2, mean2)
n1 := float64(len(group1))
n2 := float64(len(group2))
// Pooled standard error
se := math.Sqrt((var1/n1) + (var2/n2))
// T-statistic
tStat := (mean1 - mean2) / se
return map[string]float64{
"t_statistic": tStat,
"mean1": mean1,
"mean2": mean2,
"difference": mean1 - mean2,
}
}
Pattern 3: Data Interpolation
func interpolateLinear(x1, y1, x2, y2, x float64) float64 {
// Linear interpolation between two points
return y1 + (y2-y1)*(x-x1)/(x2-x1)
}
func fillMissingValues(data []float64) []float64 {
filled := make([]float64, len(data))
copy(filled, data)
for i := 0; i < len(filled); i++ {
if filled[i] == 0 { // Assuming 0 means missing
// Find previous and next non-zero values
prev := i - 1
for prev >= 0 && filled[prev] == 0 {
prev--
}
next := i + 1
for next < len(filled) && filled[next] == 0 {
next++
}
if prev >= 0 && next < len(filled) {
// Interpolate
filled[i] = interpolateLinear(
float64(prev), filled[prev],
float64(next), filled[next],
float64(i),
)
}
}
}
return filled
}
Chapter 8: Performance Optimization
Make data processing fast.
Optimization 1: Batch Processing
func processBatch(records []Record, batchSize int) error {
for i := 0; i < len(records); i += batchSize {
end := i + batchSize
if end > len(records) {
end = len(records)
}
batch := records[i:end]
if err := processBatchItems(batch); err != nil {
return err
}
}
return nil
}
Optimization 2: Memory Pooling
type BufferPool struct {
pool *sync.Pool
}
func NewBufferPool() *BufferPool {
return &BufferPool{
pool: &sync.Pool{
New: func() interface{} {
return make([]byte, 4096)
},
},
}
}
func (bp *BufferPool) Get() []byte {
return bp.pool.Get().([]byte)
}
func (bp *BufferPool) Put(b []byte) {
bp.pool.Put(b)
}
Optimization 3: Concurrent Processing
func processRecordsConcurrent(records []Record, workers int) ([]ProcessedRecord, error) {
sem := make(chan struct{}, workers)
results := make([]ProcessedRecord, len(records))
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
for i, record := range records {
wg.Add(1)
sem <- struct{}{} // Acquire
go func(idx int, r Record) {
defer wg.Done()
defer func() { <-sem }() // Release
processed, err := processRecord(r)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
return
}
mu.Lock()
results[idx] = processed
mu.Unlock()
}(i, record)
}
wg.Wait()
if len(errs) > 0 {
return nil, errs[0]
}
return results, nil
}
Optimization 4: Index/Cache
type RecordIndex struct {
byID map[int]*Record
byName map[string]*Record
mu sync.RWMutex
}
func (ri *RecordIndex) GetByID(id int) *Record {
ri.mu.RLock()
defer ri.mu.RUnlock()
return ri.byID[id]
}
func (ri *RecordIndex) Index(records []Record) {
ri.mu.Lock()
defer ri.mu.Unlock()
ri.byID = make(map[int]*Record)
ri.byName = make(map[string]*Record)
for i := range records {
ri.byID[records[i].ID] = &records[i]
ri.byName[records[i].Name] = &records[i]
}
}
Chapter 9: Real-World Example: Building a Data Sync Service
type DataSyncService struct {
source DataExtractor
destination DataLoader
db *sql.DB
logger *log.Logger
}
func (dss *DataSyncService) SyncDaily() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
defer cancel()
// Extract from source
records, err := dss.source.Extract(ctx)
if err != nil {
dss.logger.Printf("Extraction failed: %v", err)
return err
}
dss.logger.Printf("Extracted %d records", len(records))
// Transform
records = filterByThreshold(records, 10.0)
records = deduplicateRecords(records)
dss.logger.Printf("Transformed to %d records", len(records))
// Load to database
tx, err := dss.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(
"INSERT INTO data_sync (id, value, timestamp) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET value = $2",
)
if err != nil {
return err
}
defer stmt.Close()
for _, r := range records {
_, err := stmt.Exec(r.ID, r.Value, time.Now())
if err != nil {
dss.logger.Printf("Insert failed: %v", err)
return err
}
}
if err := tx.Commit(); err != nil {
return err
}
dss.logger.Printf("Sync completed successfully")
return nil
}
func (dss *DataSyncService) StartScheduled() {
c := cron.New()
c.AddFunc("0 2 * * *", func() {
if err := dss.SyncDaily(); err != nil {
dss.logger.Printf("Sync error: %v", err)
}
})
c.Start()
}
Chapter 10: Error Handling & Monitoring
Robust data systems need solid error handling.
type PipelineMetrics struct {
RecordsProcessed int64
RecordsFailed int64
Duration time.Duration
LastError error
LastRun time.Time
}
type MonitoredPipeline struct {
pipeline *Pipeline
metrics *PipelineMetrics
mu sync.RWMutex
}
func (mp *MonitoredPipeline) Run(ctx context.Context) error {
start := time.Now()
mp.mu.Lock()
mp.metrics.LastRun = start
mp.mu.Unlock()
err := mp.pipeline.Run(ctx)
duration := time.Since(start)
mp.mu.Lock()
mp.metrics.Duration = duration
if err != nil {
mp.metrics.LastError = err
mp.metrics.RecordsFailed++
} else {
mp.metrics.RecordsProcessed++
}
mp.mu.Unlock()
return err
}
func (mp *MonitoredPipeline) GetMetrics() PipelineMetrics {
mp.mu.RLock()
defer mp.mu.RUnlock()
return *mp.metrics
}
Chapter 11: Scaling Data Systems
Handle large-scale data efficiently.
Strategy 1: Sharding
type ShardedLoader struct {
shards map[int]DataLoader
mu sync.RWMutex
}
func (sl *ShardedLoader) Load(ctx context.Context, records []Record) error {
// Group by shard
shardedRecords := make(map[int][]Record)
for _, r := range records {
shard := r.ID % len(sl.shards)
shardedRecords[shard] = append(shardedRecords[shard], r)
}
// Load to each shard
var wg sync.WaitGroup
errChan := make(chan error, len(sl.shards))
for shard, recs := range shardedRecords {
wg.Add(1)
go func(s int, r []Record) {
defer wg.Done()
sl.mu.RLock()
loader := sl.shards[s]
sl.mu.RUnlock()
if err := loader.Load(ctx, r); err != nil {
errChan <- err
}
}(shard, recs)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
Strategy 2: Streaming Processing
type StreamProcessor struct {
recordChan chan Record
workers int
}
func (sp *StreamProcessor) Process() {
for i := 0; i < sp.workers; i++ {
go sp.processWorker()
}
}
func (sp *StreamProcessor) processWorker() {
for record := range sp.recordChan {
// Process record
_ = processRecord(record)
}
}
func (sp *StreamProcessor) Submit(r Record) {
sp.recordChan <- r
}
Chapter 12: Building Production Data Systems
Production-ready practices.
Health Checks
func healthCheck(db *sql.DB) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Check database connection
if err := db.PingContext(ctx); err != nil {
return err
}
// Check critical table
var count int
err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM data_sync").Scan(&count)
if err != nil {
return err
}
return nil
}
Graceful Shutdown
func main() {
pipeline := &Pipeline{}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-sigChan
log.Println("Shutting down gracefully...")
// Cleanup
os.Exit(0)
}()
// Run pipeline
if err := pipeline.Run(context.Background()); err != nil {
log.Fatal(err)
}
}
Logging & Observability
type StructuredLogger struct {
*log.Logger
}
func (sl *StructuredLogger) LogEvent(event string, fields map[string]interface{}) {
data, _ := json.Marshal(map[string]interface{}{
"event": event,
"timestamp": time.Now(),
"fields": fields,
})
sl.Println(string(data))
}
// Usage
logger.LogEvent("pipeline_start", map[string]interface{}{
"records": 10000,
"source": "api",
})
Appendix A: SQL Quick Reference
-- Basic queries
SELECT id, name FROM users WHERE age > 30;
INSERT INTO users (name, email) VALUES ('John', 'john@example.com');
UPDATE users SET email = 'new@example.com' WHERE id = 1;
DELETE FROM users WHERE id = 1;
-- Aggregates
SELECT COUNT(*), AVG(age), MAX(salary) FROM users;
-- Joins
SELECT u.name, o.amount FROM users u JOIN orders o ON u.id = o.user_id;
-- Grouping
SELECT category, COUNT(*) FROM products GROUP BY category;
-- Window functions
SELECT name, salary, RANK() OVER (ORDER BY salary DESC) FROM employees;
Appendix B: Go Database/SQL Patterns
// Error handling
if err != nil && err != sql.ErrNoRows {
return err
}
// Connection pooling
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
// Context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
row := db.QueryRowContext(ctx, query)
Appendix C: Performance Checklist
- Use connection pooling
- Batch insert/update operations
- Create appropriate database indexes
- Use prepared statements
- Process data concurrently (with semaphore)
- Implement memory pooling
- Monitor pipeline execution time
- Log errors and anomalies
- Test with realistic data volume
- Implement graceful shutdown
- Use context with timeouts
- Profile memory and CPU usage
Appendix D: Common Data Patterns
ETL: Extract → Transform → Load
ELT: Extract → Load → Transform
CDC: Change Data Capture (streaming changes)
Data Warehouse: Centralized repository
Data Lake: Raw data storage
Data Mesh: Decentralized data architecture
Conclusion: Build Data Systems That Scale
Go gives you the tools to build data systems that are:
- Fast (compile to machine code)
- Reliable (strong type system)
- Scalable (goroutines)
- Deployable (single binary)
- Observable (structured logging)
Most companies use Python for data work because it’s familiar. But production systems benefit from Go’s performance and reliability.
Start with small data automation tasks. Build confidence. Then tackle larger pipelines.
The data is waiting. Build systems to process it.