1. 딕셔너리 형태로 사용하기
import asyncio
import websockets
import json
import ccxt
import requests
from datetime import datetime, timedelta
import pandas as pd
import pprint
Futures_binance = ccxt.binance({
'options': {
'defaultType': 'future'
}
})
# -- 절취선 --
# 초기 차트를 불러오는 함수
def Chart_call(symbol, interval, limit):
symbol = symbol.upper()
url = f'https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}'
data = requests.get(url).json()
ETH_Chart = []
for row in data:
r = {
"t": datetime.fromtimestamp(row[0] / 1000).strftime('%Y-%m-%d %H:%M:%S'), # 캔들 시작 시간
"o": float(row[1]), # 시가
"h": float(row[2]), # 고가
"l": float(row[3]), # 저가
"c": float(row[4]), # 종가
"v": float(row[5]), # 거래량
"n": int(row[8]), # 거래회수
"q": round(float(row[7])), # 거래대금
"V": float(row[9]), # 테이커 매수 거래량
"Q": round(float(row[10])), # 테이커 매수 거래대금
}
ETH_Chart.append(r)
return ETH_Chart
# 차트를 업데이트 하는 함수
def update_chart(Chart, data):
last_time = Chart[-1]['t']
new_time = datetime.fromtimestamp(data['k']['t'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
new_candle = {}
for x in data['k']:
if x in Chart[-1]:
if x in ['q', 'Q']:
new_candle[x] = round(float(data['k'][x]))
elif x in 'n':
new_candle[x] = int(data['k'][x])
elif x in 't':
new_candle[x] = new_time
else:
new_candle[x] = float(data['k'][x])
if new_time != last_time: # 새로운 캔들
Chart.append(new_candle)
del Chart[0] # 오래된 캔들 제거
else: # 현재 캔들 업데이트
Chart[-1] = new_candle
print(Chart[-1])
return Chart
# -- 절취선 --
async def ETH_Kline_Stream():
symbol = 'ethusdt' # 종목 이름
interval = '1m' # 차트 시간 단위
limit = '100' # 가져올 차트 길이
ETH_Chart = Chart_call(symbol, interval, limit)
print(ETH_Chart)
uri = f'wss://fstream.binance.com/ws/{symbol}@kline_{interval}'
async with websockets.connect(uri) as websocket:
try:
while True:
data = json.loads(await websocket.recv())
ETH_Chart = update_chart(ETH_Chart, data)
except asyncio.CancelledError:
print("ETH_Kline_Stream 작업이 취소되었습니다.")
except Exception as e:
print(f"ETH_Kline_Stream 예외가 발생했습니다: {e}")
finally:
print("ETH_Kline_Stream 웹소켓 연결을 종료합니다.")
await websocket.close()
async def main():
tasks = [asyncio.create_task(ETH_Kline_Stream())]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
print("메인 작업이 취소되었습니다.")
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
print(f"메인 함수에서 예외가 발생했습니다: {e}")
finally:
print("메인 함수의 작업이 종료되었습니다.")
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("프로그램을 종료합니다.")
리스트 형태로 차트를 실시간으로 갱신합니다.
ETH_Kline_Stream 함수에서
limit = 100
해당 변수는 불러올 캔들의 개수입니다.
호출할 수 있는 최대 길이는 1000입니다.
interval = '1m'
차트의 시간 단위 입니다.
1m' , '3m', '5m', '15m', '1h', '4h', '1d' 등이 있습니다.
출력 결과
{ 't': 2024-12-01 11:04:00, # 시작시간
'Q': 659980.3794, # 테이커 매수 거래대금
'V': 177.999, # 테이커 매수 거래량
'c': 3706.49, # 종가
'h': 3708.99, # 고가
'l': 3706.25, # 저가
'n': 885, # 거래회수
'o': 3708.52, # 시가
'q': 1294397.55122, # 거래대금
'v': 349.124} # 거래량
약간 순서가 뒤죽박죽이 되긴 했는데 괜찮습니다.
2. 데이터 프레임으로 사용하기
# -- 절취선 --
# 초기 차트를 불러오는 함수
def Chart_call(symbol, interval, limit):
symbol = symbol.upper()
url = f'https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}'
data = requests.get(url).json()
ETH_Chart = []
for row in data:
r = {
"t": row[0], # 캔들 시작 시간
"o": float(row[1]), # 시가
"h": float(row[2]), # 고가
"l": float(row[3]), # 저가
"c": float(row[4]), # 종가
"v": float(row[5]), # 거래량
"n": int(row[8]), # 거래회수
"q": round(float(row[7])), # 거래대금
"V": float(row[9]), # 테이커 매수 거래량
"Q": round(float(row[10])), # 테이커 매수 거래대금
}
ETH_Chart.append(r)
# 데이터를 DataFrame으로 변환
df = pd.DataFrame(ETH_Chart)
df['t'] = pd.to_datetime(df['t'], unit='ms') + timedelta(hours=9) # 캔들 시작 시간을 읽기 쉽게 변환
df.set_index('t', inplace=True)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
return df
# 차트를 업데이트 하는 함수
def update_chart(df, data):
last_time = df.index[-1]
new_time = pd.to_datetime(data['k']['t'], unit='ms') + timedelta(hours=9)
new_candle = {}
for x in data['k']:
if x in df.columns:
if x in ['q', 'Q']:
new_candle[x] = round(float(data['k'][x]))
elif x in ['t', 'n']:
new_candle[x] = int(data['k'][x])
elif x in 'T':
pass
else:
new_candle[x] = float(data['k'][x])
new_candle['t'] = new_time
# 새로운 행 추가 및 오래된 행 제거
if new_time != last_time:
new_row = pd.DataFrame([new_candle]).set_index('t')
df = pd.concat([df, new_row])
df = df.iloc[1:]
else: # 현재 캔들 업데이트
df.iloc[-1] = pd.Series(new_candle).reindex(df.columns)
df['MA20'] = df['c'].rolling(window=20).mean()
print(df.tail())
return df
# -- 절취선 --
처음에 올린 전체 코드에서 '# -- 절취선 --' 이라고 표기된 범위의 코드를
해당 코드로 바꿔치기 하시면 됩니다.
출력 결과
t o h l c v
95 09:53:00 3360.17 3362.94 3358.85 3361.92 943.672
96 09:54:00 3361.91 3364.97 3361.91 3364.64 1187.232
97 09:55:00 3364.65 3368.66 3364.17 3367.49 1908.538
98 09:56:00 3367.49 3369.42 3367.25 3368.61 1040.303
99 09:57:00 3368.62 3369.13 3366.65 3368.67 845.391
데이터 프레임의 장점은 간단하게 보조지표를 추가할 수 있습니다.
update_chart() 함수 내에서 return 코드 위에
다음의 코드를 한 줄 추가하면 됩니다.
else: # 현재 캔들 업데이트
df.iloc[-1] = pd.Series(new_candle).reindex(df.columns)
# 20분 이동평균 추가
df['MA20'] = df['c'].rolling(window=20).mean()
print(df.tail())
return Chart
Chart['MA20'] 칼럼을 추가하여 20분 이동평균을 계산하는 코드입니다.
t o h l c v MA20
95 09:53:00 3360.17 3362.94 3358.85 3361.92 943.672 3363.999
96 09:54:00 3361.91 3364.97 3361.91 3364.64 1187.232 3363.890
97 09:55:00 3364.65 3368.66 3364.17 3367.49 1908.538 3363.715
98 09:56:00 3367.49 3369.42 3367.25 3368.61 1040.303 3363.896
99 09:57:00 3368.62 3369.13 3366.65 3368.67 845.391 3364.153
rolling(window = 20) 에서 숫자 20이
이동평균 길이라 생각하시면 됩니다.
(7로 바꾸면 7분 이동평균, 100으로 바꾸면 100분 이동평균...)
다음 글에서 조금 더 자세히 다루겠습니다.
참고로 round()는 반올림 하는 함수입니다.
round( ... , 3 ) 여기서 3이 반올림할 소수점 자리수입니다.
2 = 3364.143 -> 3364.14
1 = 3364.143 -> 3364.1
0 = 3364.143 -> 3364
-1 = 3364.143 -> 3360
만약 데이터 프레임을 print()로 출력할 때
행렬이 생략없이 출력되도록 하고 싶다면
print()로 출력하기 전에 다음의 코드를 넣으시면 됩니다.
pd.set_option('display.max_rows', None) # 행 (세로축)
pd.set_option('display.max_columns', None) # 열 (가로축)
pd.set_option('display.width', 1000) # 최대 폭 확대
print(df)
* Chart_call 함수에 넣어뒀으니 신경쓰지 않으셔도 됩니다.
'파이썬 > 바이낸스 선물 API+WS' 카테고리의 다른 글
[선물 API+WS] 데이터 프레임에 보조지표 추가하기 (0) | 2024.11.23 |
---|