ETL (Extract, Transform, Load) es el proceso de:
Patrones comunes en ETL:
Herramientas ETL:
Objetivo: Implementar un proceso ETL básico que extraiga datos de ventas, los transforme y cargue en un data warehouse.
Enunciado:
<step> <name>Table Input</name> <type>TableInput</type> <sql>SELECT fecha, producto_id, cantidad, precio FROM ventas WHERE fecha BETWEEN '2023-01-01' AND '2023-12-31'</sql> <connection>BD_Transaccional</connection> </step> <step> <name>Text File Output</name> <type>TextFileOutput</type> <filename>/etl/ventas_2023.csv</filename> <separator>,</separator> <extension>.csv</extension> </step>
<step> <name>CSV Input</name> <type>CSVFileInput</type> <filename>/etl/ventas_2023.csv</filename> <separator>,</separator> </step> <step> <name>Calculator</name> <type>Calculator</type> <calculation> <field>total</field> <calculation>MULTIPLY</calculation> <fieldA>cantidad</fieldA> <fieldB>precio</fieldB> </calculation> </step> <step> <name>Range Lookup</name> <type>RangeLookup</type> <input> <field>total</field> </input> <output> <field>categoria_venta</field> <ranges> <range min="0" max="100">Pequeña</range> <range min="100" max="500">Mediana</range> <range min="500" max="999999">Grande</range> </ranges> </output> </step>
<connection> <name>DW_Ventas</name> <type>MYSQL</type> <host>dw-server</host> <database>ventas_dw</database> </connection> <step> <name>Table Output</name> <type>TableOutput</type> <connection>DW_Ventas</connection> <table>fact_ventas</table> <mapping> <field>fecha</field> <field>producto_id</field> <field>cantidad</field> <field>total</field> <field>categoria_venta</field> </mapping> </step>
# Ejecutar la transformación ETL ./kitchen.sh -file=/etl/ventas_etl.ktr # Programar en cron (ejecución diaria a las 2 AM) 0 2 * * * /path/to/kitchen.sh -file=/etl/ventas_etl.ktr > /etl/logs/etl_$(date +\%Y\%m\%d).log
Nota: En entornos empresariales se usan herramientas como Airflow o Control-M para orquestación avanzada.
OLAP (Online Analytical Processing) permite:
Modelos OLAP:
Herramientas OLAP:
Objetivo: Diseñar e implementar un cubo OLAP para análisis de ventas.
Enunciado:
-- Tabla de hechos CREATE TABLE fact_ventas ( venta_id INT PRIMARY KEY, fecha_id INT FOREIGN KEY REFERENCES dim_tiempo(fecha_id), producto_id INT FOREIGN KEY REFERENCES dim_producto(producto_id), cliente_id INT FOREIGN KEY REFERENCES dim_cliente(cliente_id), cantidad INT, monto DECIMAL(10,2), costo DECIMAL(10,2) ); -- Dimensión Tiempo CREATE TABLE dim_tiempo ( fecha_id INT PRIMARY KEY, fecha DATE, dia INT, mes INT, trimestre INT, año INT, nombre_mes VARCHAR(20), es_fin_de_semana BIT ); -- Dimensión Producto CREATE TABLE dim_producto ( producto_id INT PRIMARY KEY, nombre VARCHAR(100), categoria VARCHAR(50), subcategoria VARCHAR(50), precio DECIMAL(10,2) );
([Measures].[Margen] - [Measures].[Costo]) / [Measures].[Margen]
Drill-down: Año → Trimestre → Mes → Día
Roll-up: Producto → Subcategoría → Categoría
Slice: Filtrar solo para categoría "Electrónicos"
Dice: Ver ventas de electrónicos en Q1 2023
SELECT { [Measures].[Ventas Totales], [Measures].[Margen Bruto] } ON COLUMNS, { [Tiempo].[2023].[Q1], [Tiempo].[2023].[Q2] } ON ROWS FROM [Ventas] WHERE ( [Producto].[Categoría].[Electrónicos] )
Nota: MDX (Multidimensional Expressions) es el lenguaje estándar para consultar cubos OLAP.
Un dashboard efectivo debe:
Componentes típicos:
Herramientas de Visualización:
Objetivo: Crear un dashboard interactivo para análisis de ventas.
Enunciado:
{ "version": "1.0", "connections": [ { "name": "CuboVentas", "connectionType": "analysisServices", "server": "as-server", "database": "VentasOLAP" } ], "queries": [ { "name": "VentasPorRegion", "query": "EVALUATE SUMMARIZECOLUMNS(...)" } ] }
Pasos en Power BI Desktop:
a) Scorecard de KPIs:
b) Gráfico de tendencia:
c) Distribución por categoría:
d) Tabla detallada:
{ "visualizations": [ { "name": "Filtros", "type": "slicer", "fields": ["Tiempo[Año]", "Producto[Categoría]"], "crossFiltering": true }, { "name": "GraficoTendencia", "type": "lineChart", "crossFilteringBehavior": "filter" } ] }
Pasos para configurar:
# Publicar a Power BI Service Publish-PowerBIFile -Path "VentasDashboard.pbix" -WorkspaceId "12345" # Configurar actualización programada Set-PowerBIDatasetRefreshSchedule -DatasetId "67890" -Daily -Time "02:00"
Pasos para compartir:
Técnicas comunes en análisis empresarial:
Herramientas para Análisis:
Objetivo: Realizar un análisis estadístico completo de datos de ventas.
Enunciado:
import pandas as pd import numpy as np # Cargar datos ventas = pd.read_csv('ventas_2023.csv') # Estadísticas básicas stats = ventas[['cantidad', 'monto']].describe() print(stats) # Por categoría stats_categoria = ventas.groupby('categoria')['monto'].agg(['sum', 'mean', 'std', 'count']) print(stats_categoria)
import seaborn as sns import matplotlib.pyplot as plt # Matriz de correlación corr = ventas[['cantidad', 'monto', 'descuento', 'costo']].corr() sns.heatmap(corr, annot=True, cmap='coolwarm') plt.title('Matriz de Correlación') plt.show() # Correlación cruzada pd.plotting.scatter_matrix(ventas[['monto', 'descuento', 'costo']], figsize=(10,8)) plt.show()
# Convertir a serie temporal ventas['fecha'] = pd.to_datetime(ventas['fecha']) ventas.set_index('fecha', inplace=True) # Agregación mensual mensual = ventas.resample('M')['monto'].sum() # Gráfico de tendencia mensual.plot(figsize=(12,6), title='Ventas Mensuales 2023') plt.ylabel('Ventas') plt.grid(True) plt.show() # Descomposición estacional from statsmodels.tsa.seasonal import seasonal_decompose result = seasonal_decompose(mensual, model='additive') result.plot() plt.show()
from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler # Preparar datos clientes = ventas.groupby('cliente_id').agg({ 'monto': 'sum', 'cantidad': 'sum', 'fecha': lambda x: (pd.to_datetime('2023-12-31') - x.max()).days }).rename(columns={'fecha': 'dias_desde_ultima_compra'}) # Estandarizar scaler = StandardScaler() clientes_scaled = scaler.fit_transform(clientes) # Determinar número óptimo de clusters (método del codo) inertia = [] for k in range(1, 11): kmeans = KMeans(n_clusters=k, random_state=42) kmeans.fit(clientes_scaled) inertia.append(kmeans.inertia_) plt.plot(range(1,11), inertia, marker='o') plt.xlabel('Número de clusters') plt.ylabel('Inercia') plt.title('Método del Codo') plt.show() # Aplicar K-Means con k=3 kmeans = KMeans(n_clusters=3, random_state=42) clientes['cluster'] = kmeans.fit_predict(clientes_scaled) # Analizar clusters cluster_stats = clientes.groupby('cluster').mean() print(cluster_stats)
Nota: Este análisis puede extenderse con RFM (Recency, Frequency, Monetary) para segmentación más avanzada.
Proceso estructurado para migrar sistemas analíticos:
Retos comunes:
Herramientas de Migración:
Objetivo: Planificar la migración de un sistema analítico de SQL Server a Snowflake.
Enunciado:
-- Consulta SQL para inventario en SQL Server SELECT t.name AS table_name, s.name AS schema_name, p.rows AS row_count, SUM(a.total_pages) * 8 AS total_space_kb FROM sys.tables t INNER JOIN sys.schemas s ON t.schema_id = s.schema_id INNER JOIN sys.partitions p ON t.object_id = p.object_id INNER JOIN sys.allocation_units a ON p.partition_id = a.container_id GROUP BY t.name, s.name, p.rows ORDER BY total_space_kb DESC; -- Documentar: -- * 15 tablas dimensionales (~50GB) -- * 3 procesos SSIS (ETL diario) -- * 12 reportes Power BI -- * 5 stored procedures analíticos
-- Configuración de warehouses CREATE WAREHOUSE analytics_wh WITH WAREHOUSE_SIZE = 'X-LARGE' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; -- Esquema dimensional en Snowflake CREATE DATABASE analytics_prod; CREATE SCHEMA dw; -- Tabla de hechos (ejemplo) CREATE TABLE fact_ventas ( venta_id INTEGER, fecha_id DATE, producto_id INTEGER, cliente_id INTEGER, cantidad INTEGER, monto NUMBER(10,2), costo NUMBER(10,2), -- Particionamiento por fecha PARTITION BY (DATE_TRUNC('MONTH', fecha_id)) CLUSTER BY (producto_id); -- Tablas de dimensiones CREATE TABLE dim_producto ( producto_id INTEGER PRIMARY KEY, nombre VARCHAR(100), categoria VARCHAR(50), subcategoria VARCHAR(50), -- Snowflake permite semiestructurado atributos VARIANT );
# Script Python para migración incremental import snowflake.connector import pyodbc from tqdm import tqdm def migrate_table(source_conn, target_conn, table_name, batch_size=10000): # Conexión origen source_cursor = source_conn.cursor() source_cursor.execute(f"SELECT * FROM {table_name}") # Conexión destino target_cursor = target_conn.cursor() # Obtener columnas columns = [column[0] for column in source_cursor.description] placeholders = ', '.join(['%s'] * len(columns)) insert_sql = f"INSERT INTO {table_name} VALUES ({placeholders})" # Migración por lotes while True: batch = source_cursor.fetchmany(batch_size) if not batch: break target_cursor.executemany(insert_sql, batch) target_conn.commit() # Configurar conexiones source = pyodbc.connect(...) # SQL Server target = snowflake.connector.connect(...) # Migrar tablas tables = ['dim_producto', 'dim_cliente', 'fact_ventas'] for table in tqdm(tables): migrate_table(source, target, table)
Fase | Actividades | Responsable | Duración |
---|---|---|---|
Pre-migración | Backup completo, Documentación | Equipo DBA | 2 días |
Migración inicial | Carga histórica, ETLs | Equipo ETL | 3 días |
Validación | Reconciliación de datos, Pruebas de rendimiento | QA + Usuarios | 5 días |
Corte | Migración incremental final, Redirección de aplicaciones | Equipo completo | 1 día (fin de semana) |
Post-migración | Monitoreo, Optimización | Equipo BI | 2 semanas |
Nota: Implementar un periodo de paralelo (ambos sistemas funcionando) para validación antes del corte definitivo.