¿Cómo conectar SQL Server y PostgreSQL con Azure Databricks?

Repo
Blog / ¿Cómo conectar SQL Server y P… /
¿Cómo conectar SQL Server y PostgreSQL con Azure Databricks?

Por: Leonardo Narváez.

En: 2 de Diciembre de 2024 a las 10:00
Azure Databricks Azure Portal Big Data Databricks GitHub Notebook PySpark Python Scala SQL

Spark JDBC

En el corazón de la integración de Spark con bases de datos relacionales encontramos JDBC (Java Database Connectivity). JDBC actúa como un puente esencial, proporcionando una interfaz estandarizada que permite a las aplicaciones Spark comunicarse con bases de datos relacionales. Esta interfaz no es simplemente un canal de comunicación; es un conjunto completo de protocolos y estándares que facilitan operaciones de lectura y escritura de manera eficiente y segura.

Spark SQL y su Relación con JDBC

Spark SQL emerge como uno de los módulos más poderosos dentro del ecosistema Spark. Este módulo introduce el concepto de DataFrames, una abstracción que permite trabajar con datos estructurados de manera intuitiva y eficiente. Cuando combinamos Spark SQL con JDBC, obtenemos una herramienta extremadamente versátil para el procesamiento de datos.  

La integración de Spark SQL con JDBC va más allá de simples operaciones de lectura y escritura. El sistema permite ejecutar consultas complejas que se benefician del procesamiento distribuido de Spark, mientras mantiene la integridad y las características ACID de las bases de datos relacionales. Esta simbiosis permite aprovechar lo mejor de ambos mundos: la escalabilidad de Spark y la confiabilidad de las bases de datos relacionales.

Arquitectura

La arquitectura de la integración Spark-JDBC se construye sobre varios componentes fundamentales que trabajan en conjunto. El Driver JDBC actúa como el intérprete principal, traduciendo las instrucciones de Spark en comandos que la base de datos puede entender. Este componente maneja no solo la traducción de comandos, sino también la gestión de tipos de datos y la optimización de consultas.  

El Connection Pool representa otro componente crucial en esta arquitectura. En lugar de crear nuevas conexiones para cada operación, mantiene un conjunto de conexiones activas que pueden ser reutilizadas. Este enfoque reduce significativamente la sobrecarga asociada con el establecimiento de conexiones y mejora el rendimiento general del sistema.  

El sistema de particionamiento en esta arquitectura merece especial atención. Permite dividir grandes conjuntos de datos en fragmentos manejables que pueden procesarse en paralelo. Este particionamiento no es arbitrario; se basa en estrategias sofisticadas que consideran la distribución de datos y los recursos disponibles.

Operaciones y Optimización

Las operaciones en el contexto de Spark-JDBC pueden clasificarse en tres categorías principales: lectura, escritura y transformación. Las operaciones de lectura pueden variar desde la simple recuperación de tablas completas hasta consultas complejas con múltiples joins y agregaciones. La escritura, por otro lado, puede implicar inserciones masivas, actualizaciones o operaciones de upsert.  

La optimización en este contexto es un arte complejo. El push-down de predicados representa una de las técnicas más importantes, permitiendo que los filtros se ejecuten en la base de datos antes de que los datos se transfieran a Spark. Esto puede reducir significativamente la cantidad de datos transferidos y mejorar el rendimiento general.  

La gestión de recursos y la configuración de parámetros juegan un papel crucial en el rendimiento. El tamaño del fetch, el número de particiones y el tamaño del batch deben ajustarse cuidadosamente según las características específicas de cada caso de uso. Estos ajustes pueden tener un impacto significativo en el rendimiento y la utilización de recursos.

Ejemplo de conexión de JDBC desde Databricks Community con SQL Server de Azure. 

Creación de SQL Server en Azure  

Crear un grupo de recursos. Luego crear un recurso > Bases de datos > SQL Database


Configura los detalles:  

  • Nombre del servidor SQL: databricks-sql-server-leo
  • Región: Italy North u otra disponible  
  • Autenticación: Habilita SQL Authentication y configura:  

Usuario: adminuser 

Contraseña: ContraseñaFuerte123

  • Marca la casilla de "Habilitar acceso a Azure Services" 

Crear la base de datos: AdventureWorksLT  

  • En la misma sección, crea una base de datos con nombre cualquiera (en este ejemplo le hemos llamado database-sql). Selecciona el servidor creado (databricks-sql-server-leo).Nivel de precio: Elige el plan más económico (Básico - DTU: 5).

  • En Additional settings, en Data Source escoger Sample para que se habilite la carga de AdventureWorksLT. Lo demas se deja por defecto.  

  • Ve al servidor que has creado y en networking configurar el firewall. Añade la IP pública de tu conexión local y habilita "Permitir acceso a todos los servicios de Azure". Habilita la dirección IP publica de databricks, suele ser: 54.200.13.2. Guarda los cambios.  

Configurar Databricks Community Edition  

Ve a la pestaña Compute y selecciona Create Compute con estos requisitos:  

  • Cluster Name: AdventureWorksCluster.  
  • Databricks Runtime Version: 11.3 LTS (Scala 2.12, Spark 3.3.1)  
  • Crear clúster.

Mientras el clúster se esta creando, descarga el controlador JDBC para SQL Server, en este caso usaremos este. O dentro del resositorio se encuentra dentro de la carpeta resources.

  • En Databricks subir el controlador a tu workspace o a tu DBFS.

 Una vez que el cluster esté activo ve al Cluster y en el boton Libraries cargar el controlador haciendo click en `Install New` y le pasas el path donde has guardado el controlador.


Conectar Databricks con SQL Server  

Crea un notebook en Databricks y añade el siguiente código ( con tus datos de configuración) :

# Configuración de conexión JDBC
jdbcHostname = "databricks-sql-server-leo.database.windows.net" # Servidor SQL
jdbcPort = 1433
jdbcDatabase = "database-sql" # Nombre exacto de tu base de datos
jdbcUsername = "adminuser" # Cambiar por tu usuario configurado
jdbcPassword = "ContraseñaFuerte123" # Cambiar por la contraseña configurada

jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase}"

# Propiedades de conexión
connectionProperties = {
  "user": jdbcUsername,
  "password": jdbcPassword,
  "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Consulta de Prueba

# Consulta de prueba
query = "(SELECT TOP 10 * FROM SalesLT.Product) AS temp" # Cambia por una tabla válida si es necesario

# Leer datos desde SQL Server
try:
  df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
  df.display() # Mostrar los datos
except Exception as e:
  print(f"Error al conectar: {e}")

Si te da fallo de IP, debe ser por que no esta configurado correctamente las reglas de firewall, usa el siguiente comando y el resultado de esa IP debes actualizarlo en las reglas de firewall de tu servidor

import requests
 
# Obtener la IP pública del nodo
public_ip = requests.get('https://api.ipify.org').text
print(f"La IP pública del nodo es: {public_ip}")

Vuelve a ejecutar el comando anterior de la consulta y veras ya conectado con la base de datos configurada en Azure.

Prueba con consultas simples usando Pyspark:  

 1. Listar todas las tablas disponibles

query = "(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE') AS temp"
df_tables = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_tables.show()

2. Productos con precios mayores a $50

query = "(SELECT ProductID, Name, ListPrice FROM SalesLT.Product WHERE ListPrice > 50) AS temp"
df_filtered = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_filtered.show()

3. Contar productos por categoría

query = """
(SELECT ProductCategoryID, COUNT(*) AS TotalProducts
 FROM SalesLT.Product
 GROUP BY ProductCategoryID) AS temp
"""
df_count = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_count.show()

  • Ordenamos los datos con Spark.
query = """
(SELECT ProductCategoryID, COUNT(*) AS TotalProducts
 FROM SalesLT.Product
 GROUP BY ProductCategoryID) AS temp
"""
# Ordenar los resultados en Spark
df_count_sorted = df_count.orderBy("TotalProducts", ascending=False)
df_count_sorted.display()

4. Contar el total de productos por tamaño

df.groupBy("Size").count().orderBy("count", ascending=False).show()

  • Hacer algunas mejoras a la consulta:
from pyspark.sql.functions import desc

df.filter(df["Size"].isNotNull()) \
  .groupBy("Size") \
  .count() \
  .withColumnRenamed("count", "TotalCount") \
  .orderBy(desc("TotalCount")) \
  .display()

5. Calcular el precio promedio de los productos

df.selectExpr("AVG(ListPrice) AS AveragePrice").show()

  • Vamos mejorar nuestra consulta, mostrando más información con el nombre del producto, realizamos agrupaciones para facilitar la consulta. Ahora calculamos el precio promedio por producto y agregamos el nombre.
from pyspark.sql.functions import round

df.groupBy("ProductID", "Name") \
  .agg({"ListPrice": "avg"}) \
  .withColumnRenamed("avg(ListPrice)", "AveragePrice") \
  .withColumn("AveragePrice", round("AveragePrice", 2)) \
  .display()

6. Encontrar productos sin categoría asignada

df.filter(df.ProductCategoryID.isNull()).select("ProductID", "Name", "ProductCategoryID").show()

Comprobamos porque esta consulta no nos muestra ningún resultado:

# Verificar si hay registros con ProductCategoryID nulo
count_nulls = df.filter(df.ProductCategoryID.isNull()).count()
print(f"Registros con ProductCategoryID NULL: {count_nulls}")

# Verificar los registros con ProductCategoryID NULL
if count_nulls > 0:
  print("Registros con ProductCategoryID NULL:")
  df.filter(df.ProductCategoryID.isNull()).select("ProductID", "Name", "ProductCategoryID").display()
else:
  print("No hay registros con ProductCategoryID NULL.")

# Verificar si hay registros con ProductCategoryID no nulo
count_non_nulls = df.filter(df.ProductCategoryID.isNotNull()).count()
print(f"Registros con ProductCategoryID NO NULL: {count_non_nulls}")

# Verificar los registros con ProductCategoryID no nulo
if count_non_nulls > 0:
  print("Registros con ProductCategoryID NO NULL:")
  df.filter(df.ProductCategoryID.isNotNull()).select("ProductID", "Name", "ProductCategoryID").display()
else:
  print("No hay registros con ProductCategoryID NO NULL.")

7. Contar productos por color

df.groupBy("Color").count().orderBy("count", ascending=False).show()

8. Calcular el costo total de todos los productos

df.selectExpr("SUM(StandardCost) AS TotalCost").show()

Vamos a mejorar la query anterior, mostrando el total por categoría y ordenando según el costo total.

from pyspark.sql.functions import round

df.groupBy("ProductCategoryID").agg({"StandardCost": "sum"}) \
  .withColumnRenamed("sum(StandardCost)", "TotalCost") \
  .withColumn("TotalCost", round("TotalCost", 2)) \
  .orderBy(desc("TotalCost")) \
  .display()

9. Productos que contienen una palabra específica en su nombre 

# Filtrar productos que contienen la palabra 'Helmet' en el nombre
df.filter(df.Name.contains("Helmet")).select("ProductID", "Name", "ListPrice").show()

Listamos los nombres de todos los productos

df.select("Name").display()

10. Listar productos creados después de 2005

df.filter(df.SellStartDate >= "2005-01-01").select("ProductID", "Name", "SellStartDate").show()