Skip to content

HTTPPoller Node

The HTTPPoller plugin can be installed using poetry with the following command poetry add aineko-plugins-nodes-http-poller.

API Reference

aineko_plugins.nodes.http_poller.http_poller.HTTPPoller

Bases: AbstractNode

Connects to an endpoint via HTTP or HTTPS and polls.

This node is a wrapper around the requests library.

Example usage in pipeline.yml:

pipeline.yml
pipeline:
  nodes:
    HTTPPoller:
      class: aineko_plugins.nodes.http_poller.HTTPPoller
      outputs:
        - test_http
      node_params:
        url: "https://example.com"
        headers:
          auth: "Bearer {$SECRET_NAME}"
        data: {"Greeting": "Hello, world!"}

Secrets can be injected (from environment) into the url, headers, and data fields by passing a string with the following format: {$SECRET_NAME}. For example, if you have an environment variable named SECRET_NAMEthat contains the value SECRET_VALUE, you can inject it into the url field by passing https://example.com?secret={$SECRET_NAME} as the url. The connector will then replace {$SECRET_NAME} with SECRET_VALUE before connecting to the HTTP endpoint.

Note that the outputs field is required and must contain exactly one output dataset. The output dataset will contain the data returned by the endpoint.

By default, this node will poll the endpoint every 5 seconds and timeout after 10 seconds. If the request fails, it will retry every 5 seconds forever. Status codes in the 200s are considered success codes and no headers, data, auth, params, or json will be attached to the request.

last_poll_time class-attribute instance-attribute

last_poll_time = time()

retry_count class-attribute instance-attribute

retry_count = 0

parse_data

parse_data(raw_data: str) -> Optional[Dict[str, Any]]

Parses raw endpoint response using JSON parser.

Parameters:

Name Type Description Default
raw_data str

The raw unprocessed data returned by the endpoint. This is the result of calling response.text on the response object.

required

Raises:

Type Description
Exception

If the retry count exceeds the max retries.

Source code in nodes/aineko-plugins-nodes-http-poller/aineko_plugins/nodes/http_poller/http_poller.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def parse_data(self, raw_data: str) -> Optional[Dict[str, Any]]:
    """Parses raw endpoint response using JSON parser.

    Args:
        raw_data: The raw unprocessed data returned by the endpoint. This
            is the result of calling `response.text` on the response object.

    Raises:
        Exception: If the retry count exceeds the max retries.
    """
    try:
        data = json.loads(raw_data)
        if self.http_poller_params.metadata is not None:
            data = {
                "metadata": self.http_poller_params.metadata,
                "data": data,
            }
        return data
    except json.decoder.JSONDecodeError as err:
        if self.retry_count < self.http_poller_params.max_retries:
            self.retry_count += 1
            self.log(
                f"Failed to parse data: {raw_data}. "
                f"The following error occurred: {err} "
                f"Will retry in {self.http_poller_params.retry_sleep} "
                "seconds...",
                level="error",
            )
            time.sleep(self.http_poller_params.retry_sleep)
            return None
        else:
            raise Exception(  # pylint: disable=broad-exception-raised
                "Retry count exceeded max retries "
                f"({self.http_poller_params.max_retries}). "
                f"Failed to parse data: {raw_data}. "
                f"The following error occurred: {err}"
            ) from err

poll_endpoint

poll_endpoint() -> Optional[str]

Polls the endpoint for data.

Raises:

Type Description
Exception

If the request fails.

Source code in nodes/aineko-plugins-nodes-http-poller/aineko_plugins/nodes/http_poller/http_poller.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def poll_endpoint(self) -> Optional[str]:
    """Polls the endpoint for data.

    Raises:
        Exception: If the request fails.
    """
    try:
        # Poll the endpoint
        response = self.session.get(
            self.http_poller_params.url,
            timeout=self.http_poller_params.timeout,
            headers=self.http_poller_params.headers,
            data=self.http_poller_params.data,
            params=self.http_poller_params.params,
            json=self.http_poller_params.json_,
            auth=self.http_poller_params.auth,
        )
        # Check if the request was successful
        if (
            response.status_code
            not in self.http_poller_params.success_codes
        ):
            # pylint: disable=broad-exception-raised
            raise Exception(
                f"Request to url {self.http_poller_params.url} "
                "failed with status code: "
                f"{response.status_code}"
            )
        return response.text
    except Exception as err:  # pylint: disable=broad-except
        # If request fails, log the error and sleep
        self.log(
            "Request failed. "
            f"Sleeping for {self.http_poller_params.retry_sleep} "
            f"seconds. Error: {err}",
            level="error",
        )
        time.sleep(self.http_poller_params.retry_sleep)
        self.retry_count += 1
        # Reset the session
        self.log(
            "Creating new session to HTTP endpoint "
            f"{self.http_poller_params.url}."
        )
        self.session = requests.Session()
        return None

aineko_plugins.nodes.http_poller.http_poller.ParamsHTTPPoller

ParamsHTTPPoller(**data: Any)

Bases: BaseModel

Parameters for the HTTPPoller node.

Attributes:

Name Type Description
timeout int

The number of seconds to wait for the endpoint to respond. Defaults to 10.

url str

The URL to connect to.

headers Optional[Dict[str, Any]]

A dictionary of headers to send to the endpoint. Defaults to None.

data Optional[Dict[str, Any]]

A dictionary of data to send to the endpoint. Defaults to None.

params Optional[Union[Dict[str, Any], List[tuple], bytes]]

A dictionary, list of tuples, bytes, or file-like object to send in the body of the request. Defaults to None.

json_ Optional[Dict[str, Any]]

A JSON serializable Python object to send in the body of the request. Defaults to None.

auth Optional[Tuple[str, str]]

A tuple of username and password to use for Basic authentication. Defaults to None.

poll_interval float

The number of seconds to wait between polls. Defaults to 5.0.

max_retries int

The maximum number of times to retry connecting to the endpoint. Defaults to -1 (retry forever).

metadata Optional[Dict[str, Any]]

A dictionary of metadata to attach to outgoing messages. Defaults to None.

retry_sleep float

The number of seconds to wait between retries. Defaults to 5.0.

success_codes List[int]

A list of status codes that indicate success. Defaults to [200, 201, 202, 203, 204, 205, 206, 207, 208, 226].

Raises:

Type Description
ValueError

If the url is not a valid HTTP or HTTPS URL.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Source code in pydantic/main.py
161
162
163
164
165
166
167
168
169
170
171
def __init__(self, /, **data: Any) -> None:  # type: ignore
    """Create a new model by parsing and validating input data from keyword arguments.

    Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be
    validated to form a valid model.

    `self` is explicitly positional-only to allow `self` as a field name.
    """
    # `__tracebackhide__` tells pytest and some other tools to omit this function from tracebacks
    __tracebackhide__ = True
    self.__pydantic_validator__.validate_python(data, self_instance=self)

auth class-attribute instance-attribute

auth: Optional[Tuple[str, str]] = None

data class-attribute instance-attribute

data: Optional[Dict[str, Any]] = None

headers class-attribute instance-attribute

headers: Optional[Dict[str, Any]] = None

json_ class-attribute instance-attribute

json_: Optional[Dict[str, Any]] = None

max_retries class-attribute instance-attribute

max_retries: int = -1

metadata class-attribute instance-attribute

metadata: Optional[Dict[str, Any]] = None

params class-attribute instance-attribute

params: Optional[
    Union[Dict[str, Any], List[tuple], bytes]
] = None

poll_interval class-attribute instance-attribute

poll_interval: float = 5.0

retry_sleep class-attribute instance-attribute

retry_sleep: float = 5

success_codes class-attribute instance-attribute

success_codes: List[int] = [
    200,
    201,
    202,
    203,
    204,
    205,
    206,
    207,
    208,
    226,
]

timeout class-attribute instance-attribute

timeout: int = 10

url instance-attribute

url: str

supported_url classmethod

supported_url(url: str) -> str

Validates that the url is a valid HTTP or HTTPS URL.

Source code in nodes/aineko-plugins-nodes-http-poller/aineko_plugins/nodes/http_poller/http_poller.py
69
70
71
72
73
74
75
76
77
78
79
@field_validator("url")
@classmethod
def supported_url(cls, url: str) -> str:
    """Validates that the url is a valid HTTP or HTTPS URL."""
    if not (url.startswith("https://") or url.startswith("http://")):
        raise ValueError(
            "Invalid url provided to HTTPPoller. "
            'Expected url to start with "https://" or "http://". '
            f"Provided url was: {url}"
        )
    return url