본문 바로가기
Algorithm Trading/ComDon 프로그램 개발이야기

9. 비동기 프로그래밍

by 컴돈AI 2024. 1. 28.

목차

비동기 프로그래밍

  • 자동매매 프로젝트의 경우 정확도가 중요합니다. 비동기적으로 처리하면서도, 로직이 꼬이지 않기 위해서 많은 시행착오를 겪었습니다. 그 과정 속에서 배워나갔던 점들을 하나하나 정리해보도록하겠습니다.

asyncio.create_task() vs asyncio.gather()

  • asyncio.create_task()와 asyncio.gather()은 async로 작성된 비동기함수를 실행할때 사용합니다. 깊게 들어가면 더 많은 차이점이 있지만, 우선은 큼직한 틀에서 차이점을 살펴보도록하겠습니다. 
  • asyncio.create_task() vs asyncio.gather()
    • asyncio.create_task()와 asyncio.gather()는 모두 async로 작성된 비동기 함수를 실행할 때 사용할 수 있습니다. 
    • 단, asyncio.create_task()는 하나의 비동기함수를 실행하지만 asyncio.gahter()는 두개 이상의 비동기함수를 한번에 실행시킬수 있습니다.
      • 참고 : asyncio.create_task()도 다음과 같이 여러개의 비동기 함수를 한 번에 실행시킬 수 있습니다.
        • import asyncio
          
          async def task1():
              print("Task 1 시작")
              await asyncio.sleep(1)
              print("Task 1 완료")
              return "Task 1 결과"
          
          async def task2():
              print("Task 2 시작")
              await asyncio.sleep(2)
              print("Task 2 완료")
              return "Task 2 결과"
          
          async def main():
              # 여러 Task 생성
              task1_handle = asyncio.create_task(task1())
              task2_handle = asyncio.create_task(task2())
          
              # 모든 Task가 완료될 때까지 기다림
              await task1_handle
              await task2_handle
          
          asyncio.run(main())
    • 추가적으로 asyncio.create_task()는 task를 실행한 뒤에 다음 코드들을 이어서 실행할 수 있지만, asyncio.gather()는 gather()안에 있는 모든 비동기함수들이 종료되어야지만 다음 코드들을 이어서 실행할 수 있습니다.
      • 즉, asyncio.create_task()는 await를 지정해주지 않아도 되지만, asyncio.gather()는 await를 지정해주어야합니다. 
        • asyncio.create_task()는 task=asyncio.create_task()로 지정해주고, 나중에 필요할때 await task를 지정해주면 됩니다.
        • import asyncio
          
          async def task1():
              await asyncio.sleep(1)
              return "Task 1 완료"
          
          async def task2():
              await asyncio.sleep(2)
              return "Task 2 완료"
          
          async def main():
              # create_task() 사용
              task1_handle = asyncio.create_task(task1())
          
              # gather() 사용
              results = await asyncio.gather(task1(), task2())
              print(results)  # 모든 태스크 완료 후 결과 출력
          
              result1 = await task1_handle  # 태스크 완료 후 결과 받기
              print(result1)
          
          asyncio.run(main())
  • asyncio.gather은 이미 실행중인 task들을 한번에 await 처리할때도 유용합니다.
    • 아래 코드에서 await running_task1, await running_task2 이런식으로 일일이 하나하나 지정할 필요 없이 gather로 한번에 처리 가능합니다. 
    • import asyncio
      
      
      async def task1():
          print("Task 1 시작")
          await asyncio.sleep(2)
          print("Task 1 완료")
      
      
      async def task2():
          print("Task 2 시작")
          await asyncio.sleep(1)
          print("Task 2 완료")
      
      
      async def main():
          running_task1 = asyncio.create_task(task1())
          running_task2 = asyncio.create_task(task2())
      
          # 이미 실행 중인 두 태스크가 모두 완료될 때까지 기다립니다.
          await asyncio.gather(running_task1, running_task2)
      
          print("완료")
      
      
      asyncio.run(main())

asyncio.create_task() 

  • asyncio.create_task() 에 대해 더 자세히 살펴보도록 하겠습니다.
  • asyncio.create_task()를 통해서 비동기 함수를 실행 가능합니다. 즉, CPU를 사용하지 않는 I/O 위주의 작업(예를 들면 API 호출, 웹소켓 통신)들을 실행시킬수 있습니다.
  • 웹소켓 통신의 경우, 계속해서 데이터를 받아오다가 특정 조건이 되면 해당 조건을 중지시킬수 있어야합니다.
    • 예를 들면, 전종목에 대한 가격 데이터를 계속 받아오면 부담이 될 수 있기 때문에, 조건에 맞는 종목들만 가격 데이터를 받아오도록 설정해야합니다. 이때 조건에 맞으면 가격을 받아오다가, 조건에 맞지 않는 종목일 경우에는 가격을 받아오는 통신을 끊어주어야합니다.
    • 이때 사용하는 것이 cancel()입니다.
    • 해당 함수가 cancel될 때 asyncio.CancelledError를 통해 실행할 코드를 지정해줄수도 있습니다.
      • async def stock_task(stock_code):
            # 장시간 실행되는 작업 (예시)
            try:
                while True:
                    print(f"Task for stock {stock_code} is running")
                    await asyncio.sleep(1)
            except asyncio.CancelledError:
                print(f"Task for stock {stock_code} was cancelled")
                raise
  • 예시
    • 조건에 맞는 종목들에 대해 실시간 가격을 받아오는 웹소켓을 실행했다고 가정해봅니다. 
    • 그러다가 조건에 맞지 않거나, 해당 종목을 거래 완료한경우에 더 이상 가격을 받아올 필요가 없어질 경우 해당 웹소켓통신을 종료해주어야합니다. 이때 cancel을 통해 종료시켜줄수 있습니다.
    • 이를 이제 종목코드를 key 값, 생성된 task를 value로 하는 공유 task 딕셔너리를 통해 관리해주면 손쉽게 관리할 수 있습니다.
      • 참고: 리스트나 딕셔너리를 공유하는 비동기 함수
        • import asyncio
          
          
          # 공유 리스트,딕셔너리를 수정하는 비동기 함수 A
          async def func_a(input_list, input_dict):
              await asyncio.sleep(1)
              input_list.append("A")
              input_dict["A"] = "a"
              print("Func A: Added 'A'")
          
          
          # 공유 리스트,딕셔너리를 수정하는 비동기 함수 B
          async def func_b(input_list, input_dict):
              await asyncio.sleep(2)
              input_list.append("B")
              input_dict["B"] = "b"
              print("Func B: Added 'B'")
          
          
          # 공유 리스트,딕셔너리를 수정하는 비동기 함수 C
          async def func_c(input_list, input_dict):
              await asyncio.sleep(3)
              input_list.append("C")
              input_dict["C"] = "c"
              print("Func C: Added 'C'")
          
          
          async def main():
              shared_list = []
              shared_dict = {}
              # 모든 함수를 동시에 실행
              await asyncio.gather(
                  func_a(shared_list, shared_dict),
                  func_b(shared_list, shared_dict),
                  func_c(shared_list, shared_dict),
              )
          
              # 공유 리스트,딕셔너리의 최종 상태 출력
              print("Shared List:", shared_list)
              print("Shared List:", shared_dict)
          
          
          # 이벤트 루프 실행
          asyncio.run(main())
          
          -->
          Func A: Added 'A'
          Func B: Added 'B'
          Func C: Added 'C'
          Shared List: ['A', 'B', 'C']
          Shared List: {'A': 'a', 'B': 'b', 'C': 'c'}
    • 아래는 10초가 지나면 해당 웹소켓 통신을 취소하는 예시 코드입니다.
      • import asyncio
        import websockets
        import json
        
        async def stock_price_websocket(stock_code, websocket_url, shared_task_dict):
            async with websockets.connect(websocket_url) as websocket:
                # 실시간 가격 데이터를 받아오는 로직
                await websocket.send(json.dumps({"command": "subscribe", "stock_code": stock_code}))
                while True:
                    try:
                        data = await websocket.recv()
                        print(f"Received data for {stock_code}: {data}")
                    except asyncio.CancelledError:
                        print(f"Websocket for {stock_code} cancelled")
                        break
        
        async def main():
            shared_task_dict = {}  # 공유 task 딕셔너리
            websocket_url = "wss://example.com/price"
        
            # 예시 종목들에 대한 웹소켓 연결 생성
            stock_codes = ["005930", "000660", "035420"]
            for stock_code in stock_codes:
                task = asyncio.create_task(stock_price_websocket(stock_code, websocket_url, shared_task_dict))
                shared_task_dict[stock_code] = task
        
            await asyncio.sleep(10)  # 여기서는 예시로 10초간 대기
        
            # 특정 종목의 웹소켓 연결 취소
            if "005930" in shared_task_dict:
                shared_task_dict["005930"].cancel()
        
            # 모든 웹소켓 연결이 완료될 때까지 대기
            await asyncio.gather(*shared_task_dict.values())
        
        asyncio.run(main())
  • 주의할 점
    • 가장 바깥쪽 함수에서는 await를 무조건 해줘야합니다. 그렇지 않으면 그냥 단순히 함수 실행만 하고 완료되는 것을 기달리지 않고 프로그램이 종료되어버릴수가 있습니다.
    • 예시
      • 아래 코드는 가장 바깥쪽 함수인 main()에서 asyncio.create_task를 await 해주지 않아서 print("main 함수 완료")만 print 되고 프로그램이 종료되어버렸습니다.
      • import asyncio
        
        
        async def do_something_after_delay(delay, message):
            await asyncio.sleep(delay)
            print(message)
        
        
        async def main():
            # 태스크를 스케줄링하고 있지만, 완료될 때까지 기다리지 않음
            asyncio.create_task(do_something_after_delay(1, "첫 번째 메시지"))
            asyncio.create_task(do_something_after_delay(2, "두 번째 메시지"))
            print("main 함수 완료")
        
        
        asyncio.run(main())
        -->
        main 함수 완료

asyncio.Lock()

  • 위 설명 중에 하나의 리스트나 딕셔너리에 여러 비동기함수가 접근가능하다고 했습니다. 하지만 이럴경우 동기화 문제가 발생할 수 있습니다.
    • 동기화 문제 예시
      • import asyncio
        
        
        async def func_a(shared_resource, key, value):
            current_value = shared_resource.get(key, 0)
            await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
            shared_resource[key] = current_value + value
        
        
        async def func_b(shared_resource, key, value):
            current_value = shared_resource.get(key, 0)
            await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
            shared_resource[key] = current_value + value
        
        
        async def main():
            shared_resource = {}
            task1 = asyncio.create_task(func_a(shared_resource, "key", 1))
            task2 = asyncio.create_task(func_b(shared_resource, "key", 2))
        
            await task1
            await task2
        
            print(shared_resource)
        
        
        asyncio.run(main())
        -->
        {'key': 2}
      • 원래 결과는 3이 나와야 맞지만, 2가 결과로 나옵니다. 이는 task1과 task2가 비동기적으로 실행되면서 발생한 동기화문제입니다.
  • 이러한 동기화 문제는 asyncio.Lock()을 통해 해결해줄수 있습니다. 공유자원에 하나의 함수가 접근하면, lock을 획득하여 다른 함수가 접근하지 못하도록 설정할수 있습니다. 다른 함수는 그 lock이 해제될때까지 기다려야합니다.
    • import asyncio
      
      
      async def func_a(shared_resource, key, value, lock):
          async with lock:
              current_value = shared_resource.get(key, 0)
              await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
              shared_resource[key] = current_value + value
      
      
      async def func_b(shared_resource, key, value, lock):
          async with lock:
              current_value = shared_resource.get(key, 0)
              await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
              shared_resource[key] = current_value + value
      
      
      async def main():
          shared_resource = {}
          lock = asyncio.Lock()
          task1 = asyncio.create_task(func_a(shared_resource, "key", 1, lock))
          task2 = asyncio.create_task(func_b(shared_resource, "key", 2, lock))
      
          await task1
          await task2
      
          print(shared_resource)
      
      
      asyncio.run(main())
      
      -->
      {'key': 3}
    • func_a가 먼저 lock을 획득하면, func_a의 코드블록이 완전히 실행(정확히는 async with lock:으로 감싼부분)되고 lock이 해제될 때까지 func_b는 대기 상태가 됩니다. 반대로 func_b가 먼저 lock을 획득하면 func_a는 func_b의 실행(정확히는 async with lock:으로 감싼부분)이 완료될때까지 대기합니다.
  • 이러한 방식으로 asyncio.Lock은 공유 자원에 대한 동시 접근을 제어하며, 예측 가능하고 안전한 데이터 변경을 보장합니다.

asyncio.Event()

  • asyncio.Event는 비동기 프로그래밍에서 이벤트를 관리하는데 사용되는 도구입니다. asyncio.Event 객체는 내부적으로 set 또는 clear 상태를 가질 수 있으며, 이를 통해 여러 태스크 간의 동기화를 제어합니다.
    • event.set(): 이벤트를 'set' 상태로 만듭니다. 이 상태에서 await event.wait()를 호출하는 모든 태스크는 즉시 진행됩니다.
    • event.clear(): 이벤트를 'clear' 상태로 만듭니다. 이 상태에서 await event.wait()를 호출하면 이벤트가 'set' 상태가 될 때까지 태스크가 대기합니다.
    • await event.wait(): 이벤트가 'set' 상태가 될 때까지 현재 태스크를 대기시킵니다. 이벤트가 이미 'set' 상태라면 즉시 진행됩니다.
  • 예시
    • 특정 조건에 맞는 종목들을 검색하는 웹소켓에서 데이터가 검출될때, 특정 함수를 실행해주고 싶은 경우에 유용하게 사용할 수 있습니다.
    • 코드
      • import asyncio
        import json
        import websockets
        
        async def realtime_stock_search(event, condition_list):
            uri = "wss://example.com/stock-search"
            async with websockets.connect(uri) as websocket:
                while True:
                    data = await websocket.recv()
                    stock_data = json.loads(data)
                    condition_list.append(stock_data)  # 데이터가 들어오면 condition_list에 추가
                    event.set()  # 이벤트 설정하여 condition_list에 데이터가 추가되었다고 다른 함수에 알림
        
        
        async def process_stock_data(event, condition_list):
            while True:
                await event.wait()  # 이벤트가 설정될 때까지 대기
                stock = condition_list[-1]  # 처리할 주식 데이터 가져오기
                print(f"Processing stock: {stock}")  # 주식 데이터 처리
                event.clear()  # 이벤트 초기화
        
        async def main():
            event = asyncio.Event()
            condition_list = []
        
            # 두 함수 동시에 실행
            await asyncio.gather(
                realtime_stock_search(event, condition_list),
                process_stock_data(event, condition_list)
            )
        
        asyncio.run(main())
    • 하지만 위처럼 그냥 단순하게 append 되었을때 신호를 보내고, 마지막 종목에 대해서만 처리를 하게된다면 process_stock_data가 처리되는 동안(즉, event가 set된 상태,event가 clear되지 않은 상태)에 데이터가 들어오게 된다면, 그 데이터처리는 되지 않을 것입니다. 따라서 다음과 같이 이전 길이를 기억해두었다가, 처리해주어야지 놓치는 데이터 없이 처리가 가능합니다.
      • import asyncio
        import json
        import websockets
        
        async def realtime_stock_search(event, condition_list):
            uri = "wss://example.com/stock-search"
            async with websockets.connect(uri) as websocket:
                while True:
                    data = await websocket.recv()
                    stock_data = json.loads(data)
                    condition_list.append(stock_data)  # 데이터가 들어오면 condition_list에 추가
                    event.set()  # 이벤트 설정하여 condition_list에 데이터가 추가되었다고 다른 함수에 알림
        
        async def process_stock_data(event, condition_list):
            prev_length = 0
            while True:
                await event.wait()  # 이벤트가 설정될 때까지 대기
                new_length = len(condition_list)
                if new_length > prev_length:
                    new_items = condition_list[prev_length:]
                    for stock in new_items:
                        print(f"Processing stock: {stock}")  # 새로운 주식 데이터 처리
                    prev_length = new_length
                event.clear()  # 이벤트 초기화
        
        async def main():
            event = asyncio.Event()
            condition_list = []
        
            # 두 함수 동시에 실행
            await asyncio.gather(
                realtime_stock_search(event, condition_list),
                process_stock_data(event, condition_list)
            )
        
        asyncio.run(main())

asyncio.Queue()

    • asyncio.Queue는 비동기 프로그래밍에서 사용되는 FIFO(First-In-First-Out) 큐입니다.
      • asyncio.Queue(maxsize=0)를 통해서 queue를 생성할 수 있습니다.
        • maxsize 입력인자를 따로 입력하지 않을 경우 0으로 지정됩니다.
        • maxsize가 0이하면 큐의 크기는 무한입니다. 
        • maxsize가 0보다 큰 정수일 경우, maxsize에 도달하면 await put()은 큐에서 항목이 제거될 때까지(get()에 의해 큐에서 값이꺼내질때까지) 대기합니다.
      • empty() : 큐가 비어 있으면 True를 반환하고, 그렇지 않으면 False를 반환합니다.
      • full() : 큐가 maxsize 항목으로 가득 차 있으면 True를 반환합니다. maxsize=0으로 초기화된 경우 full()은 항상 False를 반환합니다.
      • qsize() : 큐에 있는 항목의 수를 반환합니다.
      • get() : 큐에서 항목을 제거하고 반환합니다. 큐가 비어 있다면 항목이 사용 가능해질때까지 대기합니다. (await 사용)
      • get_nowait() : 큐에서 즉시 사용 가능한 항목을 반환합니다. 사용 가능한 항목이 없으면 (QueueEmpty 예외 발생, await 사용 안함)
      • put(item) : 큐에 항목을 추가합니다. 큐가 가득찼다면, 빈 슬롯이 생길 때까지 대기합니다. (await 사용)
      • put_nowait(item) : 큐에 항목을 추가하되, 대기하지 않고 즉시 수행합니다. 즉시 빈 슬롯이 없으면 QueueFull 예외를 발생시킵니다.
      • join() : 큐의 모든 항목이 처리될 때까지 대기합니다. 항목이 큐에 추가될 때마다 미완료 작업의 수가 증가하고, 소비자 코루틴이 task_done()을 호출해 작업이 완료되었음을 알릴 때마다 감소합니다. 미완료 작업 수가 0이 되면 join()은 더이상 대기하지 않습니다.
      • task_done() : 큐에서 가져온 작업이 완료되었음을 나타냅니다. 큐의 소비자에 의해 사용됩니다. get()으로 작업을 가져온 후, task_done()을 호출하면 해당 작업에 대한 처리가 완료되었음을 큐에 알립니다. join()이 현재 대기 중이라면, 큐에 추가된 모든 항목이 처리되었을때 (즉, 큐에 put()된 모든 항목에 대해 task_done() 호출이 수신되었을 때) join()이 재개됩니다.
    • 예시
      • 기본적인 put과 get을 사용하는 경우
        • import asyncio
          
          
          async def producer(queue):
              # 큐에 데이터 추가
              for i in range(5):
                  await queue.put(i)
                  print(f"Produced: {i}")
                  await asyncio.sleep(1)
          
          
          async def consumer(queue):
              # 큐에서 데이터 가져오기
              while True:
                  item = await queue.get()
                  print(f"Consumed: {item}")
                  queue.task_done()
          
          
          async def main():
              queue = asyncio.Queue()
              producer_task = asyncio.create_task(producer(queue))
              consumer_task = asyncio.create_task(consumer(queue))
          
              # 모든 데이터가 처리될 때까지 대기
              await producer_task
              await queue.join()  # 모든 항목이 처리될 때까지 대기
              consumer_task.cancel()  # 소비자 작업 취소
          
          
          asyncio.run(main())
      • put_nowait()와 get_nowait() 사용
        • import asyncio
          
          async def main():
              queue = asyncio.Queue()
          
              # 큐에 데이터를 즉시 넣기
              try:
                  queue.put_nowait("Hello")
                  queue.put_nowait("World")
              except asyncio.QueueFull:
                  print("Queue is full")
          
              # 큐에서 데이터를 즉시 가져오기
              try:
                  print(queue.get_nowait())
                  print(queue.get_nowait())
              except asyncio.QueueEmpty:
                  print("Queue is empty")
          
          asyncio.run(main())
      • put_nowait()와 get()을 사용 (데이터는 누락되면 안되기때문에 put_nowait로 바로바로 데이터를 넣어줍니다.)
        • import asyncio
          
          
          async def producer(queue):
              # 큐에 데이터를 즉시 넣기
              for i in range(5):
                  try:
                      queue.put_nowait(i)
                      print(f"Produced: {i}")
                  except asyncio.QueueFull:
                      print("Queue is full")
                  await asyncio.sleep(1)  # 다음 아이템을 생성하기 전에 잠시 대기
          
          
          async def consumer(queue):
              # 큐에서 데이터를 비동기적으로 가져오기
              while True:
                  item = await queue.get()
                  print(f"Consumed: {item}")
                  queue.task_done()
          
          
          async def main():
              queue = asyncio.Queue(maxsize=2)  # 최대 크기를 2로 설정
          
              producer_task = asyncio.create_task(producer(queue))
              consumer_task = asyncio.create_task(consumer(queue))
          
              # 모든 데이터가 처리될 때까지 대기
              await producer_task
              await queue.join()  # 모든 항목이 처리될 때까지 대기
              consumer_task.cancel()  # 소비자 작업 취소
          
          
          asyncio.run(main())
      • 여러 프로듀서와 컨슈머
        • import asyncio
          
          
          async def producer(queue, name):
              for i in range(3):
                  await queue.put((name, i))
                  print(f"Producer {name}: Added item {i}")
                  await asyncio.sleep(1)
          
          
          async def consumer(queue, name):
              while True:
                  producer_name, item = await queue.get()
                  print(f"Consumer {name}: Processed item {item} from {producer_name}")
                  queue.task_done()
          
          
          async def main():
              queue = asyncio.Queue()
          
              # 여러 프로듀서와 컨슈머 생성
              producers = [
                  asyncio.create_task(producer(queue, f"Producer {i}")) for i in range(2)
              ]
              consumers = [
                  asyncio.create_task(consumer(queue, f"Consumer {i}")) for i in range(2)
              ]
          
              # 모든 프로듀서가 작업을 완료할 때까지 대기
              await asyncio.gather(*producers)
              await queue.join()  # 모든 항목이 처리될 때까지 대기
          
              # 모든 컨슈머 작업 취소
              for c in consumers:
                  c.cancel()
          
          
          asyncio.run(main())
  • 그렇다면 이 queue를 자동매매에서 언제 사용할 수 있을까요? 위에서 봤듯이 asyncio.Event를 사용할 경우, 데이터를 처리하는 부분에서 오래 걸리게 되면은 clear()가 늦게 적용되어서 데이터가 누락될 수 있었습니다. 이를 리스트 슬라이싱이 아닌 queue를 통해서 처리를 해줄수 있습니다.
    • import asyncio
      import json
      import websockets
      
      async def realtime_stock_search(queue):
          uri = "wss://example.com/stock-search"
          async with websockets.connect(uri) as websocket:
              while True:
                  data = await websocket.recv()
                  stock_data = json.loads(data)
                  await queue.put(stock_data)  # 데이터가 들어오면 큐에 추가
      
      async def process_stock_data(queue):
          while True:
              stock = await queue.get()  # 큐에서 주식 데이터 가져오기
              print(f"Processing stock: {stock}")  # 주식 데이터 처리
              queue.task_done()
      
      async def main():
          queue = asyncio.Queue()
      
          # 두 함수 동시에 실행
          await asyncio.gather(
              realtime_stock_search(queue),
              process_stock_data(queue)
          )
      
      asyncio.run(main())

CPU위주의 작업 처리 (asyncio.get_running_loop())

  • 비동기 프로그래밍을 할 때 주의할 점은 CPU 위주의 작업을 잘 처리해주어야 한다는 것입니다. 비동기적으로 코드를 잘 작성해두었지만, 만약 그 중간에 CPU 위주의 작업이 존재할 경우, 다른 IO 처리를 진행하는 비동기 코드들이 멈춰버리게 됩니다. 이럴 경우 원하는대로 로직이 동작하지 않을 수 있습니다.
  • 이를 위해 사용할 수 있는 것이 별도의 스레드나 프로세스를 사용하는 것입니다. asyncio는 loop.run_in_executor() 메소드를 사용하여 CPU 집약적인 작업을 별도의 스레드나 프로세스에서 비동기적으로 실행할 수 있습니다. 이렇게 할 경우 CPU 작업이 별도의 실행 컨텍스트에서 처리되므로, 메인 이벤트 루프는 I/O 작업을 계속 처리할 수 있습니다.
  • 예시 코드
    • import asyncio
      import concurrent.futures
      import websockets
      import json
      
      async def websocket_receive_data(websocket_url):
          async with websockets.connect(websocket_url) as websocket:
              while True:
                  data = await websocket.recv()
                  # 받아온 데이터를 처리하는 함수에 전달
                  asyncio.create_task(process_data(json.loads(data)))
      
      async def process_data(data):
          # CPU 집약적인 데이터 처리 작업을 별도의 스레드에서 실행
          loop = asyncio.get_running_loop()
          with concurrent.futures.ThreadPoolExecutor() as pool:
              result = await loop.run_in_executor(pool, cpu_intensive_task, data)
              print(f"Processed data: {result}")
      
      def cpu_intensive_task(data):
          # CPU 집약적인 데이터 처리 로직
          # 예: 데이터 분석, 복잡한 계산 등
          return data  # 처리 결과 반환
      
      async def main():
          websocket_url = "wss://example.com/your_websocket"
          await websocket_receive_data(websocket_url)
      
      asyncio.run(main())
    • 별도의 프로세스를 사용하고 싶은 경우는 concurrent.futures.ThreadPoolExecutor()대신 concurrent.futures.ProcessPoolExecutor()를 사용하면 됩니다.
  • 참고 : 비동기 프로그래밍 vs 멀티스레드 vs 멀티프로세스