Skip to content

WebSocketClient Node

The WebSocketClient plugin can be installed using poetry with the following command poetry add aineko-plugins-nodes-websocket-client.

API Reference

aineko_plugins.nodes.websocket_client.WebSocketClient

Bases: AbstractNode

Node for ingesting data from a WebSocket.

This node is a wrapper around the websocket-client library.

node_params should be a dictionary with the following keys:

url: The WebSocket URL to connect to
header (optional): A dictionary of headers to send to the WebSocket.
    Defaults to None.
init_messages (optional): A list of messages to send to the WebSocket
    upon connection. Defaults to [].
metadata (optional): A dictionary of metadata to attach to outgoing
    messages. Defaults to None.
max_retries (optional): The maximum number of times to retry
    connecting to the WebSocket. Defaults to -1 (retry forever).
retry_sleep (optional): The number of seconds to wait between retries.
    Defaults to 5.

Secrets can be injected (from environment) into the url, header, and init_messages fields by passing a string with the following format: {$SECRET_NAME}. For example, if you have a secret named SECRET_NAME with value SECRET_VALUE, you can inject it into the url field by passing wss://example.com?secret={$SECRET_NAME} as the url. The connector will then replace {$SECRET_NAME} with SECRET_VALUE before connecting to the WebSocket.

Example usage in pipeline.yml:

pipeline.yml
pipeline:
  nodes:
    WebSocketClient:
      class: aineko_plugins.nodes.websocket_client.WebSocketClient
      outputs:
        - test_websocket
      node_params:
        url: "wss://example.com"
        header:
          auth: "Bearer {$SECRET_NAME}"
        init_messages:
            - {"Greeting": "Hello, world!"}

retry_count class-attribute instance-attribute

retry_count = 0

create_subscription

create_subscription() -> None

Creates a subscription on the websocket.

Raises:

Type Description
ValueError

If the retry count exceeds the max retries.

Source code in nodes/aineko-plugins-nodes-websocket-client/aineko_plugins/nodes/websocket_client/websocket_client.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def create_subscription(self) -> None:
    """Creates a subscription on the websocket.

    Raises:
        ValueError: If the retry count exceeds the max retries.
    """
    try:
        self.log(f"Creating subscription to {self.ws_params.url}...")
        self.ws.connect(
            url=self.ws_params.url, header=self.ws_params.header
        )  # type: ignore

        if self.ws_params.init_messages:
            # Send initialization messages
            for init_msg in self.ws_params.init_messages:
                self.ws.send(json.dumps(init_msg))
                message = self.ws.recv()
                self.log(
                    "Sent initialization message to "
                    f"{self.ws_params.url}. "
                    f"Acknowledged initialization message: {message!r}"
                )

        ack_message = self.ws.recv()
        self.log(
            f"Subscription to {self.ws_params.url} created. "
            f"Acknowledged subscription message: {ack_message!r}"
        )

        self.retry_count = 0
    except Exception as err:  # pylint: disable=broad-except
        if self.retry_count < self.ws_params.max_retries:
            self.log(
                "Encountered error when attempting to connect to "
                f"{self.ws_params.url}. Will retry in "
                f"{self.ws_params.retry_sleep} seconds"
            )
            self.retry_count += 1
            time.sleep(self.ws_params.retry_sleep)
            self.create_subscription()
        else:
            raise ValueError(
                "Retry count exceeded max retries. "
                "Failed to create subscription to "
                f"{self.ws_params.url}. "
                f"The following error occurred: {err}"
            ) from err

aineko_plugins.nodes.websocket_client.websocket_client.ParamsWebSocketClient

ParamsWebSocketClient(**data: Any)

Bases: BaseModel

Connector params for WebSocket model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Source code in pydantic/main.py
161
162
163
164
165
166
167
168
169
170
171
def __init__(self, /, **data: Any) -> None:  # type: ignore
    """Create a new model by parsing and validating input data from keyword arguments.

    Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be
    validated to form a valid model.

    `self` is explicitly positional-only to allow `self` as a field name.
    """
    # `__tracebackhide__` tells pytest and some other tools to omit this function from tracebacks
    __tracebackhide__ = True
    self.__pydantic_validator__.validate_python(data, self_instance=self)

header class-attribute instance-attribute

header: Optional[Dict[str, str]] = None

init_messages class-attribute instance-attribute

init_messages: List[Any] = []

max_retries class-attribute instance-attribute

max_retries: int = -1

metadata class-attribute instance-attribute

metadata: Optional[Dict[str, Any]] = None

retry_sleep class-attribute instance-attribute

retry_sleep: float = 5

url instance-attribute

url: str

supported_url classmethod

supported_url(url: str) -> str

Validates that the url is a valid WebSocket URL.

Source code in nodes/aineko-plugins-nodes-websocket-client/aineko_plugins/nodes/websocket_client/websocket_client.py
24
25
26
27
28
29
30
31
32
33
34
@field_validator("url")
@classmethod
def supported_url(cls, url: str) -> str:
    """Validates that the url is a valid WebSocket URL."""
    if not (url.startswith("wss://") or url.startswith("ws://")):
        raise ValueError(
            "Invalid url provided to WebSocket params. "
            'Expected url to start with "wss://" or "ws://". '
            f"Provided url was: {url}"
        )
    return url