ĐÂU LÀ GIÁ TRỊ CỦA CHÚNG TA!

BIVIET

Đâu là giá trị của chúng ta?

Tạo UDF (User-Defined Funtion)
4.6
(27)

UDF là gì?

UDF (User-Defined Function) là hàm do người dùng tự định nghĩa để mở rộng khả năng của các hệ thống xử lý dữ liệu, chẳng hạn như PySpark. Nó cho phép bạn viết các logic tùy chỉnh mà không có sẵn trong các hàm tích hợp sẵn của Spark.

Trong PySpark, UDF được sử dụng để áp dụng các hàm tùy chỉnh lên các cột của DataFrame. Bạn có thể sử dụng UDF để xử lý dữ liệu theo các quy tắc mà không có trong thư viện PySpark mặc định.

Cách tạo UDF

1. Bạn định nghĩa một hàm trong Python để xử lý dữ liệu theo cách của riêng bạn.
2. Sau đó, bạn đăng ký hàm này như một UDF trong Spark bằng cách sử dụng udf() để áp dụng hàm này lên các cột trong DataFrame.

Ví dụ

Tạo hàm để thực hiện chuyển đổi kiểu dữ liệu string qua datetime trong pyspark. Đáp ứng được nhiều format của datetime khác nhau.

from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, DateType
from datetime import datetime
import re
# Các format cho kiểu datetime và date
formats = [
    "%Y-%m-%d %H:%M:%S.%f",      #2024-01-01 10:01:01.12345
    "%Y-%m-%d %H:%M:%S",         #2024-01-01 10:01:01
    "%Y-%m-%d %H:%M",            #2024-01-01 10:01
    "%Y-%m-%d",                  #2024-01-01
    "%Y/%m/%d %H:%M:%S.%f",      #2024/01/01 10:01:01.12345
    "%Y/%m/%d %H:%M:%S",         #2024/01/01 10:01:01
    "%Y/%m/%d %H:%M",            #2024/01/01 10:01
    "%Y/%m/%d",                  #2024/01/01
    "%b %d %Y %I:%M%p"           #Jan 01 2024 10:01AM
]
time_formats = [
    # timeデータ型
    "%H:%M:%S.%f",               #10:01:01.12345
    "%H:%M:%S",                  #10:01:01
    "%H:%M"                      #10:01
]
# function để convert timestamp
def parse_timestamp(timestamp_str):
    # Định nghĩa cho ngày đầu giới hạn của datetime
    delta_min_date = datetime(1900,1,1)
    # parquet chấp nhận những ngày từ 1900/01/01 trờ về sau
    if timestamp_str is None:
        return None
    # Xử lý các format của dạng datetime
    for datetime_fmt in formats:
        try:
            # xử lý phần microsecond (phần microsecond không quá 6 ký tự)
            if ('.' in timestamp_str) and (len(timestamp_str.split('.')[-1])>6):
                # Remove các ký tự sau ký tự thứ 6
                timestamp_str = timestamp_str[:timestamp_str.index('.')+7]
                # Xử lý thay đổi đữ liệu
            parsed_datetime = datetime.strptime(timestamp_str, datetime_fmt)
            # Xử lý các dữ liệu nhỏ hơn 1899-12-31
            if parsed_datetime < delta_min_date:
                return datetime(1900,1,1, parsed_datetime.hour, parsed_datetime.minute, parsed_datetime.second, parsed_datetime.microsecond)
            return parsed_datetime
        except ValueError:
            continue
    # xử lý time format
    for time_fmt in time_formats:
        try:
            if ('.' in timestamp_str) and (len(timestamp_str.split('.')[-1])>6):
                # Bỏ các ký tự sau ký tự thứ 6 của microsecond
                timestamp_str = timestamp_str[:timestamp_str.index('.')+7]
            # Xử lý chuyển đổi dữ liệu
            parsed_time = datetime.strptime(timestamp_str, time_fmt)
            return datetime(1900,1,1, parsed_time.hour, parsed_time.minute, parsed_time.second, parsed_time.microsecond)
        except ValueError:
            continue
    return None # Trả None đối với những dữ liệu không có format
# Đăng ký udf
udf_parse_timestamp = udf(parse_timestamp, TimestampType())

Cách dùng

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Tạo Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()

# Giả sử có một DataFrame chứa dữ liệu ngày giờ dưới dạng chuỗi
data = [("2024-09-06 12:30:45.123456",), ("06-09-2024 12:30:45",), ("Sep 06 2024 12:30PM",)]
df = spark.createDataFrame(data, ["timestamp_str"])

# Áp dụng UDF parse_date_udf để chuyển đổi các chuỗi ngày giờ thành kiểu date
df_with_date = df.withColumn("date_column", parse_date_udf(col("timestamp_str")))

# Hiển thị kết quả
df_with_date.show(truncate=False)

Giải thích Format

Định dạng ngày giờ (Format)Ví dụGhi Chú
%Y-%m-%d %H:%M:%S.%f2024-09-06 12:30:45.123456
%Y-%m-%d %H:%M:%S2024-09-06 12:30:45
%Y-%m-%d %H:%M2024-09-06 12:30
%Y/%m/%d %H:%M:%S.%f2024/09/06 12:30:45.123456
%Y/%m/%d %H:%M:%S2024/09/06 12:30:45
%Y/%m/%d %H:%M2024/09/06 12:30
%b %d %Y %I:%M%pSep 06 2024 12:30PM
Các format mẫu trong code

Các hàm tiện dụng khác

Đổi kiểu dữ liệu date

Để đổi kiểu dữ liệu date, chung ta dùng code định dạng dữ liệu datetime trên, nhưng chỉ lấy phần date sau khi chuyển đổi

from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, DateType
from datetime import datetime
import re
# Các format cho kiểu datetime và date
formats = [
    "%Y-%m-%d %H:%M:%S.%f",      #2024-01-01 10:01:01.12345
    "%Y-%m-%d %H:%M:%S",         #2024-01-01 10:01:01
    "%Y-%m-%d %H:%M",            #2024-01-01 10:01
    "%Y-%m-%d",                  #2024-01-01
    "%Y/%m/%d %H:%M:%S.%f",      #2024/01/01 10:01:01.12345
    "%Y/%m/%d %H:%M:%S",         #2024/01/01 10:01:01
    "%Y/%m/%d %H:%M",            #2024/01/01 10:01
    "%Y/%m/%d",                  #2024/01/01
    "%b %d %Y %I:%M%p"           #Jan 01 2024 10:01AM
]
time_formats = [
    # timeデータ型
    "%H:%M:%S.%f",               #10:01:01.12345
    "%H:%M:%S",                  #10:01:01
    "%H:%M"                      #10:01
]

#function để convert date

def parse_date(date_str):
    # Định nghĩa ngày được parquet chấp nhận
    delta_min_date = datetime(1900,1,1).date()
    if date_str is None:
        return None
    # Xử lý ngày
    for date_fmt in formats:
        try:
            # Xử lý phần microsecond
            if('.' in date_str) and (len(date_str.split('.')[-1])>6):
                # Bỏ các ký tự sau ký tự thứ 6 của microsecond
                date_str = date_str[:date_str.index('.')+7]
            # Xử lý đổi kiểu dữ liệu
            parsed_date = datetime.strptime(date_str, date_fmt).date()
            # Xử lý các dữ liệu trước ngày 1899-12-31
            if parsed_date < delta_min_date:
                return delta_min_date
            return parsed_date
        except ValueError:
            continue
    return None
# Đăng ký udf
udf_parse_date = udf(parsed_date, DateType())

Chuyển đổi các ký tự không được hỗ trợ trong delta table column

Có thể dùng function sau để chuyển đổi tên column trong dataframe để có thể tạo delta table.
Function sau sử dụng để thay thế các ký tự không được hỗ trợ trong chuỗi (cụ thể là các ký tự ()[];,{}\n\t= ) bằng dấu gạch dưới _.

import re
# Các ký tự không dược hỗ trợ
def replace_special_chars(column_name):
    return re.sub(r'[()\[\];,{}\n\t= ]','_',column_name)

Dùng code trên để đổi các column_name trong data frame

from pyspark.sql import SparkSession
import re

# Tạo Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()

# Giả sử có một DataFrame với các cột có tên chứa ký tự đặc biệt
data = [(1, 2, 3)]
columns = ["col(1)", "col[2]", "col;3"]

df = spark.createDataFrame(data, columns)

# Hàm thay thế ký tự đặc biệt trong tên cột
def replace_special_chars(column_name):
    return re.sub(r'[()\[\];,{}\n\t= ]', '_', column_name)

# Đổi tên các cột để thay thế các ký tự đặc biệt
new_columns = [replace_special_chars(col) for col in df.columns]

# Tạo DataFrame mới với tên cột đã được thay thế
df_cleaned = df.toDF(*new_columns)

# Hiển thị DataFrame với tên cột đã được thay thế
df_cleaned.show()

Xóa các ký tự “_” ở vị trí đầu và vị trí cuối trong tên column

import re

# Xoá ký tự "_" ở đầu hoặc cuối của tên column
def remove_start_end_special_chars(column_name):
    return re.sub(r'^_+|_+$','',column_name)

Với cách tạo UDF trên, chúng ta có thể linh động hơn trong việc sử dụng notebook. Các bạn có thể tự tạo cho mình những funtions riêng và tập hợp trên 1 notebook để dễ dàng chỉnh sửa, và sử dụng.

How useful was this post?

Click on a star to rate it!

Average rating 4.6 / 5. Vote count: 27

No votes so far! Be the first to rate this post.

Tạo UDF (User-Defined Funtion)
31

Leave a Reply

Your email address will not be published. Required fields are marked *

Scroll to top