Tinkoff Invest API на Python: туториал от регистрации токена до боевого робота
Tinkoff Invest API — это gRPC-доступ к торговому счёту: можно получать рыночные данные, стримить свечи, открывать позиции, управлять портфелем — всё из Python без QUIK и без терминала на компьютере. Разбираем по шагам: как получить токен, поставить SDK, прочитать свечи, выставить заявку, написать момент-робота с управлением риском, развернуть в Docker и не упереться в лимиты API.
Что такое Tinkoff Invest API
Tinkoff Invest API (T-Invest API) — это gRPC-интерфейс к брокерским сервисам Тинькофф: получение рыночных данных, исторических свечей, тикового стрима, отправка заявок, управление портфелем. В отличие от QUIK, здесь нет терминала на компьютере — код работает где угодно (Docker на VPS, AWS Lambda, домашний сервер).
API двух режимов:
- Песочница (sandbox) — изолированный счёт с виртуальными деньгами. Идеально для разработки и тестирования. Все методы работают, но «реального» исполнения нет.
- Продакшен (production) — боевой счёт. Для торговли нужен открытый брокерский счёт в Тинькофф (физлицо или ИП).
Преимущества перед QUIK Lua:
- Любой язык: Python (
tinkoff-investments), Go, Java, .NET, Node.js. Активный community-SDK на Python развивается быстрее остальных. - Облачный деплой — без зависимости от вашего компьютера.
- Удобная разработка: PyCharm/VSCode, тесты, CI/CD, Docker.
- Полная история свечей — до 1 минуты, без задержек.
- Стандартные структуры данных — Decimal, datetime с TZ, чёткие enum.
Главные ограничения:
- Лимиты на запросы — категория «Standard» = 60 заявок в минуту, «Pro» (для крупных клиентов) — до 300. Стрим свечей не считается в этот лимит.
- Без прямых маркет-мейкер-привилегий — это розничный API, а не proximity к матчинговому движку.
- Только инструменты, доступные в Тинькофф Инвестиции — российские акции и облигации, фьючерсы Si/RTS/MX, иностранные акции на СПБ (с ограничениями), валюта, ETF/БПИФ.
Получение токена
- Открыть https://www.tinkoff.ru/invest/settings → раздел API.
- Создать новый токен. Параметры:
- Только просмотр — для чтения данных и тестирования.
- Полный доступ — для боевой торговли. Включает право выставления заявок.
- Скопировать токен сразу при создании. Повторно его увидеть нельзя.
Песочница использует отдельный токен: создаётся в режиме «Песочница», работает изолированно. Удобно держать оба — один для разработки, другой для боевого деплоя.
# .env-файл проекта (НЕ коммитить!)
TINKOFF_SANDBOX_TOKEN=t.xxxxxxxxxxxx
TINKOFF_PROD_TOKEN=t.yyyyyyyyyyyy
TINKOFF_ACCOUNT_ID=2200000000 # видно в личном кабинете
.env — в .gitignore. Для production-деплоя — секреты через docker secrets, k8s secrets или Vault.
Установка SDK
python -m venv venv && source venv/bin/activate
pip install tinkoff-investments
На момент написания актуальная версия — tinkoff-investments>=0.2.0 (синхронный + асинхронный клиенты, gRPC-генерация). Версии <0.2 устарели.
Минимальный hello-world: список своих счетов.
import os
from tinkoff.invest import Client
TOKEN = os.environ["TINKOFF_PROD_TOKEN"]
with Client(TOKEN) as client:
accounts = client.users.get_accounts().accounts
for acc in accounts:
print(f"{acc.id} {acc.name} {acc.access_level}")
Если выдаёт 40003 — Token is invalid — проверьте, нет ли пробелов вокруг токена в .env.
Получение свечей и текущей цены
from datetime import datetime, timedelta, timezone
from tinkoff.invest import CandleInterval, Client
FIGI = "BBG004730N88" # SBER
with Client(TOKEN) as client:
# Последние 200 пятиминуток
end = datetime.now(timezone.utc)
start = end - timedelta(hours=20)
candles = list(client.get_all_candles(
figi=FIGI,
from_=start,
to=end,
interval=CandleInterval.CANDLE_INTERVAL_5_MIN,
))
print(f"loaded {len(candles)} candles")
last = candles[-1]
print(f"close: {quotation_to_float(last.close)}")
def quotation_to_float(q):
"""Tinkoff отдаёт цены как Quotation(units, nano). Конвертируем в float."""
return q.units + q.nano / 1e9
Quotation — особенность T-Invest: цена кодируется как пара (units, nano), чтобы избежать ошибок плавающей точки. На входе ордера тоже отдаём в этом формате.
Стрим свечей (живая лента)
Вместо опроса каждую минуту — gRPC stream, в котором новые свечи прилетают как только формируются на бирже:
import asyncio
from tinkoff.invest import (
AsyncClient, CandleInstrument, MarketDataRequest,
SubscribeCandlesRequest, SubscriptionAction, SubscriptionInterval,
)
async def stream_candles(token, figi):
async with AsyncClient(token) as client:
async def request_iter():
yield MarketDataRequest(
subscribe_candles_request=SubscribeCandlesRequest(
subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
instruments=[CandleInstrument(
figi=figi,
interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_FIVE_MINUTES,
)],
)
)
while True:
await asyncio.sleep(60)
async for msg in client.market_data_stream.market_data_stream(request_iter()):
if msg.candle is None:
continue
print(f"candle close={quotation_to_float(msg.candle.close)}")
asyncio.run(stream_candles(TOKEN, "BBG004730N88"))
Ключевой момент: request_iter() — это бесконечный генератор. Стрим живёт, пока генератор не остановлен. Если стрим разорвётся (сетевой обрыв, перезагрузка серверов) — нужно перезапускать с retry-логикой.
Размер позиции от риска
Базовое правило профессионального трейдинга: фиксированный риск в деньгах, не фиксированный объём в лотах. Считаем размер позиции от стоп-лосса:
def position_size(deposit_rub: float, risk_pct: float,
entry_price: float, stop_price: float,
lot_size: int) -> int:
risk_money = deposit_rub * risk_pct / 100
risk_per_share = abs(entry_price - stop_price)
if risk_per_share < 0.01:
return 0
raw_shares = risk_money / risk_per_share
# Округляем вниз до целого числа лотов
return int(raw_shares // lot_size) * lot_size
Депозит 200 000 ₽, риск 0.5% (то есть максимум 1000 ₽ убытка на сделку), вход в SBER по 280, стоп 274, размер лота 10 акций:
risk_money = 1000 ₽
risk_per_share = 6 ₽
raw_shares = 1000 / 6 ≈ 166
shares = 166 // 10 * 10 = 160 (16 лотов)
Эта же формула работает на любом инструменте — фьючерсах, акциях, валюте — если корректно подставить lot_size.
Отправка заявки
Заявка — это PostOrderRequest с тщательно подобранными полями:
from tinkoff.invest import (
Client, OrderDirection, OrderType, Quotation,
)
from uuid import uuid4
def to_quotation(price: float) -> Quotation:
units = int(price)
nano = int(round((price - units) * 1e9))
return Quotation(units=units, nano=nano)
def submit_order(token, account_id, figi, qty_lots, price, direction):
with Client(token) as client:
return client.orders.post_order(
figi=figi,
quantity=qty_lots,
price=to_quotation(price),
direction=direction,
account_id=account_id,
order_type=OrderType.ORDER_TYPE_LIMIT,
order_id=str(uuid4()), # идемпотентность: повтор не создаст дубликат
)
# Открыть лонг 16 лотов SBER по 280.00
resp = submit_order(
token=TOKEN,
account_id=ACCOUNT_ID,
figi="BBG004730N88",
qty_lots=16,
price=280.00,
direction=OrderDirection.ORDER_DIRECTION_BUY,
)
print(resp.order_id, resp.execution_report_status)
order_id через uuid4 обязателен — если повторно вызвать с тем же order_id, T-Invest вернёт ту же заявку (идемпотентность). Это спасает при сетевых обрывах: повтор не задвоит позицию.
execution_report_status:
ORDER_EXECUTION_REPORT_STATUS_FILL— исполнена (для маркет-заявок).ORDER_EXECUTION_REPORT_STATUS_NEW— в стакане, ждёт исполнения.ORDER_EXECUTION_REPORT_STATUS_REJECTED— отклонена биржей.
Полный пример: momentum-робот
Соберём всё в один файл. Стратегия — пробой 20-периодного Donchian-канала с ATR-фильтром.
# bot.py
import asyncio
import logging
import os
from collections import deque
from datetime import datetime, timedelta, timezone
from uuid import uuid4
from tinkoff.invest import (
AsyncClient, CandleInstrument, CandleInterval, MarketDataRequest,
OrderDirection, OrderType, Quotation, SubscribeCandlesRequest,
SubscriptionAction, SubscriptionInterval,
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
TOKEN = os.environ["TINKOFF_PROD_TOKEN"]
ACCOUNT_ID = os.environ["TINKOFF_ACCOUNT_ID"]
FIGI = "BBG004730N88" # SBER
LOT_SIZE = 10
RISK_PCT = 0.5
DEPOSIT = 200_000
CHANNEL_N = 20
ATR_N = 14
ATR_MULT = 0.5
def q_to_f(q: Quotation) -> float:
return q.units + q.nano / 1e9
def f_to_q(p: float) -> Quotation:
units = int(p)
nano = int(round((p - units) * 1e9))
return Quotation(units=units, nano=nano)
class State:
def __init__(self):
self.candles: deque = deque(maxlen=300)
self.position_qty = 0
self.entry_price = 0.0
self.stop_price = 0.0
state = State()
async def load_history(client):
end = datetime.now(timezone.utc)
start = end - timedelta(hours=24)
async for c in client.get_all_candles(
figi=FIGI, from_=start, to=end,
interval=CandleInterval.CANDLE_INTERVAL_5_MIN,
):
state.candles.append({
"o": q_to_f(c.open), "h": q_to_f(c.high),
"l": q_to_f(c.low), "c": q_to_f(c.close),
"v": c.volume, "t": c.time,
})
log.info(f"loaded {len(state.candles)} candles")
def atr(candles, n=ATR_N):
if len(candles) < n + 1:
return 0.0
trs = []
for i in range(1, len(candles)):
c, p = candles[i], candles[i-1]
tr = max(c["h"] - c["l"], abs(c["h"] - p["c"]), abs(c["l"] - p["c"]))
trs.append(tr)
return sum(trs[-n:]) / n
def channel_top(candles, n=CHANNEL_N):
if len(candles) < n:
return float("inf")
return max(c["h"] for c in list(candles)[-n:])
def position_size(entry: float, stop: float) -> int:
risk_money = DEPOSIT * RISK_PCT / 100
risk_per_share = abs(entry - stop)
if risk_per_share < 0.01:
return 0
return int((risk_money / risk_per_share) // LOT_SIZE) * LOT_SIZE
async def open_long(client, price: float, stop: float):
qty_shares = position_size(price, stop)
if qty_shares == 0:
return
qty_lots = qty_shares // LOT_SIZE
resp = await client.orders.post_order(
figi=FIGI,
quantity=qty_lots,
price=f_to_q(price),
direction=OrderDirection.ORDER_DIRECTION_BUY,
account_id=ACCOUNT_ID,
order_type=OrderType.ORDER_TYPE_LIMIT,
order_id=str(uuid4()),
)
log.info(f"LONG order: {resp.order_id} status={resp.execution_report_status}")
state.position_qty = qty_shares
state.entry_price = price
state.stop_price = stop
async def close_position(client, price: float, reason: str):
qty_lots = state.position_qty // LOT_SIZE
resp = await client.orders.post_order(
figi=FIGI,
quantity=qty_lots,
price=f_to_q(price),
direction=OrderDirection.ORDER_DIRECTION_SELL,
account_id=ACCOUNT_ID,
order_type=OrderType.ORDER_TYPE_LIMIT,
order_id=str(uuid4()),
)
log.info(f"CLOSE ({reason}): {resp.order_id} @ {price}")
state.position_qty = 0
async def on_new_candle(client, candle: dict):
state.candles.append(candle)
if len(state.candles) < CHANNEL_N + ATR_N:
return
cur_atr = atr(state.candles)
top = channel_top(list(state.candles)[:-1]) # без последней
last_close = candle["c"]
if state.position_qty == 0:
if last_close > top + ATR_MULT * cur_atr:
stop = last_close - 2 * cur_atr
await open_long(client, last_close, stop)
else:
if last_close < state.stop_price:
await close_position(client, last_close, "stop-loss")
elif last_close > state.entry_price + 2 * (state.entry_price - state.stop_price):
# Профит 2R — частичная фиксация / трейлинг
await close_position(client, last_close, "take-profit-2R")
async def main():
async with AsyncClient(TOKEN) as client:
await load_history(client)
async def request_iter():
yield MarketDataRequest(
subscribe_candles_request=SubscribeCandlesRequest(
subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
instruments=[CandleInstrument(
figi=FIGI,
interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_FIVE_MINUTES,
)],
)
)
while True:
await asyncio.sleep(60)
async for msg in client.market_data_stream.market_data_stream(request_iter()):
if msg.candle is None:
continue
c = msg.candle
if not c.is_complete:
continue
await on_new_candle(client, {
"o": q_to_f(c.open), "h": q_to_f(c.high),
"l": q_to_f(c.low), "c": q_to_f(c.close),
"v": c.volume, "t": c.time,
})
if __name__ == "__main__":
asyncio.run(main())
Это рабочий каркас на ~150 строк, в котором сделано главное: загрузка истории, стрим свечей, расчёт ATR/Donchian, размер позиции от риска, идемпотентные заявки, закрытие по стопу и тейку.
Что в нём отсутствует для production-боя:
- Перезапуск стрима при разрыве (нужен
try/except+ retry с экспоненциальной паузой). - Хранение состояния (на рестарте позиция «теряется» из памяти; решается через Redis или JSON-файл).
- Управление маржой и проверка свободных средств.
- Переход стопа в безубыток после +1R.
- Отслеживание корпоративных событий (дивиденды, делистинг).
Эти штуки — ещё 200–300 строк, но базовый каркас уже годен для тестирования на песочнице.
Деплой в Docker
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY bot.py .
CMD ["python", "bot.py"]
# docker-compose.yml
services:
bot:
build: .
env_file: .env
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "5"
Запуск: docker compose up -d. Логи: docker logs -f <id>. Стоп: docker compose down.
В RU-сегменте подходящие хостеры — Selectel, Reg.ru, Timeweb. На английских хостингах (DigitalOcean, Hetzner) запросы к API Тинькофф проходят, но с потенциально большей задержкой.
Лимиты и квоты
T-Invest API имеет несколько уровней лимитов:
- Запросы в минуту (для Standard-аккаунта): 60 — чтения / 100 — заявок. После 60 секунд — счётчик сбрасывается.
- Стрим не считается в лимиты, но количество одновременно открытых стримов ограничено десятком.
- Лимит на заявку — стандартный биржевой (объём, цена в пределах планки).
Если робот ловит RESOURCE_EXHAUSTED (gRPC код 8) — значит упёрлись в rate-limit. Решение — time.sleep(60) + retry. На продакшене — собственный rate-limiter, который не пропускает > 50 запросов/мин с запасом.
Категория Pro даётся клиентам с активным портфелем от 10 млн ₽. Лимиты — ×3–5 от Standard, доступен расширенный набор данных (стакан полным глубиной).
Что запомнить
- T-Invest API (gRPC) — современный путь алготрейдинга на MOEX/СПБ через Python без QUIK.
- Песочница и продакшен — два разных токена; разработка на песочнице, деплой через переменные окружения.
- Обязательный
order_id = uuid4()для идемпотентности — спасает от дублей при сетевых обрывах. - Цены передаются через
Quotation(units, nano)— конверторы пишутся один раз и используются везде. - Базовый момент-робот = ~150 строк; production-готовый = ~400–500 строк с retry, журналом, watchdog.
- Деплой в Docker с
restart: unless-stopped— минимальная необходимая инфраструктура. - Лимит Standard — 60 запросов/мин на чтение; стрим не считается.