Skip to content

Base

DatafeedBase

Bases: ABC

Abstract base class for market data feed implementations.

A data feed is responsible for connecting to an external data source, managing symbol and bar-period subscriptions, and publishing market data events onto the system event bus.

Concrete subclasses implement the mechanics of connectivity, subscription handling, and lifecycle management for a specific data source.

__init__(event_bus)

Initialize the data feed with an event bus.

Parameters:

Name Type Description Default
event_bus EventBus

Event bus used to publish market data events produced by this data feed.

required
Source code in src/onesecondtrader/datafeeds/base.py
def __init__(self, event_bus: messaging.EventBus) -> None:
    """
    Initialize the data feed with an event bus.

    parameters:
        event_bus:
            Event bus used to publish market data events produced by this data feed.
    """
    self._event_bus = event_bus

_publish(event)

Publish a market data event to the event bus.

This method is intended for use by subclasses to forward incoming data from the external source into the internal event-driven system.

Parameters:

Name Type Description Default
event EventBase

Event instance to be published.

required
Source code in src/onesecondtrader/datafeeds/base.py
def _publish(self, event: events.EventBase) -> None:
    """
    Publish a market data event to the event bus.

    This method is intended for use by subclasses to forward incoming data from the external source into the internal event-driven system.

    parameters:
        event:
            Event instance to be published.
    """
    self._event_bus.publish(event)

connect() abstractmethod

Establish a connection to the underlying data source.

Implementations should perform any required setup, authentication, or resource allocation needed before subscriptions can be registered.

Source code in src/onesecondtrader/datafeeds/base.py
@abc.abstractmethod
def connect(self) -> None:
    """
    Establish a connection to the underlying data source.

    Implementations should perform any required setup, authentication, or resource allocation needed before subscriptions can be registered.
    """
    pass

disconnect() abstractmethod

Terminate the connection to the underlying data source.

Implementations should release resources and ensure that no further events are published after disconnection.

Source code in src/onesecondtrader/datafeeds/base.py
@abc.abstractmethod
def disconnect(self) -> None:
    """
    Terminate the connection to the underlying data source.

    Implementations should release resources and ensure that no further events are published after disconnection.
    """
    pass

subscribe(symbols, bar_period) abstractmethod

Subscribe to market data for one or more symbols at a given bar period.

Parameters:

Name Type Description Default
symbols list[str]

Instrument symbols to subscribe to, interpreted according to the conventions of the underlying data source.

required
bar_period BarPeriod

Bar aggregation period specifying the granularity of market data.

required
Source code in src/onesecondtrader/datafeeds/base.py
@abc.abstractmethod
def subscribe(self, symbols: list[str], bar_period: models.BarPeriod) -> None:
    """
    Subscribe to market data for one or more symbols at a given bar period.

    parameters:
        symbols:
            Instrument symbols to subscribe to, interpreted according to the conventions of the underlying data source.
        bar_period:
            Bar aggregation period specifying the granularity of market data.
    """
    pass

unsubscribe(symbols, bar_period) abstractmethod

Cancel existing subscriptions for one or more symbols at a given bar period.

Parameters:

Name Type Description Default
symbols list[str]

Instrument symbols for which subscriptions should be removed.

required
bar_period BarPeriod

Bar aggregation period associated with the subscriptions.

required
Source code in src/onesecondtrader/datafeeds/base.py
@abc.abstractmethod
def unsubscribe(self, symbols: list[str], bar_period: models.BarPeriod) -> None:
    """
    Cancel existing subscriptions for one or more symbols at a given bar period.

    parameters:
        symbols:
            Instrument symbols for which subscriptions should be removed.
        bar_period:
            Bar aggregation period associated with the subscriptions.
    """
    pass

wait_until_complete()

Block until the data feed has completed all pending work.

This method may be overridden by subclasses that perform asynchronous ingestion or background processing. The default implementation does nothing.

Source code in src/onesecondtrader/datafeeds/base.py
def wait_until_complete(self) -> None:
    """
    Block until the data feed has completed all pending work.

    This method may be overridden by subclasses that perform asynchronous ingestion or background processing.
    The default implementation does nothing.
    """
    pass