Skip to content

Client

lightkube.Client(config: Union[SingleConfig, KubeConfig, None] = None, namespace: Optional[str] = None, timeout: Optional[httpx.Timeout] = None, lazy: bool = True, field_manager: Optional[str] = None, trust_env: bool = True, dry_run: bool = False, transport: Optional[httpx.BaseTransport] = None, proxy: Optional[str] = None, http2: bool = False)

Create a new lightkube client

Parameters:

  • config (Union[SingleConfig, KubeConfig, None], default: None ) –

    Instance of SingleConfig or KubeConfig. When not set the configuration will be detected automatically using the following order: in-cluster config, KUBECONFIG environment variable, ~/.kube/config file.

  • namespace (Optional[str], default: None ) –

    Default namespace to use. This attribute is used in case namespaced resources are called without defining a namespace. If not specified, the default namespace set in your kube configuration will be used.

  • timeout (Optional[Timeout], default: None ) –

    Instance of httpx.Timeout. By default, all timeouts are set to 10 seconds. Notice that read timeout is ignored when watching changes.

  • lazy (bool, default: True ) –

    When set, the returned objects will be decoded from the JSON payload in a lazy way, i.e. only when accessed.

  • field_manager (Optional[str], default: None ) –

    Name associated with the actor or entity that is making these changes.

  • trust_env (bool, default: True ) –

    Ignore environment variables, also passed through to httpx.Client trust_env. See its docs for further description. If False, empty config will be derived from_file(DEFAULT_KUBECONFIG)

  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

  • transport (Optional[BaseTransport], default: None ) –

    Custom httpx transport.

  • proxy (Optional[str], default: None ) –

    HTTP proxy for the httpx client.

Source code in src/lightkube/core/client.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    config: Union[SingleConfig, KubeConfig, None] = None,
    namespace: Optional[str] = None,
    timeout: Optional[httpx.Timeout] = None,
    lazy: bool = True,
    field_manager: Optional[str] = None,
    trust_env: bool = True,
    dry_run: bool = False,
    transport: Optional[httpx.BaseTransport] = None,
    proxy: Optional[str] = None,
    http2: bool = False,
):
    if timeout is None:
        timeout = httpx.Timeout(10.0)
    self._client = GenericSyncClient(
        ConnectionParams(timeout=timeout, trust_env=trust_env, transport=transport, proxy=proxy, http2=http2),
        config,
        namespace=namespace,
        lazy=lazy,
        field_manager=field_manager,
        dry_run=dry_run,
    )

config: SingleConfig property

Return the kubernetes configuration used in this client

namespace property

Return the default namespace that will be used when a namespace has not been specified

apply(obj, name=None, *, namespace=None, field_manager=None, force=False, dry_run=False)

apply(
    obj: GlobalSubResource,
    name: str,
    *,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalSubResource
apply(
    obj: NamespacedSubResource,
    name: str,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> NamespacedSubResource
apply(
    obj: GlobalResource,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalResource
apply(
    obj: NamespacedResource,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> NamespacedResource

Create or configure an object. This method uses the server-side apply functionality.

Parameters:

  • obj

    object to create. This need to be an instance of a resource kind.

  • name

    Required only for sub-resources: Name of the resource to which this object belongs.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources). If the namespace doesn't exist, lightkube.ApiError is raised.

  • field_manager

    Name associated with the actor or entity that is making these changes.

  • force

    Force is going to "force" Apply requests. It means user will re-acquire conflicting fields owned by other people.

  • dry_run

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/client.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
def apply(
    self,
    obj,
    name=None,
    *,
    namespace=None,
    field_manager=None,
    force=False,
    dry_run=False,
):
    """Create or configure an object. This method uses the
    [server-side apply](https://kubernetes.io/docs/reference/using-api/server-side-apply/) functionality.

    Parameters:
        obj: object to create. This need to be an instance of a resource kind.
        name: Required only for sub-resources: Name of the resource to which this object belongs.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
            If the namespace doesn't exist, `lightkube.ApiError` is raised.
        field_manager: Name associated with the actor or entity that is making these changes.
        force: Force is going to "force" Apply requests. It means user will re-acquire conflicting
            fields owned by other people.
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    if namespace is None and isinstance(obj, r.NamespacedResource) and obj.metadata.namespace:
        namespace = obj.metadata.namespace
    if name is None and obj.metadata.name:
        name = obj.metadata.name
    return self.patch(
        type(obj),
        name,
        obj,
        namespace=namespace,
        patch_type=PatchType.APPLY,
        field_manager=field_manager,
        force=force,
        dry_run=dry_run,
    )

create(obj, name=None, *, namespace=None, field_manager=None, dry_run=False)

create(
    obj: GlobalSubResource,
    name: str,
    *,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalSubResource
create(
    obj: NamespacedSubResource,
    name: str,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedSubResource
create(
    obj: GlobalResource,
    *,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalResource
create(
    obj: NamespacedResource,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedResource

Create a new object and return its representation. Raise lightkube.ApiError if the object already exist.

Parameters:

  • obj

    object to create. This need to be an instance of a resource kind.

  • name

    Required only for sub-resources: Name of the resource to which this object belongs.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources). If the namespace doesn't exist, lightkube.ApiError is raised.

  • field_manager

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter.

  • dry_run

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/client.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def create(self, obj, name=None, *, namespace=None, field_manager=None, dry_run=False):
    """Create a new object and return its representation.
    Raise lightkube.ApiError if the object already exist.

    Parameters:
        obj: object to create. This need to be an instance of a resource kind.
        name: Required only for sub-resources: Name of the resource to which this object belongs.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
            If the namespace doesn't exist, `lightkube.ApiError` is raised.
        field_manager: Name associated with the actor or entity that is making these changes.
            This parameter overrides the corresponding `Client` initialization parameter.
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    return self._client.request(
        "post",
        name=name,
        namespace=namespace,
        obj=obj,
        params={
            "fieldManager": field_manager,
            "dryRun": "All" if dry_run else None,
        },
    )

delete(res, name: str, *, namespace: Optional[str] = None, grace_period: Optional[int] = None, cascade: Optional[CascadeType] = None, dry_run: bool = False)

delete(
    res: Type[GlobalResource],
    name: str,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None
delete(
    res: Type[NamespacedResource],
    name: str,
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None

Delete an object. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • res

    Resource kind.

  • name (str) –

    Name of the object to delete.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

  • grace_period (Optional[int], default: None ) –

    The duration in seconds before the object should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is None (default), the default grace period for the specified type will be used. Defaults to a per object value if not specified. Zero means delete immediately.

  • cascade (Optional[CascadeType], default: None ) –

    Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

    • CascadeType.ORPHAN - orphan the dependents;
    • CascadeType.BACKGROUND - allow the garbage collector to delete the dependents in the background;
    • CascadeType.FOREGROUND - a cascading policy that deletes all dependents in the foreground.
  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/client.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def delete(
    self,
    res,
    name: str,
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
):
    """Delete an object. Raise [lightkube.ApiError][] if the object doesn't exist.

    Parameters:
        res: Resource kind.
        name: Name of the object to delete.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        grace_period: The duration in seconds before the object should be deleted.
            Value must be non-negative integer. The value zero indicates delete immediately. If this value is `None`
            (default), the default grace period for the specified type will be used. Defaults to a per object value if
            not specified. Zero means delete immediately.
        cascade: Whether and how garbage collection will be performed. Either this field or
            OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set
            in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

            * `CascadeType.ORPHAN` - orphan the dependents;
            * `CascadeType.BACKGROUND` - allow the garbage collector to delete the dependents in the background;
            * `CascadeType.FOREGROUND` - a cascading policy that deletes all dependents in the foreground.
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    return self._client.request(
        "delete",
        res=res,
        name=name,
        namespace=namespace,
        params={
            "gracePeriodSeconds": grace_period,
            "propagationPolicy": cascade.value if cascade else None,
            "dryRun": "All" if dry_run else None,
        },
    )

deletecollection(res, *, namespace: Optional[str] = None, grace_period: Optional[int] = None, cascade: Optional[CascadeType] = None, dry_run: bool = False)

deletecollection(
    res: Type[GlobalResource],
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None
deletecollection(
    res: Type[NamespacedResource],
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None

Delete all objects of the given kind

Parameters:

  • res

    Resource kind.

  • namespace (Optional[str], default: None ) –

    (optional) Name of the namespace containing the object (Only for namespaced resources).

  • grace_period (Optional[int], default: None ) –

    The duration in seconds before the objects should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is None (default), the default grace period for the specified type will be used. Defaults to a per object value if not specified. Zero means delete immediately.

  • cascade (Optional[CascadeType], default: None ) –

    Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

    • 'CascadeType.ORPHAN' - orphan the dependents;
    • 'CascadeType.BACKGROUND' - allow the garbage collector to delete the dependents in the background;
    • 'CascadeType.FOREGROUND' - a cascading policy that deletes all dependents in the foreground.
  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/client.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def deletecollection(
    self,
    res,
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
):
    """Delete all objects of the given kind

    Parameters:
        res: Resource kind.
        namespace: *(optional)* Name of the namespace containing the object (Only for namespaced resources).
        grace_period: The duration in seconds before the objects should be deleted.
            Value must be non-negative integer. The value zero indicates delete immediately. If this value is `None`
            (default), the default grace period for the specified type will be used. Defaults to a per object value if
            not specified. Zero means delete immediately.
        cascade: Whether and how garbage collection will be performed. Either this field or
            OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set
            in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

            * 'CascadeType.ORPHAN' - orphan the dependents;
            * 'CascadeType.BACKGROUND' - allow the garbage collector to delete the dependents in the background;
            * 'CascadeType.FOREGROUND' - a cascading policy that deletes all dependents in the foreground.
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    return self._client.request(
        "deletecollection",
        res=res,
        namespace=namespace,
        params={
            "gracePeriodSeconds": grace_period,
            "propagationPolicy": cascade.value if cascade else None,
            "dryRun": "All" if dry_run else None,
        },
    )

exec(name: str, *, namespace: Optional[str] = None, container: Optional[str] = None, command: Union[str, Iterable[str]], stdin: Union[str, bytes, BinaryIO, None] = None, stdout: Union[BinaryIO, bool] = False, stderr: Union[BinaryIO, bool] = False, decode: Optional[str] = 'utf-8', raise_on_error: bool = False, timeout: Optional[float] = None) -> ExecResponse

Execute a command in a Pod and return stdout/stderr.

Parameters:

  • name (str) –

    Name of the Pod.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the Pod.

  • container (Optional[str], default: None ) –

    Name of the container in the pod to execute the command in.

  • command (Union[str, Iterable[str]]) –

    Command to execute in the Pod.

  • stdin (Union[str, bytes, BinaryIO, None], default: None ) –

    Data to send to stdin. This can be either a string, bytes or a binary stream. Strings will be encoded as utf-8 before sending.

  • stdout (Union[BinaryIO, bool], default: False ) –

    If True, the command's stdout will be captured and returned in the response. If a binary stream is passed, the command's stdout will be written to it instead.

  • stderr (Union[BinaryIO, bool], default: False ) –

    If True, the command's stderr will be captured and returned in the response. If a binary stream is passed, the command's stderr will be written to it instead.

  • decode (Optional[str], default: 'utf-8' ) –

    Decode captured stdout/stderr in ExecResponse using this encoding as strings. If you expect a binary output, set stdout and/or stderr to a binary stream or set this parameter to None.

  • raise_on_error (bool, default: False ) –

    If True, an exception will be raised if the command exits with a non-zero status code. Note that other exceptions may still be raised for other types of errors, such as connection issues, missing pod or timeouts.

  • timeout (Optional[float], default: None ) –

    If set, the maximum amount of time in seconds to wait for the command to complete before raising a timeout exception. By default, there is no timeout and the method will wait until the command completes or an error occurs.

Source code in src/lightkube/core/client.py
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
def exec(
    self,
    name: str,
    *,
    namespace: Optional[str] = None,
    container: Optional[str] = None,
    command: Union[str, Iterable[str]],
    stdin: Union[str, bytes, BinaryIO, None] = None,
    stdout: Union[BinaryIO, bool] = False,
    stderr: Union[BinaryIO, bool] = False,
    decode: Optional[str] = "utf-8",
    raise_on_error: bool = False,
    timeout: Optional[float] = None,
) -> ExecResponse:
    """Execute a command in a Pod and return stdout/stderr.

    Parameters:
        name: Name of the Pod.
        namespace: Name of the namespace containing the Pod.
        container: Name of the container in the pod to execute the command in.
        command: Command to execute in the Pod.
        stdin: Data to send to stdin. This can be either a string, bytes or a binary stream.
          Strings will be encoded as utf-8 before sending.
        stdout: If `True`, the command's stdout will be captured and returned in the response.
          If a binary stream is passed, the command's stdout will be written to it instead.
        stderr: If `True`, the command's stderr will be captured and returned in the response.
          If a binary stream is passed, the command's stderr will be written to it instead.
        decode: Decode captured stdout/stderr in `ExecResponse` using this encoding as strings.
          If you expect a binary output, set `stdout` and/or `stderr` to a binary stream or set this parameter to `None`.
        raise_on_error: If `True`, an exception will be raised if the command exits with a non-zero status code.
          Note that other exceptions may still be raised for other types of errors, such as connection issues, missing
          pod or timeouts.
        timeout: If set, the maximum amount of time in seconds to wait for the command to complete before raising a
          timeout exception. By default, there is no timeout and the method will wait until the command completes
          or an error occurs.
    """
    commands = [command] if isinstance(command, str) else list(command)
    params = {"command": commands, "stdout": stdout, "stderr": stderr, "stdin": stdin, "container": container}
    return self._client.ws_request(
        "exec",
        name=name,
        namespace=namespace,
        params=params,
        raise_on_error=raise_on_error,
        decode=decode,
        timeout=timeout,
    )

get(res, name, *, namespace=None)

get(res: Type[GlobalResource], name: str) -> GlobalResource
get(
    res: Type[AllNamespacedResource],
    name: str,
    *,
    namespace: Optional[str] = None,
) -> AllNamespacedResource

Return an object. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • res

    Resource kind.

  • name

    Name of the object to fetch.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

Source code in src/lightkube/core/client.py
221
222
223
224
225
226
227
228
229
def get(self, res, name, *, namespace=None):
    """Return an object. Raise `lightkube.ApiError` if the object doesn't exist.

    Parameters:
        res: Resource kind.
        name: Name of the object to fetch.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
    """
    return self._client.request("get", res=res, name=name, namespace=namespace)

list(res, *, namespace=None, chunk_size=None, labels=None, fields=None)

list(
    res: Type[GlobalResource],
    *,
    chunk_size: Optional[int] = None,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
) -> ListIterable[GlobalResource]
list(
    res: Type[NamespacedResource],
    *,
    namespace: Optional[str] = None,
    chunk_size: Optional[int] = None,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
) -> ListIterable[NamespacedResource]

Return an iterator of objects matching the selection criteria.

Parameters:

  • res

    resource kind.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • chunk_size

    Limit the amount of objects returned for each rest API call. This method will automatically execute all subsequent calls until no more data is available.

  • labels

    Limit the returned objects by labels. More details.

  • fields

    Limit the returned objects by fields. More details.

Source code in src/lightkube/core/client.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None):
    """Return an iterator of objects matching the selection criteria.

    Parameters:
        res: resource kind.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        chunk_size: Limit the amount of objects returned for each rest API call.
             This method will automatically execute all subsequent calls until no more data is available.
        labels: Limit the returned objects by labels. More [details](../selectors.md).
        fields: Limit the returned objects by fields. More [details](../selectors.md).
    """

    br = self._client.prepare_request(
        "list",
        res=res,
        namespace=namespace,
        params={
            "limit": chunk_size,
            "labelSelector": build_selector(labels) if labels else None,
            "fieldSelector": (build_selector(fields, for_fields=True) if fields else None),
        },
    )
    return self._client.list(br)

log(name, *, namespace=None, container=None, follow=False, since=None, tail_lines=None, timestamps=False, newlines=True)

log(
    name: str,
    *,
    namespace: Optional[str] = None,
    container: Optional[str] = None,
    follow: bool = False,
    since: Optional[int] = None,
    tail_lines: Optional[int] = None,
    timestamps: bool = False,
    newlines: bool = True,
) -> Iterator[str]

Return log lines for the given Pod. Raise lightkube.ApiError if the Pod doesn't exist or has not yet started.

Parameters:

  • name

    Name of the Pod.

  • namespace

    Name of the namespace containing the Pod.

  • container

    The container for which to stream logs. Defaults to only container if there is one container in the pod.

  • follow

    If True, follow the log stream of the pod.

  • since

    If set, a relative time in seconds before the current time from which to fetch logs.

  • tail_lines

    If set, the number of lines from the end of the logs to fetch.

  • timestamps

    If True, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output.

  • newlines

    If True, each line will end with a newline, otherwise the newlines will be stripped.

Source code in src/lightkube/core/client.py
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
def log(
    self,
    name,
    *,
    namespace=None,
    container=None,
    follow=False,
    since=None,
    tail_lines=None,
    timestamps=False,
    newlines=True,
):
    """Return log lines for the given Pod. Raise `lightkube.ApiError` if the Pod doesn't exist or has not yet started.

    Parameters:
        name: Name of the Pod.
        namespace: Name of the namespace containing the Pod.
        container: The container for which to stream logs. Defaults to only container if there is one container in the pod.
        follow: If `True`, follow the log stream of the pod.
        since: If set, a relative time in seconds before the current time from which to fetch logs.
        tail_lines: If set, the number of lines from the end of the logs to fetch.
        timestamps: If `True`, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output.
        newlines: If `True`, each line will end with a newline, otherwise the newlines will be stripped.
    """
    br = self._client.prepare_request(
        "get",
        core_v1.PodLog,
        name=name,
        namespace=namespace,
        params={
            "timestamps": timestamps,
            "tailLines": tail_lines,
            "container": container,
            "sinceSeconds": since,
            "follow": follow,
        },
    )
    req = self._client.build_adapter_request(br)
    resp = self._client.send(req, stream=follow)
    if resp.is_error and follow:
        # The body must be read into memory before accessing it when building the exception.
        resp.read()
    self._client.raise_for_status(resp)
    return (line + "\n" if newlines else line for line in resp.iter_lines())

patch(res, name, obj, *, namespace=None, patch_type=PatchType.STRATEGIC, field_manager=None, force=False, dry_run=False)

patch(
    res: Type[GlobalSubResource],
    name: str,
    obj: Union[GlobalSubResource, Dict, List],
    *,
    patch_type: PatchType = PatchType.STRATEGIC,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalSubResource
patch(
    res: Type[GlobalResource],
    name: str,
    obj: Union[GlobalResource, Dict, List],
    *,
    patch_type: PatchType = PatchType.STRATEGIC,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalResource
patch(
    res: Type[AllNamespacedResource],
    name: str,
    obj: Union[AllNamespacedResource, Dict, List],
    *,
    namespace: Optional[str] = None,
    patch_type: PatchType = PatchType.STRATEGIC,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> AllNamespacedResource

Patch an object. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • res

    Resource kind.

  • name

    Name of the object to patch.

  • obj

    patch object.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • patch_type

    Type of patch to execute. Default PatchType.STRATEGIC.

  • field_manager

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter. NOTE: This parameter is mandatory (here or at Client creation time) for PatchType.APPLY.

  • force

    Force is going to "force" Apply requests. It means user will re-acquire conflicting fields owned by other people. This parameter is ignored for non-apply patch types

  • dry_run

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/client.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def patch(
    self,
    res,
    name,
    obj,
    *,
    namespace=None,
    patch_type=PatchType.STRATEGIC,
    field_manager=None,
    force=False,
    dry_run=False,
):
    """Patch an object. Raise lightkube.ApiError if the object doesn't exist.

    Parameters:
        res: Resource kind.
        name: Name of the object to patch.
        obj: patch object.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        patch_type: Type of patch to execute. Default `PatchType.STRATEGIC`.
        field_manager: Name associated with the actor or entity that is making these changes.
            This parameter overrides the corresponding `Client` initialization parameter.
            **NOTE**: This parameter is mandatory (here or at `Client` creation time) for `PatchType.APPLY`.
        force: Force is going to "force" Apply requests. It means user will re-acquire conflicting
            fields owned by other people. This parameter is ignored for non-apply patch types
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    force_param = "true" if force and patch_type == PatchType.APPLY else None
    return self._client.request(
        "patch",
        res=res,
        name=name,
        namespace=namespace,
        obj=obj,
        headers={"Content-Type": patch_type.value},
        params={
            "force": force_param,
            "fieldManager": field_manager,
            "dryRun": "All" if dry_run else None,
        },
    )

replace(obj, name=None, *, namespace=None, field_manager=None, dry_run: bool = False)

replace(
    obj: GlobalSubResource,
    name: str,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalSubResource
replace(
    obj: NamespacedSubResource,
    name: str,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedSubResource
replace(
    obj: GlobalResource,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalResource
replace(
    obj: NamespacedResource,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedResource

Replace an existing resource. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • obj

    new object. This need to be an instance of a resource kind.

  • name

    Required only for sub-resources: Name of the resource to which this object belongs.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • field_manager

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter.

  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/client.py
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def replace(
    self,
    obj,
    name=None,
    *,
    namespace=None,
    field_manager=None,
    dry_run: bool = False,
):
    """Replace an existing resource. Raise `lightkube.ApiError` if the object doesn't exist.

    Parameters:
        obj: new object. This need to be an instance of a resource kind.
        name: Required only for sub-resources: Name of the resource to which this object belongs.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        field_manager: Name associated with the actor or entity that is making these changes.
            This parameter overrides the corresponding `Client` initialization parameter.
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    return self._client.request(
        "put",
        name=name,
        namespace=namespace,
        obj=obj,
        params={
            "fieldManager": field_manager,
            "dryRun": "All" if dry_run else None,
        },
    )

set(res, name, *, namespace=None, labels=None, annotations=None, field_manager=None, dry_run=False)

set(
    res: Type[GlobalResource],
    name: str,
    *,
    labels: Optional[Dict[str, str]] = None,
    annotations: Optional[Dict[str, str]] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalResource
set(
    res: Type[NamespacedResource],
    name: str,
    *,
    namespace: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    annotations: Optional[Dict[str, str]] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedResource

Set labels and annotations to an object. Raise lightkube.ApiError if the object doesn't exist. This is a convenience method that use patch internally.

Parameters:

  • res

    Resource kind.

  • name

    Name of the object to patch.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • labels

    Labels to set. Labels are defined as key/value pairs. To remove a label, set its value to None.

  • annotations

    Annotations to set. Annotations are defined as key/value pairs. To remove an annotation, set its value to None.

  • field_manager

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter.

  • dry_run

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/client.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
def set(
    self,
    res,
    name,
    *,
    namespace=None,
    labels=None,
    annotations=None,
    field_manager=None,
    dry_run=False,
):
    """Set labels and annotations to an object. Raise lightkube.ApiError if the object doesn't exist.
    This is a convenience method that use `patch` internally.

    Parameters:
        res: Resource kind.
        name: Name of the object to patch.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        labels: Labels to set. Labels are defined as key/value pairs. To remove a label, set its value to `None`.
        annotations: Annotations to set. Annotations are defined as key/value pairs. To remove an annotation,
            set its value to `None`.
        field_manager: Name associated with the actor or entity that is making these changes.
            This parameter overrides the corresponding `Client` initialization parameter.
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    metadata = [("labels", labels), ("annotations", annotations)]
    payload = {"metadata": {k: v for k, v in metadata if v is not None}}
    return self.patch(
        res, name, payload, namespace=namespace, field_manager=field_manager, dry_run=dry_run, patch_type=PatchType.MERGE
    )

wait(res, name: str, *, for_conditions: Iterable[str], namespace=None, raise_for_conditions: Iterable[str] = ())

wait(
    res: Type[GlobalResource],
    name: str,
    *,
    for_conditions: Iterable[str],
    raise_for_conditions: Iterable[str] = (),
) -> GlobalResource
wait(
    res: Type[AllNamespacedResource],
    name: str,
    *,
    for_conditions: Iterable[str],
    namespace: Optional[str] = None,
    raise_for_conditions: Iterable[str] = (),
) -> AllNamespacedResource

Wait for the specified conditions. Raise lightkube.ObjectDeleted if the object get deleted during waiting.

Parameters:

  • res

    Resource kind.

  • name (str) –

    Name of resource to wait for.

  • for_conditions (Iterable[str]) –

    Condition types that are considered a success and will end the wait.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • raise_for_conditions (Iterable[str], default: () ) –

    Condition types that are considered failures and will exit the wait early with lightkube.ConditionError.

Source code in src/lightkube/core/client.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def wait(
    self,
    res,
    name: str,
    *,
    for_conditions: Iterable[str],
    namespace=None,
    raise_for_conditions: Iterable[str] = (),
):
    """Wait for the specified conditions.
    Raise `lightkube.ObjectDeleted` if the object get deleted during waiting.

    Parameters:
        res: Resource kind.
        name: Name of resource to wait for.
        for_conditions: Condition types that are considered a success and will end the wait.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        raise_for_conditions: Condition types that are considered failures and will exit the wait early
            with `lightkube.ConditionError`.
    """

    kind = r.api_info(res).plural
    full_name = f"{kind}/{name}"

    for_conditions = list(for_conditions)
    raise_for_conditions = list(raise_for_conditions)

    for op, obj in self.watch(res, namespace=namespace, fields={"metadata.name": name}):
        if obj.status is None:
            continue

        if op == "DELETED":
            raise ObjectDeleted(full_name)

        try:
            status = obj.status.to_dict()
        except AttributeError:
            status = obj.status

        conditions = [c for c in status.get("conditions", []) if c["status"] == "True"]
        if any(c["type"] in for_conditions for c in conditions):
            return obj

        failures = [c for c in conditions if c["type"] in raise_for_conditions]

        if failures:
            raise ConditionError(full_name, [f.get("message", f["type"]) for f in failures])

watch(res, *, namespace=None, labels=None, fields=None, server_timeout=None, resource_version=None, on_error=on_error_raise)

watch(
    res: Type[GlobalResource],
    *,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
    server_timeout: Optional[int] = None,
    resource_version: Optional[str] = None,
    on_error: OnErrorHandler = on_error_raise,
) -> Iterator[Tuple[str, GlobalResource]]
watch(
    res: Type[NamespacedResource],
    *,
    namespace: Optional[str] = None,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
    server_timeout: Optional[int] = None,
    resource_version: Optional[str] = None,
    on_error: OnErrorHandler = on_error_raise,
) -> Iterator[Tuple[str, NamespacedResource]]

Watch changes to objects

Parameters:

  • res

    resource kind.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • labels

    Limit the returned objects by labels. More details.

  • fields

    Limit the returned objects by fields. More details.

  • server_timeout

    Server side timeout in seconds to close a watch request. This method will automatically create a new request whenever the backend close the connection without errors.

  • resource_version

    When set, only modification events following this version will be returned.

  • on_error

    Function that control what to do in case of errors. The default implementation will raise any error.

Source code in src/lightkube/core/client.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def watch(
    self,
    res,
    *,
    namespace=None,
    labels=None,
    fields=None,
    server_timeout=None,
    resource_version=None,
    on_error=on_error_raise,
):
    """Watch changes to objects

    Parameters:
        res: resource kind.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        labels: Limit the returned objects by labels. More [details](../selectors.md).
        fields: Limit the returned objects by fields. More [details](../selectors.md).
        server_timeout: Server side timeout in seconds to close a watch request.
            This method will automatically create a new request whenever the backend close the connection
            without errors.
        resource_version: When set, only modification events following this version will be returned.
        on_error: Function that control what to do in case of errors.
            The default implementation will raise any error.
    """
    br = self._client.prepare_request(
        "list",
        res=res,
        namespace=namespace,
        watch=True,
        params={
            "timeoutSeconds": server_timeout,
            "resourceVersion": resource_version,
            "labelSelector": build_selector(labels) if labels else None,
            "fieldSelector": (build_selector(fields, for_fields=True) if fields else None),
        },
    )
    return self._client.watch(br, on_error=on_error)

namespace parameter

All API calls for namespaced resources will need to refer to a specific namespace. By default the namespace provided in the kubeconfig file is used. This default can be overridden when instantiating the client class with a different value. You can also specify a specific namespace for a single call using the namespace parameter.

The methods create or replace will use the namespace defined in the object when it's present. Notice that if the object namespace and the method's namespace parameter are set, both must have the same value.

Override rules summary:

  • client.method(..., namespace=..)
  • [obj.metadata.namespace] (Only when calling create or replace)
  • Client(..., namespace=...)
  • kubernetes config file

List or watch objects in all namespaces

The methods list and watch can also return objects for all namespaces using namespace='*'.