"""
Inference run execute raic foundry object detection, vectorization and prediction
Inference runs can serve a variety of different purposes. They can operate on both geospatial and non-geospatial imagery formats, taking into account their temporal tags whenever possible.
Here are some quickstart examples. Make sure to first login to Raic Foundry
.. code-block:: python
from raic.foundry.client.context import login_if_not_already
# Login to Raic Foundry (prompted on the command line)
login_if_not_already()
Example: Object detect and vectorize crops using default models
.. code-block:: python
from raic.foundry.datasources import Datasource
from raic.foundry.inference import InferenceRun
# Look up existing data source record
data_source = Datasource.from_existing('My Existing Data Source')
# Start new inference run
run = InferenceRun.new(name='My New Inference Run', data_source=data_source)
data_frame = run.wait_and_return_dataframe()
print(data_frame)
Example: Only vectorize images (aka classification only)
.. code-block:: python
from raic.foundry.datasources import Datasource
from raic.foundry.inference import InferenceRun
# Look up existing data source record
data_source = Datasource.from_existing('My Existing Data Source')
# Start new inference run
run = InferenceRun.new(name='My New Inference Run', data_source=data_source, universal_detector=None)
data_frame = run.wait_and_return_dataframe()
print(data_frame)
Example: Fully customize universal detector, vectorizer model as well as a prediction model
.. code-block:: python
from raic.foundry.datasources import Datasource
from raic.foundry.models import UniversalDetector, VectorizerModel, PredictionModel
from raic.foundry.inference import InferenceRun
# Look up existing data source record
data_source = Datasource.from_existing('My Existing Data Source')
# Look up models from model registry
universal_detector = UniversalDetector.from_existing('baseline', version='latest')
vectorizer_model = VectorizerModel.from_existing('baseline', version='latest')
prediction_model = PredictionModel.from_existing('My Prediction Model', version='latest')
# Start new inference run
run = InferenceRun.new(
name='CM Inference Run',
data_source=data_source,
universal_detector=universal_detector,
vectorizer_model=vectorizer_model,
prediction_model=prediction_model
)
data_frame = run.wait_and_return_dataframe()
print(data_frame)
Example: Iterating results from query as an alternative
.. code-block:: python
from raic.foundry.inference import InferenceRun
...
for prediction in run.iterate_predictions():
print(prediction)
"""
import re
import csv
import uuid
import time
import tempfile
import pandas as pd
from pathlib import Path
from typing import Optional, Any, Iterator, List
from concurrent.futures import ThreadPoolExecutor, as_completed
from shapely import Point, from_wkt, from_wkb
from raic.foundry.shared.utils import chunk_iterable
from raic.foundry.datasources import Datasource
from raic.foundry.models import UniversalDetector, VectorizerModel, PredictionModel, RaicVisionModel
from raic.foundry.client.inference_job import InferenceClient
from raic.foundry.client.raic_vision_job import CascadeVisionClient
from raic.foundry.cli.console import clear_console
[docs]
class InferenceRun():
def __init__(self, record: dict, is_raic_vision: bool = False):
"""Manage an inference run
Args:
record (dict): Inference run record from the API
"""
self.id = record['id']
self._record = record
self._is_raic_vision = is_raic_vision
[docs]
def is_complete(self) -> bool:
"""Check whether the run has completed yet
Returns:
bool: True if run status is Completed
"""
if self._is_raic_vision:
updated_record = CascadeVisionClient().get_run(self.id)
else:
updated_record = InferenceClient().get_inference_run(self.id)
return updated_record['status'] == 'Completed'
[docs]
def restart(self):
"""In the event that an inference run gets stuck it can be restarted from the beginning. Any frames already processed will be skipped.
"""
if self._is_raic_vision:
CascadeVisionClient().restart_run(self.id)
else:
InferenceClient().restart_inference_run(self.id)
[docs]
def iterate_predictions(self, include_embeddings: bool = True) -> Iterator[dict]:
"""Iterate through all inference run prediction results as they are queried from the API
Args:
include_embeddings (bool, optional): Include the embedding vector with each prediction. Defaults to True.
Yields:
Iterator[dict]: All of the prediction results as an iterator, optionally including the embeddings for each
"""
if self._is_raic_vision:
return CascadeVisionClient().iterate_predictions(self.id, include_embeddings)
else:
return InferenceClient().iterate_predictions(self.id, include_embeddings)
[docs]
def fetch_predictions_as_dataframe(self, include_embeddings: bool = True) -> pd.DataFrame:
"""Collect all of the prediction results from the inference run
Args:
include_embeddings (bool, optional): Include the embedding vector with each prediction. Defaults to True.
Returns:
DataFrame: All of the prediction results as a pandas DataFrame, optionally including the embeddings for each
"""
if self._is_raic_vision:
iterator = CascadeVisionClient().iterate_predictions(self.id, include_embeddings)
else:
iterator = InferenceClient().iterate_predictions(self.id, include_embeddings)
fieldnames=["inference_run_id", "detection_id", "frame_id",
"image_name", "label_class", "confidence", "x0", "y0", "x1", "y1",
"centroid", "extent", "frame_sequence_number", "embedding", "builder_class_id"]
def get_centroid(centroid: str | None) -> Point | None:
if centroid is None:
return None
elif isinstance(centroid, bytes) or bool(re.fullmatch(r'\b[0-9a-fA-F]+\b', centroid)):
return from_wkb(centroid).centroid
else:
return from_wkt(centroid).centroid
def get_extent(extent: str | None) -> Any | None:
if extent is None:
return None
elif isinstance(extent, bytes) or bool(re.fullmatch(r'\b[0-9a-fA-F]+\b', extent)):
return from_wkb(extent)
else:
return from_wkt(extent)
with tempfile.NamedTemporaryFile(mode='w+t', delete=True) as tmpfile:
writer = csv.DictWriter(tmpfile, fieldnames=fieldnames)
writer.writeheader()
for record in iterator:
record['centroid'] = get_centroid(record['centroid'])
record['extent'] = get_extent(record['extent'])
complete_row = {field: record.get(field, '') for field in fieldnames}
writer.writerow(complete_row)
tmpfile.seek(0)
return pd.read_csv(tmpfile)
[docs]
def wait_and_return_dataframe(self, poll_interval: int = 10, include_embeddings: bool = True) -> pd.DataFrame:
"""Wait for inference run to complete return predictions as a data frame
Args:
poll_interval (int, optional): Polling interval in seconds. Minimum value is 5 seconds. Defaults to 10 seconds.
include_embeddings (bool, optional): Include the embedding vector with each prediction. Defaults to True.
Returns:
DataFrame: All of the prediction results as a pandas DataFrame, optionally including the embeddings for each
"""
if poll_interval is None or poll_interval < 5:
poll_interval = 5
while not self.is_complete():
time.sleep(poll_interval)
return self.fetch_predictions_as_dataframe()
[docs]
def stream_crop_images(self, destination_path: Path | str, max_workers: Optional[int] = None) -> Iterator[Path]:
"""Download the crops images for inference run predictions
Each one is named by its prediction identifier
Args:
destination_path (Path | str): Local folder where prediction crops will be downloaded.
max_workers (Optional[int], optional): Max number of worker threads to parallelize download. Defaults to None.
Yields:
Iterator[Path]: Iterator of each crop image local path as its downloaded
"""
prediction_iterator = self.iterate_predictions(include_embeddings=False)
def download(prediction: dict):
local_file_path = Path(destination_path, f'{prediction['detection_id']}.jpg')
InferenceClient().download_crop_image(inference_run_id=self.id, detection_id=prediction['detection_id'], save_to_path=local_file_path)
return local_file_path
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(download, batch): batch
for batch in prediction_iterator
}
for future in as_completed(futures):
yield future.result()
[docs]
def delete(self):
"""Delete the inference run
"""
if self._is_raic_vision:
CascadeVisionClient().update_run(self.id, is_deleted=True)
else:
InferenceClient().delete_inference_run(self.id)
[docs]
@classmethod
def from_existing(cls, identifier: str):
"""Look up an existing inference run by its UUID or its name
Note: If there are multiple runs with the same name looking up by name will fail with an Exception
Args:
identifier (str): Either the UUID of the inference run or its name
Raises:
Exception: If multiple runs are returned with the same name
Returns:
InferenceRun
"""
is_raic_vision = False
if cls._is_uuid(identifier):
try:
run_record = CascadeVisionClient().get_run(identifier)
except Exception:
run_record = None
if run_record is not None:
is_raic_vision = True
else:
run_record = InferenceClient().get_inference_run(identifier)
if run_record is None:
raise Exception(f"Inference run {identifier} cannot be found")
else:
response = InferenceClient().find_inference_runs_by_name(identifier)
if len(response['value']) == 0 or len(response['value']) > 1:
raise Exception(f"{len(response['value'])} inference runs are named '{identifier}'")
run_record = response['value'][0]
return InferenceRun(run_record, is_raic_vision)
[docs]
@classmethod
def from_prompt(
cls,
data_source: Datasource,
name: Optional[str] = None,
universal_detector: Optional[UniversalDetector] = None,
vectorizer_model: Optional[VectorizerModel] = None,
prediction_model: Optional[PredictionModel] = None,
raic_vision_model: Optional[RaicVisionModel] = None
):
if bool(name):
return cls.new(name=name, data_source=data_source, universal_detector=universal_detector, vectorizer_model=vectorizer_model, prediction_model=prediction_model)
clear_console()
print(f"Datasource: {data_source._record['name']}")
if raic_vision_model is None:
universal_detector_name = "baseline" if universal_detector is None else universal_detector._record['name']
vectorizer_model_name = "baseline" if vectorizer_model is None else vectorizer_model._record['name']
print(f"Universal Detector: {universal_detector_name}")
print(f"Vectorizer Model: {vectorizer_model_name}")
default_name = f"{data_source._record['name']} ({universal_detector_name}) ({vectorizer_model_name})"
if prediction_model is not None:
print(f"Prediction Model: {prediction_model._record['name']}")
default_name += f" ({prediction_model._record['name']})"
else:
print(f"Raic Vision Model: {raic_vision_model._record['name']}")
default_name = f"{data_source._record['name']} ({raic_vision_model._record['name']})"
print()
selection = input(f"What should this inference run be called? [{default_name}]: ")
if not bool(selection):
return cls.new(name=default_name, data_source=data_source, universal_detector=universal_detector, vectorizer_model=vectorizer_model, prediction_model=prediction_model)
return cls.new(name=selection, data_source=data_source, universal_detector=universal_detector, vectorizer_model=vectorizer_model, prediction_model=prediction_model)
[docs]
@classmethod
def new(
cls,
name: str,
data_source: Datasource,
universal_detector: Optional[UniversalDetector|str] = 'baseline',
vectorizer_model: Optional[VectorizerModel|str] = 'baseline',
prediction_model: Optional[PredictionModel|str] = None,
raic_vision_model: Optional[RaicVisionModel|str] = None
):
"""Create a new inference run
Args:
name (str): Name of new inference run
data_source (Datasource): Data source object representing imagery already uploaded to a blob storage container
universal_detector (Optional[UniversalDetector | str], optional): Model for object detection. Defaults to 'baseline'.
vectorizer_model (Optional[VectorizerModel | str]): Model for vectorizing detection drop images. Defaults to 'baseline'.
prediction_model (Optional[PredictionModel | str], optional): Model for classifying detections without needing deep training. Defaults to None.
raic_vision_model (Optional[RaicVisionModel | str], optional): Model combining all three previous models into one. Defaults to None.
Raises:
Exception: If no vectorizer model is specified
Returns:
InferenceRun
"""
if vectorizer_model is None:
vectorizer_model = 'baseline'
if universal_detector is not None and isinstance(universal_detector, str):
universal_detector = UniversalDetector.from_existing(universal_detector)
if isinstance(vectorizer_model, str):
vectorizer_model = VectorizerModel.from_existing(vectorizer_model)
if isinstance(prediction_model, str):
prediction_model = PredictionModel.from_existing(prediction_model)
if isinstance(raic_vision_model, str):
raic_vision_model = RaicVisionModel.from_existing(raic_vision_model)
run_record = InferenceClient().create_inference_run(
name=name,
data_source_id=data_source.datasource_id,
model_id=universal_detector.id if universal_detector is not None else None,
model_version=universal_detector.version if universal_detector is not None else None,
iou=universal_detector.iou if universal_detector is not None else 0,
confidence=universal_detector.confidence if universal_detector is not None else 0,
max_detects=universal_detector.max_detects if universal_detector is not None else 0,
small_objects=universal_detector.small_objects if universal_detector is not None else False,
no_object_detection=False if universal_detector is not None else True,
vectorizer_id=vectorizer_model.id,
vectorizer_version=vectorizer_model.version,
prediction_model_id=prediction_model.id if prediction_model is not None else None,
prediction_model_version=prediction_model.version if prediction_model is not None else None,
raic_vision_model_id=raic_vision_model.id if raic_vision_model is not None else None,
raic_vision_model_version=raic_vision_model.version if raic_vision_model is not None else None
)
return InferenceRun(run_record, is_raic_vision=raic_vision_model is not None)
@classmethod
def _is_uuid(cls, uuid_to_test: str, version=4) -> bool:
try:
uuid.UUID(uuid_to_test, version=version)
return True
except ValueError:
return False
[docs]
@classmethod
def suppress_errors(cls, func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
return None
return wrapper