vendor_connectors.base
Base class for all vendor connectors.
This module provides VendorConnectorBase - the foundation for ALL connectors in this library. It extends DirectedInputsClass and provides:
- Credential loading from env vars, stdin, or direct inputs
- HTTP client with retries and rate limiting
- MCP server scaffolding
- LangChain tool registration helpers
ALL connectors should extend this class instead of DirectedInputsClass directly.
Usage: from vendor_connectors.base import VendorConnectorBase
class MyConnector(VendorConnectorBase): API_KEY_ENV = "MY_API_KEY" # Required env var name BASE_URL = "https://api.example.com"
def __init__(self, api_key: str | None = None, **kwargs): super().__init__(**kwargs) self._api_key = api_key or self.get_input(self.API_KEY_ENV, required=True)
def my_operation(self) -> dict: return self.request("GET", "/endpoint")Module Contents
Section titled “Module Contents”Classes
Section titled “Classes”VendorConnectorBase | Base class for all vendor connectors. |
|---|
exception vendor_connectors.base.RateLimitError
Section titled “exception vendor_connectors.base.RateLimitError”Bases: Exception
Raised when API rate limit is hit - triggers retry.
Initialization
Section titled “Initialization”Initialize self. See help(type(self)) for accurate signature.
exception vendor_connectors.base.ConnectorAPIError(message: str, status_code: int | None = None)
Section titled “exception vendor_connectors.base.ConnectorAPIError(message: str, status_code: int | None = None)”Bases: Exception
Raised when API returns an error.
Initialization
Section titled “Initialization”Initialize self. See help(type(self)) for accurate signature.
class vendor_connectors.base.VendorConnectorBase(api_key: str | None = None, base_url: str | None = None, timeout: float | None = None, logger: lifecyclelogging.Logging | None = None, **kwargs)
Section titled “class vendor_connectors.base.VendorConnectorBase(api_key: str | None = None, base_url: str | None = None, timeout: float | None = None, logger: lifecyclelogging.Logging | None = None, **kwargs)”Bases: directed_inputs_class.DirectedInputsClass, abc.ABC
Base class for all vendor connectors.
Provides:
- DirectedInputsClass for credential loading (env, stdin, direct)
- HTTP client with connection pooling
- Automatic retries with exponential backoff
- Rate limiting
- MCP tool registration scaffolding
- LangChain tool helpers
Class Attributes: BASE_URL: API base URL (required for HTTP connectors) API_KEY_ENV: Environment variable name for API key TIMEOUT: HTTP timeout in seconds (default 300) MIN_REQUEST_INTERVAL: Minimum seconds between requests (rate limiting) MAX_RETRIES: Maximum retry attempts (default 5)
Instance Attributes: logger: Logger instance _client: HTTP client (lazy-initialized) _tools: Registered LangChain tools
Initialization
Section titled “Initialization”Initialize the connector.
Args: api_key: API key (overrides environment variable) base_url: Base URL (overrides class default) timeout: HTTP timeout in seconds logger: Logger instance **kwargs: Passed to DirectedInputsClass
BASE_URL : ClassVar[str] =
Section titled “BASE_URL : ClassVar[str] = ”API_KEY_ENV : ClassVar[str] =
Section titled “API_KEY_ENV : ClassVar[str] = ”300.0
MIN_REQUEST_INTERVAL : ClassVar[float]
Section titled “MIN_REQUEST_INTERVAL : ClassVar[float]”0.0
MAX_RETRIES : ClassVar[int]
Section titled “MAX_RETRIES : ClassVar[int]”5
_rate_limit_locks : ClassVar[dict[type, threading.Lock]]
Section titled “_rate_limit_locks : ClassVar[dict[type, threading.Lock]]”None
_last_request_times : ClassVar[dict[type, float]]
Section titled “_last_request_times : ClassVar[dict[type, float]]”None
property api_key : str
Section titled “property api_key : str”Get API key, raising if not set.
property client : httpx.Client
Section titled “property client : httpx.Client”Get or create HTTP client with connection pooling.
close() → None
Section titled “close() → None”Close HTTP client and release resources.
_rate_limit() → None
Section titled “_rate_limit() → None”Apply rate limiting between requests.
Rate limiting is per-connector-type (not global), so different connector types (e.g., MeshyConnector vs AWSConnector) don’t interfere with each other.
_build_headers() → dict[str, str]
Section titled “_build_headers() → dict[str, str]”Build request headers. Override in subclasses for custom auth.
_build_url(endpoint: str) → str
Section titled “_build_url(endpoint: str) → str”Build full URL from endpoint.
request(method: str, endpoint: str, , headers: dict[str, str] | None = None, **kwargs) → httpx.Response
Section titled “request(method: str, endpoint: str, , headers: dict[str, str] | None = None, **kwargs) → httpx.Response”Make HTTP request with retries and rate limiting.
Args: method: HTTP method (GET, POST, PUT, DELETE, etc.) endpoint: API endpoint (relative to BASE_URL) headers: Additional headers (merged with defaults) **kwargs: Passed to httpx.request (json, params, data, etc.)
Returns: httpx.Response
Raises: RateLimitError: On 429 (will retry automatically) ConnectorAPIError: On other API errors
get(endpoint: str, **kwargs) → httpx.Response
Section titled “get(endpoint: str, **kwargs) → httpx.Response”HTTP GET request.
post(endpoint: str, **kwargs) → httpx.Response
Section titled “post(endpoint: str, **kwargs) → httpx.Response”HTTP POST request.
put(endpoint: str, **kwargs) → httpx.Response
Section titled “put(endpoint: str, **kwargs) → httpx.Response”HTTP PUT request.
delete(endpoint: str, **kwargs) → httpx.Response
Section titled “delete(endpoint: str, **kwargs) → httpx.Response”HTTP DELETE request.
patch(endpoint: str, **kwargs) → httpx.Response
Section titled “patch(endpoint: str, **kwargs) → httpx.Response”HTTP PATCH request.
download(url: str, output_path: str) → int
Section titled “download(url: str, output_path: str) → int”Download file from URL.
Args: url: URL to download from output_path: Local path to save to
Returns: File size in bytes
register_tool(func: collections.abc.Callable, name: str | None = None, description: str | None = None, schema: type[pydantic.BaseModel] | None = None) → None
Section titled “register_tool(func: collections.abc.Callable, name: str | None = None, description: str | None = None, schema: type[pydantic.BaseModel] | None = None) → None”Register a function as a LangChain tool.
Args: func: The function to register name: Tool name (defaults to function name) description: Tool description (defaults to docstring) schema: Pydantic model for input schema.
get_tools() → list[langchain_core.tools.StructuredTool]
Section titled “get_tools() → list[langchain_core.tools.StructuredTool]”Get all registered tools as LangChain StructuredTools.
Returns: List of StructuredTool instances
get_ai_tool_definitions() → list[dict[str, Any]]
Section titled “get_ai_tool_definitions() → list[dict[str, Any]]”Get tool definitions in Vercel AI SDK-compatible format.
Returns: List of AI tool definition dicts
handle_ai_tool_call(name: str, arguments: dict[str, Any]) → Any
Section titled “handle_ai_tool_call(name: str, arguments: dict[str, Any]) → Any”Handle an AI tool call.
Args: name: Tool name arguments: Tool arguments
Returns: Tool result