목차
비동기 프로그래밍
- 자동매매 프로젝트의 경우 정확도가 중요합니다. 비동기적으로 처리하면서도, 로직이 꼬이지 않기 위해서 많은 시행착오를 겪었습니다. 그 과정 속에서 배워나갔던 점들을 하나하나 정리해보도록하겠습니다.
asyncio.create_task() vs asyncio.gather()
- asyncio.create_task()와 asyncio.gather()은 async로 작성된 비동기함수를 실행할때 사용합니다. 깊게 들어가면 더 많은 차이점이 있지만, 우선은 큼직한 틀에서 차이점을 살펴보도록하겠습니다.
- asyncio.create_task() vs asyncio.gather()
- asyncio.create_task()와 asyncio.gather()는 모두 async로 작성된 비동기 함수를 실행할 때 사용할 수 있습니다.
- 단, asyncio.create_task()는 하나의 비동기함수를 실행하지만 asyncio.gahter()는 두개 이상의 비동기함수를 한번에 실행시킬수 있습니다.
- 참고 : asyncio.create_task()도 다음과 같이 여러개의 비동기 함수를 한 번에 실행시킬 수 있습니다.
-
import asyncio async def task1(): print("Task 1 시작") await asyncio.sleep(1) print("Task 1 완료") return "Task 1 결과" async def task2(): print("Task 2 시작") await asyncio.sleep(2) print("Task 2 완료") return "Task 2 결과" async def main(): # 여러 Task 생성 task1_handle = asyncio.create_task(task1()) task2_handle = asyncio.create_task(task2()) # 모든 Task가 완료될 때까지 기다림 await task1_handle await task2_handle asyncio.run(main())
-
- 참고 : asyncio.create_task()도 다음과 같이 여러개의 비동기 함수를 한 번에 실행시킬 수 있습니다.
- 추가적으로 asyncio.create_task()는 task를 실행한 뒤에 다음 코드들을 이어서 실행할 수 있지만, asyncio.gather()는 gather()안에 있는 모든 비동기함수들이 종료되어야지만 다음 코드들을 이어서 실행할 수 있습니다.
- 즉, asyncio.create_task()는 await를 지정해주지 않아도 되지만, asyncio.gather()는 await를 지정해주어야합니다.
- asyncio.create_task()는 task=asyncio.create_task()로 지정해주고, 나중에 필요할때 await task를 지정해주면 됩니다.
-
import asyncio async def task1(): await asyncio.sleep(1) return "Task 1 완료" async def task2(): await asyncio.sleep(2) return "Task 2 완료" async def main(): # create_task() 사용 task1_handle = asyncio.create_task(task1()) # gather() 사용 results = await asyncio.gather(task1(), task2()) print(results) # 모든 태스크 완료 후 결과 출력 result1 = await task1_handle # 태스크 완료 후 결과 받기 print(result1) asyncio.run(main())
- 즉, asyncio.create_task()는 await를 지정해주지 않아도 되지만, asyncio.gather()는 await를 지정해주어야합니다.
- asyncio.gather은 이미 실행중인 task들을 한번에 await 처리할때도 유용합니다.
- 아래 코드에서 await running_task1, await running_task2 이런식으로 일일이 하나하나 지정할 필요 없이 gather로 한번에 처리 가능합니다.
-
import asyncio async def task1(): print("Task 1 시작") await asyncio.sleep(2) print("Task 1 완료") async def task2(): print("Task 2 시작") await asyncio.sleep(1) print("Task 2 완료") async def main(): running_task1 = asyncio.create_task(task1()) running_task2 = asyncio.create_task(task2()) # 이미 실행 중인 두 태스크가 모두 완료될 때까지 기다립니다. await asyncio.gather(running_task1, running_task2) print("완료") asyncio.run(main())
asyncio.create_task()
- asyncio.create_task() 에 대해 더 자세히 살펴보도록 하겠습니다.
- asyncio.create_task()를 통해서 비동기 함수를 실행 가능합니다. 즉, CPU를 사용하지 않는 I/O 위주의 작업(예를 들면 API 호출, 웹소켓 통신)들을 실행시킬수 있습니다.
- 웹소켓 통신의 경우, 계속해서 데이터를 받아오다가 특정 조건이 되면 해당 조건을 중지시킬수 있어야합니다.
- 예를 들면, 전종목에 대한 가격 데이터를 계속 받아오면 부담이 될 수 있기 때문에, 조건에 맞는 종목들만 가격 데이터를 받아오도록 설정해야합니다. 이때 조건에 맞으면 가격을 받아오다가, 조건에 맞지 않는 종목일 경우에는 가격을 받아오는 통신을 끊어주어야합니다.
- 이때 사용하는 것이 cancel()입니다.
- 해당 함수가 cancel될 때 asyncio.CancelledError를 통해 실행할 코드를 지정해줄수도 있습니다.
-
async def stock_task(stock_code): # 장시간 실행되는 작업 (예시) try: while True: print(f"Task for stock {stock_code} is running") await asyncio.sleep(1) except asyncio.CancelledError: print(f"Task for stock {stock_code} was cancelled") raise
-
- 예시
- 조건에 맞는 종목들에 대해 실시간 가격을 받아오는 웹소켓을 실행했다고 가정해봅니다.
- 그러다가 조건에 맞지 않거나, 해당 종목을 거래 완료한경우에 더 이상 가격을 받아올 필요가 없어질 경우 해당 웹소켓통신을 종료해주어야합니다. 이때 cancel을 통해 종료시켜줄수 있습니다.
- 이를 이제 종목코드를 key 값, 생성된 task를 value로 하는 공유 task 딕셔너리를 통해 관리해주면 손쉽게 관리할 수 있습니다.
- 참고: 리스트나 딕셔너리를 공유하는 비동기 함수
-
import asyncio # 공유 리스트,딕셔너리를 수정하는 비동기 함수 A async def func_a(input_list, input_dict): await asyncio.sleep(1) input_list.append("A") input_dict["A"] = "a" print("Func A: Added 'A'") # 공유 리스트,딕셔너리를 수정하는 비동기 함수 B async def func_b(input_list, input_dict): await asyncio.sleep(2) input_list.append("B") input_dict["B"] = "b" print("Func B: Added 'B'") # 공유 리스트,딕셔너리를 수정하는 비동기 함수 C async def func_c(input_list, input_dict): await asyncio.sleep(3) input_list.append("C") input_dict["C"] = "c" print("Func C: Added 'C'") async def main(): shared_list = [] shared_dict = {} # 모든 함수를 동시에 실행 await asyncio.gather( func_a(shared_list, shared_dict), func_b(shared_list, shared_dict), func_c(shared_list, shared_dict), ) # 공유 리스트,딕셔너리의 최종 상태 출력 print("Shared List:", shared_list) print("Shared List:", shared_dict) # 이벤트 루프 실행 asyncio.run(main()) --> Func A: Added 'A' Func B: Added 'B' Func C: Added 'C' Shared List: ['A', 'B', 'C'] Shared List: {'A': 'a', 'B': 'b', 'C': 'c'}
-
- 참고: 리스트나 딕셔너리를 공유하는 비동기 함수
- 아래는 10초가 지나면 해당 웹소켓 통신을 취소하는 예시 코드입니다.
-
import asyncio import websockets import json async def stock_price_websocket(stock_code, websocket_url, shared_task_dict): async with websockets.connect(websocket_url) as websocket: # 실시간 가격 데이터를 받아오는 로직 await websocket.send(json.dumps({"command": "subscribe", "stock_code": stock_code})) while True: try: data = await websocket.recv() print(f"Received data for {stock_code}: {data}") except asyncio.CancelledError: print(f"Websocket for {stock_code} cancelled") break async def main(): shared_task_dict = {} # 공유 task 딕셔너리 websocket_url = "wss://example.com/price" # 예시 종목들에 대한 웹소켓 연결 생성 stock_codes = ["005930", "000660", "035420"] for stock_code in stock_codes: task = asyncio.create_task(stock_price_websocket(stock_code, websocket_url, shared_task_dict)) shared_task_dict[stock_code] = task await asyncio.sleep(10) # 여기서는 예시로 10초간 대기 # 특정 종목의 웹소켓 연결 취소 if "005930" in shared_task_dict: shared_task_dict["005930"].cancel() # 모든 웹소켓 연결이 완료될 때까지 대기 await asyncio.gather(*shared_task_dict.values()) asyncio.run(main())
-
- 주의할 점
- 가장 바깥쪽 함수에서는 await를 무조건 해줘야합니다. 그렇지 않으면 그냥 단순히 함수 실행만 하고 완료되는 것을 기달리지 않고 프로그램이 종료되어버릴수가 있습니다.
- 예시
- 아래 코드는 가장 바깥쪽 함수인 main()에서 asyncio.create_task를 await 해주지 않아서 print("main 함수 완료")만 print 되고 프로그램이 종료되어버렸습니다.
-
import asyncio async def do_something_after_delay(delay, message): await asyncio.sleep(delay) print(message) async def main(): # 태스크를 스케줄링하고 있지만, 완료될 때까지 기다리지 않음 asyncio.create_task(do_something_after_delay(1, "첫 번째 메시지")) asyncio.create_task(do_something_after_delay(2, "두 번째 메시지")) print("main 함수 완료") asyncio.run(main()) --> main 함수 완료
asyncio.Lock()
- 위 설명 중에 하나의 리스트나 딕셔너리에 여러 비동기함수가 접근가능하다고 했습니다. 하지만 이럴경우 동기화 문제가 발생할 수 있습니다.
- 동기화 문제 예시
-
import asyncio async def func_a(shared_resource, key, value): current_value = shared_resource.get(key, 0) await asyncio.sleep(0.1) # 다른 코루틴이 실행될 기회 제공 shared_resource[key] = current_value + value async def func_b(shared_resource, key, value): current_value = shared_resource.get(key, 0) await asyncio.sleep(0.1) # 다른 코루틴이 실행될 기회 제공 shared_resource[key] = current_value + value async def main(): shared_resource = {} task1 = asyncio.create_task(func_a(shared_resource, "key", 1)) task2 = asyncio.create_task(func_b(shared_resource, "key", 2)) await task1 await task2 print(shared_resource) asyncio.run(main()) --> {'key': 2}
- 원래 결과는 3이 나와야 맞지만, 2가 결과로 나옵니다. 이는 task1과 task2가 비동기적으로 실행되면서 발생한 동기화문제입니다.
-
- 동기화 문제 예시
- 이러한 동기화 문제는 asyncio.Lock()을 통해 해결해줄수 있습니다. 공유자원에 하나의 함수가 접근하면, lock을 획득하여 다른 함수가 접근하지 못하도록 설정할수 있습니다. 다른 함수는 그 lock이 해제될때까지 기다려야합니다.
-
import asyncio async def func_a(shared_resource, key, value, lock): async with lock: current_value = shared_resource.get(key, 0) await asyncio.sleep(0.1) # 다른 코루틴이 실행될 기회 제공 shared_resource[key] = current_value + value async def func_b(shared_resource, key, value, lock): async with lock: current_value = shared_resource.get(key, 0) await asyncio.sleep(0.1) # 다른 코루틴이 실행될 기회 제공 shared_resource[key] = current_value + value async def main(): shared_resource = {} lock = asyncio.Lock() task1 = asyncio.create_task(func_a(shared_resource, "key", 1, lock)) task2 = asyncio.create_task(func_b(shared_resource, "key", 2, lock)) await task1 await task2 print(shared_resource) asyncio.run(main()) --> {'key': 3}
- func_a가 먼저 lock을 획득하면, func_a의 코드블록이 완전히 실행(정확히는 async with lock:으로 감싼부분)되고 lock이 해제될 때까지 func_b는 대기 상태가 됩니다. 반대로 func_b가 먼저 lock을 획득하면 func_a는 func_b의 실행(정확히는 async with lock:으로 감싼부분)이 완료될때까지 대기합니다.
-
- 이러한 방식으로 asyncio.Lock은 공유 자원에 대한 동시 접근을 제어하며, 예측 가능하고 안전한 데이터 변경을 보장합니다.
asyncio.Event()
- asyncio.Event는 비동기 프로그래밍에서 이벤트를 관리하는데 사용되는 도구입니다. asyncio.Event 객체는 내부적으로 set 또는 clear 상태를 가질 수 있으며, 이를 통해 여러 태스크 간의 동기화를 제어합니다.
- event.set(): 이벤트를 'set' 상태로 만듭니다. 이 상태에서 await event.wait()를 호출하는 모든 태스크는 즉시 진행됩니다.
- event.clear(): 이벤트를 'clear' 상태로 만듭니다. 이 상태에서 await event.wait()를 호출하면 이벤트가 'set' 상태가 될 때까지 태스크가 대기합니다.
- await event.wait(): 이벤트가 'set' 상태가 될 때까지 현재 태스크를 대기시킵니다. 이벤트가 이미 'set' 상태라면 즉시 진행됩니다.
- 예시
- 특정 조건에 맞는 종목들을 검색하는 웹소켓에서 데이터가 검출될때, 특정 함수를 실행해주고 싶은 경우에 유용하게 사용할 수 있습니다.
- 코드
-
import asyncio import json import websockets async def realtime_stock_search(event, condition_list): uri = "wss://example.com/stock-search" async with websockets.connect(uri) as websocket: while True: data = await websocket.recv() stock_data = json.loads(data) condition_list.append(stock_data) # 데이터가 들어오면 condition_list에 추가 event.set() # 이벤트 설정하여 condition_list에 데이터가 추가되었다고 다른 함수에 알림 async def process_stock_data(event, condition_list): while True: await event.wait() # 이벤트가 설정될 때까지 대기 stock = condition_list[-1] # 처리할 주식 데이터 가져오기 print(f"Processing stock: {stock}") # 주식 데이터 처리 event.clear() # 이벤트 초기화 async def main(): event = asyncio.Event() condition_list = [] # 두 함수 동시에 실행 await asyncio.gather( realtime_stock_search(event, condition_list), process_stock_data(event, condition_list) ) asyncio.run(main())
-
- 하지만 위처럼 그냥 단순하게 append 되었을때 신호를 보내고, 마지막 종목에 대해서만 처리를 하게된다면 process_stock_data가 처리되는 동안(즉, event가 set된 상태,event가 clear되지 않은 상태)에 데이터가 들어오게 된다면, 그 데이터처리는 되지 않을 것입니다. 따라서 다음과 같이 이전 길이를 기억해두었다가, 처리해주어야지 놓치는 데이터 없이 처리가 가능합니다.
-
import asyncio import json import websockets async def realtime_stock_search(event, condition_list): uri = "wss://example.com/stock-search" async with websockets.connect(uri) as websocket: while True: data = await websocket.recv() stock_data = json.loads(data) condition_list.append(stock_data) # 데이터가 들어오면 condition_list에 추가 event.set() # 이벤트 설정하여 condition_list에 데이터가 추가되었다고 다른 함수에 알림 async def process_stock_data(event, condition_list): prev_length = 0 while True: await event.wait() # 이벤트가 설정될 때까지 대기 new_length = len(condition_list) if new_length > prev_length: new_items = condition_list[prev_length:] for stock in new_items: print(f"Processing stock: {stock}") # 새로운 주식 데이터 처리 prev_length = new_length event.clear() # 이벤트 초기화 async def main(): event = asyncio.Event() condition_list = [] # 두 함수 동시에 실행 await asyncio.gather( realtime_stock_search(event, condition_list), process_stock_data(event, condition_list) ) asyncio.run(main())
-
asyncio.Queue()
- asyncio.Queue는 비동기 프로그래밍에서 사용되는 FIFO(First-In-First-Out) 큐입니다.
- asyncio.Queue(maxsize=0)를 통해서 queue를 생성할 수 있습니다.
- maxsize 입력인자를 따로 입력하지 않을 경우 0으로 지정됩니다.
- maxsize가 0이하면 큐의 크기는 무한입니다.
- maxsize가 0보다 큰 정수일 경우, maxsize에 도달하면 await put()은 큐에서 항목이 제거될 때까지(get()에 의해 큐에서 값이꺼내질때까지) 대기합니다.
- empty() : 큐가 비어 있으면 True를 반환하고, 그렇지 않으면 False를 반환합니다.
- full() : 큐가 maxsize 항목으로 가득 차 있으면 True를 반환합니다. maxsize=0으로 초기화된 경우 full()은 항상 False를 반환합니다.
- qsize() : 큐에 있는 항목의 수를 반환합니다.
- get() : 큐에서 항목을 제거하고 반환합니다. 큐가 비어 있다면 항목이 사용 가능해질때까지 대기합니다. (await 사용)
- get_nowait() : 큐에서 즉시 사용 가능한 항목을 반환합니다. 사용 가능한 항목이 없으면 (QueueEmpty 예외 발생, await 사용 안함)
- put(item) : 큐에 항목을 추가합니다. 큐가 가득찼다면, 빈 슬롯이 생길 때까지 대기합니다. (await 사용)
- put_nowait(item) : 큐에 항목을 추가하되, 대기하지 않고 즉시 수행합니다. 즉시 빈 슬롯이 없으면 QueueFull 예외를 발생시킵니다.
- join() : 큐의 모든 항목이 처리될 때까지 대기합니다. 항목이 큐에 추가될 때마다 미완료 작업의 수가 증가하고, 소비자 코루틴이 task_done()을 호출해 작업이 완료되었음을 알릴 때마다 감소합니다. 미완료 작업 수가 0이 되면 join()은 더이상 대기하지 않습니다.
- task_done() : 큐에서 가져온 작업이 완료되었음을 나타냅니다. 큐의 소비자에 의해 사용됩니다. get()으로 작업을 가져온 후, task_done()을 호출하면 해당 작업에 대한 처리가 완료되었음을 큐에 알립니다. join()이 현재 대기 중이라면, 큐에 추가된 모든 항목이 처리되었을때 (즉, 큐에 put()된 모든 항목에 대해 task_done() 호출이 수신되었을 때) join()이 재개됩니다.
- asyncio.Queue(maxsize=0)를 통해서 queue를 생성할 수 있습니다.
- 예시
- 기본적인 put과 get을 사용하는 경우
-
import asyncio async def producer(queue): # 큐에 데이터 추가 for i in range(5): await queue.put(i) print(f"Produced: {i}") await asyncio.sleep(1) async def consumer(queue): # 큐에서 데이터 가져오기 while True: item = await queue.get() print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue() producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) # 모든 데이터가 처리될 때까지 대기 await producer_task await queue.join() # 모든 항목이 처리될 때까지 대기 consumer_task.cancel() # 소비자 작업 취소 asyncio.run(main())
-
- put_nowait()와 get_nowait() 사용
-
import asyncio async def main(): queue = asyncio.Queue() # 큐에 데이터를 즉시 넣기 try: queue.put_nowait("Hello") queue.put_nowait("World") except asyncio.QueueFull: print("Queue is full") # 큐에서 데이터를 즉시 가져오기 try: print(queue.get_nowait()) print(queue.get_nowait()) except asyncio.QueueEmpty: print("Queue is empty") asyncio.run(main())
-
- put_nowait()와 get()을 사용 (데이터는 누락되면 안되기때문에 put_nowait로 바로바로 데이터를 넣어줍니다.)
-
import asyncio async def producer(queue): # 큐에 데이터를 즉시 넣기 for i in range(5): try: queue.put_nowait(i) print(f"Produced: {i}") except asyncio.QueueFull: print("Queue is full") await asyncio.sleep(1) # 다음 아이템을 생성하기 전에 잠시 대기 async def consumer(queue): # 큐에서 데이터를 비동기적으로 가져오기 while True: item = await queue.get() print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue(maxsize=2) # 최대 크기를 2로 설정 producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) # 모든 데이터가 처리될 때까지 대기 await producer_task await queue.join() # 모든 항목이 처리될 때까지 대기 consumer_task.cancel() # 소비자 작업 취소 asyncio.run(main())
-
- 여러 프로듀서와 컨슈머
-
import asyncio async def producer(queue, name): for i in range(3): await queue.put((name, i)) print(f"Producer {name}: Added item {i}") await asyncio.sleep(1) async def consumer(queue, name): while True: producer_name, item = await queue.get() print(f"Consumer {name}: Processed item {item} from {producer_name}") queue.task_done() async def main(): queue = asyncio.Queue() # 여러 프로듀서와 컨슈머 생성 producers = [ asyncio.create_task(producer(queue, f"Producer {i}")) for i in range(2) ] consumers = [ asyncio.create_task(consumer(queue, f"Consumer {i}")) for i in range(2) ] # 모든 프로듀서가 작업을 완료할 때까지 대기 await asyncio.gather(*producers) await queue.join() # 모든 항목이 처리될 때까지 대기 # 모든 컨슈머 작업 취소 for c in consumers: c.cancel() asyncio.run(main())
-
- 기본적인 put과 get을 사용하는 경우
- 그렇다면 이 queue를 자동매매에서 언제 사용할 수 있을까요? 위에서 봤듯이 asyncio.Event를 사용할 경우, 데이터를 처리하는 부분에서 오래 걸리게 되면은 clear()가 늦게 적용되어서 데이터가 누락될 수 있었습니다. 이를 리스트 슬라이싱이 아닌 queue를 통해서 처리를 해줄수 있습니다.
-
import asyncio import json import websockets async def realtime_stock_search(queue): uri = "wss://example.com/stock-search" async with websockets.connect(uri) as websocket: while True: data = await websocket.recv() stock_data = json.loads(data) await queue.put(stock_data) # 데이터가 들어오면 큐에 추가 async def process_stock_data(queue): while True: stock = await queue.get() # 큐에서 주식 데이터 가져오기 print(f"Processing stock: {stock}") # 주식 데이터 처리 queue.task_done() async def main(): queue = asyncio.Queue() # 두 함수 동시에 실행 await asyncio.gather( realtime_stock_search(queue), process_stock_data(queue) ) asyncio.run(main())
-
CPU위주의 작업 처리 (asyncio.get_running_loop())
- 비동기 프로그래밍을 할 때 주의할 점은 CPU 위주의 작업을 잘 처리해주어야 한다는 것입니다. 비동기적으로 코드를 잘 작성해두었지만, 만약 그 중간에 CPU 위주의 작업이 존재할 경우, 다른 IO 처리를 진행하는 비동기 코드들이 멈춰버리게 됩니다. 이럴 경우 원하는대로 로직이 동작하지 않을 수 있습니다.
- 이를 위해 사용할 수 있는 것이 별도의 스레드나 프로세스를 사용하는 것입니다. asyncio는 loop.run_in_executor() 메소드를 사용하여 CPU 집약적인 작업을 별도의 스레드나 프로세스에서 비동기적으로 실행할 수 있습니다. 이렇게 할 경우 CPU 작업이 별도의 실행 컨텍스트에서 처리되므로, 메인 이벤트 루프는 I/O 작업을 계속 처리할 수 있습니다.
- 예시 코드
-
import asyncio import concurrent.futures import websockets import json async def websocket_receive_data(websocket_url): async with websockets.connect(websocket_url) as websocket: while True: data = await websocket.recv() # 받아온 데이터를 처리하는 함수에 전달 asyncio.create_task(process_data(json.loads(data))) async def process_data(data): # CPU 집약적인 데이터 처리 작업을 별도의 스레드에서 실행 loop = asyncio.get_running_loop() with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor(pool, cpu_intensive_task, data) print(f"Processed data: {result}") def cpu_intensive_task(data): # CPU 집약적인 데이터 처리 로직 # 예: 데이터 분석, 복잡한 계산 등 return data # 처리 결과 반환 async def main(): websocket_url = "wss://example.com/your_websocket" await websocket_receive_data(websocket_url) asyncio.run(main())
- 별도의 프로세스를 사용하고 싶은 경우는 concurrent.futures.ThreadPoolExecutor()대신 concurrent.futures.ProcessPoolExecutor()를 사용하면 됩니다.
-
- 참고 : 비동기 프로그래밍 vs 멀티스레드 vs 멀티프로세스
'Algorithm Trading > ComDon 프로그램 개발이야기' 카테고리의 다른 글
10. 디스코드 알림 (0) | 2024.02.04 |
---|---|
8. pyautogui (1) | 2024.01.27 |
7. 이베스트투자증권 OPEN API (0) | 2024.01.22 |
6. .env 파일 작성 (0) | 2024.01.21 |
5. 주문딜레이 시간 체크 / 비동기 처리의 장점 (1) | 2024.01.09 |