Skip to content

AsyncClient

lightkube.AsyncClient(config: Union[SingleConfig, KubeConfig, None] = None, namespace: Optional[str] = None, timeout: Optional[httpx.Timeout] = None, lazy: bool = True, field_manager: Optional[str] = None, trust_env: bool = True, dry_run: bool = False, transport: Optional[httpx.AsyncBaseTransport] = None, proxy: Optional[str] = None, http2: bool = False)

Create a new lightkube client

Parameters:

  • config (Union[SingleConfig, KubeConfig, None], default: None ) –

    Instance of SingleConfig or KubeConfig. When not set the configuration will be detected automatically using the following order: in-cluster config, KUBECONFIG environment variable, ~/.kube/config file.

  • namespace (Optional[str], default: None ) –

    Default namespace to use. This attribute is used in case namespaced resources are called without defining a namespace. If not specified, the default namespace set in your kube configuration will be used.

  • timeout (Optional[Timeout], default: None ) –

    Instance of httpx.Timeout. By default all timeouts are set to 10 seconds. Notice that read timeout is ignored when watching changes.

  • lazy (bool, default: True ) –

    When set, the returned objects will be decoded from the JSON payload in a lazy way, i.e. only when accessed.

  • field_manager (Optional[str], default: None ) –

    Name associated with the actor or entity that is making these changes.

  • trust_env (bool, default: True ) –

    Ignore environment variables, also passed through to httpx.AsyncClient trust_env. See its docs for further description. If False, empty config will be derived from_file(DEFAULT_KUBECONFIG)

  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

  • transport (Optional[AsyncBaseTransport], default: None ) –

    Custom httpx transport.

  • proxy (Optional[str], default: None ) –

    HTTP proxy for the httpx client.

Source code in src/lightkube/core/async_client.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    config: Union[SingleConfig, KubeConfig, None] = None,
    namespace: Optional[str] = None,
    timeout: Optional[httpx.Timeout] = None,
    lazy: bool = True,
    field_manager: Optional[str] = None,
    trust_env: bool = True,
    dry_run: bool = False,
    transport: Optional[httpx.AsyncBaseTransport] = None,
    proxy: Optional[str] = None,
    http2: bool = False,
):
    if timeout is None:
        timeout = httpx.Timeout(10.0)
    self._client = GenericAsyncClient(
        ConnectionParams(timeout=timeout, trust_env=trust_env, transport=transport, proxy=proxy, http2=http2),
        config,
        namespace=namespace,
        lazy=lazy,
        field_manager=field_manager,
        dry_run=dry_run,
    )

config: SingleConfig property

Return the kubernetes configuration used in this client

namespace property

Return the default namespace that will be used when a namespace has not been specified

apply(obj, name=None, *, namespace: Optional[str] = None, field_manager: Optional[str] = None, force=False, dry_run: bool = False) async

apply(
    obj: GlobalSubResource,
    name: str,
    *,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalSubResource
apply(
    obj: NamespacedSubResource,
    name: str,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> NamespacedSubResource
apply(
    obj: GlobalResource,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalResource
apply(
    obj: NamespacedResource,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> NamespacedResource

Create or configure an object. This method uses the server-side apply functionality.

Parameters:

  • obj

    object to create. This need to be an instance of a resource kind.

  • name

    Required only for sub-resources: Name of the resource to which this object belongs.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources). If the namespace doesn't exist, lightkube.ApiError is raised.

  • field_manager (Optional[str], default: None ) –

    Name associated with the actor or entity that is making these changes.

  • force

    Force is going to "force" Apply requests. It means user will re-acquire conflicting fields owned by other people.

  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/async_client.py
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
async def apply(
    self,
    obj,
    name=None,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    force=False,
    dry_run: bool = False,
):
    """Create or configure an object. This method uses the
    [server-side apply](https://kubernetes.io/docs/reference/using-api/server-side-apply/) functionality.

    Parameters:
      obj: object to create. This need to be an instance of a resource kind.
      name: Required only for sub-resources: Name of the resource to which this object belongs.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
          If the namespace doesn't exist, `lightkube.ApiError` is raised.
      field_manager: Name associated with the actor or entity that is making these changes.
      force: Force is going to "force" Apply requests. It means user will re-acquire conflicting
          fields owned by other people.
      dry_run: Apply server-side dry-run and guarantee that modifications will not
          be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
          to `kubectl` commands.
    """
    if namespace is None and isinstance(obj, r.NamespacedResource) and obj.metadata.namespace:
        namespace = obj.metadata.namespace
    if name is None and obj.metadata.name:
        name = obj.metadata.name
    return await self.patch(
        type(obj),
        name,
        obj,
        namespace=namespace,
        patch_type=PatchType.APPLY,
        field_manager=field_manager,
        force=force,
        dry_run=dry_run,
    )

close() async

Close the underline httpx client

Source code in src/lightkube/core/async_client.py
887
888
889
async def close(self):
    """Close the underline httpx client"""
    await self._client.close()

create(obj, name=None, *, namespace: Optional[str] = None, field_manager: Optional[str] = None, dry_run: bool = False) async

create(
    obj: GlobalSubResource,
    name: str,
    *,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalSubResource
create(
    obj: NamespacedSubResource,
    name: str,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedSubResource
create(
    obj: GlobalResource,
    *,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalResource
create(
    obj: NamespacedResource,
    *,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedResource

Create a new object and return its representation. Raise lightkube.ApiError if the object already exist.

Parameters:

  • obj

    object to create. This need to be an instance of a resource kind.

  • name

    Required only for sub-resources: Name of the resource to which this object belongs.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources). If the namespace doesn't exist, lightkube.ApiError is raised.

  • field_manager (Optional[str], default: None ) –

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter.

  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/async_client.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
async def create(
    self,
    obj,
    name=None,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
):
    """Create a new object and return its representation.
    Raise `lightkube.ApiError` if the object already exist.

    Parameters:
      obj: object to create. This need to be an instance of a resource kind.
      name: Required only for sub-resources: Name of the resource to which this object belongs.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
          If the namespace doesn't exist, `lightkube.ApiError` is raised.
      field_manager: Name associated with the actor or entity that is making these changes.
          This parameter overrides the corresponding `Client` initialization parameter.
      dry_run: Apply server-side dry-run and guarantee that modifications will not
          be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
          to `kubectl` commands.
    """
    return await self._client.request(
        "post",
        name=name,
        namespace=namespace,
        obj=obj,
        params={
            "fieldManager": field_manager,
            "dryRun": "All" if dry_run else None,
        },
    )

delete(res, name: str, *, namespace: Optional[str] = None, grace_period: Optional[int] = None, cascade: Optional[CascadeType] = None, dry_run: bool = False) async

delete(
    res: Type[GlobalResource],
    name: str,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None
delete(
    res: Type[NamespacedResource],
    name: str,
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None

Delete an object. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • res

    Resource kind.

  • name (str) –

    Name of the object to delete.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

  • grace_period (Optional[int], default: None ) –

    The duration in seconds before the object should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is None (default), the default grace period for the specified type will be used. Defaults to a per object value if not specified. Zero means delete immediately.

  • cascade (Optional[CascadeType], default: None ) –

    Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

    • 'CascadeType.ORPHAN' - orphan the dependents;
    • 'CascadeType.BACKGROUND' - allow the garbage collector to delete the dependents in the background;
    • 'CascadeType.FOREGROUND' - a cascading policy that deletes all dependents in the foreground.
  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/async_client.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def delete(
    self,
    res,
    name: str,
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
):
    """Delete an object. Raise `lightkube.ApiError` if the object doesn't exist.

    Parameters:
      res: Resource kind.
      name: Name of the object to delete.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
      grace_period: The duration in seconds before the object should be deleted.
          Value must be non-negative integer. The value zero indicates delete immediately. If this value is `None`
          (default), the default grace period for the specified type will be used. Defaults to a per object value if
          not specified. Zero means delete immediately.
      cascade: Whether and how garbage collection will be performed. Either this field or
          OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set
          in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

          * 'CascadeType.ORPHAN' - orphan the dependents;
          * 'CascadeType.BACKGROUND' - allow the garbage collector to delete the dependents in the background;
          * 'CascadeType.FOREGROUND' - a cascading policy that deletes all dependents in the foreground.
      dry_run: Apply server-side dry-run and guarantee that modifications will not
          be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
          to `kubectl` commands.
    """
    return await self._client.request(
        "delete",
        res=res,
        name=name,
        namespace=namespace,
        params={
            "gracePeriodSeconds": grace_period,
            "propagationPolicy": cascade.value if cascade else None,
            "dryRun": "All" if dry_run else None,
        },
    )

deletecollection(res, *, namespace: Optional[str] = None, grace_period: Optional[int] = None, cascade: Optional[CascadeType] = None, dry_run: bool = False) async

deletecollection(
    res: Type[GlobalResource],
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None
deletecollection(
    res: Type[NamespacedResource],
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
) -> None

Delete all objects of the given kind

Parameters:

  • res

    Resource kind.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

  • grace_period (Optional[int], default: None ) –

    The duration in seconds before the object should be deleted. Value must be non-negative integer. The value zero indicates delete immediately. If this value is None (default), the default grace period for the specified type will be used. Defaults to a per object value if not specified. Zero means delete immediately.

  • cascade (Optional[CascadeType], default: None ) –

    Whether and how garbage collection will be performed. Either this field or OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

    • 'CascadeType.ORPHAN' - orphan the dependents;
    • 'CascadeType.BACKGROUND' - allow the garbage collector to delete the dependents in the background;
    • 'CascadeType.FOREGROUND' - a cascading policy that deletes all dependents in the foreground.
  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/async_client.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
async def deletecollection(
    self,
    res,
    *,
    namespace: Optional[str] = None,
    grace_period: Optional[int] = None,
    cascade: Optional[CascadeType] = None,
    dry_run: bool = False,
):
    """Delete all objects of the given kind

    Parameters:
      res: Resource kind.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
      grace_period: The duration in seconds before the object should be deleted.
          Value must be non-negative integer. The value zero indicates delete immediately. If this value is `None`
          (default), the default grace period for the specified type will be used. Defaults to a per object value if
          not specified. Zero means delete immediately.
      cascade: Whether and how garbage collection will be performed. Either this field or
          OrphanDependents may be set, but not both. The default policy is decided by the existing finalizer set
          in the metadata.finalizers and the resource-specific default policy. Acceptable values are:

          * 'CascadeType.ORPHAN' - orphan the dependents;
          * 'CascadeType.BACKGROUND' - allow the garbage collector to delete the dependents in the background;
          * 'CascadeType.FOREGROUND' - a cascading policy that deletes all dependents in the foreground.
      dry_run: Apply server-side dry-run and guarantee that modifications will not
          be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
          to `kubectl` commands.
    """
    return await self._client.request(
        "deletecollection",
        res=res,
        namespace=namespace,
        params={
            "gracePeriodSeconds": grace_period,
            "propagationPolicy": cascade.value if cascade else None,
            "dryRun": "All" if dry_run else None,
        },
    )

exec(name: str, *, namespace: Optional[str] = None, container: Optional[str] = None, command: Union[str, Iterable[str]], stdin: Union[str, bytes, BinaryIO, None] = None, stdout: Union[BinaryIO, bool] = False, stderr: Union[BinaryIO, bool] = False, decode: Optional[str] = 'utf-8', raise_on_error: bool = False, timeout: Optional[float] = None) -> ExecResponse async

Execute a command in a Pod and return stdout/stderr.

Parameters:

  • name (str) –

    Name of the Pod.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the Pod.

  • container (Optional[str], default: None ) –

    Name of the container in the pod to execute the command in.

  • command (Union[str, Iterable[str]]) –

    Command to execute in the Pod.

  • stdin (Union[str, bytes, BinaryIO, None], default: None ) –

    Data to send to stdin. This can be either a string, bytes or a binary stream. Strings will be encoded as utf-8 before sending.

  • stdout (Union[BinaryIO, bool], default: False ) –

    If True, the command's stdout will be captured and returned in the response. If a binary stream is passed, the command's stdout will be written to it instead.

  • stderr (Union[BinaryIO, bool], default: False ) –

    If True, the command's stderr will be captured and returned in the response. If a binary stream is passed, the command's stderr will be written to it instead.

  • decode (Optional[str], default: 'utf-8' ) –

    Decode captured stdout/stderr in ExecResponse using this encoding as strings. If you expect a binary output, set stdout and/or stderr to a binary stream or set this parameter to None.

  • raise_on_error (bool, default: False ) –

    If True, an exception will be raised if the command exits with a non-zero status code. Note that other exceptions may still be raised for other types of errors, such as connection issues, missing pod or timeouts.

  • timeout (Optional[float], default: None ) –

    If set, the maximum amount of time in seconds to wait for the command to complete before raising a timeout exception. By default, there is no timeout and the method will wait until the command completes or an error occurs.

Source code in src/lightkube/core/async_client.py
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
async def exec(
    self,
    name: str,
    *,
    namespace: Optional[str] = None,
    container: Optional[str] = None,
    command: Union[str, Iterable[str]],
    stdin: Union[str, bytes, BinaryIO, None] = None,
    stdout: Union[BinaryIO, bool] = False,
    stderr: Union[BinaryIO, bool] = False,
    decode: Optional[str] = "utf-8",
    raise_on_error: bool = False,
    timeout: Optional[float] = None,
) -> ExecResponse:
    """Execute a command in a Pod and return stdout/stderr.

    Parameters:
        name: Name of the Pod.
        namespace: Name of the namespace containing the Pod.
        container: Name of the container in the pod to execute the command in.
        command: Command to execute in the Pod.
        stdin: Data to send to stdin. This can be either a string, bytes or a binary stream.
          Strings will be encoded as utf-8 before sending.
        stdout: If `True`, the command's stdout will be captured and returned in the response.
          If a binary stream is passed, the command's stdout will be written to it instead.
        stderr: If `True`, the command's stderr will be captured and returned in the response.
          If a binary stream is passed, the command's stderr will be written to it instead.
        decode: Decode captured stdout/stderr in `ExecResponse` using this encoding as strings.
            If you expect a binary output, set `stdout` and/or `stderr` to a binary stream or set this parameter to `None`.
        raise_on_error: If `True`, an exception will be raised if the command exits with a non-zero status code.
          Note that other exceptions may still be raised for other types of errors, such as connection issues, missing
          pod or timeouts.
        timeout: If set, the maximum amount of time in seconds to wait for the command to complete before raising a
          timeout exception. By default, there is no timeout and the method will wait until the command completes
          or an error occurs.
    """
    commands = [command] if isinstance(command, str) else list(command)
    params = {"command": commands, "stdout": stdout, "stderr": stderr, "stdin": stdin, "container": container}
    return await self._client.ws_request(
        "exec",
        name=name,
        namespace=namespace,
        params=params,
        raise_on_error=raise_on_error,
        decode=decode,
        timeout=timeout,
    )

get(res, name, *, namespace: Optional[str] = None) async

get(res: Type[GlobalResource], name: str) -> GlobalResource
get(
    res: Type[AllNamespacedResource],
    name: str,
    *,
    namespace: Optional[str] = None,
) -> AllNamespacedResource

Return an object. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • res

    Resource kind.

  • name

    Name of the object to fetch.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

Source code in src/lightkube/core/async_client.py
218
219
220
221
222
223
224
225
226
async def get(self, res, name, *, namespace: Optional[str] = None):
    """Return an object. Raise `lightkube.ApiError` if the object doesn't exist.

    Parameters:
      res: Resource kind.
      name: Name of the object to fetch.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
    """
    return await self._client.request("get", res=res, name=name, namespace=namespace)

list(res, *, namespace=None, chunk_size=None, labels=None, fields=None)

list(
    res: Type[GlobalResource],
    *,
    chunk_size: Optional[int] = None,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
) -> ListAsyncIterable[GlobalResource]
list(
    res: Type[NamespacedResource],
    *,
    namespace: Optional[str] = None,
    chunk_size: Optional[int] = None,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
) -> ListAsyncIterable[NamespacedResource]

Return an iterator of objects matching the selection criteria.

Parameters:

  • res

    resource kind.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • chunk_size

    Limit the amount of objects returned for each rest API call. This method will automatically execute all subsequent calls until no more data is available.

  • labels

    Limit the returned objects by labels. More details.

  • fields

    Limit the returned objects by fields. More details.

Source code in src/lightkube/core/async_client.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None):
    """Return an iterator of objects matching the selection criteria.

    Parameters:
      res: resource kind.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
      chunk_size: Limit the amount of objects returned for each rest API call.
        This method will automatically execute all subsequent calls until no more data is available.
      labels: Limit the returned objects by labels. More [details](../selectors.md).
      fields: Limit the returned objects by fields. More [details](../selectors.md).
    """

    br = self._client.prepare_request(
        "list",
        res=res,
        namespace=namespace,
        params={
            "limit": chunk_size,
            "labelSelector": build_selector(labels) if labels else None,
            "fieldSelector": (build_selector(fields, for_fields=True) if fields else None),
        },
    )
    return self._client.list(br)

log(name, *, namespace: Optional[str] = None, container: Optional[str] = None, follow: bool = False, since: Optional[int] = None, tail_lines: Optional[int] = None, timestamps: bool = False, newlines: bool = True)

log(
    name: str,
    *,
    namespace: Optional[str] = None,
    container: Optional[str] = None,
    follow: bool = False,
    since: Optional[int] = None,
    tail_lines: Optional[int] = None,
    timestamps: bool = False,
    newlines: bool = True,
) -> AsyncIterable[str]

Return log lines for the given Pod. Raise lightkube.ApiError if the Pod doesn't exist or has not yet started.

Parameters:

  • name

    Name of the Pod.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the Pod.

  • container (Optional[str], default: None ) –

    The container for which to stream logs. Defaults to only container if there is one container in the pod.

  • follow (bool, default: False ) –

    If True, follow the log stream of the pod.

  • since (Optional[int], default: None ) –

    If set, a relative time in seconds before the current time from which to fetch logs.

  • tail_lines (Optional[int], default: None ) –

    If set, the number of lines from the end of the logs to fetch.

  • timestamps (bool, default: False ) –

    If True, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output.

  • newlines (bool, default: True ) –

    If True, each line will end with a newline, otherwise the newlines will be stripped.

Source code in src/lightkube/core/async_client.py
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
def log(
    self,
    name,
    *,
    namespace: Optional[str] = None,
    container: Optional[str] = None,
    follow: bool = False,
    since: Optional[int] = None,
    tail_lines: Optional[int] = None,
    timestamps: bool = False,
    newlines: bool = True,
):
    """Return log lines for the given Pod. Raise `lightkube.ApiError` if the Pod doesn't exist or has not yet started.

    Parameters:
      name: Name of the Pod.
      namespace: Name of the namespace containing the Pod.
      container: The container for which to stream logs. Defaults to only container if there is one container in the pod.
      follow: If `True`, follow the log stream of the pod.
      since: If set, a relative time in seconds before the current time from which to fetch logs.
      tail_lines: If set, the number of lines from the end of the logs to fetch.
      timestamps: If `True`, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output.
      newlines: If `True`, each line will end with a newline, otherwise the newlines will be stripped.
    """
    br = self._client.prepare_request(
        "get",
        core_v1.PodLog,
        name=name,
        namespace=namespace,
        params={
            "timestamps": timestamps,
            "tailLines": tail_lines,
            "container": container,
            "sinceSeconds": since,
            "follow": follow,
        },
    )
    req = self._client.build_adapter_request(br)

    async def stream_log() -> AsyncIterator[str]:
        resp = await self._client.send(req, stream=follow)
        if resp.is_error and follow:
            # The body must be read into memory before accessing it when building the exception.
            resp.read()
        self._client.raise_for_status(resp)
        async for line in resp.aiter_lines():
            yield line + "\n" if newlines else line

    return stream_log()

patch(res, name, obj, *, namespace: Optional[str] = None, patch_type=PatchType.STRATEGIC, field_manager: Optional[str] = None, force=False, dry_run: bool = False) async

patch(
    res: Type[GlobalSubResource],
    name: str,
    obj: Union[GlobalSubResource, Dict, List],
    *,
    patch_type: PatchType = PatchType.STRATEGIC,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalSubResource
patch(
    res: Type[GlobalResource],
    name: str,
    obj: Union[GlobalResource, Dict, List],
    *,
    patch_type: PatchType = PatchType.STRATEGIC,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> GlobalResource
patch(
    res: Type[AllNamespacedResource],
    name: str,
    obj: Union[AllNamespacedResource, Dict, List],
    *,
    namespace: Optional[str] = None,
    patch_type: PatchType = PatchType.STRATEGIC,
    field_manager: Optional[str] = None,
    force: bool = False,
    dry_run: bool = False,
) -> AllNamespacedResource

Patch an object. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • res

    Resource kind.

  • name

    Name of the object to patch.

  • obj

    patch object.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

  • patch_type

    Type of patch to execute. Default PatchType.STRATEGIC.

  • field_manager (Optional[str], default: None ) –

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter. NOTE: This parameter is mandatory (here or at Client creation time) for PatchType.APPLY.

  • force

    Force is going to "force" Apply requests. It means user will re-acquire conflicting fields owned by other people. This parameter is ignored for non-apply patch types

  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/async_client.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
async def patch(
    self,
    res,
    name,
    obj,
    *,
    namespace: Optional[str] = None,
    patch_type=PatchType.STRATEGIC,
    field_manager: Optional[str] = None,
    force=False,
    dry_run: bool = False,
):
    """Patch an object. Raise `lightkube.ApiError` if the object doesn't exist.

    Parameters:
      res: Resource kind.
      name: Name of the object to patch.
      obj: patch object.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
      patch_type: Type of patch to execute. Default `PatchType.STRATEGIC`.
      field_manager: Name associated with the actor or entity that is making these changes.
          This parameter overrides the corresponding `Client` initialization parameter.
          **NOTE**: This parameter is mandatory (here or at `Client` creation time) for `PatchType.APPLY`.
      force: Force is going to "force" Apply requests. It means user will re-acquire conflicting
        fields owned by other people. This parameter is ignored for non-apply patch types
      dry_run: Apply server-side dry-run and guarantee that modifications will not
          be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
          to `kubectl` commands.
    """
    force_param = "true" if force and patch_type == PatchType.APPLY else None
    return await self._client.request(
        "patch",
        res=res,
        name=name,
        namespace=namespace,
        obj=obj,
        headers={"Content-Type": patch_type.value},
        params={
            "force": force_param,
            "fieldManager": field_manager,
            "dryRun": "All" if dry_run else None,
        },
    )

replace(obj, name=None, *, namespace: Optional[str] = None, field_manager: Optional[str] = None, dry_run: bool = False) async

replace(
    obj: GlobalSubResource,
    name: str,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalSubResource
replace(
    obj: NamespacedSubResource,
    name: str,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedSubResource
replace(
    obj: GlobalResource,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalResource
replace(
    obj: NamespacedResource,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedResource

Replace an existing resource. Raise lightkube.ApiError if the object doesn't exist.

Parameters:

  • obj

    new object. This need to be an instance of a resource kind.

  • name

    Required only for sub-resources: Name of the resource to which this object belongs.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

  • field_manager (Optional[str], default: None ) –

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter.

  • dry_run (bool, default: False ) –

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/async_client.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
async def replace(
    self,
    obj,
    name=None,
    *,
    namespace: Optional[str] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
):
    """Replace an existing resource. Raise `lightkube.ApiError` if the object doesn't exist.

    Parameters:
      obj: new object. This need to be an instance of a resource kind.
      name: Required only for sub-resources: Name of the resource to which this object belongs.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
      field_manager: Name associated with the actor or entity that is making these changes.
          This parameter overrides the corresponding `Client` initialization parameter.
      dry_run: Apply server-side dry-run and guarantee that modifications will not
          be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
          to `kubectl` commands.
    """
    return await self._client.request(
        "put",
        name=name,
        namespace=namespace,
        obj=obj,
        params={
            "fieldManager": field_manager,
            "dryRun": "All" if dry_run else None,
        },
    )

set(res, name, *, namespace=None, labels=None, annotations=None, field_manager=None, dry_run=False) async

set(
    res: Type[GlobalResource],
    name: str,
    *,
    labels: Optional[Dict[str, str]] = None,
    annotations: Optional[Dict[str, str]] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> GlobalResource
set(
    res: Type[NamespacedResource],
    name: str,
    *,
    namespace: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    annotations: Optional[Dict[str, str]] = None,
    field_manager: Optional[str] = None,
    dry_run: bool = False,
) -> NamespacedResource

Set labels and annotations to an object. Raise lightkube.ApiError if the object doesn't exist. This is a convenience method that use patch internally.

Parameters:

  • res

    Resource kind.

  • name

    Name of the object to patch.

  • namespace

    Name of the namespace containing the object (Only for namespaced resources).

  • labels

    Labels to set. Labels are defined as key/value pairs. To remove a label, set its value to None.

  • annotations

    Annotations to set. Annotations are defined as key/value pairs. To remove an annotation, set its value to None.

  • field_manager

    Name associated with the actor or entity that is making these changes. This parameter overrides the corresponding Client initialization parameter.

  • dry_run

    Apply server-side dry-run and guarantee that modifications will not be persisted in storage. Setting this field to True is equivalent of passing --dry-run=server to kubectl commands.

Source code in src/lightkube/core/async_client.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
async def set(
    self,
    res,
    name,
    *,
    namespace=None,
    labels=None,
    annotations=None,
    field_manager=None,
    dry_run=False,
):
    """Set labels and annotations to an object. Raise lightkube.ApiError if the object doesn't exist.
    This is a convenience method that use `patch` internally.

    Parameters:
        res: Resource kind.
        name: Name of the object to patch.
        namespace: Name of the namespace containing the object (Only for namespaced resources).
        labels: Labels to set. Labels are defined as key/value pairs. To remove a label, set its value to `None`.
        annotations: Annotations to set. Annotations are defined as key/value pairs. To remove an annotation,
            set its value to `None`.
        field_manager: Name associated with the actor or entity that is making these changes.
            This parameter overrides the corresponding `Client` initialization parameter.
        dry_run: Apply server-side dry-run and guarantee that modifications will not
            be persisted in storage. Setting this field to `True` is equivalent of passing `--dry-run=server`
            to `kubectl` commands.
    """
    metadata = [("labels", labels), ("annotations", annotations)]
    payload = {"metadata": {k: v for k, v in metadata if v is not None}}
    return await self.patch(
        res, name, payload, namespace=namespace, field_manager=field_manager, dry_run=dry_run, patch_type=PatchType.MERGE
    )

wait(res, name: str, *, for_conditions: Iterable[str], namespace: Optional[str] = None, raise_for_conditions: Iterable[str] = ()) async

wait(
    res: Type[GlobalResource],
    name: str,
    *,
    for_conditions: Iterable[str],
    raise_for_conditions: Iterable[str] = (),
) -> GlobalResource
wait(
    res: Type[AllNamespacedResource],
    name: str,
    *,
    for_conditions: Iterable[str],
    namespace: Optional[str] = None,
    raise_for_conditions: Iterable[str] = (),
) -> AllNamespacedResource

Wait for specified conditions. Raise lightkube.ObjectDeleted if the object get deleted during waiting.

Parameters:

  • res

    Resource kind.

  • name (str) –

    Name of resource to wait for.

  • for_conditions (Iterable[str]) –

    Condition types that are considered a success and will end the wait.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

  • raise_for_conditions (Iterable[str], default: () ) –

    Condition types that are considered failures and will exit the wait early with lightkube.ConditionError.

Source code in src/lightkube/core/async_client.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
async def wait(
    self,
    res,
    name: str,
    *,
    for_conditions: Iterable[str],
    namespace: Optional[str] = None,
    raise_for_conditions: Iterable[str] = (),
):
    """Wait for specified conditions.
    Raise `lightkube.ObjectDeleted` if the object get deleted during waiting.

    Parameters:
      res: Resource kind.
      name: Name of resource to wait for.
      for_conditions: Condition types that are considered a success and will end the wait.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
      raise_for_conditions: Condition types that are considered failures and will exit the wait early
          with `lightkube.ConditionError`.
    """

    kind = r.api_info(res).plural
    full_name = f"{kind}/{name}"

    for_conditions = list(for_conditions)
    raise_for_conditions = list(raise_for_conditions)

    watch = self.watch(res, namespace=namespace, fields={"metadata.name": name})
    try:
        async for op, obj in watch:
            if obj.status is None:
                continue

            if op == "DELETED":
                raise ObjectDeleted(full_name)

            try:
                status = obj.status.to_dict()
            except AttributeError:
                status = obj.status

            conditions = [c for c in status.get("conditions", []) if c["status"] == "True"]
            if any(c["type"] in for_conditions for c in conditions):
                return obj

            failures = [c for c in conditions if c["type"] in raise_for_conditions]

            if failures:
                raise ConditionError(full_name, [f.get("message", f["type"]) for f in failures])
    finally:
        # we ensure the async generator is closed before returning
        await watch.aclose()

watch(res, *, namespace: Optional[str] = None, labels: Optional[LabelSelector] = None, fields: Optional[FieldSelector] = None, server_timeout: Optional[int] = None, resource_version: Optional[str] = None, on_error: OnErrorHandler = on_error_raise)

watch(
    res: Type[GlobalResource],
    *,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
    server_timeout: Optional[int] = None,
    resource_version: Optional[str] = None,
    on_error: OnErrorHandler = on_error_raise,
) -> AsyncIterable[Tuple[str, GlobalResource]]
watch(
    res: Type[NamespacedResource],
    *,
    namespace: Optional[str] = None,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
    server_timeout: Optional[int] = None,
    resource_version: Optional[str] = None,
    on_error: OnErrorHandler = on_error_raise,
) -> AsyncIterable[Tuple[str, NamespacedResource]]

Watch changes to objects

Parameters:

  • res

    resource kind.

  • namespace (Optional[str], default: None ) –

    Name of the namespace containing the object (Only for namespaced resources).

  • labels (Optional[LabelSelector], default: None ) –

    Limit the returned objects by labels. More details.

  • fields (Optional[FieldSelector], default: None ) –

    Limit the returned objects by fields. More details.

  • server_timeout (Optional[int], default: None ) –

    Server side timeout in seconds to close a watch request. This method will automatically create a new request whenever the backend close the connection without errors.

  • resource_version (Optional[str], default: None ) –

    When set, only modification events following this version will be returned.

  • on_error (OnErrorHandler, default: on_error_raise ) –

    Function that control what to do in case of errors. The default implementation will raise any error.

Source code in src/lightkube/core/async_client.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def watch(
    self,
    res,
    *,
    namespace: Optional[str] = None,
    labels: Optional[LabelSelector] = None,
    fields: Optional[FieldSelector] = None,
    server_timeout: Optional[int] = None,
    resource_version: Optional[str] = None,
    on_error: OnErrorHandler = on_error_raise,
):
    """Watch changes to objects

    Parameters:
      res: resource kind.
      namespace: Name of the namespace containing the object (Only for namespaced resources).
      labels: Limit the returned objects by labels. More [details](../selectors.md).
      fields: Limit the returned objects by fields. More [details](../selectors.md).
      server_timeout: Server side timeout in seconds to close a watch request.
          This method will automatically create a new request whenever the backend close the connection
          without errors.
      resource_version: When set, only modification events following this version will be returned.
      on_error: Function that control what to do in case of errors.
          The default implementation will raise any error.
    """
    br = self._client.prepare_request(
        "list",
        res=res,
        namespace=namespace,
        watch=True,
        params={
            "timeoutSeconds": server_timeout,
            "resourceVersion": resource_version,
            "labelSelector": build_selector(labels) if labels else None,
            "fieldSelector": (build_selector(fields, for_fields=True) if fields else None),
        },
    )
    return self._client.watch(br, on_error=on_error)