Skip to content

Latest commit

 

History

History
2062 lines (1418 loc) · 36.5 KB

File metadata and controls

2062 lines (1418 loc) · 36.5 KB

Reference

Entities

client.entities.publish_entity(...) -> AsyncHttpResponse[Entity]

📝 Description

Publish an entity for ingest into the Entities API. Entities created with this method are "owned" by the originator: other sources, such as the UI, may not edit or delete these entities. The server validates entities at API call time and returns an error if the entity is invalid.

An entity ID must be provided when calling this endpoint. If the entity referenced by the entity ID does not exist then it will be created. Otherwise the entity will be updated. An entity will only be updated if its provenance.sourceUpdateTime is greater than the provenance.sourceUpdateTime of the existing entity.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.entities.publish_entity()

⚙️ Parameters

entity_id: typing.Optional[str]

A Globally Unique Identifier (GUID) for your entity. This is a required field.

description: typing.Optional[str]

A human-readable entity description that's helpful for debugging purposes and human traceability. If this field is empty, the Entity Manager API generates one for you.

is_live: typing.Optional[bool]

Indicates the entity is active and should have a lifecycle state of CREATE or UPDATE. Set this field to true when publishing an entity.

created_time: typing.Optional[dt.datetime]

The time when the entity was first known to the entity producer. If this field is empty, the Entity Manager API uses the current timestamp of when the entity is first received. For example, when a drone is first powered on, it might report its startup time as the created time. The timestamp doesn't change for the lifetime of an entity.

expiry_time: typing.Optional[dt.datetime]

Future time that expires an entity and updates the is_live flag. For entities that are constantly updating, the expiry time also updates. In some cases, this may differ from is_live. Example: Entities with tasks exported to an external system must remain active even after they expire. This field is required when publishing a prepopulated entity. The expiry time must be in the future, but less than 30 days from the current time.

no_expiry: typing.Optional[bool]

Use noExpiry only when the entity contains information that should be available to other tasks or integrations beyond its immediate operational context. For example, use noExpiry for long-living geographical entities that maintain persistent relevance across multiple operations or tasks.

status: typing.Optional[Status] — Human-readable descriptions of what the entity is currently doing.

location: typing.Optional[Location] — Geospatial data related to the entity, including its position, kinematics, and orientation.

location_uncertainty: typing.Optional[LocationUncertainty] — Indicates uncertainty of the entity's position and kinematics.

geo_shape: typing.Optional[GeoShape] — Geospatial representation of the entity, including entities that cover an area rather than a fixed point.

geo_details: typing.Optional[GeoDetails] — Additional details on what the geospatial area or point represents, along with visual display details.

aliases: typing.Optional[Aliases] — Entity name displayed in the Lattice UI side panel. Also includes identifiers that other systems can use to reference the same entity.

tracked: typing.Optional[Tracked] — If this entity is tracked by another entity, this component contains data related to how it's being tracked.

correlation: typing.Optional[Correlation] — If this entity has been correlated or decorrelated to another one, this component contains information on the correlation or decorrelation.

mil_view: typing.Optional[MilView] — View of the entity.

ontology: typing.Optional[Ontology] — Ontology defines an entity's categorization in Lattice, and improves data retrieval and integration. Builds a standardized representation of the entity.

sensors: typing.Optional[Sensors] — Details an entity's available sensors.

payloads: typing.Optional[Payloads] — Details an entity's available payloads.

power_state: typing.Optional[PowerState] — Details the entity's power source.

provenance: typing.Optional[Provenance] — The primary data source provenance for this entity.

overrides: typing.Optional[Overrides] — Provenance of override data.

indicators: typing.Optional[Indicators]

Describes an entity's specific characteristics and the operations that can be performed on the entity. For example, "simulated" informs the operator that the entity is from a simulation, and "deletable" informs the operator (and system) that the delete operation is valid against the entity.

target_priority: typing.Optional[TargetPriority] — The prioritization associated with an entity, such as if it's a threat or a high-value target.

signal: typing.Optional[Signal] — Describes an entity's signal characteristics, primarily used when an entity is a signal of interest.

transponder_codes: typing.Optional[TransponderCodes] — A message describing any transponder codes associated with Mode 1, 2, 3, 4, 5, S interrogations. These are related to ADS-B modes.

data_classification: typing.Optional[Classification]

Describes an entity's security classification levels at an overall classification level and on a per field level.

task_catalog: typing.Optional[TaskCatalog] — A catalog of tasks that can be performed by an entity.

media: typing.Optional[Media] — Media associated with an entity, such as videos, images, or thumbnails.

relationships: typing.Optional[Relationships] — The relationships between this entity and other entities in the common operational picture (COP).

visual_details: typing.Optional[VisualDetails] — Visual details associated with the display of an entity in the client.

dimensions: typing.Optional[Dimensions] — Physical dimensions of the entity.

route_details: typing.Optional[RouteDetails] — Additional information about an entity's route.

schedules: typing.Optional[Schedules] — Schedules associated with this entity.

health: typing.Optional[Health] — Health metrics or connection status reported by the entity.

group_details: typing.Optional[GroupDetails] — Details for the group associated with this entity.

supplies: typing.Optional[Supplies] — Contains relevant supply information for the entity, such as fuel.

orbit: typing.Optional[Orbit] — Orbit information for space objects.

symbology: typing.Optional[Symbology] — Symbology/iconography for the entity respecting an existing standard.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.entities.get_entity(...) -> AsyncHttpResponse[Entity]

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.entities.get_entity(
    entity_id="entityId",
)

⚙️ Parameters

entity_id: str — ID of the entity to return

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.entities.override_entity(...) -> AsyncHttpResponse[Entity]

📝 Description

Only fields marked with overridable can be overridden. Please refer to our documentation to see the comprehensive list of fields that can be overridden. The entity in the request body should only have a value set on the field specified in the field path parameter. Field paths are rooted in the base entity object and must be represented using lower_snake_case. Do not include "entity" in the field path.

Note that overrides are applied in an eventually consistent manner. If multiple overrides are created concurrently for the same field path, the last writer wins.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.entities.override_entity(
    entity_id="entityId",
    field_path="mil_view.disposition",
)

⚙️ Parameters

entity_id: str — The unique ID of the entity to override

field_path: str — fieldPath to override

entity: typing.Optional[Entity]

The entity containing the overridden fields. The service will extract the overridable fields from the object and ignore all other fields.

provenance: typing.Optional[Provenance] — Additional information about the source of the override.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.entities.remove_entity_override(...) -> AsyncHttpResponse[Entity]

📝 Description

This operation clears the override value from the specified field path on the entity.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.entities.remove_entity_override(
    entity_id="entityId",
    field_path="mil_view.disposition",
)

⚙️ Parameters

entity_id: str — The unique ID of the entity to undo an override from.

field_path: str — The fieldPath to clear overrides from.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.entities.long_poll_entity_events(...) -> AsyncHttpResponse[EntityEventResponse]

📝 Description

This is a long polling API that will first return all pre-existing data and then return all new data as it becomes available. If you want to start a new polling session then open a request with an empty 'sessionToken' in the request body. The server will return a new session token in the response. If you want to retrieve the next batch of results from an existing polling session then send the session token you received from the server in the request body. If no new data is available then the server will hold the connection open for up to 5 minutes. After the 5 minute timeout period, the server will close the connection with no results and you may resume polling with the same session token. If your session falls behind more than 3x the total number of entities in the environment, the server will terminate your session. In this case you must start a new session by sending a request with an empty session token.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.entities.long_poll_entity_events(
    session_token="sessionToken",
)

⚙️ Parameters

session_token: str — Long-poll session identifier. Leave empty to start a new polling session.

batch_size: typing.Optional[int] — Maximum size of response batch. Defaults to 100. Must be between 1 and 2000 (inclusive).

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.entities.stream_entities(...) -> typing.AsyncIterator[ AsyncHttpResponse[typing.AsyncIterator[StreamEntitiesResponse]] ]

📝 Description

Establishes a server-sent events (SSE) connection that streams entity data in real-time. This is a one-way connection from server to client that follows the SSE protocol with text/event-stream content type.

This endpoint enables clients to maintain a real-time view of the common operational picture (COP) by first streaming all pre-existing entities that match filter criteria, then continuously delivering updates as entities are created, modified, or deleted.

The server first sends events with type PREEXISTING for all live entities matching the filter that existed before the stream was open, then streams CREATE events for newly created entities, UPDATE events when existing entities change, and DELETED events when entities are removed. The stream remains open indefinitely unless preExistingOnly is set to true.

Heartbeat messages can be configured to maintain connection health and detect disconnects by setting the heartbeatIntervalMS parameter. These heartbeats help keep the connection alive and allow clients to verify the server is still responsive.

Clients can optimize bandwidth usage by specifying which entity components they need populated using the componentsToInclude parameter. This allows receiving only relevant data instead of complete entities.

The connection automatically recovers from temporary disconnections, resuming the stream where it left off. Unlike polling approaches, this provides real-time updates with minimal latency and reduced server load.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
response = client.entities.stream_entities()
for chunk in response.data:
    yield chunk

⚙️ Parameters

heartbeat_interval_ms: typing.Optional[int] — at what interval to send heartbeat events, defaults to 30s.

pre_existing_only: typing.Optional[bool] — only stream pre-existing entities in the environment and then close the connection, defaults to false.

components_to_include: typing.Optional[typing.Sequence[str]] — list of components to include, leave empty to include all components.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

Tasks

client.tasks.create_task(...) -> AsyncHttpResponse[Task]

📝 Description

Creates a new Task in the system with the specified parameters.

This method initiates a new task with a unique ID (either provided or auto-generated), sets the initial task state to STATUS_CREATED, and establishes task ownership. The task can be assigned to a specific agent through the Relations field.

Once created, a task enters the lifecycle workflow and can be tracked, updated, and managed through other Tasks API endpoints.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.tasks.create_task()

⚙️ Parameters

task_id: typing.Optional[str]

If non-empty, will set the requested Task ID, otherwise will generate a new random GUID. Will reject if supplied Task ID does not match [A-Za-z0-9_-.]{5,36}.

display_name: typing.Optional[str] — Human readable display name for this Task, should be short (<100 chars).

description: typing.Optional[str] — Longer, free form human readable description of this Task.

specification: typing.Optional[GoogleProtobufAny] — The path for the Protobuf task definition, and the complete task data.

author: typing.Optional[Principal]

relations: typing.Optional[Relations]

Any relationships associated with this Task, such as a parent Task or an assignee this Task is designated to for execution.

is_executed_elsewhere: typing.Optional[bool]

If set, then the service will not trigger execution of this task on an agent. Useful for when ingesting tasks from an external system that is triggering execution of tasks on agents.

initial_entities: typing.Optional[typing.Sequence[TaskEntity]]

Indicates an initial set of entities that can be used to execute an entity aware task. For example, an entity Objective, an entity Keep In Zone, etc.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.tasks.get_task(...) -> AsyncHttpResponse[Task]

📝 Description

Retrieves a specific Task by its ID, with options to select a particular task version or view.

This method returns detailed information about a task including its current status, specification, relations, and other metadata. The response includes the complete Task object with all associated fields.

By default, the method returns the latest definition version of the task from the manager's perspective.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.tasks.get_task(
    task_id="taskId",
)

⚙️ Parameters

task_id: str — ID of task to return

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.tasks.update_task_status(...) -> AsyncHttpResponse[Task]

📝 Description

Updates the status of a Task as it progresses through its lifecycle.

This method allows agents or operators to report the current state of a task, which could include changes to task status, and error information.

Each status update increments the task's status_version. When updating status, clients must provide the current version to ensure consistency. The system rejects updates with mismatched versions to prevent race conditions.

Terminal states (STATUS_DONE_OK and STATUS_DONE_NOT_OK) are permanent; once a task reaches these states, no further updates are allowed.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.tasks.update_task_status(
    task_id="taskId",
)

⚙️ Parameters

task_id: str — ID of task to update status of

status_version: typing.Optional[int]

The status version of the task to update. This version number increments to indicate the task's current stage in its status lifecycle. Specifically, whenever a task's status updates, the status version increments by one. Any status updates received with a lower status version number than what is known are considered stale and ignored.

new_status: typing.Optional[TaskStatus] — The new status of the task.

author: typing.Optional[Principal]

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.tasks.query_tasks(...) -> AsyncHttpResponse[TaskQueryResults]

📝 Description

Searches for Tasks that match specified filtering criteria and returns matching tasks in paginated form.

This method allows filtering tasks based on multiple criteria including:

  • Parent task relationships
  • Task status (with inclusive or exclusive filtering)
  • Update time ranges
  • Task view (manager or agent perspective)
  • Task assignee
  • Task type (via exact URL matches or prefix matching)

Results are returned in pages. When more results are available than can be returned in a single response, a page_token is provided that can be used in subsequent requests to retrieve the next set of results.

By default, this returns the latest task version for each matching task from the manager's perspective.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.tasks.query_tasks()

⚙️ Parameters

page_token: typing.Optional[str] — If set, returns results starting from the given pageToken.

parent_task_id: typing.Optional[str]

If present matches Tasks with this parent Task ID. Note: this is mutually exclusive with all other query parameters, for example, either provide parent task ID, or any of the remaining parameters, but not both.

status_filter: typing.Optional[TaskQueryStatusFilter]

update_time_range: typing.Optional[TaskQueryUpdateTimeRange] — If provided, only provides Tasks updated within the time range.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.tasks.stream_tasks(...) -> typing.AsyncIterator[ AsyncHttpResponse[typing.AsyncIterator[StreamTasksResponse]] ]

📝 Description

Establishes a server streaming connection that delivers task updates in real-time using Server-Sent Events (SSE).

The stream delivers all existing non-terminal tasks when first connected, followed by real-time updates for task creation and status changes. Additionally, heartbeat messages are sent periodically to maintain the connection.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
response = client.tasks.stream_tasks()
for chunk in response.data:
    yield chunk

⚙️ Parameters

heartbeat_interval_ms: typing.Optional[int] — The time interval, in milliseconds, that determines the frequency at which to send heartbeat events. Defaults to 30000 (30 seconds).

rate_limit: typing.Optional[int]

The time interval, in milliseconds, after an update for a given task before another one will be sent for the same task. If set, value must be >= 250.

exclude_preexisting_tasks: typing.Optional[bool]

Optional flag to only include tasks created or updated after the stream is initiated, and not any previous preexisting tasks. If unset or false, the stream will include any new tasks and task updates, as well as all preexisting tasks.

task_type: typing.Optional[TaskStreamRequestTaskType] — Optional filter that only returns tasks with specific types. If not provided, all task types will be streamed.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.tasks.listen_as_agent(...) -> AsyncHttpResponse[AgentRequest]

📝 Description

Establishes a server streaming connection that delivers tasks to taskable agents for execution.

This method creates a persistent connection from Tasks API to an agent, allowing the server to push tasks to the agent as they become available. The agent receives a stream of tasks that match its selector criteria (entity IDs).

The stream delivers three types of requests:

  • ExecuteRequest: Contains a new task for the agent to execute
  • CancelRequest: Indicates a task should be canceled
  • CompleteRequest: Indicates a task should be completed

This is the primary method for taskable agents to receive and process tasks in real-time. Agents should maintain this connection and process incoming tasks according to their capabilities.

When an agent receives a task, it should update the task status using the UpdateStatus endpoint to provide progress information back to Tasks API.

This is a long polling API that will block until a new task is ready for delivery. If no new task is available then the server will hold on to your request for up to 5 minutes, after that 5 minute timeout period you will be expected to reinitiate a new request.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.tasks.listen_as_agent()

⚙️ Parameters

agent_selector: typing.Optional[EntityIdsSelector] — Selector criteria to determine which Agent Tasks the agent receives

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.tasks.stream_as_agent(...) -> typing.AsyncIterator[ AsyncHttpResponse[typing.AsyncIterator[StreamAsAgentResponse]] ]

📝 Description

Establishes a server streaming connection that delivers tasks to taskable agents for execution using Server-Sent Events (SSE).

This method creates a connection from the Tasks API to an agent that streams relevant tasks to the listener agent. The agent receives a stream of tasks that match the entities specified by the tasks' selector criteria.

The stream delivers three types of requests:

  • ExecuteRequest: Contains a new task for the agent to execute
  • CancelRequest: Indicates a task should be canceled
  • CompleteRequest: Indicates a task should be completed

Additionally, heartbeat messages are sent periodically to maintain the connection.

This is recommended method for taskable agents to receive and process tasks in real-time. Agents should maintain connection to this stream and process incoming tasks according to their capabilities.

When an agent receives a task, it should update the task status using the UpdateStatus endpoint to provide progress information back to Tasks API.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
response = client.tasks.stream_as_agent()
for chunk in response.data:
    yield chunk

⚙️ Parameters

agent_selector: typing.Optional[EntityIdsSelector] — The selector criteria to determine which tasks the agent receives.

heartbeat_interval_ms: typing.Optional[int] — The time interval, defined in seconds, that determines the frequency at which to send heartbeat events. Defaults to 30s.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

Objects

client.objects.list_objects(...) -> AsyncPager[PathMetadata, ListResponse]

📝 Description

Lists objects in your environment. You can define a prefix to list a subset of your objects. If you do not set a prefix, Lattice returns all available objects. By default this endpoint will list local objects only.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
response = client.objects.list_objects()
for item in response:
    yield item
# alternatively, you can paginate page-by-page
for page in response.iter_pages():
    yield page

⚙️ Parameters

prefix: typing.Optional[str] — Filters the objects based on the specified prefix path. If no path is specified, all objects are returned.

since_timestamp: typing.Optional[dt.datetime] — Sets the age for the oldest objects to query across the environment.

page_token: typing.Optional[str] — Base64 and URL-encoded cursor returned by the service to continue paging.

all_objects_in_mesh: typing.Optional[bool] — Lists objects across all environment nodes in a Lattice Mesh.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.objects.get_object(...) -> typing.AsyncIterator[AsyncHttpResponse[typing.AsyncIterator[bytes]]]

📝 Description

Fetches an object from your environment using the objectPath path parameter.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.objects.get_object(
    object_path="objectPath",
)

⚙️ Parameters

object_path: str — The path of the object to fetch.

accept_encoding: typing.Optional[GetObjectRequestAcceptEncoding] — If set, Lattice will compress the response using the specified compression method. If the header is not defined, or the compression method is set to identity, no compression will be applied to the response.

priority: typing.Optional[str] — Indicates a client's preference for the priority of the response. The value is a structured header as defined in RFC 9218. If you do not set the header, Lattice uses the default priority set for the environment. Incremental delivery directives are not supported and will be ignored.

request_options: typing.Optional[RequestOptions] — Request-specific configuration. You can pass in configuration such as chunk_size, and more to customize the request and response.

client.objects.upload_object(...) -> AsyncHttpResponse[PathMetadata]

📝 Description

Uploads an object. The object must be 1 GiB or smaller.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.objects.upload_object()

⚙️ Parameters

object_path: str — Path of the Object that is to be uploaded.

request: typing.Union[bytes, typing.Iterator[bytes], typing.AsyncIterator[bytes]]

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.objects.delete_object(...) -> AsyncHttpResponse[None]

📝 Description

Deletes an object from your environment given the objectPath path parameter.

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.objects.delete_object(
    object_path="objectPath",
)

⚙️ Parameters

object_path: str — The path of the object to delete.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

client.objects.get_object_metadata(...) -> AsyncHttpResponse[None]

📝 Description

Returns metadata for a specified object path. Use this to fetch metadata such as object size (size_bytes), its expiry time (expiry_time), or its latest update timestamp (last_updated_at).

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.objects.get_object_metadata(
    object_path="objectPath",
)

⚙️ Parameters

object_path: str — The path of the object to query.

request_options: typing.Optional[RequestOptions] — Request-specific configuration.

oauth

client.oauth.get_token(...) -> AsyncHttpResponse[GetTokenResponse]

📝 Description

Gets a new short-lived token using the specified client credentials

🔌 Usage

from anduril import Lattice

client = Lattice(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
)
client.oauth.get_token()

⚙️ Parameters

client_id: typing.Optional[str] — The client identifier

client_secret: typing.Optional[str] — The client secret

request_options: typing.Optional[RequestOptions] — Request-specific configuration.