UDF là gì?
UDF (User-Defined Function) là hàm do người dùng tự định nghĩa để mở rộng khả năng của các hệ thống xử lý dữ liệu, chẳng hạn như PySpark. Nó cho phép bạn viết các logic tùy chỉnh mà không có sẵn trong các hàm tích hợp sẵn của Spark.
Trong PySpark, UDF được sử dụng để áp dụng các hàm tùy chỉnh lên các cột của DataFrame. Bạn có thể sử dụng UDF để xử lý dữ liệu theo các quy tắc mà không có trong thư viện PySpark mặc định.
Cách tạo UDF
1. Bạn định nghĩa một hàm trong Python để xử lý dữ liệu theo cách của riêng bạn.
2. Sau đó, bạn đăng ký hàm này như một UDF trong Spark bằng cách sử dụng udf()
để áp dụng hàm này lên các cột trong DataFrame.
Ví dụ
Tạo hàm để thực hiện chuyển đổi kiểu dữ liệu string qua datetime trong pyspark. Đáp ứng được nhiều format của datetime khác nhau.
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, DateType
from datetime import datetime
import re
# Các format cho kiểu datetime và date
formats = [
"%Y-%m-%d %H:%M:%S.%f", #2024-01-01 10:01:01.12345
"%Y-%m-%d %H:%M:%S", #2024-01-01 10:01:01
"%Y-%m-%d %H:%M", #2024-01-01 10:01
"%Y-%m-%d", #2024-01-01
"%Y/%m/%d %H:%M:%S.%f", #2024/01/01 10:01:01.12345
"%Y/%m/%d %H:%M:%S", #2024/01/01 10:01:01
"%Y/%m/%d %H:%M", #2024/01/01 10:01
"%Y/%m/%d", #2024/01/01
"%b %d %Y %I:%M%p" #Jan 01 2024 10:01AM
]
time_formats = [
# timeデータ型
"%H:%M:%S.%f", #10:01:01.12345
"%H:%M:%S", #10:01:01
"%H:%M" #10:01
]
# function để convert timestamp
def parse_timestamp(timestamp_str):
# Định nghĩa cho ngày đầu giới hạn của datetime
delta_min_date = datetime(1900,1,1)
# parquet chấp nhận những ngày từ 1900/01/01 trờ về sau
if timestamp_str is None:
return None
# Xử lý các format của dạng datetime
for datetime_fmt in formats:
try:
# xử lý phần microsecond (phần microsecond không quá 6 ký tự)
if ('.' in timestamp_str) and (len(timestamp_str.split('.')[-1])>6):
# Remove các ký tự sau ký tự thứ 6
timestamp_str = timestamp_str[:timestamp_str.index('.')+7]
# Xử lý thay đổi đữ liệu
parsed_datetime = datetime.strptime(timestamp_str, datetime_fmt)
# Xử lý các dữ liệu nhỏ hơn 1899-12-31
if parsed_datetime < delta_min_date:
return datetime(1900,1,1, parsed_datetime.hour, parsed_datetime.minute, parsed_datetime.second, parsed_datetime.microsecond)
return parsed_datetime
except ValueError:
continue
# xử lý time format
for time_fmt in time_formats:
try:
if ('.' in timestamp_str) and (len(timestamp_str.split('.')[-1])>6):
# Bỏ các ký tự sau ký tự thứ 6 của microsecond
timestamp_str = timestamp_str[:timestamp_str.index('.')+7]
# Xử lý chuyển đổi dữ liệu
parsed_time = datetime.strptime(timestamp_str, time_fmt)
return datetime(1900,1,1, parsed_time.hour, parsed_time.minute, parsed_time.second, parsed_time.microsecond)
except ValueError:
continue
return None # Trả None đối với những dữ liệu không có format
# Đăng ký udf
udf_parse_timestamp = udf(parse_timestamp, TimestampType())
Cách dùng
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Tạo Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()
# Giả sử có một DataFrame chứa dữ liệu ngày giờ dưới dạng chuỗi
data = [("2024-09-06 12:30:45.123456",), ("06-09-2024 12:30:45",), ("Sep 06 2024 12:30PM",)]
df = spark.createDataFrame(data, ["timestamp_str"])
# Áp dụng UDF parse_date_udf để chuyển đổi các chuỗi ngày giờ thành kiểu date
df_with_date = df.withColumn("date_column", parse_date_udf(col("timestamp_str")))
# Hiển thị kết quả
df_with_date.show(truncate=False)
Giải thích Format
Định dạng ngày giờ (Format) | Ví dụ | Ghi Chú |
%Y-%m-%d %H:%M:%S.%f | 2024-09-06 12:30:45.123456 | |
%Y-%m-%d %H:%M:%S | 2024-09-06 12:30:45 | |
%Y-%m-%d %H:%M | 2024-09-06 12:30 | |
%Y/%m/%d %H:%M:%S.%f | 2024/09/06 12:30:45.123456 | |
%Y/%m/%d %H:%M:%S | 2024/09/06 12:30:45 | |
%Y/%m/%d %H:%M | 2024/09/06 12:30 | |
%b %d %Y %I:%M%p | Sep 06 2024 12:30PM |
Các hàm tiện dụng khác
Đổi kiểu dữ liệu date
Để đổi kiểu dữ liệu date, chung ta dùng code định dạng dữ liệu datetime trên, nhưng chỉ lấy phần date sau khi chuyển đổi
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, DateType
from datetime import datetime
import re
# Các format cho kiểu datetime và date
formats = [
"%Y-%m-%d %H:%M:%S.%f", #2024-01-01 10:01:01.12345
"%Y-%m-%d %H:%M:%S", #2024-01-01 10:01:01
"%Y-%m-%d %H:%M", #2024-01-01 10:01
"%Y-%m-%d", #2024-01-01
"%Y/%m/%d %H:%M:%S.%f", #2024/01/01 10:01:01.12345
"%Y/%m/%d %H:%M:%S", #2024/01/01 10:01:01
"%Y/%m/%d %H:%M", #2024/01/01 10:01
"%Y/%m/%d", #2024/01/01
"%b %d %Y %I:%M%p" #Jan 01 2024 10:01AM
]
time_formats = [
# timeデータ型
"%H:%M:%S.%f", #10:01:01.12345
"%H:%M:%S", #10:01:01
"%H:%M" #10:01
]
#function để convert datedef parse_date(date_str):
# Định nghĩa ngày được parquet chấp nhận
delta_min_date = datetime(1900,1,1).date()
if date_str is None:
return None
# Xử lý ngày
for date_fmt in formats:
try:
# Xử lý phần microsecond
if('.' in date_str) and (len(date_str.split('.')[-1])>6):
# Bỏ các ký tự sau ký tự thứ 6 của microsecond
date_str = date_str[:date_str.index('.')+7]
# Xử lý đổi kiểu dữ liệu
parsed_date = datetime.strptime(date_str, date_fmt).date()
# Xử lý các dữ liệu trước ngày 1899-12-31
if parsed_date < delta_min_date:
return delta_min_date
return parsed_date
except ValueError:
continue
return None
# Đăng ký udf
udf_parse_date = udf(parsed_date, DateType())
Chuyển đổi các ký tự không được hỗ trợ trong delta table column
Có thể dùng function sau để chuyển đổi tên column trong dataframe để có thể tạo delta table.
Function sau sử dụng để thay thế các ký tự không được hỗ trợ trong chuỗi (cụ thể là các ký tự ()[];,{}\n\t=
) bằng dấu gạch dưới _
.
import re
# Các ký tự không dược hỗ trợ
def replace_special_chars(column_name):
return re.sub(r'[()\[\];,{}\n\t= ]','_',column_name)
Dùng code trên để đổi các column_name trong data frame
from pyspark.sql import SparkSession
import re
# Tạo Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()
# Giả sử có một DataFrame với các cột có tên chứa ký tự đặc biệt
data = [(1, 2, 3)]
columns = ["col(1)", "col[2]", "col;3"]
df = spark.createDataFrame(data, columns)
# Hàm thay thế ký tự đặc biệt trong tên cột
def replace_special_chars(column_name):
return re.sub(r'[()\[\];,{}\n\t= ]', '_', column_name)
# Đổi tên các cột để thay thế các ký tự đặc biệt
new_columns = [replace_special_chars(col) for col in df.columns]
# Tạo DataFrame mới với tên cột đã được thay thế
df_cleaned = df.toDF(*new_columns)
# Hiển thị DataFrame với tên cột đã được thay thế
df_cleaned.show()
Xóa các ký tự “_” ở vị trí đầu và vị trí cuối trong tên column
import re
# Xoá ký tự "_" ở đầu hoặc cuối của tên column
def remove_start_end_special_chars(column_name):
return re.sub(r'^_+|_+$','',column_name)
Với cách tạo UDF trên, chúng ta có thể linh động hơn trong việc sử dụng notebook. Các bạn có thể tự tạo cho mình những funtions riêng và tập hợp trên 1 notebook để dễ dàng chỉnh sửa, và sử dụng.