Skip to content

Python SDK

The NCAE Python SDK provides a convenient and efficient way to interact with the NCAE API using Python. It offers both synchronous and asynchronous clients, allowing you to choose the best fit for your use case. The SDK also includes a FastAPI module for building custom NCAE modules with ease.

See the official project on PyPI for more details: https://pypi.org/project/ncae-sdk/

Basic Usage

This snippet shows various features of the NCAE Python SDK with its asynchronous client implementation. You can easily use the synchronous client as well, by simply importing Client instead of AsyncClient and removing all async/await keywords.

import asyncio
from datetime import datetime, timedelta

from ncae_sdk import AsyncClient, SessionAuth
from ncae_sdk.types import ExtApiAuthType, InfluxDataPoint, InfluxRetention, InfluxTagFilter


async def main() -> None:
    async with AsyncClient(
        base_url="https://ncae.test.lab/api/",
        auth=SessionAuth(username="admin", password=""),
        verify=False,
    ) as client:
        print(">> List multiple")
        devices = await client.devices.list()
        async for device in devices:
            print(device)

        print(">> Find exactly one")
        cool_device = await client.devices.find(host="10.241.120.201")
        print(cool_device)

        print(">> Get by id")
        some_device = await client.devices.get(45)
        print(some_device)

        async for device in await client.devices.list():
            if device.name.startswith("sdk-device"):
                await client.devices.delete(device.id)

        print(">> Create with minimum props")
        new_device = await client.devices.create(
            name="sdk-device",
            host="sdk-device.test.lab",
            device_model_id=1,
            credential_id=1,
        )
        print(new_device)

        print(">> Update device")
        updated_device = await client.devices.update(
            new_device.id,
            name="sdk-device-new",
            host="new.test.lab",
            extra_vars={"shenanigans": True},
        )
        print(updated_device)

        groups = await client.device_groups.list()
        async for group in groups:
            if group.name.startswith("dk-group"):
                await client.device_groups.delete(group.id)

        print(">> New group")
        new_group = await client.device_groups.create(name="sdk-group", device_ids=[new_device.id])
        print(new_group)

        print(">> Update group slug")
        updated_group = await client.device_groups.update(new_group.id, slug="cool_slug")
        print(updated_group)

        print(">> Find our group by slug")
        found_group = await client.device_groups.find(slug="cool_slug")
        print(found_group)

        print(">> Do some user management")
        users = await client.users.list()
        async for user in users:
            print(user)

        sdk_user = await client.users.find(username="sdk-user")
        if not sdk_user:
            sdk_user = await client.users.create(username="sdk-user")

        await client.users.update(sdk_user.id, is_active=True, is_staff=True, is_superuser=True)
        await client.users.set_password(sdk_user.id, "...")

        print(">> And do something with enums")
        auths = await client.ext_api_auths.list()
        async for auth in auths:
            print(auth)
            if auth.name.startswith("sdk-auth"):
                await client.ext_api_auths.delete(auth.id)

        new_auth = await client.ext_api_auths.create(
            name="sdk-auth",
            type=ExtApiAuthType.BASIC,
            auth_username="top",
            auth_value="...",
        )
        print(new_auth)

        print(">> Listing Influx repositories")
        sdk_repo = None
        items = await client.influx_repositories.list()
        async for item in items:
            print(item)
            if item.name == "sdk-repo":
                sdk_repo = item

        print(">> Creating Influx repository if not found")
        if not sdk_repo:
            sdk_repo = await client.influx_repositories.create(
                uid="sdk-repo",
                name="sdk-repo",
                system_id=1,
                module_id=1,
                retention=InfluxRetention.KEEP_1_HOUR,
            )
        print(sdk_repo)

        print(">> Ingesting metrics")
        points = [
            InfluxDataPoint(measurement="sdk", tag_data={"test": "hello"}, field_data={"value": 1}),
            InfluxDataPoint(measurement="sdk", tag_data={"test": "world"}, field_data={"value": 2}),
        ]
        await client.influx_repositories.ingest_metrics(sdk_repo.id, points)

        print(">> Querying metrics")
        date_from = datetime.now() - timedelta(minutes=1)
        date_to = date_from + timedelta(minutes=2)
        query = await client.influx_repositories.query_metrics(
            sdk_repo.id,
            date_from=date_from,
            date_to=date_to,
            tag_filters=[InfluxTagFilter(key="test", value="hello")],
        )
        print(query)

        print(">> Do some tagging")
        tag_relations = await client.tag_relations.list(object_type="serviceinstance", object_ids=[6483])
        async for tag_relation in tag_relations:
            print(tag_relation)

        response = await client.tag_relations.set(object_type="serviceinstance", object_id=6483, tag_ids=[9, 5])
        print(response)

        response = await client.tag_relations.set(object_type="serviceinstance", object_id=6483, tag_ids=[9])
        print(response)

        print(">> Do some service instance logging with header hacks")
        client.session._client.headers["X-Transaction-Id"] = "fea6737a-f07d-4719-a6c5-d503c0bf5a98"
        await client.service_instance_logs.create(
            hostname="server-abc.example.com",
            title="Hello World",
            text="This is a test message from the Python SDK",
            service_instance_id=6793,
        )


async def demo() -> None:
    async with AsyncClient(
        base_url="https://ncae.test.lab/api/",
        auth=SessionAuth(username="admin", password="..."),
        verify=False,
    ) as client:
        items = await client.upload_files.list()
        async for item in items:
            print(item)

        async with client.upload_files.download(22) as chunks:
            async for chunk in chunks:
                print("Chunk", len(chunk))


asyncio.run(demo())

FastAPI Module Example (sync)

This snippet showcases the FastAPI integration of the NCAE Python SDK for building custom NCAE modules. It demonstrates the available decorators for easily dealing with ExtApiPhase and (OneTime)Report routes, as well as the usage of the executor context and device workers for parallel processing on devices.

import random
import time
import uuid
from typing import Optional

from pydantic import BaseModel

from ncae_sdk.fastapi import create_module_app
from ncae_sdk.fastapi.extapi import (
    ExtApiDevice,
    ExtApiPhaseContext,
    ExtApiReportContext,
    ExtApiReportResponse,
    extapi_phase_route,
    extapi_report_route,
)

app = create_module_app("Example Module")


class Invite(BaseModel):
    first_name: str
    last_name: str


class Cmdb(BaseModel):
    invites: list[Invite]


class Callback(BaseModel):
    psk_key: Optional[str] = None


@extapi_phase_route(app, "/phase1")
def phase_one(ctx: ExtApiPhaseContext[Cmdb, Callback]) -> None:
    ctx.log_info("Hello from an NCAE module!")
    ctx.log_debug("This is running in tenant %s", ctx.body.ncae_tenant_slug)

    devices = ctx.client.devices.list()
    for device in devices:
        ctx.internal_logger.debug("> Found device: %s", device.name)

    print("Old Callback Data", ctx.callback_data)
    ctx.callback_data.psk_key = str(uuid.uuid4())
    ctx.update_callback_data()
    print("New Callback Data", ctx.callback_data)

    for invite in ctx.cmdb_data.invites:
        print(f"Invite: {invite.first_name} {invite.last_name}")


@extapi_phase_route(app, "/phase2")
def phase_two(ctx: ExtApiPhaseContext[Cmdb]) -> None:
    ctx.internal_logger.info("Hello World")

    runner = ctx.get_device_runner()
    results = runner.run_sync(my_device_worker, "Hello", "From", "World")
    for result in results:
        if not result.ok:
            ctx.log_error("Device worker failed: %s", result.exception)
            continue

        ctx.log_info("Got another device result: %s", result.value)

    ctx.log_info("Runner Stats: %s", runner.progress)

    ctx.log_warning("Phase 2 is running!")
    raise Exception("Oops, something went wrong!")


def my_device_worker(ctx: ExtApiPhaseContext[Cmdb], device: ExtApiDevice, arg1: str, arg2: str, arg3: str) -> str:
    # Here you could do some complex and time-consuming processing on the device
    # This function is automatically run for each device in the service instance
    # To support parallel execution, this function is always run in a thread pool
    # Its return value is collected and returned to the executor context

    time.sleep(random.uniform(0.5, 5.0))  # Simulate some processing time
    ctx.internal_logger.debug("Doing some heavy lifting on device: %s", device.name)
    ctx.internal_logger.debug("I also support additional arguments: %s, %s, %s", arg1, arg2, arg3)

    return device.host


class Query(BaseModel):
    my_string: Optional[str] = None
    my_number: Optional[int] = None


@extapi_report_route(app, "/report")
def report(ctx: ExtApiReportContext[Query]) -> ExtApiReportResponse:
    print("Devices", ctx.devices)
    print("My String", ctx.query.my_string)
    print("My Number", ctx.query.my_number)

    return ExtApiReportResponse(
        field_names=["pid", "name", "hobby"],
        field_data=[
            {"pid": 1, "name": "Alice", "hobby": "chess"},
            {"pid": 2, "name": "Bob", "hobby": "hiking"},
            {"pid": 3, "name": "Charlie", "hobby": "gaming"},
            {"pid": 4, "name": "Diana", "hobby": "reading"},
            {"pid": 5, "name": "Eve", "hobby": "cooking"},
        ],
    )


@extapi_report_route(app, "/report-light")
def report_light(ctx: ExtApiReportContext) -> ExtApiReportResponse:
    print("Tenant ID", ctx.body.ncae_tenant_id)

    return ExtApiReportResponse(
        field_names=["pid", "name", "food"],
        field_data=[
            {"pid": 1, "name": "Alice", "food": "pizza"},
            {"pid": 2, "name": "Bob", "food": "sushi"},
            {"pid": 3, "name": "Charlie", "food": "pasta"},
        ],
    )
Last updated on