Source code for lime_trader.clients.account_feed_client
import threading
import time
from logging import Logger
from orjson import orjson
from websocket import WebSocketApp
from lime_trader.converters.cattr_converter import converter
from lime_trader.models.accounts import AccountFeedAction, AccountFeedActionType
[docs]
class AccountFeedClient(threading.Thread):
"""
Client used to subscribe to account feeds.
Should not be instantiated by end-user.
"""
def __init__(self, websocket_app: WebSocketApp, logger: Logger):
super().__init__()
self._websocket_app = websocket_app
self._logger = logger
self._available = False
def run(self):
self._logger.info("Starting account feed thread")
self._websocket_app.run_forever()
def stop(self):
self._logger.info("Stopping account feed thread")
self._websocket_app.close()
def on_account_streaming_feed_open(self, web_socket_app: WebSocketApp):
self._logger.info("Account feed client connection opened")
self._available = True
def on_account_streaming_feed_close(self, web_socket_app: WebSocketApp):
self._logger.info("Account feed client connection closed")
self._available = False
def _send_action(self, action: AccountFeedAction):
self._logger.info(f"Account feed - sending action {action}")
while not self._available or not self._websocket_app.sock:
time.sleep(1)
self._websocket_app.send(data=orjson.dumps(converter.dump_to_dict(action)))
[docs]
def subscribe_balance(self, account_number: str) -> None:
"""
Subscribe to balance changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.SUBSCRIBE_BALANCE, account=account_number)
self._send_action(action=action)
[docs]
def subscribe_positions(self, account_number: str) -> None:
"""
Subscribe to position changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.SUBSCRIBE_POSITIONS, account=account_number)
self._send_action(action=action)
[docs]
def subscribe_orders(self, account_number: str) -> None:
"""
Subscribe to order changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.SUBSCRIBE_ORDERS, account=account_number)
self._send_action(action=action)
[docs]
def subscribe_trades(self, account_number: str) -> None:
"""
Subscribe to trades changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.SUBSCRIBE_TRADES, account=account_number)
self._send_action(action=action)
[docs]
def unsubscribe_balance(self, account_number: str) -> None:
"""
Unsubscribe from balance changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.UNSUBSCRIBE_BALANCE, account=account_number)
self._send_action(action=action)
[docs]
def unsubscribe_positions(self, account_number: str) -> None:
"""
Unsubscribe from position changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.UNSUBSCRIBE_POSITIONS, account=account_number)
self._send_action(action=action)
[docs]
def unsubscribe_orders(self, account_number: str) -> None:
"""
Unsubscribe from order changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.UNSUBSCRIBE_ORDERS, account=account_number)
self._send_action(action=action)
[docs]
def unsubscribe_trades(self, account_number: str) -> None:
"""
Unsubscribe from trades changes for specific account
Args:
account_number: Account number
"""
action = AccountFeedAction(action=AccountFeedActionType.UNSUBSCRIBE_TRADES, account=account_number)
self._send_action(action=action)