Поиск по материалам

Tinkoff Invest API на Python: туториал от регистрации токена до боевого робота

·

Tinkoff Invest API — это gRPC-доступ к торговому счёту: можно получать рыночные данные, стримить свечи, открывать позиции, управлять портфелем — всё из Python без QUIK и без терминала на компьютере. Разбираем по шагам: как получить токен, поставить SDK, прочитать свечи, выставить заявку, написать момент-робота с управлением риском, развернуть в Docker и не упереться в лимиты API.

Что такое Tinkoff Invest API

Tinkoff Invest API (T-Invest API) — это gRPC-интерфейс к брокерским сервисам Тинькофф: получение рыночных данных, исторических свечей, тикового стрима, отправка заявок, управление портфелем. В отличие от QUIK, здесь нет терминала на компьютере — код работает где угодно (Docker на VPS, AWS Lambda, домашний сервер).

API двух режимов:

  • Песочница (sandbox) — изолированный счёт с виртуальными деньгами. Идеально для разработки и тестирования. Все методы работают, но «реального» исполнения нет.
  • Продакшен (production) — боевой счёт. Для торговли нужен открытый брокерский счёт в Тинькофф (физлицо или ИП).

Преимущества перед QUIK Lua:

  • Любой язык: Python (tinkoff-investments), Go, Java, .NET, Node.js. Активный community-SDK на Python развивается быстрее остальных.
  • Облачный деплой — без зависимости от вашего компьютера.
  • Удобная разработка: PyCharm/VSCode, тесты, CI/CD, Docker.
  • Полная история свечей — до 1 минуты, без задержек.
  • Стандартные структуры данных — Decimal, datetime с TZ, чёткие enum.

Главные ограничения:

  • Лимиты на запросы — категория «Standard» = 60 заявок в минуту, «Pro» (для крупных клиентов) — до 300. Стрим свечей не считается в этот лимит.
  • Без прямых маркет-мейкер-привилегий — это розничный API, а не proximity к матчинговому движку.
  • Только инструменты, доступные в Тинькофф Инвестиции — российские акции и облигации, фьючерсы Si/RTS/MX, иностранные акции на СПБ (с ограничениями), валюта, ETF/БПИФ.

Получение токена

  1. Открыть https://www.tinkoff.ru/invest/settings → раздел API.
  2. Создать новый токен. Параметры:
    • Только просмотр — для чтения данных и тестирования.
    • Полный доступ — для боевой торговли. Включает право выставления заявок.
  3. Скопировать токен сразу при создании. Повторно его увидеть нельзя.

Песочница использует отдельный токен: создаётся в режиме «Песочница», работает изолированно. Удобно держать оба — один для разработки, другой для боевого деплоя.

# .env-файл проекта (НЕ коммитить!)
TINKOFF_SANDBOX_TOKEN=t.xxxxxxxxxxxx
TINKOFF_PROD_TOKEN=t.yyyyyyyyyyyy
TINKOFF_ACCOUNT_ID=2200000000   # видно в личном кабинете

.env — в .gitignore. Для production-деплоя — секреты через docker secrets, k8s secrets или Vault.

Установка SDK

python -m venv venv && source venv/bin/activate
pip install tinkoff-investments

На момент написания актуальная версия — tinkoff-investments>=0.2.0 (синхронный + асинхронный клиенты, gRPC-генерация). Версии <0.2 устарели.

Минимальный hello-world: список своих счетов.

import os
from tinkoff.invest import Client

TOKEN = os.environ["TINKOFF_PROD_TOKEN"]

with Client(TOKEN) as client:
    accounts = client.users.get_accounts().accounts
    for acc in accounts:
        print(f"{acc.id}  {acc.name}  {acc.access_level}")

Если выдаёт 40003 — Token is invalid — проверьте, нет ли пробелов вокруг токена в .env.

Получение свечей и текущей цены

from datetime import datetime, timedelta, timezone
from tinkoff.invest import CandleInterval, Client

FIGI = "BBG004730N88"   # SBER

with Client(TOKEN) as client:
    # Последние 200 пятиминуток
    end = datetime.now(timezone.utc)
    start = end - timedelta(hours=20)
    candles = list(client.get_all_candles(
        figi=FIGI,
        from_=start,
        to=end,
        interval=CandleInterval.CANDLE_INTERVAL_5_MIN,
    ))
    print(f"loaded {len(candles)} candles")
    last = candles[-1]
    print(f"close: {quotation_to_float(last.close)}")

def quotation_to_float(q):
    """Tinkoff отдаёт цены как Quotation(units, nano). Конвертируем в float."""
    return q.units + q.nano / 1e9

Quotation — особенность T-Invest: цена кодируется как пара (units, nano), чтобы избежать ошибок плавающей точки. На входе ордера тоже отдаём в этом формате.

Стрим свечей (живая лента)

Вместо опроса каждую минуту — gRPC stream, в котором новые свечи прилетают как только формируются на бирже:

import asyncio
from tinkoff.invest import (
    AsyncClient, CandleInstrument, MarketDataRequest,
    SubscribeCandlesRequest, SubscriptionAction, SubscriptionInterval,
)

async def stream_candles(token, figi):
    async with AsyncClient(token) as client:
        async def request_iter():
            yield MarketDataRequest(
                subscribe_candles_request=SubscribeCandlesRequest(
                    subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
                    instruments=[CandleInstrument(
                        figi=figi,
                        interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_FIVE_MINUTES,
                    )],
                )
            )
            while True:
                await asyncio.sleep(60)

        async for msg in client.market_data_stream.market_data_stream(request_iter()):
            if msg.candle is None:
                continue
            print(f"candle close={quotation_to_float(msg.candle.close)}")

asyncio.run(stream_candles(TOKEN, "BBG004730N88"))

Ключевой момент: request_iter() — это бесконечный генератор. Стрим живёт, пока генератор не остановлен. Если стрим разорвётся (сетевой обрыв, перезагрузка серверов) — нужно перезапускать с retry-логикой.

Размер позиции от риска

Базовое правило профессионального трейдинга: фиксированный риск в деньгах, не фиксированный объём в лотах. Считаем размер позиции от стоп-лосса:

def position_size(deposit_rub: float, risk_pct: float,
                  entry_price: float, stop_price: float,
                  lot_size: int) -> int:
    risk_money = deposit_rub * risk_pct / 100
    risk_per_share = abs(entry_price - stop_price)
    if risk_per_share < 0.01:
        return 0
    raw_shares = risk_money / risk_per_share
    # Округляем вниз до целого числа лотов
    return int(raw_shares // lot_size) * lot_size

Депозит 200 000 ₽, риск 0.5% (то есть максимум 1000 ₽ убытка на сделку), вход в SBER по 280, стоп 274, размер лота 10 акций:

risk_money = 1000 ₽
risk_per_share = 6 ₽
raw_shares = 1000 / 6 ≈ 166
shares = 166 // 10 * 10 = 160 (16 лотов)

Эта же формула работает на любом инструменте — фьючерсах, акциях, валюте — если корректно подставить lot_size.

Отправка заявки

Заявка — это PostOrderRequest с тщательно подобранными полями:

from tinkoff.invest import (
    Client, OrderDirection, OrderType, Quotation,
)
from uuid import uuid4

def to_quotation(price: float) -> Quotation:
    units = int(price)
    nano = int(round((price - units) * 1e9))
    return Quotation(units=units, nano=nano)

def submit_order(token, account_id, figi, qty_lots, price, direction):
    with Client(token) as client:
        return client.orders.post_order(
            figi=figi,
            quantity=qty_lots,
            price=to_quotation(price),
            direction=direction,
            account_id=account_id,
            order_type=OrderType.ORDER_TYPE_LIMIT,
            order_id=str(uuid4()),  # идемпотентность: повтор не создаст дубликат
        )

# Открыть лонг 16 лотов SBER по 280.00
resp = submit_order(
    token=TOKEN,
    account_id=ACCOUNT_ID,
    figi="BBG004730N88",
    qty_lots=16,
    price=280.00,
    direction=OrderDirection.ORDER_DIRECTION_BUY,
)
print(resp.order_id, resp.execution_report_status)

order_id через uuid4 обязателен — если повторно вызвать с тем же order_id, T-Invest вернёт ту же заявку (идемпотентность). Это спасает при сетевых обрывах: повтор не задвоит позицию.

execution_report_status:

  • ORDER_EXECUTION_REPORT_STATUS_FILL — исполнена (для маркет-заявок).
  • ORDER_EXECUTION_REPORT_STATUS_NEW — в стакане, ждёт исполнения.
  • ORDER_EXECUTION_REPORT_STATUS_REJECTED — отклонена биржей.

Полный пример: momentum-робот

Соберём всё в один файл. Стратегия — пробой 20-периодного Donchian-канала с ATR-фильтром.

# bot.py
import asyncio
import logging
import os
from collections import deque
from datetime import datetime, timedelta, timezone
from uuid import uuid4

from tinkoff.invest import (
    AsyncClient, CandleInstrument, CandleInterval, MarketDataRequest,
    OrderDirection, OrderType, Quotation, SubscribeCandlesRequest,
    SubscriptionAction, SubscriptionInterval,
)

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)

TOKEN = os.environ["TINKOFF_PROD_TOKEN"]
ACCOUNT_ID = os.environ["TINKOFF_ACCOUNT_ID"]
FIGI = "BBG004730N88"  # SBER
LOT_SIZE = 10
RISK_PCT = 0.5
DEPOSIT = 200_000

CHANNEL_N = 20
ATR_N = 14
ATR_MULT = 0.5

def q_to_f(q: Quotation) -> float:
    return q.units + q.nano / 1e9

def f_to_q(p: float) -> Quotation:
    units = int(p)
    nano = int(round((p - units) * 1e9))
    return Quotation(units=units, nano=nano)

class State:
    def __init__(self):
        self.candles: deque = deque(maxlen=300)
        self.position_qty = 0
        self.entry_price = 0.0
        self.stop_price = 0.0

state = State()

async def load_history(client):
    end = datetime.now(timezone.utc)
    start = end - timedelta(hours=24)
    async for c in client.get_all_candles(
        figi=FIGI, from_=start, to=end,
        interval=CandleInterval.CANDLE_INTERVAL_5_MIN,
    ):
        state.candles.append({
            "o": q_to_f(c.open), "h": q_to_f(c.high),
            "l": q_to_f(c.low),  "c": q_to_f(c.close),
            "v": c.volume, "t": c.time,
        })
    log.info(f"loaded {len(state.candles)} candles")

def atr(candles, n=ATR_N):
    if len(candles) < n + 1:
        return 0.0
    trs = []
    for i in range(1, len(candles)):
        c, p = candles[i], candles[i-1]
        tr = max(c["h"] - c["l"], abs(c["h"] - p["c"]), abs(c["l"] - p["c"]))
        trs.append(tr)
    return sum(trs[-n:]) / n

def channel_top(candles, n=CHANNEL_N):
    if len(candles) < n:
        return float("inf")
    return max(c["h"] for c in list(candles)[-n:])

def position_size(entry: float, stop: float) -> int:
    risk_money = DEPOSIT * RISK_PCT / 100
    risk_per_share = abs(entry - stop)
    if risk_per_share < 0.01:
        return 0
    return int((risk_money / risk_per_share) // LOT_SIZE) * LOT_SIZE

async def open_long(client, price: float, stop: float):
    qty_shares = position_size(price, stop)
    if qty_shares == 0:
        return
    qty_lots = qty_shares // LOT_SIZE
    resp = await client.orders.post_order(
        figi=FIGI,
        quantity=qty_lots,
        price=f_to_q(price),
        direction=OrderDirection.ORDER_DIRECTION_BUY,
        account_id=ACCOUNT_ID,
        order_type=OrderType.ORDER_TYPE_LIMIT,
        order_id=str(uuid4()),
    )
    log.info(f"LONG order: {resp.order_id} status={resp.execution_report_status}")
    state.position_qty = qty_shares
    state.entry_price = price
    state.stop_price = stop

async def close_position(client, price: float, reason: str):
    qty_lots = state.position_qty // LOT_SIZE
    resp = await client.orders.post_order(
        figi=FIGI,
        quantity=qty_lots,
        price=f_to_q(price),
        direction=OrderDirection.ORDER_DIRECTION_SELL,
        account_id=ACCOUNT_ID,
        order_type=OrderType.ORDER_TYPE_LIMIT,
        order_id=str(uuid4()),
    )
    log.info(f"CLOSE ({reason}): {resp.order_id} @ {price}")
    state.position_qty = 0

async def on_new_candle(client, candle: dict):
    state.candles.append(candle)
    if len(state.candles) < CHANNEL_N + ATR_N:
        return

    cur_atr = atr(state.candles)
    top = channel_top(list(state.candles)[:-1])  # без последней
    last_close = candle["c"]

    if state.position_qty == 0:
        if last_close > top + ATR_MULT * cur_atr:
            stop = last_close - 2 * cur_atr
            await open_long(client, last_close, stop)
    else:
        if last_close < state.stop_price:
            await close_position(client, last_close, "stop-loss")
        elif last_close > state.entry_price + 2 * (state.entry_price - state.stop_price):
            # Профит 2R — частичная фиксация / трейлинг
            await close_position(client, last_close, "take-profit-2R")

async def main():
    async with AsyncClient(TOKEN) as client:
        await load_history(client)

        async def request_iter():
            yield MarketDataRequest(
                subscribe_candles_request=SubscribeCandlesRequest(
                    subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
                    instruments=[CandleInstrument(
                        figi=FIGI,
                        interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_FIVE_MINUTES,
                    )],
                )
            )
            while True:
                await asyncio.sleep(60)

        async for msg in client.market_data_stream.market_data_stream(request_iter()):
            if msg.candle is None:
                continue
            c = msg.candle
            if not c.is_complete:
                continue
            await on_new_candle(client, {
                "o": q_to_f(c.open), "h": q_to_f(c.high),
                "l": q_to_f(c.low),  "c": q_to_f(c.close),
                "v": c.volume, "t": c.time,
            })

if __name__ == "__main__":
    asyncio.run(main())

Это рабочий каркас на ~150 строк, в котором сделано главное: загрузка истории, стрим свечей, расчёт ATR/Donchian, размер позиции от риска, идемпотентные заявки, закрытие по стопу и тейку.

Что в нём отсутствует для production-боя:

  • Перезапуск стрима при разрыве (нужен try/except + retry с экспоненциальной паузой).
  • Хранение состояния (на рестарте позиция «теряется» из памяти; решается через Redis или JSON-файл).
  • Управление маржой и проверка свободных средств.
  • Переход стопа в безубыток после +1R.
  • Отслеживание корпоративных событий (дивиденды, делистинг).

Эти штуки — ещё 200–300 строк, но базовый каркас уже годен для тестирования на песочнице.

Деплой в Docker

# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY bot.py .
CMD ["python", "bot.py"]
# docker-compose.yml
services:
  bot:
    build: .
    env_file: .env
    restart: unless-stopped
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "5"

Запуск: docker compose up -d. Логи: docker logs -f <id>. Стоп: docker compose down.

В RU-сегменте подходящие хостеры — Selectel, Reg.ru, Timeweb. На английских хостингах (DigitalOcean, Hetzner) запросы к API Тинькофф проходят, но с потенциально большей задержкой.

Лимиты и квоты

T-Invest API имеет несколько уровней лимитов:

  • Запросы в минуту (для Standard-аккаунта): 60 — чтения / 100 — заявок. После 60 секунд — счётчик сбрасывается.
  • Стрим не считается в лимиты, но количество одновременно открытых стримов ограничено десятком.
  • Лимит на заявку — стандартный биржевой (объём, цена в пределах планки).

Если робот ловит RESOURCE_EXHAUSTED (gRPC код 8) — значит упёрлись в rate-limit. Решение — time.sleep(60) + retry. На продакшене — собственный rate-limiter, который не пропускает > 50 запросов/мин с запасом.

Категория Pro даётся клиентам с активным портфелем от 10 млн ₽. Лимиты — ×3–5 от Standard, доступен расширенный набор данных (стакан полным глубиной).

Что запомнить

  • T-Invest API (gRPC) — современный путь алготрейдинга на MOEX/СПБ через Python без QUIK.
  • Песочница и продакшен — два разных токена; разработка на песочнице, деплой через переменные окружения.
  • Обязательный order_id = uuid4() для идемпотентности — спасает от дублей при сетевых обрывах.
  • Цены передаются через Quotation(units, nano) — конверторы пишутся один раз и используются везде.
  • Базовый момент-робот = ~150 строк; production-готовый = ~400–500 строк с retry, журналом, watchdog.
  • Деплой в Docker с restart: unless-stopped — минимальная необходимая инфраструктура.
  • Лимит Standard — 60 запросов/мин на чтение; стрим не считается.
Tinkoff API Python gRPC торговый робот momentum