파이썬/바이낸스 선물 API+WS

[선물 API+WS] 차트 실시간 갱신하기

Eluv 2024. 11. 22. 20:06

1. 딕셔너리 형태로 사용하기

import asyncio
import websockets
import json
import ccxt
import requests
from datetime import datetime, timedelta
import pandas as pd
import pprint

Futures_binance = ccxt.binance({
    'options': {
        'defaultType': 'future'
    }
})

# -- 절취선 --

# 초기 차트를 불러오는 함수
def Chart_call(symbol, interval, limit):
    symbol = symbol.upper()

    url = f'https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}'
    data = requests.get(url).json()

    ETH_Chart = []
    for row in data:
        r = {
            "t": datetime.fromtimestamp(row[0] / 1000).strftime('%Y-%m-%d %H:%M:%S'),  # 캔들 시작 시간
            "o": float(row[1]),  # 시가
            "h": float(row[2]),  # 고가
            "l": float(row[3]),  # 저가
            "c": float(row[4]),  # 종가
            "v": float(row[5]),  # 거래량
            "n": int(row[8]),  # 거래회수
            "q": round(float(row[7])),  # 거래대금
            "V": float(row[9]),  # 테이커 매수 거래량
            "Q": round(float(row[10])),  # 테이커 매수 거래대금
        }
        ETH_Chart.append(r)
    return ETH_Chart

# 차트를 업데이트 하는 함수
def update_chart(Chart, data):
    last_time = Chart[-1]['t']
    new_time = datetime.fromtimestamp(data['k']['t'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
    new_candle = {}
    for x in data['k']:
        if x in Chart[-1]:
            if x in ['q', 'Q']:
                new_candle[x] = round(float(data['k'][x]))
            elif x in 'n':
                new_candle[x] = int(data['k'][x])
            elif x in 't':
                new_candle[x] = new_time
            else:
                new_candle[x] = float(data['k'][x])
    if new_time != last_time:  # 새로운 캔들
        Chart.append(new_candle)
        del Chart[0]  # 오래된 캔들 제거
    else:  # 현재 캔들 업데이트
        Chart[-1] = new_candle
    print(Chart[-1])
    return Chart

# -- 절취선 --

async def ETH_Kline_Stream():
    symbol = 'ethusdt'  # 종목 이름
    interval = '1m'  # 차트 시간 단위
    limit = '100'  # 가져올 차트 길이
    ETH_Chart = Chart_call(symbol, interval, limit)
    print(ETH_Chart)
    uri = f'wss://fstream.binance.com/ws/{symbol}@kline_{interval}'
    async with websockets.connect(uri) as websocket:
        try:
            while True:
                data = json.loads(await websocket.recv())
                ETH_Chart = update_chart(ETH_Chart, data)
        except asyncio.CancelledError:
                print("ETH_Kline_Stream 작업이 취소되었습니다.")
        except Exception as e:
            print(f"ETH_Kline_Stream 예외가 발생했습니다: {e}")
        finally:
            print("ETH_Kline_Stream 웹소켓 연결을 종료합니다.")
            await websocket.close()


async def main():
    tasks = [asyncio.create_task(ETH_Kline_Stream())]
    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        print("메인 작업이 취소되었습니다.")
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as e:
        print(f"메인 함수에서 예외가 발생했습니다: {e}")
    finally:
        print("메인 함수의 작업이 종료되었습니다.")

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("프로그램을 종료합니다.")

리스트 형태로 차트를 실시간으로 갱신합니다.

 

ETH_Kline_Stream 함수에서

limit = 100

해당 변수는 불러올 캔들의 개수입니다.

호출할 수 있는 최대 길이는 1000입니다. 

 

interval = '1m'

차트의 시간 단위 입니다.

1m' , '3m', '5m', '15m', '1h', '4h', '1d' 등이 있습니다.

 

출력 결과

{ 't': 2024-12-01 11:04:00, # 시작시간
'Q': 659980.3794, # 테이커 매수 거래대금
 'V': 177.999, # 테이커 매수 거래량
 'c': 3706.49, # 종가
 'h': 3708.99, # 고가
 'l': 3706.25, # 저가
 'n': 885, # 거래회수
 'o': 3708.52, # 시가
 'q': 1294397.55122, # 거래대금
 'v': 349.124} # 거래량

약간 순서가 뒤죽박죽이 되긴 했는데 괜찮습니다.

 

2. 데이터 프레임으로 사용하기

# -- 절취선 --

# 초기 차트를 불러오는 함수
def Chart_call(symbol, interval, limit):
    symbol = symbol.upper()

    url = f'https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}'
    data = requests.get(url).json()

    ETH_Chart = []
    for row in data:
        r = {
            "t": row[0],  # 캔들 시작 시간
            "o": float(row[1]),  # 시가
            "h": float(row[2]),  # 고가
            "l": float(row[3]),  # 저가
            "c": float(row[4]),  # 종가
            "v": float(row[5]),  # 거래량
            "n": int(row[8]),  # 거래회수
            "q": round(float(row[7])),  # 거래대금
            "V": float(row[9]),  # 테이커 매수 거래량
            "Q": round(float(row[10])),  # 테이커 매수 거래대금
        }
        ETH_Chart.append(r)

    # 데이터를 DataFrame으로 변환
    df = pd.DataFrame(ETH_Chart)
    df['t'] = pd.to_datetime(df['t'], unit='ms') + timedelta(hours=9)  # 캔들 시작 시간을 읽기 쉽게 변환

    df.set_index('t', inplace=True)
    
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', 1000)
    
    return df

# 차트를 업데이트 하는 함수
def update_chart(df, data):
    last_time = df.index[-1]
    new_time = pd.to_datetime(data['k']['t'], unit='ms') + timedelta(hours=9)
    new_candle = {}
    for x in data['k']:
        if x in df.columns:
            if x in ['q', 'Q']:
                new_candle[x] = round(float(data['k'][x]))
            elif x in ['t', 'n']:
                new_candle[x] = int(data['k'][x])
            elif x in 'T':
                pass
            else:
                new_candle[x] = float(data['k'][x])
    new_candle['t'] = new_time

    # 새로운 행 추가 및 오래된 행 제거
    if new_time != last_time:
        new_row = pd.DataFrame([new_candle]).set_index('t')
        df = pd.concat([df, new_row])
        df = df.iloc[1:]
    else:  # 현재 캔들 업데이트
        df.iloc[-1] = pd.Series(new_candle).reindex(df.columns)

    df['MA20'] = df['c'].rolling(window=20).mean()

    print(df.tail())
    return df
    
# -- 절취선 --

 

처음에 올린 전체 코드에서 '# -- 절취선 --' 이라고 표기된 범위의 코드를

해당 코드로 바꿔치기 하시면 됩니다.

 

출력 결과

          t        o        h        l        c         v
95 09:53:00  3360.17  3362.94  3358.85  3361.92   943.672
96 09:54:00  3361.91  3364.97  3361.91  3364.64  1187.232
97 09:55:00  3364.65  3368.66  3364.17  3367.49  1908.538
98 09:56:00  3367.49  3369.42  3367.25  3368.61  1040.303
99 09:57:00  3368.62  3369.13  3366.65  3368.67   845.391

 

데이터 프레임의 장점은 간단하게 보조지표를 추가할 수 있습니다.

update_chart() 함수 내에서 return 코드 위에

다음의 코드를 한 줄 추가하면 됩니다.

    else:  # 현재 캔들 업데이트
        df.iloc[-1] = pd.Series(new_candle).reindex(df.columns)
    
    # 20분 이동평균 추가
    df['MA20'] = df['c'].rolling(window=20).mean()
    
    print(df.tail())
    return Chart

 Chart['MA20'] 칼럼을 추가하여 20분 이동평균을 계산하는 코드입니다. 

          t        o        h        l        c         v      MA20
95 09:53:00  3360.17  3362.94  3358.85  3361.92   943.672  3363.999
96 09:54:00  3361.91  3364.97  3361.91  3364.64  1187.232  3363.890
97 09:55:00  3364.65  3368.66  3364.17  3367.49  1908.538  3363.715
98 09:56:00  3367.49  3369.42  3367.25  3368.61  1040.303  3363.896
99 09:57:00  3368.62  3369.13  3366.65  3368.67   845.391  3364.153

 

rolling(window = 20) 에서 숫자 20이

이동평균 길이라 생각하시면 됩니다.

(7로 바꾸면 7분 이동평균, 100으로 바꾸면 100분 이동평균...)

 

다음 글에서 조금 더 자세히 다루겠습니다.

 

참고로 round()는 반올림 하는 함수입니다.

round( ... , 3 ) 여기서 3이 반올림할 소수점 자리수입니다.

2 = 3364.143 ->  3364.14

1 = 3364.143 ->  3364.1

0 = 3364.143 ->  3364

-1 = 3364.143  ->  3360

 

 


만약 데이터 프레임을 print()로 출력할 때

행렬이 생략없이 출력되도록 하고 싶다면

print()로 출력하기 전에 다음의 코드를 넣으시면 됩니다.

pd.set_option('display.max_rows', None) # 행 (세로축)
pd.set_option('display.max_columns', None) # 열 (가로축)
pd.set_option('display.width', 1000) # 최대 폭 확대

print(df)

* Chart_call 함수에 넣어뒀으니 신경쓰지 않으셔도 됩니다.