Async Generator: Khi Stream Dữ Liệu Gặp Gỡ Bất Đồng Bộ
Python

Async Generator: Khi Stream Dữ Liệu Gặp Gỡ Bất Đồng Bộ

Author

Admin System

@root

Ngày xuất bản

21 Mar, 2026

Lượt xem

4 Lượt

"async_generator"

Async Generator: Khi Stream Dữ Liệu Gặp Gỡ Bất Đồng Bộ – Flex Sức Mạnh Vô Song!

Chào các bạn "dev-er" Gen Z! Anh Creyt lại lên sóng đây. Hôm nay, chúng ta sẽ "bóc tách" một concept mà nghe thì có vẻ "hack não" nhưng thực ra lại cực kỳ "chill" khi hiểu rõ: async_generator. Nghe đồn là nó giúp code của mình "mượt mà" hơn, "flex" được sức mạnh của lập trình bất đồng bộ (asyncio) trong Python. Vậy nó là cái "quái gì" và làm sao để mình "hút" được hết tiềm năng của nó? Nghe anh Creyt kể chuyện nhé!

1. Async Generator là gì và để làm gì? – Kể chuyện TikTok Stream

Hãy tưởng tượng bạn đang lướt TikTok hoặc xem một livestream game. Bạn có thấy là video cứ thế "trôi" đến, từng đoạn từng đoạn một, mà bạn không phải chờ cả cái video dài ngoẵng đó tải xong mới xem được không? Đó chính là tinh thần của generator nói chung, và async_generator nói riêng, nhưng ở một "level" cao hơn, "pro" hơn!

Generator thường (sync generator): Giống như một đầu bếp làm món ăn, cứ làm xong món nào là bưng ra cho khách món đó. Nhưng nếu món nào cần đợi lâu (ví dụ, hầm xương 3 tiếng), thì cả nhà hàng phải ngồi chờ, không ai được ăn gì khác cho đến khi món đó xong. Nó "block" cả tiến trình.

Async Generator: Giờ tưởng tượng đầu bếp đó có một "đội quân" trợ lý siêu năng lực. Khi món hầm cần 3 tiếng, đầu bếp giao cho trợ lý lo liệu, rồi quay sang làm món gỏi, món khai vị khác. Món nào xong trước thì bưng ra trước. Khách hàng cứ thế được "stream" đồ ăn liên tục, không bị "đứng hình" chờ đợi.

Nói một cách "học thuật" hơn, async_generator là một hàm generator có khả năng tạm dừng thực thi để await một tác vụ bất đồng bộ (ví dụ: đọc file, gọi API, truy vấn database) và sau đó yield ra từng giá trị khi chúng sẵn sàng. Nó cho phép bạn tạo ra một chuỗi các giá trị một cách bất đồng bộ, mà không làm "treo" toàn bộ ứng dụng của bạn. Điều này cực kỳ hữu ích khi bạn làm việc với dữ liệu lớn, stream dữ liệu liên tục, hoặc các tác vụ I/O tốn thời gian.

2. Code Ví Dụ Minh Hoạ – "Flex" Code Ngay và Luôn!

Cú pháp của async_generator khá giống generator truyền thống, nhưng có thêm từ khóa async def và khả năng dùng await. Để "triệu hồi" nó, bạn sẽ dùng async for.

Ví dụ 1: Đếm số bất đồng bộ

Hãy cùng tạo một async_generator đơn giản để đếm số, nhưng mỗi lần đếm lại "nghỉ" một chút, mô phỏng một tác vụ I/O nào đó.

Illustration

import asyncio

async def async_number_generator(limit):
    """
    Một async generator đếm số từ 0 đến limit-1.
    Mỗi lần đếm, nó "nghỉ" 0.5 giây.
    """
    print("Bắt đầu đếm số bất đồng bộ...")
    for i in range(limit):
        await asyncio.sleep(0.5)  # Giả lập tác vụ I/O tốn thời gian
        yield i
        print(f"  Đã yield số: {i}")
    print("Kết thúc đếm số bất đồng bộ.")

async def main():
    print("Chương trình chính bắt đầu.")
    print("Đang lấy các số từ async generator:")
    async for number in async_number_generator(5):
        print(f"    Nhận được số từ generator: {number}")
    
    print("\nLấy các số từ async generator lần 2 (có tác vụ song song):")
    
    # Kết hợp async generator với một tác vụ bất đồng bộ khác
    async def another_task():
        for _ in range(3):
            await asyncio.sleep(0.3)
            print("      Tác vụ khác đang chạy...")

    # Chạy song song generator và tác vụ khác
    await asyncio.gather(
        another_task(),
        async def():
            async for number in async_number_generator(3):
                print(f"    Nhận được số từ generator (song song): {number}")
    )() # Gọi lambda function để chạy async for
    
    print("Chương trình chính kết thúc.")

if __name__ == "__main__":
    asyncio.run(main())

Giải thích code:

  • async def async_number_generator(limit):: Khai báo đây là một async generator.
  • await asyncio.sleep(0.5): Đây là điểm mấu chốt. Thay vì "treo" cả chương trình, nó nhường quyền điều khiển cho asyncio để chạy các tác vụ khác (nếu có) trong lúc chờ đợi. Sau 0.5 giây, nó sẽ "thức dậy" và tiếp tục.
  • yield i: Trả về giá trị i và tạm dừng thực thi, chờ lần next() tiếp theo.
  • async for number in async_number_generator(5):: Cách để "tiêu thụ" các giá trị từ async generator. Nó sẽ await mỗi lần yield giá trị.

Bạn sẽ thấy output không bị "treo" hoàn toàn giữa các lần đếm, đặc biệt ở ví dụ thứ 2 khi có another_task chạy song song. Các thông báo từ another_task sẽ xen kẽ với thông báo từ generator, chứng tỏ tính bất đồng bộ của nó.

3. Mẹo (Best Practices) – Ghi nhớ để "flex" code mượt mà!

  1. Hiểu rõ khi nào cần dùng: Đừng "lạm dụng" async_generator cho mọi thứ. Chỉ dùng khi bạn cần stream dữ liệu (tức là cần trả về từng phần một) VÀ các tác vụ để tạo ra từng phần đó là bất đồng bộ (I/O bound). Nếu chỉ là tính toán CPU thuần túy, generator thường là đủ.
  2. await là chìa khóa: Luôn nhớ rằng await bên trong async_generator là để nhường quyền điều khiển. Nếu bạn không có await nào, nó sẽ chạy như một generator đồng bộ (mặc dù vẫn là async def), và bạn sẽ không tận dụng được lợi thế bất đồng bộ.
  3. Xử lý lỗi: Giống như các generator thông thường, async_generator cũng có thể ném ra ngoại lệ. Hãy dùng try...except để đảm bảo luồng dữ liệu không bị gián đoạn giữa chừng nếu có lỗi xảy ra trong quá trình tạo giá trị.
  4. Đóng generator: Trong một số trường hợp, bạn cần đảm bảo tài nguyên được giải phóng khi async_generator không còn được sử dụng nữa. Python 3.6+ hỗ trợ phương thức aclose() để đóng một async_generator một cách tường minh, hoặc bạn có thể dùng async with nếu async_generator của bạn là một async context manager.

4. Ví Dụ Thực Tế – Ai đang "flex" nó ngoài kia?

async_generator không phải là một thứ gì đó "trên trời", mà nó được ứng dụng rất nhiều trong các hệ thống hiện đại:

  • API Streaming: Khi bạn gọi một API trả về dữ liệu theo từng "chunk" (đoạn nhỏ), ví dụ như API của OpenAI cho ChatGPT stream câu trả lời, hoặc các API stream dữ liệu tài chính theo thời gian thực. async_generator có thể giúp bạn xử lý từng chunk dữ liệu ngay khi nó đến, thay vì phải đợi toàn bộ phản hồi.
  • WebSockets: Trong các ứng dụng dùng WebSocket để nhận dữ liệu liên tục (chat app, real-time dashboards), async_generator có thể được dùng để "yield" từng tin nhắn hoặc sự kiện đến từ server.
  • Xử lý File/Database lớn: Đọc một file log khổng lồ hoặc truy vấn một database với hàng triệu bản ghi theo từng đợt (batch) để tránh tràn bộ nhớ và xử lý bất đồng bộ.
  • Data Pipelines: Trong các hệ thống xử lý dữ liệu nơi bạn cần chuyển đổi và truyền dữ liệu qua nhiều bước, mỗi bước có thể là một async_generator xử lý một phần dữ liệu và yield kết quả cho bước tiếp theo.

5. Thử nghiệm đã từng và nên dùng cho case nào?

Anh Creyt đã từng "chinh chiến" với async_generator trong một dự án xây dựng hệ thống thu thập dữ liệu giá tiền ảo theo thời gian thực. Dữ liệu từ các sàn giao dịch đổ về liên tục qua WebSocket.

Case Study: Ban đầu, team dùng một vòng lặp while True để await nhận tin nhắn, rồi xử lý. Cách này hoạt động, nhưng khi cần "linh hoạt" hơn, ví dụ như có nhiều nguồn dữ liệu hoặc cần "inject" thêm logic kiểm tra vào giữa chừng, thì code trở nên rối rắm.

Khi chuyển sang dùng async_generator, mỗi sàn giao dịch được "đại diện" bởi một async_generator riêng. Nó sẽ await tin nhắn từ WebSocket, yield ra một đối tượng dữ liệu đã được chuẩn hóa. Sau đó, một async for khác sẽ "tiêu thụ" các dữ liệu này, đưa vào hàng đợi xử lý.

# Ví dụ concept đơn giản cho việc thu thập dữ liệu từ nhiều nguồn
import asyncio
import random

async def fetch_data_from_exchange(exchange_name, delay_min, delay_max):
    """
    Giả lập một async generator lấy dữ liệu từ sàn giao dịch.
    """
    while True:
        await asyncio.sleep(random.uniform(delay_min, delay_max)) # Giả lập độ trễ mạng
        price = round(random.uniform(10000, 70000), 2)
        yield {"exchange": exchange_name, "price": price, "timestamp": asyncio.get_event_loop().time()}
        print(f"  [{exchange_name}] Đã fetch và yield giá: {price}")

async def data_consumer(generator, consumer_id):
    """
    Một consumer xử lý dữ liệu từ async generator.
    """
    print(f"Consumer {consumer_id} bắt đầu tiêu thụ dữ liệu.")
    async for data_point in generator:
        # Ở đây bạn có thể lưu vào DB, gửi đến Kafka, xử lý thống kê, v.v.
        print(f"    Consumer {consumer_id} nhận được: {data_point}")
        await asyncio.sleep(0.1) # Giả lập thời gian xử lý

async def main_trading_app():
    print("Ứng dụng giao dịch bắt đầu.")
    
    # Tạo các async generator cho các sàn khác nhau
    binance_gen = fetch_data_from_exchange("Binance", 0.5, 1.5)
    coinbase_gen = fetch_data_from_exchange("Coinbase", 0.3, 1.0)

    # Tạo các consumer để xử lý dữ liệu từ các sàn (có thể chạy song song)
    task1 = asyncio.create_task(data_consumer(binance_gen, "BinanceProcessor"))
    task2 = asyncio.create_task(data_consumer(coinbase_gen, "CoinbaseProcessor"))
    
    # Chạy trong một khoảng thời gian nhất định để thấy hiệu quả
    await asyncio.sleep(10) 
    
    task1.cancel()
    task2.cancel()
    try:
        await asyncio.gather(task1, task2, return_exceptions=True)
    except asyncio.CancelledError:
        print("Các consumer đã dừng.")

    print("Ứng dụng giao dịch kết thúc.")

if __name__ == "__main__":
    asyncio.run(main_trading_app())

Khi nào nên dùng?

  • Khi bạn cần stream dữ liệu: Dữ liệu đến từng phần và bạn muốn xử lý từng phần một thay vì đợi toàn bộ.
  • Khi việc tạo ra mỗi phần dữ liệu là một tác vụ bất đồng bộ: Gọi API, đọc từ network, database, file system.
  • Khi bạn muốn "phân tách" logic: Tách biệt logic tạo dữ liệu khỏi logic tiêu thụ dữ liệu, giúp code dễ đọc, dễ bảo trì và dễ mở rộng hơn.
  • Trong các pipeline xử lý dữ liệu bất đồng bộ: Mỗi bước trong pipeline có thể là một async_generator, nhận đầu vào từ generator trước và yield đầu ra cho generator kế tiếp.

Tóm lại, async_generator là một công cụ cực kỳ mạnh mẽ để "flex" khả năng xử lý bất đồng bộ của Python, giúp ứng dụng của bạn mượt mà, phản hồi nhanh hơn và "cool ngầu" hơn rất nhiều trong thế giới của dữ liệu và stream liên tục. Hãy "thực hành" ngay để không bị "tối cổ" nhé các Gen Z!

Thuộc Series: Python

Bài giảng này được tự động xuất bản ngẫu nhiên từ thư viện kiến thức. Đừng quên đón xem các Từ khoá Hướng Dẫn tiếp theo nhé!

#tech #cyberpunk #laravel
Chỉnh sửa bài viết

Bình luận (0)

Vui lòng Đăng Nhập để Bình luận

Hỗ trợ Markdown cơ bản
Nguyễn Văn A
1 ngày trước

Tính năng này đỉnh quá ad ơi, chờ mãi mới thấy một blog Tiếng Việt có UI/UX xịn như vầy!