파이썬/바이낸스 선물 자동매매

[자동매매] 볼린저 밴드 + RSI

Eluv 2024. 12. 6. 23:47

코드를 정상적으로 실행하려면 바이낸스의 API 키가 필요합니다.

API키 발급 방법 참조 ->   바이낸스 API키 발급

 

만약 실행이 안되면 PC의 시간을 동기화 하시면 됩니다.

윈 11 기준 윈도우 화면 오른쪽 아래 시간을 우클릭 후

날짜 및 시간 조정을 들어가셔서 지금 동기화 버튼을 누르시면 됩니다.

 

그래도 안되면 발생한 문제에 대한 정보가 추가로 필요합니다.

 

import asyncio
import websockets
import json
import ccxt
from datetime import datetime, timedelta
import pandas as pd
import requests
import hmac
import hashlib
import time

# 바이낸스 API 키와 시크릿 로드
with open("D:/코인/binance key.txt") as f:
    lines = f.readlines()
    api_key = lines[0].strip()
    secret_key = lines[1].strip()

Futures_binance = ccxt.binanceusdm({
    'apiKey': api_key,
    'secret': secret_key,
    'enableRateLimit': True,
    'options': {
        'defaultType': 'future',
        'adjustForTimeDifference': True,
    }
})

headers = {
    "X-MBX-APIKEY": api_key,
}

balance = {}
open_orders = {}
target_symbol = 'ETHUSDT' # 거래할 종목
price_gap_entry = 0.3 # 평단가보다 일정 이상 높거나 낮을 때
order_unit_amount = 0.01 # 주문할 기본 수량

# 바이낸스 API 요청 함수
def binance_api(endpoint, params, method):
    params["timestamp"] = int(time.time() * 1000)
    query_string = "&".join([f"{key}={value}" for key, value in params.items()])
    params["signature"] = hmac.new(secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
    url = "https://fapi.binance.com" + endpoint
    try:
        if method == "GET":
            response = requests.get(url, headers=headers, params=params)
        elif method == "POST":
            response = requests.post(url, headers=headers, params=params)
        else:
            raise ValueError(f"잘못된 요청입니다 : {url}, {endpoint}, {params}, {method}")
        return response.json()
    except requests.exceptions.Timeout:
        print(f"시간 초과 {url}, {endpoint}, {params}, {method}")
    except requests.exceptions.RequestException as e:
        print(f"에러 발생 : {e} \n {url}, {endpoint}, {params}, {method}")

# 계좌 잔액 및 포지션 정보 조회
def fetch_account_info():
    endpoint = "/fapi/v2/account"
    params = {}
    return binance_api(endpoint, params, "GET")

# 미체결 주문 정보 조회
def fetch_open_orders(symbol=None):
    endpoint = "/fapi/v1/openOrders"
    params = {}
    if symbol:
        params["symbol"] = symbol.upper()
    return binance_api(endpoint, params, "GET")

# 지정가 주문
def LIMIT_order(symbol, side, positionSide, amount, price, Order_count):
    try:
        cp = 0.005 # 분할 간격 퍼센트. 0.005는 0.5%를 의미함.
        price_multiplier = 1 - cp if positionSide == 'LONG' else 1 + cp

        for i in range(Order_count):
            adjusted_price = price * (price_multiplier ** i)
            order = Futures_binance.create_order(
                symbol=symbol,
                type='limit',
                side=side,
                amount=amount,
                price=adjusted_price,
                params={
                    'positionSide': positionSide,
                    'timeInForce': 'GTC'
                }
            )
            print(
                f"Order Limit placed: {order['info']['symbol']}, {order['info']['price']}, {order['info']['origQty']}")
            #print(f"Order Limit placed: {order}")
            time.sleep(0.5)
    except Exception as e:
        print(f"LIMIT_order Error: {e}")

# 전체 주문 취소
def cancel_all_order(symbol):
    try:
        Futures_binance.cancel_all_orders(symbol)
        return True
    except Exception as e:
        print(f'주문 취소 실패: {e}')
        return False

# 포지션 진입, 정리 조건 확인 함수
def execute_trade_logic(df):
    global balance

    current_close = df['c'].iloc[-1]
    lower_band = df['bb_Lower'].iloc[-1]
    upper_band = df['bb_Upper'].iloc[-1]
    current_rsi = df['RSI'].iloc[-1]

    MA20 = df['MA20'].iloc[-1]
    MA5 = df['MA5'].iloc[-1]

    long_amt = balance[target_symbol]['LONG']['amt']
    long_ent = balance[target_symbol]['LONG']['ent']
    short_amt = balance[target_symbol]['SHORT']['amt']
    short_ent = balance[target_symbol]['SHORT']['ent']
    lev = balance[target_symbol]['leverage']

    long_positon_margin = long_amt * long_ent
    # 롱 포지션 진입 조건 체크
    if long_positon_margin + current_close * order_unit_amount > balance['total'] * lev :
        print('주문 가능 금액 부족')
    elif current_close < lower_band and current_rsi <= 30:
        apg = 1 - price_gap_entry * (1 + 0.5 * (long_amt / order_unit_amount - 1)) / 100
        print('apg :', apg)
        if long_amt > 0 and current_close >= long_ent * apg:
            print(f'평단가 - {price_gap_entry}% 보다는 아직 높음')
        else :
            print(f"롱 포지션 진입중... 현재가 : {current_close}, 볼린저 하단 : {lower_band}, RSI : {current_rsi}")
            LIMIT_order(target_symbol, 'buy', 'LONG', order_unit_amount, current_close, 1)

    # 롱 포지션 정리 조건 체크
    if current_close >= upper_band : # and current_rsi >= 70
        if long_amt > order_unit_amount * 2:
            print("포지션 일부 매도중...")
            LIMIT_order(target_symbol, 'sell', 'LONG', order_unit_amount, current_close, 1)
        elif 0 < long_amt <= order_unit_amount * 2:
            print("포지션 전체 매도중...")
            LIMIT_order(target_symbol, 'sell', 'LONG', long_amt, current_close, 1)

    short_positon_margin = short_amt * short_ent
    # 숏 포지션 진입 조건 체크
    if short_positon_margin + current_close * order_unit_amount > balance['total'] * lev :
        print('주문 가능 금액 부족')
    elif current_close > upper_band and current_rsi >= 70 and long_amt == 0 and current_close >= MA5 * (1 + price_gap_entry / 100):
        apg = 1 + price_gap_entry * (1 + 0.5 * (short_amt / order_unit_amount - 1)) / 100
        print('apg :', apg)
        if short_amt > 0 and current_close <= short_ent * apg:
            print(f'평단가 + {price_gap_entry}% 보다는 아직 낮음')
        else :
            print(f"숏 포지션 진입중... 현재가 : {current_close}, 볼린저 하단 : {upper_band}, RSI : {current_rsi}")
            LIMIT_order(target_symbol, 'sell', 'SHORT', order_unit_amount, current_close, 1)



# 초기 차트를 불러오는 함수
def Chart_call(symbol, interval, limit):
    symbol = symbol.upper()

    url = f'https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}'
    data = requests.get(url).json()

    ETH_Chart = []
    for row in data:
        r = {
            "t": row[0],  # 캔들 시작 시간
            "o": float(row[1]),  # 시가
            "h": float(row[2]),  # 고가
            "l": float(row[3]),  # 저가
            "c": float(row[4]),  # 종가
            "v": float(row[5]),  # 거래량
            "n": int(row[8]),  # 거래회수
            "q": round(float(row[7])),  # 거래대금
            "V": float(row[9]),  # 테이커 매수 거래량
            "Q": round(float(row[10])),  # 테이커 매수 거래대금
        }
        ETH_Chart.append(r)

    # 데이터를 DataFrame으로 변환
    df = pd.DataFrame(ETH_Chart)
    df['t'] = pd.to_datetime(df['t'], unit='ms') + timedelta(hours=9)  # 캔들 시작 시간을 읽기 쉽게 변환

    df.set_index('t', inplace=True)
    pd.set_option('display.max_columns', None)  # 열 (가로축)
    pd.set_option('display.width', 1000)
    return df

# 차트를 업데이트 하는 함수
def update_chart(df, data):
    last_time = df.index[-1]
    new_time = pd.to_datetime(data['k']['t'], unit='ms') + timedelta(hours=9)
    new_candle = {}
    for x in data['k']:
        if x in df.columns:
            if x in ['q', 'Q']:
                new_candle[x] = round(float(data['k'][x]))
            elif x in ['t', 'n']:
                new_candle[x] = int(data['k'][x])
            elif x in 'T':
                pass
            else:
                new_candle[x] = float(data['k'][x])
    new_candle['t'] = new_time

    # 새로운 행 추가 및 오래된 행 제거
    if new_time != last_time:
        if 'MA20' in df :
            execute_trade_logic(df)
        new_row = pd.DataFrame([new_candle]).set_index('t')
        df = pd.concat([df, new_row])
        df = df.iloc[1:]
        print(df.tail())
    else:  # 현재 캔들 업데이트
        df.iloc[-1] = pd.Series(new_candle).reindex(df.columns)

    df['MA20'] = round(df['c'].rolling(window=20).mean(), 2)
    df['MA5'] = round(df['c'].rolling(window=5).mean(), 2)

    df['STD20'] = df['c'].rolling(window=20).std()
    df['bb_Upper'] = df['MA20'] + df['STD20'] * 2
    df['bb_Lower'] = df['MA20'] - df['STD20'] * 2

    # RSI
    delta = df['c'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.ewm(alpha=1/14, min_periods=14, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/14, min_periods=14, adjust=False).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))

    # 숏 포지션 정리 조건 체크
    if balance[target_symbol]['SHORT']['amt'] > 0 and df['c'].iloc[-1] <= df['MA5'].iloc[-1]:  # and current_rsi >= 70
        print("포지션 전체 매도중...")
        LIMIT_order(target_symbol, 'buy', 'SHORT', balance[target_symbol]['SHORT']['amt'], df['c'].iloc[-1], 1)

    return df

# 차트 웹소켓 연결 함수
async def Kline_Stream():
    symbol = target_symbol.lower()  # 종목 이름
    interval = '5m'  # 차트 시간 단위
    limit = '100'  # 가져올 차트 길이
    Chart = Chart_call(symbol, interval, limit)
    print(Chart)
    uri = f'wss://fstream.binance.com/ws/{symbol}@kline_{interval}'
    async with websockets.connect(uri) as websocket:
        try:
            while True:
                data = json.loads(await websocket.recv())
                Chart = update_chart(Chart, data)
                #print(Chart.tail())
        except asyncio.CancelledError:
            print("Kline_Stream 작업이 취소되었습니다.")
        except Exception as e:
            print(f"Kline_Stream 예외가 발생했습니다: {e}")
        finally:
            print("Kline_Stream 웹소켓 연결을 종료합니다.")
            await websocket.close()


FUTURES_STREAM_END_POINT_1 = "wss://fstream.binance.com"
BINANCE_FUTURES_END_POINT = "https://fapi.binance.com/fapi/v1/listenKey"

def create_futures_listen_key():
    response = requests.post(url=BINANCE_FUTURES_END_POINT, headers=headers)
    return response.json()['listenKey']

listen_key = create_futures_listen_key()

async def UserDataStream():
    uri = f"{FUTURES_STREAM_END_POINT_1}/ws/{listen_key}"
    async with websockets.connect(uri) as websocket:
        try:
            while True:
                data = json.loads(await websocket.recv())
                print(data)
                if 'e' in data:
                    if data['e'] == "ORDER_TRADE_UPDATE" and 'o' in data:
                        if data['o']['x'] == 'NEW':
                            if data['o']['X'] == 'NEW':
                                open_orders[data['o']['i']] = float(data['o']['q'])
                            # print(data)
                        if data['o']['x'] == 'TRADE':
                            if data['o']['X'] == 'PARTIALLY_FILLED':
                                open_orders[data['o']['i']] = open_orders[data['o']['i']] - float(
                                    data['o']['q'])
                            elif data['o']['X'] == 'FILLED' and data['o']['i'] in open_orders:
                                del open_orders[data['o']['i']]
                            # print(data)
                            if 'rp' in data['o'] and data['o']['rp'] != '0':
                                print('실현손익 : ', data['o']['rp'])
                        if data['o']['x'] == 'CANCELED' and data['o']['i'] in open_orders:
                            del open_orders[data['o']['i']]
                        print('open_orders :',open_orders)
                    if data['e'] == "ACCOUNT_UPDATE" and 'B' in data['a'] and 'P' in data['a'] and data['a'][
                        'm'] == 'ORDER':
                        balance['total'] = float(data['a']['B'][0]['wb'])
                        balance[target_symbol][data['a']['P'][0]['ps']]['amt'] = abs(float(data['a']['P'][0]['pa']))
                        balance[target_symbol][data['a']['P'][0]['ps']]['ent'] = float(data['a']['P'][0]['ep'])
                        print(balance)


                    if data['e'] == "ACCOUNT_CONFIG_UPDATE" and 'ac' in data and 's' in data['ac'] and 'l' in data['ac'] and data['ac']['s'] == target_symbol:
                        balance[data['ac']['s']]['leverage'] = int(data['ac']['l'])
                        print(balance)
        except asyncio.CancelledError:
            print("UserDataStream 작업이 취소되었습니다.")
        except Exception as e:
            print(f"UserDataStream 예외가 발생했습니다: {e}")
        finally:
            print("UserDataStream 웹소켓 연결을 종료합니다.")
            await websocket.close()

async def ping_listen_key():
    while True:
        await asyncio.sleep(1800)  # 30분 간격으로 ping 전송
        response = requests.put(url=BINANCE_FUTURES_END_POINT, headers=headers)
        if response.status_code == 200:
            print('Ping sent successfully')
        else:
            print('Failed to send ping')

async def main():
    global balance

    same_symbol = 'symbol'
    Unfilled_order = fetch_open_orders()
    if len(Unfilled_order) > 0 :
        for x in Unfilled_order:
            if same_symbol != x['symbol']:
                cancel_all_order(x['symbol'])
                same_symbol = x['symbol']
            time.sleep(0.5)

    account_info = fetch_account_info()
    balance = {'total' : float(account_info['totalMarginBalance']), target_symbol : {}}

    for x in account_info['positions']:
        if x['symbol'] == target_symbol:
            balance[target_symbol]['leverage'] = int(x['leverage'])
            balance[target_symbol][x['positionSide']] = {'amt': float(x['positionAmt']), 'ent': float(x['entryPrice'])}
    print(balance)

    time.sleep(0.5)

    tasks = [asyncio.create_task(Kline_Stream()),
             asyncio.create_task(UserDataStream()),
             asyncio.create_task(ping_listen_key())]

    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        print("메인 작업이 취소되었습니다.")
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as e:
        print(f"메인 함수에서 예외가 발생했습니다: {e}")
    finally:
        print("메인 함수의 작업이 종료되었습니다.")

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("프로그램을 종료합니다.")

 

1. 종목, 거래 수량, 물타기 간격 변경 방법

target_symbol = 'ETHUSDT' # 거래할 종목
price_gap_entry = 0.3 # 평단가보다 일정 이상 높거나 낮을 때
order_unit_amount = 0.01 # 주문할 기본 수량

target_symbol의 종목 이름으로 거래를 하게 됩니다.

 

order_unit_amount는 주문을 넣을 때 코인의 수량입니다.

현재는 이더리움 0.01개를 주문 넣도록 되어있습니다.

너무 적은 수량을 요청하면 에러가 발생할 수 있습니다.

 

price_gap_entry의 값은 평단가보다

몇% 이상 낮은 가격일 때 물타기를 할지에 대한 값입니다.

예를들어 어떤 코인을 1000원에 샀다고 가정하면

평단가보다 0.3% 낮은 997원 밑으로 내려오기 전까진 추가 구매하지 않습니다.

 

price_gap_entry= 1로 변경하면

평단가보다 1% 낮은 가격일 때만 추가 구매를 합니다.

 

다른 코인을 거래한다면 거래할 수량을 직접 생각해서 입력해 주시면 됩니다.

ex)

target_symbol = 'XRPUSDT'

price_gap_entry= 1

order_unit_amount= 10

 

아래는 중요한 얘기는 아니니 넘어가셔도 됩니다.

물론 코드를 읽어보시면 눈치 채셨을 수도 있는데,

apg = 1 - price_gap_entry * (1 + 0.5 * (long_amt / order_unit_amount - 1)) / 100

현재의 평단가에서 1%가 아니라 최초의 평단가에서 1% 입니다.

 

long_ent * (1 - price_gap_entry/ 100) 이렇게 하면

현재 평단가를 기준으로 1%가 됩니다.

 

만약 1000원이 평단가일 때 5% 낮은 950원에 추가 매수 하면

(1000 + 950) / 2로 평단가는 975가 됩니다.

 

그러면 다음 추가 매수는 1000원에서 10% 낮은 900원이 아니라

975에서 5% 낮은 926원에 추가 구매를 하게 됩니다.

 

이런식으로 분할 매수를 할 수록 평단가는 완만하게 낮아지기 때문에

비중이 늘어날수록 예상보다 높은 가격에 반복 매수를 할 가능성이 있습니다.

 

그것을 방지하기 위해 위의 코드를 사용해서

1000, 950, 902, 855... 이런식으로

최초 평단가에서 정확히 5%간격은 아니지만

어느정도 유사한 간격으로 추가 매수를 합니다.

 

2. 차트 시간 단위 조정

async def Kline_Stream():
    symbol = target_symbol.lower()  # 종목 이름
    interval = '5m'  # 차트 시간 단위
    limit = '100'  # 가져올 차트 길이
    Chart = Chart_call(symbol, interval, limit)

매매할 종목 이름은 위에 target_symbol 에서 지정했기 때문에

여기에선 해당 이름은 소문자로 변환하는 것만 합니다.

 

빠른 테스트를 위해서 '1m' 단위를 사용했는데

가급적이면 '5m' 이나 '1h' 같이

더 큰 시간대의 차트를 사용하시는 것을 권장합니다.

 

3. 코드의 기능

코드의 전체적인 기능을 간략하게 요약하면,

 

1. 롱 포지션 진입

캔들이 바뀔 때의 종가가

볼린저 밴드 하단 보다 낮고,

RSI 30 이하 일 때 롱 포지션 0.01개 진입

 

2. 롱 포지션 정리

현재 가격이 볼린저 밴드 상단보다 높으면

보유중인 롱 포지션을 0.01개 매도

 

3. 숏 포지션 진입

만약 보유중인 롱 포지션이 없고,

현재 가격이 볼밴 상단보다 높고,

RSI가 70 이상일 때 숏 포지션을 0.01개 진입

 

4. 숏 포지션 정리

숏포지션을 보유 중일 때

현재 가격이 5분 이평보다 낮으면

숏 포지션 전부 정리

 

 

롱 포지션 진입, 정리, 숏 포지션 진입은 캔들이 바뀔 때만 체크하지만

숏 포지션 정리는 실시간으로 5분 이평보다 낮아지는 순간 바로 정리합니다.

 

일단 여기까지가 비중관리를 위한 기본 골격으로 사용될 코드입니다.

물론 이정도만 해도 아주 쓸모없지는 않지만 수익을 보장할 수 있는 것은 아닙니다.

해당 코드를 응용해서 여러분만의 전략을 만들어 보시는 것을 추천드립니다.

다음 글에서 이어집니다.

'파이썬 > 바이낸스 선물 자동매매' 카테고리의 다른 글

조건문 사용하기  (0) 2024.12.07