Để tự động chuyển đổi kiểu dữ liệu của các cột trong DataFrame dựa trên một JSON schema, chúng ta có thể dùng cách sau:
1. Định dạng JSON Schema
Giả sử bạn có một JSON schema định nghĩa kiểu dữ liệu cho các cột trong DataFrame như sau:
{
"columns": {
"column1": "string",
"column2": "integer",
"column3": "double",
"column4": "boolean"
}
}
2. Đọc JSON Schema
dùng lớp json để đọc và xử lý JSON schema để lấy ra các cột và kiểu dữ liệu tương ứng
import json
# Đọc JSON schema từ file hoặc chuỗi JSON
json_schema = """
{
"columns": {
"column1": "string",
"column2": "integer",
"column3": "double",
"column4": "boolean"
}
}
"""
schema = json.loads(json_schema)
column_types = schema['columns']
3. Chuyển đổi kiểu dữ liệu của các cột
Sử dụng PySpark, bạn có thể tự động chuyển đổi kiểu dữ liệu của các cột trong DataFrame dựa trên JSON schema
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, DoubleType, BooleanType
# Định nghĩa mapping từ schema type thành PySpark type
type_mapping = {
"string": StringType(),
"integer": IntegerType(),
"double": DoubleType(),
"boolean": BooleanType()
}
# Giả sử df là DataFrame bạn đang làm việc
df = spark.sql("SELECT * FROM your_table")
# Chuyển đổi kiểu dữ liệu cho từng cột theo JSON schema
for column, dtype in column_types.items():
if column in df.columns:
df = df.withColumn(column, col(column).cast(type_mapping[dtype]))
4. Kiểm tra kết quả
Cuối cùng kiểm tra schema đã được thay đổi.
df.printSchema()
5. Ví dụ tự động đổi data type của dữ liệu sử dụng schema lấy từ SQL DB
5.1. Trích xuất schema từ SQL Server
Có thể thực hiện bằng cách sử dụng một truy vấn SQL để lấy thông tin về cột và kiểu dữ liệu.
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'your_table_name';
5.2. Kết nối với SQL Server và lấy schema
import pyodbc
# Kết nối với SQL Server
conn = pyodbc.connect(
'DRIVER={SQL Server};'
'SERVER=your_server_name;'
'DATABASE=your_database_name;'
'UID=your_username;'
'PWD=your_password'
)
cursor = conn.cursor()
# Truy vấn lấy schema
cursor.execute("""
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'your_table_name';
""")
# Lưu schema vào dictionary
column_types = {row.COLUMN_NAME: row.DATA_TYPE for row in cursor.fetchall()}
5.3. Mapping SQL Server Data Types to PySpark Types
Tạo một mapping giữa các kiểu dữ liệu của SQL Server và PySpark
from pyspark.sql.types import StringType, IntegerType, DoubleType, BooleanType
# Định nghĩa mapping từ SQL Server type thành PySpark type
type_mapping = {
"varchar": StringType(),
"nvarchar": StringType(),
"int": IntegerType(),
"bigint": IntegerType(),
"decimal": DoubleType(),
"float": DoubleType(),
"bit": BooleanType()
# Thêm các kiểu dữ liệu khác nếu cần
}
5.4. Chuyển đổi kiểu dữ liệu trong DataFrame
Sử dụng schema đã lấy từ SQL Server để chuyển đổi kiểu dữ liệu của các cột trong DataFrame PySpark
from pyspark.sql.functions import col
# Giả sử df là DataFrame bạn đang làm việc
df = spark.sql("SELECT * FROM your_table")
# Chuyển đổi kiểu dữ liệu cho từng cột theo schema từ SQL Server
for column, dtype in column_types.items():
if column in df.columns and dtype in type_mapping:
df = df.withColumn(column, col(column).cast(type_mapping[dtype]))
5.5. Kiểm tra và xác nhận kết quả
df.printSchema()
5. Tóm tắt
Với phương pháp này, bạn có thể tự động chuyển đổi kiểu dữ liệu của các cột trong DataFrame dựa trên JSON schema mà không cần phải làm thủ công từng cột một.
※Thực thi đoạn mã trên trong notebook PySpark hoặc môi trường PySpark khác để áp dụng chuyển đổi kiểu dữ liệu tự động dựa trên JSON schema.
bài viết hay quá!đúng nội dung mình cần tìm.cám ơn ad đã chia sẻ.
cám ơn ad đã chia sẻ!