본문 바로가기
Algorithm Trading/ComDon 프로그램 개발이야기

9. 비동기 프로그래밍

by 컴돈AI 2024. 1. 28.

목차

    비동기 프로그래밍

    • 자동매매 프로젝트의 경우 정확도가 중요합니다. 비동기적으로 처리하면서도, 로직이 꼬이지 않기 위해서 많은 시행착오를 겪었습니다. 그 과정 속에서 배워나갔던 점들을 하나하나 정리해보도록하겠습니다.

    asyncio.create_task() vs asyncio.gather()

    • asyncio.create_task()와 asyncio.gather()은 async로 작성된 비동기함수를 실행할때 사용합니다. 깊게 들어가면 더 많은 차이점이 있지만, 우선은 큼직한 틀에서 차이점을 살펴보도록하겠습니다. 
    • asyncio.create_task() vs asyncio.gather()
      • asyncio.create_task()와 asyncio.gather()는 모두 async로 작성된 비동기 함수를 실행할 때 사용할 수 있습니다. 
      • 단, asyncio.create_task()는 하나의 비동기함수를 실행하지만 asyncio.gahter()는 두개 이상의 비동기함수를 한번에 실행시킬수 있습니다.
        • 참고 : asyncio.create_task()도 다음과 같이 여러개의 비동기 함수를 한 번에 실행시킬 수 있습니다.
          • import asyncio
            
            async def task1():
                print("Task 1 시작")
                await asyncio.sleep(1)
                print("Task 1 완료")
                return "Task 1 결과"
            
            async def task2():
                print("Task 2 시작")
                await asyncio.sleep(2)
                print("Task 2 완료")
                return "Task 2 결과"
            
            async def main():
                # 여러 Task 생성
                task1_handle = asyncio.create_task(task1())
                task2_handle = asyncio.create_task(task2())
            
                # 모든 Task가 완료될 때까지 기다림
                await task1_handle
                await task2_handle
            
            asyncio.run(main())
      • 추가적으로 asyncio.create_task()는 task를 실행한 뒤에 다음 코드들을 이어서 실행할 수 있지만, asyncio.gather()는 gather()안에 있는 모든 비동기함수들이 종료되어야지만 다음 코드들을 이어서 실행할 수 있습니다.
        • 즉, asyncio.create_task()는 await를 지정해주지 않아도 되지만, asyncio.gather()는 await를 지정해주어야합니다. 
          • asyncio.create_task()는 task=asyncio.create_task()로 지정해주고, 나중에 필요할때 await task를 지정해주면 됩니다.
          • import asyncio
            
            async def task1():
                await asyncio.sleep(1)
                return "Task 1 완료"
            
            async def task2():
                await asyncio.sleep(2)
                return "Task 2 완료"
            
            async def main():
                # create_task() 사용
                task1_handle = asyncio.create_task(task1())
            
                # gather() 사용
                results = await asyncio.gather(task1(), task2())
                print(results)  # 모든 태스크 완료 후 결과 출력
            
                result1 = await task1_handle  # 태스크 완료 후 결과 받기
                print(result1)
            
            asyncio.run(main())
    • asyncio.gather은 이미 실행중인 task들을 한번에 await 처리할때도 유용합니다.
      • 아래 코드에서 await running_task1, await running_task2 이런식으로 일일이 하나하나 지정할 필요 없이 gather로 한번에 처리 가능합니다. 
      • import asyncio
        
        
        async def task1():
            print("Task 1 시작")
            await asyncio.sleep(2)
            print("Task 1 완료")
        
        
        async def task2():
            print("Task 2 시작")
            await asyncio.sleep(1)
            print("Task 2 완료")
        
        
        async def main():
            running_task1 = asyncio.create_task(task1())
            running_task2 = asyncio.create_task(task2())
        
            # 이미 실행 중인 두 태스크가 모두 완료될 때까지 기다립니다.
            await asyncio.gather(running_task1, running_task2)
        
            print("완료")
        
        
        asyncio.run(main())

    asyncio.create_task() 

    • asyncio.create_task() 에 대해 더 자세히 살펴보도록 하겠습니다.
    • asyncio.create_task()를 통해서 비동기 함수를 실행 가능합니다. 즉, CPU를 사용하지 않는 I/O 위주의 작업(예를 들면 API 호출, 웹소켓 통신)들을 실행시킬수 있습니다.
    • 웹소켓 통신의 경우, 계속해서 데이터를 받아오다가 특정 조건이 되면 해당 조건을 중지시킬수 있어야합니다.
      • 예를 들면, 전종목에 대한 가격 데이터를 계속 받아오면 부담이 될 수 있기 때문에, 조건에 맞는 종목들만 가격 데이터를 받아오도록 설정해야합니다. 이때 조건에 맞으면 가격을 받아오다가, 조건에 맞지 않는 종목일 경우에는 가격을 받아오는 통신을 끊어주어야합니다.
      • 이때 사용하는 것이 cancel()입니다.
      • 해당 함수가 cancel될 때 asyncio.CancelledError를 통해 실행할 코드를 지정해줄수도 있습니다.
        • async def stock_task(stock_code):
              # 장시간 실행되는 작업 (예시)
              try:
                  while True:
                      print(f"Task for stock {stock_code} is running")
                      await asyncio.sleep(1)
              except asyncio.CancelledError:
                  print(f"Task for stock {stock_code} was cancelled")
                  raise
    • 예시
      • 조건에 맞는 종목들에 대해 실시간 가격을 받아오는 웹소켓을 실행했다고 가정해봅니다. 
      • 그러다가 조건에 맞지 않거나, 해당 종목을 거래 완료한경우에 더 이상 가격을 받아올 필요가 없어질 경우 해당 웹소켓통신을 종료해주어야합니다. 이때 cancel을 통해 종료시켜줄수 있습니다.
      • 이를 이제 종목코드를 key 값, 생성된 task를 value로 하는 공유 task 딕셔너리를 통해 관리해주면 손쉽게 관리할 수 있습니다.
        • 참고: 리스트나 딕셔너리를 공유하는 비동기 함수
          • import asyncio
            
            
            # 공유 리스트,딕셔너리를 수정하는 비동기 함수 A
            async def func_a(input_list, input_dict):
                await asyncio.sleep(1)
                input_list.append("A")
                input_dict["A"] = "a"
                print("Func A: Added 'A'")
            
            
            # 공유 리스트,딕셔너리를 수정하는 비동기 함수 B
            async def func_b(input_list, input_dict):
                await asyncio.sleep(2)
                input_list.append("B")
                input_dict["B"] = "b"
                print("Func B: Added 'B'")
            
            
            # 공유 리스트,딕셔너리를 수정하는 비동기 함수 C
            async def func_c(input_list, input_dict):
                await asyncio.sleep(3)
                input_list.append("C")
                input_dict["C"] = "c"
                print("Func C: Added 'C'")
            
            
            async def main():
                shared_list = []
                shared_dict = {}
                # 모든 함수를 동시에 실행
                await asyncio.gather(
                    func_a(shared_list, shared_dict),
                    func_b(shared_list, shared_dict),
                    func_c(shared_list, shared_dict),
                )
            
                # 공유 리스트,딕셔너리의 최종 상태 출력
                print("Shared List:", shared_list)
                print("Shared List:", shared_dict)
            
            
            # 이벤트 루프 실행
            asyncio.run(main())
            
            -->
            Func A: Added 'A'
            Func B: Added 'B'
            Func C: Added 'C'
            Shared List: ['A', 'B', 'C']
            Shared List: {'A': 'a', 'B': 'b', 'C': 'c'}
      • 아래는 10초가 지나면 해당 웹소켓 통신을 취소하는 예시 코드입니다.
        • import asyncio
          import websockets
          import json
          
          async def stock_price_websocket(stock_code, websocket_url, shared_task_dict):
              async with websockets.connect(websocket_url) as websocket:
                  # 실시간 가격 데이터를 받아오는 로직
                  await websocket.send(json.dumps({"command": "subscribe", "stock_code": stock_code}))
                  while True:
                      try:
                          data = await websocket.recv()
                          print(f"Received data for {stock_code}: {data}")
                      except asyncio.CancelledError:
                          print(f"Websocket for {stock_code} cancelled")
                          break
          
          async def main():
              shared_task_dict = {}  # 공유 task 딕셔너리
              websocket_url = "wss://example.com/price"
          
              # 예시 종목들에 대한 웹소켓 연결 생성
              stock_codes = ["005930", "000660", "035420"]
              for stock_code in stock_codes:
                  task = asyncio.create_task(stock_price_websocket(stock_code, websocket_url, shared_task_dict))
                  shared_task_dict[stock_code] = task
          
              await asyncio.sleep(10)  # 여기서는 예시로 10초간 대기
          
              # 특정 종목의 웹소켓 연결 취소
              if "005930" in shared_task_dict:
                  shared_task_dict["005930"].cancel()
          
              # 모든 웹소켓 연결이 완료될 때까지 대기
              await asyncio.gather(*shared_task_dict.values())
          
          asyncio.run(main())
    • 주의할 점
      • 가장 바깥쪽 함수에서는 await를 무조건 해줘야합니다. 그렇지 않으면 그냥 단순히 함수 실행만 하고 완료되는 것을 기달리지 않고 프로그램이 종료되어버릴수가 있습니다.
      • 예시
        • 아래 코드는 가장 바깥쪽 함수인 main()에서 asyncio.create_task를 await 해주지 않아서 print("main 함수 완료")만 print 되고 프로그램이 종료되어버렸습니다.
        • import asyncio
          
          
          async def do_something_after_delay(delay, message):
              await asyncio.sleep(delay)
              print(message)
          
          
          async def main():
              # 태스크를 스케줄링하고 있지만, 완료될 때까지 기다리지 않음
              asyncio.create_task(do_something_after_delay(1, "첫 번째 메시지"))
              asyncio.create_task(do_something_after_delay(2, "두 번째 메시지"))
              print("main 함수 완료")
          
          
          asyncio.run(main())
          -->
          main 함수 완료

    asyncio.Lock()

    • 위 설명 중에 하나의 리스트나 딕셔너리에 여러 비동기함수가 접근가능하다고 했습니다. 하지만 이럴경우 동기화 문제가 발생할 수 있습니다.
      • 동기화 문제 예시
        • import asyncio
          
          
          async def func_a(shared_resource, key, value):
              current_value = shared_resource.get(key, 0)
              await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
              shared_resource[key] = current_value + value
          
          
          async def func_b(shared_resource, key, value):
              current_value = shared_resource.get(key, 0)
              await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
              shared_resource[key] = current_value + value
          
          
          async def main():
              shared_resource = {}
              task1 = asyncio.create_task(func_a(shared_resource, "key", 1))
              task2 = asyncio.create_task(func_b(shared_resource, "key", 2))
          
              await task1
              await task2
          
              print(shared_resource)
          
          
          asyncio.run(main())
          -->
          {'key': 2}
        • 원래 결과는 3이 나와야 맞지만, 2가 결과로 나옵니다. 이는 task1과 task2가 비동기적으로 실행되면서 발생한 동기화문제입니다.
    • 이러한 동기화 문제는 asyncio.Lock()을 통해 해결해줄수 있습니다. 공유자원에 하나의 함수가 접근하면, lock을 획득하여 다른 함수가 접근하지 못하도록 설정할수 있습니다. 다른 함수는 그 lock이 해제될때까지 기다려야합니다.
      • import asyncio
        
        
        async def func_a(shared_resource, key, value, lock):
            async with lock:
                current_value = shared_resource.get(key, 0)
                await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
                shared_resource[key] = current_value + value
        
        
        async def func_b(shared_resource, key, value, lock):
            async with lock:
                current_value = shared_resource.get(key, 0)
                await asyncio.sleep(0.1)  # 다른 코루틴이 실행될 기회 제공
                shared_resource[key] = current_value + value
        
        
        async def main():
            shared_resource = {}
            lock = asyncio.Lock()
            task1 = asyncio.create_task(func_a(shared_resource, "key", 1, lock))
            task2 = asyncio.create_task(func_b(shared_resource, "key", 2, lock))
        
            await task1
            await task2
        
            print(shared_resource)
        
        
        asyncio.run(main())
        
        -->
        {'key': 3}
      • func_a가 먼저 lock을 획득하면, func_a의 코드블록이 완전히 실행(정확히는 async with lock:으로 감싼부분)되고 lock이 해제될 때까지 func_b는 대기 상태가 됩니다. 반대로 func_b가 먼저 lock을 획득하면 func_a는 func_b의 실행(정확히는 async with lock:으로 감싼부분)이 완료될때까지 대기합니다.
    • 이러한 방식으로 asyncio.Lock은 공유 자원에 대한 동시 접근을 제어하며, 예측 가능하고 안전한 데이터 변경을 보장합니다.

    asyncio.Event()

    • asyncio.Event는 비동기 프로그래밍에서 이벤트를 관리하는데 사용되는 도구입니다. asyncio.Event 객체는 내부적으로 set 또는 clear 상태를 가질 수 있으며, 이를 통해 여러 태스크 간의 동기화를 제어합니다.
      • event.set(): 이벤트를 'set' 상태로 만듭니다. 이 상태에서 await event.wait()를 호출하는 모든 태스크는 즉시 진행됩니다.
      • event.clear(): 이벤트를 'clear' 상태로 만듭니다. 이 상태에서 await event.wait()를 호출하면 이벤트가 'set' 상태가 될 때까지 태스크가 대기합니다.
      • await event.wait(): 이벤트가 'set' 상태가 될 때까지 현재 태스크를 대기시킵니다. 이벤트가 이미 'set' 상태라면 즉시 진행됩니다.
    • 예시
      • 특정 조건에 맞는 종목들을 검색하는 웹소켓에서 데이터가 검출될때, 특정 함수를 실행해주고 싶은 경우에 유용하게 사용할 수 있습니다.
      • 코드
        • import asyncio
          import json
          import websockets
          
          async def realtime_stock_search(event, condition_list):
              uri = "wss://example.com/stock-search"
              async with websockets.connect(uri) as websocket:
                  while True:
                      data = await websocket.recv()
                      stock_data = json.loads(data)
                      condition_list.append(stock_data)  # 데이터가 들어오면 condition_list에 추가
                      event.set()  # 이벤트 설정하여 condition_list에 데이터가 추가되었다고 다른 함수에 알림
          
          
          async def process_stock_data(event, condition_list):
              while True:
                  await event.wait()  # 이벤트가 설정될 때까지 대기
                  stock = condition_list[-1]  # 처리할 주식 데이터 가져오기
                  print(f"Processing stock: {stock}")  # 주식 데이터 처리
                  event.clear()  # 이벤트 초기화
          
          async def main():
              event = asyncio.Event()
              condition_list = []
          
              # 두 함수 동시에 실행
              await asyncio.gather(
                  realtime_stock_search(event, condition_list),
                  process_stock_data(event, condition_list)
              )
          
          asyncio.run(main())
      • 하지만 위처럼 그냥 단순하게 append 되었을때 신호를 보내고, 마지막 종목에 대해서만 처리를 하게된다면 process_stock_data가 처리되는 동안(즉, event가 set된 상태,event가 clear되지 않은 상태)에 데이터가 들어오게 된다면, 그 데이터처리는 되지 않을 것입니다. 따라서 다음과 같이 이전 길이를 기억해두었다가, 처리해주어야지 놓치는 데이터 없이 처리가 가능합니다.
        • import asyncio
          import json
          import websockets
          
          async def realtime_stock_search(event, condition_list):
              uri = "wss://example.com/stock-search"
              async with websockets.connect(uri) as websocket:
                  while True:
                      data = await websocket.recv()
                      stock_data = json.loads(data)
                      condition_list.append(stock_data)  # 데이터가 들어오면 condition_list에 추가
                      event.set()  # 이벤트 설정하여 condition_list에 데이터가 추가되었다고 다른 함수에 알림
          
          async def process_stock_data(event, condition_list):
              prev_length = 0
              while True:
                  await event.wait()  # 이벤트가 설정될 때까지 대기
                  new_length = len(condition_list)
                  if new_length > prev_length:
                      new_items = condition_list[prev_length:]
                      for stock in new_items:
                          print(f"Processing stock: {stock}")  # 새로운 주식 데이터 처리
                      prev_length = new_length
                  event.clear()  # 이벤트 초기화
          
          async def main():
              event = asyncio.Event()
              condition_list = []
          
              # 두 함수 동시에 실행
              await asyncio.gather(
                  realtime_stock_search(event, condition_list),
                  process_stock_data(event, condition_list)
              )
          
          asyncio.run(main())

    asyncio.Queue()

      • asyncio.Queue는 비동기 프로그래밍에서 사용되는 FIFO(First-In-First-Out) 큐입니다.
        • asyncio.Queue(maxsize=0)를 통해서 queue를 생성할 수 있습니다.
          • maxsize 입력인자를 따로 입력하지 않을 경우 0으로 지정됩니다.
          • maxsize가 0이하면 큐의 크기는 무한입니다. 
          • maxsize가 0보다 큰 정수일 경우, maxsize에 도달하면 await put()은 큐에서 항목이 제거될 때까지(get()에 의해 큐에서 값이꺼내질때까지) 대기합니다.
        • empty() : 큐가 비어 있으면 True를 반환하고, 그렇지 않으면 False를 반환합니다.
        • full() : 큐가 maxsize 항목으로 가득 차 있으면 True를 반환합니다. maxsize=0으로 초기화된 경우 full()은 항상 False를 반환합니다.
        • qsize() : 큐에 있는 항목의 수를 반환합니다.
        • get() : 큐에서 항목을 제거하고 반환합니다. 큐가 비어 있다면 항목이 사용 가능해질때까지 대기합니다. (await 사용)
        • get_nowait() : 큐에서 즉시 사용 가능한 항목을 반환합니다. 사용 가능한 항목이 없으면 (QueueEmpty 예외 발생, await 사용 안함)
        • put(item) : 큐에 항목을 추가합니다. 큐가 가득찼다면, 빈 슬롯이 생길 때까지 대기합니다. (await 사용)
        • put_nowait(item) : 큐에 항목을 추가하되, 대기하지 않고 즉시 수행합니다. 즉시 빈 슬롯이 없으면 QueueFull 예외를 발생시킵니다.
        • join() : 큐의 모든 항목이 처리될 때까지 대기합니다. 항목이 큐에 추가될 때마다 미완료 작업의 수가 증가하고, 소비자 코루틴이 task_done()을 호출해 작업이 완료되었음을 알릴 때마다 감소합니다. 미완료 작업 수가 0이 되면 join()은 더이상 대기하지 않습니다.
        • task_done() : 큐에서 가져온 작업이 완료되었음을 나타냅니다. 큐의 소비자에 의해 사용됩니다. get()으로 작업을 가져온 후, task_done()을 호출하면 해당 작업에 대한 처리가 완료되었음을 큐에 알립니다. join()이 현재 대기 중이라면, 큐에 추가된 모든 항목이 처리되었을때 (즉, 큐에 put()된 모든 항목에 대해 task_done() 호출이 수신되었을 때) join()이 재개됩니다.
      • 예시
        • 기본적인 put과 get을 사용하는 경우
          • import asyncio
            
            
            async def producer(queue):
                # 큐에 데이터 추가
                for i in range(5):
                    await queue.put(i)
                    print(f"Produced: {i}")
                    await asyncio.sleep(1)
            
            
            async def consumer(queue):
                # 큐에서 데이터 가져오기
                while True:
                    item = await queue.get()
                    print(f"Consumed: {item}")
                    queue.task_done()
            
            
            async def main():
                queue = asyncio.Queue()
                producer_task = asyncio.create_task(producer(queue))
                consumer_task = asyncio.create_task(consumer(queue))
            
                # 모든 데이터가 처리될 때까지 대기
                await producer_task
                await queue.join()  # 모든 항목이 처리될 때까지 대기
                consumer_task.cancel()  # 소비자 작업 취소
            
            
            asyncio.run(main())
        • put_nowait()와 get_nowait() 사용
          • import asyncio
            
            async def main():
                queue = asyncio.Queue()
            
                # 큐에 데이터를 즉시 넣기
                try:
                    queue.put_nowait("Hello")
                    queue.put_nowait("World")
                except asyncio.QueueFull:
                    print("Queue is full")
            
                # 큐에서 데이터를 즉시 가져오기
                try:
                    print(queue.get_nowait())
                    print(queue.get_nowait())
                except asyncio.QueueEmpty:
                    print("Queue is empty")
            
            asyncio.run(main())
        • put_nowait()와 get()을 사용 (데이터는 누락되면 안되기때문에 put_nowait로 바로바로 데이터를 넣어줍니다.)
          • import asyncio
            
            
            async def producer(queue):
                # 큐에 데이터를 즉시 넣기
                for i in range(5):
                    try:
                        queue.put_nowait(i)
                        print(f"Produced: {i}")
                    except asyncio.QueueFull:
                        print("Queue is full")
                    await asyncio.sleep(1)  # 다음 아이템을 생성하기 전에 잠시 대기
            
            
            async def consumer(queue):
                # 큐에서 데이터를 비동기적으로 가져오기
                while True:
                    item = await queue.get()
                    print(f"Consumed: {item}")
                    queue.task_done()
            
            
            async def main():
                queue = asyncio.Queue(maxsize=2)  # 최대 크기를 2로 설정
            
                producer_task = asyncio.create_task(producer(queue))
                consumer_task = asyncio.create_task(consumer(queue))
            
                # 모든 데이터가 처리될 때까지 대기
                await producer_task
                await queue.join()  # 모든 항목이 처리될 때까지 대기
                consumer_task.cancel()  # 소비자 작업 취소
            
            
            asyncio.run(main())
        • 여러 프로듀서와 컨슈머
          • import asyncio
            
            
            async def producer(queue, name):
                for i in range(3):
                    await queue.put((name, i))
                    print(f"Producer {name}: Added item {i}")
                    await asyncio.sleep(1)
            
            
            async def consumer(queue, name):
                while True:
                    producer_name, item = await queue.get()
                    print(f"Consumer {name}: Processed item {item} from {producer_name}")
                    queue.task_done()
            
            
            async def main():
                queue = asyncio.Queue()
            
                # 여러 프로듀서와 컨슈머 생성
                producers = [
                    asyncio.create_task(producer(queue, f"Producer {i}")) for i in range(2)
                ]
                consumers = [
                    asyncio.create_task(consumer(queue, f"Consumer {i}")) for i in range(2)
                ]
            
                # 모든 프로듀서가 작업을 완료할 때까지 대기
                await asyncio.gather(*producers)
                await queue.join()  # 모든 항목이 처리될 때까지 대기
            
                # 모든 컨슈머 작업 취소
                for c in consumers:
                    c.cancel()
            
            
            asyncio.run(main())
    • 그렇다면 이 queue를 자동매매에서 언제 사용할 수 있을까요? 위에서 봤듯이 asyncio.Event를 사용할 경우, 데이터를 처리하는 부분에서 오래 걸리게 되면은 clear()가 늦게 적용되어서 데이터가 누락될 수 있었습니다. 이를 리스트 슬라이싱이 아닌 queue를 통해서 처리를 해줄수 있습니다.
      • import asyncio
        import json
        import websockets
        
        async def realtime_stock_search(queue):
            uri = "wss://example.com/stock-search"
            async with websockets.connect(uri) as websocket:
                while True:
                    data = await websocket.recv()
                    stock_data = json.loads(data)
                    await queue.put(stock_data)  # 데이터가 들어오면 큐에 추가
        
        async def process_stock_data(queue):
            while True:
                stock = await queue.get()  # 큐에서 주식 데이터 가져오기
                print(f"Processing stock: {stock}")  # 주식 데이터 처리
                queue.task_done()
        
        async def main():
            queue = asyncio.Queue()
        
            # 두 함수 동시에 실행
            await asyncio.gather(
                realtime_stock_search(queue),
                process_stock_data(queue)
            )
        
        asyncio.run(main())

    CPU위주의 작업 처리 (asyncio.get_running_loop())

    • 비동기 프로그래밍을 할 때 주의할 점은 CPU 위주의 작업을 잘 처리해주어야 한다는 것입니다. 비동기적으로 코드를 잘 작성해두었지만, 만약 그 중간에 CPU 위주의 작업이 존재할 경우, 다른 IO 처리를 진행하는 비동기 코드들이 멈춰버리게 됩니다. 이럴 경우 원하는대로 로직이 동작하지 않을 수 있습니다.
    • 이를 위해 사용할 수 있는 것이 별도의 스레드나 프로세스를 사용하는 것입니다. asyncio는 loop.run_in_executor() 메소드를 사용하여 CPU 집약적인 작업을 별도의 스레드나 프로세스에서 비동기적으로 실행할 수 있습니다. 이렇게 할 경우 CPU 작업이 별도의 실행 컨텍스트에서 처리되므로, 메인 이벤트 루프는 I/O 작업을 계속 처리할 수 있습니다.
    • 예시 코드
      • import asyncio
        import concurrent.futures
        import websockets
        import json
        
        async def websocket_receive_data(websocket_url):
            async with websockets.connect(websocket_url) as websocket:
                while True:
                    data = await websocket.recv()
                    # 받아온 데이터를 처리하는 함수에 전달
                    asyncio.create_task(process_data(json.loads(data)))
        
        async def process_data(data):
            # CPU 집약적인 데이터 처리 작업을 별도의 스레드에서 실행
            loop = asyncio.get_running_loop()
            with concurrent.futures.ThreadPoolExecutor() as pool:
                result = await loop.run_in_executor(pool, cpu_intensive_task, data)
                print(f"Processed data: {result}")
        
        def cpu_intensive_task(data):
            # CPU 집약적인 데이터 처리 로직
            # 예: 데이터 분석, 복잡한 계산 등
            return data  # 처리 결과 반환
        
        async def main():
            websocket_url = "wss://example.com/your_websocket"
            await websocket_receive_data(websocket_url)
        
        asyncio.run(main())
      • 별도의 프로세스를 사용하고 싶은 경우는 concurrent.futures.ThreadPoolExecutor()대신 concurrent.futures.ProcessPoolExecutor()를 사용하면 됩니다.
    • 참고 : 비동기 프로그래밍 vs 멀티스레드 vs 멀티프로세스