Source code for consul.api.acl.policy

from __future__ import annotations

import json

from consul.callback import CB


[docs] class Policy: def __init__(self, agent) -> None: self.agent = agent
[docs] def list(self, token: str | None = None): """ Lists all the active ACL policies. This is a privileged endpoint, and requires a management token. *token* will override this client's default token. Requires a token with acl:read capability. ACLPermissionDenied raised otherwise """ headers = self.agent.prepare_headers(token) return self.agent.http.get(CB.json(), "/v1/acl/policies", headers=headers)
[docs] def read(self, uuid, token: str | None = None): """ Returns the policy information for *id*. Requires a token with acl:read capability. :param uuid: Specifies the UUID of the policy you look up. :param token: token with acl:read capability :return: selected Polic information """ headers = self.agent.prepare_headers(token) return self.agent.http.get(CB.json(), f"/v1/acl/policy/{uuid}", headers=headers)
[docs] def create(self, name: str, token: str | None = None, description: str | None = None, rules=None): """ Create a policy This is a privileged endpoint, and requires a token with acl:write. :param name: Specifies a name for the ACL policy. :param token: token with acl:write capability :param description: Free form human-readable description of the policy. :param rules: Specifies rules for the ACL policy. :return: The cloned token information """ json_data = {"name": name} if rules: json_data["rules"] = json.dumps(rules) if description: json_data["Description"] = description headers = self.agent.prepare_headers(token) return self.agent.http.put( CB.json(), "/v1/acl/policy", headers=headers, data=json.dumps(json_data), )