from __future__ import annotations
import json
from typing import Any
from consul import Check
from consul.callback import CB
[docs]
class Agent:
"""
The Agent endpoints are used to interact with a local Consul agent.
Usually, services and checks are registered with an agent, which then
takes on the burden of registering with the Catalog and performing
anti-entropy to recover from outages.
"""
def __init__(self, agent) -> None:
self.agent = agent
self.service = Agent.Service(agent)
self.check = Agent.Check(agent)
self.connect = Agent.Connect(agent)
[docs]
def self(self):
"""
Returns configuration of the local agent and member information.
"""
return self.agent.http.get(CB.json(), "/v1/agent/self")
[docs]
def services(self) -> Any:
"""
Returns all the services that are registered with the local agent.
These services were either provided through configuration files, or
added dynamically using the HTTP API. It is important to note that
the services known by the agent may be different than those
reported by the Catalog. This is usually due to changes being made
while there is no leader elected. The agent performs active
anti-entropy, so in most situations everything will be in sync
within a few seconds.
"""
return self.agent.http.get(CB.json(), "/v1/agent/services")
[docs]
def service_definition(self, service_id):
"""
Returns a service definition for a single instance that is registered
with the local agent.
"""
return self.agent.http.get(CB.json(), f"/v1/agent/service/{service_id}")
[docs]
def checks(self) -> Any:
"""
Returns all the checks that are registered with the local agent.
These checks were either provided through configuration files, or
added dynamically using the HTTP API. Similar to services,
the checks known by the agent may be different than those
reported by the Catalog. This is usually due to changes being made
while there is no leader elected. The agent performs active
anti-entropy, so in most situations everything will be in sync
within a few seconds.
"""
return self.agent.http.get(CB.json(), "/v1/agent/checks")
[docs]
def members(self, wan: bool = False):
"""
Returns all the members that this agent currently sees. This may
vary by agent, use the nodes api of Catalog to retrieve a cluster
wide consistent view of members.
For agents running in server mode, setting *wan* to *True* returns
the list of WAN members instead of the LAN members which is
default.
"""
params = []
if wan:
params.append(("wan", 1))
return self.agent.http.get(CB.json(), "/v1/agent/members", params=params)
[docs]
def maintenance(self, enable: bool, reason: str | None = None, token: str | None = None):
"""
The node maintenance endpoint can place the agent into
"maintenance mode".
*enable* is either 'true' or 'false'. 'true' enables maintenance
mode, 'false' disables maintenance mode.
*reason* is an optional string. This is simply to aid human
operators.
"""
params: list[tuple[str, Any]] = []
params.append(("enable", enable))
if reason:
params.append(("reason", reason))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), "/v1/agent/maintenance", params=params, headers=headers)
[docs]
def join(self, address: str, wan: bool = False, token: str | None = None):
"""
This endpoint instructs the agent to attempt to connect to a
given address.
*address* is the ip to connect to.
*wan* is either 'true' or 'false'. For agents running in server
mode, 'true' causes the agent to attempt to join using the WAN
pool. Default is 'false'.
"""
params = []
if wan:
params.append(("wan", 1))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/join/{address}", params=params, headers=headers)
[docs]
def force_leave(self, node: str, token: str | None = None):
"""
This endpoint instructs the agent to force a node into the left
state. If a node fails unexpectedly, then it will be in a failed
state. Once in the failed state, Consul will attempt to reconnect,
and the services and checks belonging to that node will not be
cleaned up. Forcing a node into the left state allows its old
entries to be removed.
*node* is the node to change state for.
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/force-leave/{node}", headers=headers)
[docs]
class Service:
def __init__(self, agent) -> None:
self.agent = agent
# pylint: disable=too-many-branches
[docs]
def register(
self,
name: str,
service_id=None,
address=None,
port: int | None = None,
tags=None,
check=None,
token: str | None = None,
meta=None,
weights=None,
# *deprecated* use check parameter
script=None,
interval=None,
ttl: int | None = None,
http=None,
timeout=None,
enable_tag_override: bool = False,
extra_checks=None,
replace_existing_checks=False,
tagged_addresses: dict | None = None,
connect: dict[str, Any] | None = None,
):
"""
Add a new service to the local agent. There is more
documentation on services
`here <https://developer.hashicorp.com/consul/docs/fundamentals/service>`_.
*name* is the name of the service.
If the optional *service_id* is not provided it is set to
*name*. You cannot have duplicate *service_id* entries per
agent, so it may be necessary to provide one.
*address* will default to the address of the agent if not
provided.
An optional health *check* can be created for this service is
one of `Check.script`_, `Check.http`_, `Check.tcp`_,
`Check.ttl`_ or `Check.docker`_.
*token* is an optional `ACL token`_ to apply to this request.
Note this call will return successful even if the token doesn't
have permissions to register this service.
*meta* specifies arbitrary KV metadata linked to the service
formatted as {k1:v1, k2:v2}.
*weights* specifies weights for the service; default to
{"Passing": 1, "Warning": 1}.
*tagged_addresses* specifies alternative addresses for the service,
e.g. for use with Connect. Formatted as { "lan": "<address>", "wan": "<address>" }.
*connect* specifies configuration for Connect. Formatted as { "sidecar_service": {} }.
*script*, *interval*, *ttl*, *http*, and *timeout* arguments
are deprecated. use *check* instead.
*replace_existing_checks* Missing health checks from the request will
be deleted from the agent.
Using this parameter allows to idempotently register a service and its
checks without having to manually deregister checks.
*enable_tag_override* is an optional bool that enable you
to modify a service tags from servers(consul agent role server)
Default is set to False.
This option is only for >=v0.6.0 version on both agent and
servers.
for more information
https://developer.hashicorp.com/consul/docs/fundamentals/service
"""
if extra_checks is None:
extra_checks = []
payload: dict[str, Any] = {}
payload["name"] = name
if enable_tag_override:
payload["enabletagoverride"] = enable_tag_override
if service_id:
payload["id"] = service_id
if address:
payload["address"] = address
if port:
payload["port"] = port
if tags:
payload["tags"] = tags
if meta:
payload["meta"] = meta
if check:
payload["checks"] = [check] + extra_checks
if weights:
payload["weights"] = weights
else:
payload.update(
Check._compat( # pylint: disable=protected-access
script=script, interval=interval, ttl=ttl, http=http, timeout=timeout
)
)
if tagged_addresses:
payload["tagged_addresses"] = tagged_addresses
if connect:
payload["connect"] = connect
params = []
if replace_existing_checks:
params.append(("replace-existing-checks", "true"))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.boolean(), "/v1/agent/service/register", params=params, headers=headers, data=json.dumps(payload)
)
[docs]
def deregister(self, service_id: str, token: str | None = None):
"""
Used to remove a service from the local agent. The agent will
take care of deregistering the service with the Catalog. If
there is an associated check, that is also deregistered.
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/service/deregister/{service_id}", headers=headers)
[docs]
def maintenance(self, service_id: str, enable: bool, reason: str | None = None, token: str | None = None):
"""
The service maintenance endpoint allows placing a given service
into "maintenance mode".
*service_id* is the id of the service that is to be targeted
for maintenance.
*enable* is either 'true' or 'false'. 'true' enables
maintenance mode, 'false' disables maintenance mode.
*reason* is an optional string. This is simply to aid human
operators.
"""
params: list[tuple[str, Any]] = []
params.append(("enable", enable))
if reason:
params.append(("reason", reason))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.boolean(), f"/v1/agent/service/maintenance/{service_id}", params=params, headers=headers
)
[docs]
class Check:
def __init__(self, agent) -> None:
self.agent = agent
[docs]
def register(
self,
name: str,
check=None,
check_id=None,
notes=None,
service_id=None,
token: str | None = None,
# *deprecated* use check parameter
script=None,
interval=None,
ttl: int | None = None,
http=None,
timeout=None,
):
"""
Register a new check with the local agent. More documentation
on checks can be found `here
<https://developer.hashicorp.com/consul/docs/register/health-check/vm>`_.
*name* is the name of the check.
*check* is one of `Check.script`_, `Check.http`_, `Check.tcp`_
`Check.ttl`_ or `Check.docker`_ and is required.
If the optional *check_id* is not provided it is set to *name*.
*check_id* must be unique for this agent.
*notes* is not used by Consul, and is meant to be human
readable.
Optionally, a *service_id* can be specified to associate a
registered check with an existing service.
*token* is an optional `ACL token`_ to apply to this request.
Note this call will return successful even if the token doesn't
have permissions to register this check.
*script*, *interval*, *ttl*, *http*, and *timeout* arguments
are deprecated. use *check* instead.
Returns *True* on success.
"""
payload = {"name": name}
assert check or script or ttl or http, "check is required"
if check:
payload.update(check)
else:
payload.update(
Check._compat(script=script, interval=interval, ttl=ttl, http=http, timeout=timeout)["check"]
)
if check_id:
payload["id"] = check_id
if notes:
payload["notes"] = notes
if service_id:
payload["serviceid"] = service_id
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.boolean(), "/v1/agent/check/register", headers=headers, data=json.dumps(payload)
)
[docs]
def deregister(self, check_id: str, token: str | None = None):
"""
Remove a check from the local agent.
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/deregister/{check_id}", headers=headers)
[docs]
def ttl_pass(self, check_id: str, notes=None, token: str | None = None):
"""
Mark a ttl based check as passing. Optional notes can be
attached to describe the status of the check.
"""
params = []
if notes:
params.append(("note", notes))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/pass/{check_id}", params=params, headers=headers)
[docs]
def ttl_fail(self, check_id: str, notes=None, token: str | None = None):
"""
Mark a ttl based check as failing. Optional notes can be
attached to describe why check is failing. The status of the
check will be set to critical and the ttl clock will be reset.
"""
params = []
if notes:
params.append(("note", notes))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/fail/{check_id}", params=params, headers=headers)
[docs]
def ttl_warn(self, check_id: str, notes=None, token: str | None = None):
"""
Mark a ttl based check with warning. Optional notes can be
attached to describe the warning. The status of the
check will be set to warn and the ttl clock will be reset.
"""
params = []
if notes:
params.append(("note", notes))
headers = self.agent.prepare_headers(token)
return self.agent.http.put(CB.boolean(), f"/v1/agent/check/warn/{check_id}", params=params, headers=headers)
[docs]
class Connect:
def __init__(self, agent) -> None:
self.agent = agent
self.ca = Agent.Connect.CA(agent)
[docs]
def authorize(self, target, client_cert_uri, client_cert_serial, token: str | None = None):
"""
Tests whether a connection attempt is authorized between
two services.
More information is available
`here <https://developer.hashicorp.com/consul/api-docs/agent/connect>`_.
*target* is the name of the service that is being requested.
*client_cert_uri* The unique identifier for the requesting
client.
*client_cert_serial* The colon-hex-encoded serial number for
the requesting client cert.
"""
payload = {"Target": target, "ClientCertURI": client_cert_uri, "ClientCertSerial": client_cert_serial}
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.json(), "/v1/agent/connect/authorize", headers=headers, data=json.dumps(payload)
)
[docs]
class CA:
def __init__(self, agent) -> None:
self.agent = agent
[docs]
def roots(self):
return self.agent.http.get(CB.json(), "/v1/agent/connect/ca/roots")
[docs]
def leaf(self, service, token: str | None = None):
headers = self.agent.prepare_headers(token)
return self.agent.http.get(CB.json(), f"/v1/agent/connect/ca/leaf/{service}", headers=headers)