import asyncio from concurrent import futures from functools import partial import grs_service_pb2 from base.utils import log, get_real_distance from common.metrics import mt_find, mt_insert from common.milvus_client import MilvusClient, id_to_cname, MilvusClientException from common.resnet_client import ResnetServiceClient from s3_comm.grs import SnapshotPersist from settings import RESNET_HOST, RESNET_PORT, FILTER_RADIUS, \ FILTER_SIMILARITY_BRANE, FILTER_SIMILARITY_GLOBAL, GLOBAL_COLLECTION_NAME def filter_by_similarity(result_snapshot, value): if 1 - result_snapshot.distance > value: return True else: log.debug( f'result: Snapshot not saved because {result_snapshot.distance} {1 - result_snapshot.distance} < {value}') return False def filter_by_distance(search_snapshot, result_snapshot): accur_distance = abs(get_real_distance(result_snapshot.latitude, result_snapshot.longitude, search_snapshot.latitude, search_snapshot.longitude) - search_snapshot.accuracy) if accur_distance < FILTER_RADIUS: return False else: log.debug( f'result: Snapshot {search_snapshot.latitude} {search_snapshot.longitude} {search_snapshot.accuracy} not emit because {accur_distance} > {FILTER_RADIUS}') return True def vector_to_snapshot(vector): snapshot = grs_service_pb2.SnapShot() snapshot.braneId = vector['brane_id'] snapshot.latitude = vector['lat'] snapshot.longitude = vector['lon'] snapshot.transform.extend(vector['transformation']) snapshot.distance = vector['distance'] return snapshot class Processor: def __init__(self) -> None: self.persist_snapshots = SnapshotPersist() self.resnet_client = ResnetServiceClient(RESNET_HOST, RESNET_PORT) self.milvus_client = MilvusClient() self.pool = futures.ThreadPoolExecutor(max_workers=128) def prepare_result(self, search_snapshot, vectors, _filter_by_distance=True): branes = {} result = grs_service_pb2.Result() for vector in vectors: result_snapshot = vector_to_snapshot(vector) if not branes.get(result_snapshot.braneId, False): if _filter_by_distance and filter_by_distance(search_snapshot, result_snapshot): continue result_snapshot.braneRate = self.milvus_client.count(id_to_cname(result_snapshot.braneId)) branes[result_snapshot.braneId] = result_snapshot result.snapshots.append(result_snapshot) return result async def _find(self, snapshot, vector, collection, limit, _filter_by_distance=True): @mt_find.time() def make_find(collection, vector, limit): result = self.milvus_client.search_vectors(collection, vector, limit) result = self.prepare_result(snapshot, result, _filter_by_distance) return result loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.pool, partial(make_find, collection, vector, limit)) return result async def _add(self, vector, snapshot, collection_name, need_add_global): @mt_insert.time() def make_add(vector, snapshot, collection_name, need_add_global, ): self.milvus_client.insert(GLOBAL_COLLECTION_NAME, vector, snapshot) if need_add_global: self.milvus_client.insert(collection_name, vector, snapshot) self.persist_snapshots.add_snapshot(snapshot.braneId, snapshot.session, snapshot.SerializeToString()) log.info(f'Made add. {snapshot.braneId} {snapshot.session} {collection_name}. Global? {need_add_global}') loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.pool, partial(make_add, vector, snapshot, collection_name, need_add_global)) return result async def find(self, snapshot): vector = await self.resnet_client.process_snapshot(snapshot) if vector is None: log.error(f'No vector from resnet by this snapshot') return grs_service_pb2.Result() try: result = await self._find(snapshot, vector, GLOBAL_COLLECTION_NAME, 128) except MilvusClientException as e: log.exception(e) log.error( f"We have connection or collection error. Try to fix") # TODO think about this or remove it, spoon self.milvus_client.refresh_collection(GLOBAL_COLLECTION_NAME) result = await self._find(snapshot, vector, GLOBAL_COLLECTION_NAME, 128) log.info( f'Processor. Find result: {snapshot.braneId} {snapshot.latitude} {snapshot.longitude}: {len(result.snapshots)} ') return result async def add(self, snapshot): if not snapshot.braneId: return grs_service_pb2.Result() cname = id_to_cname(snapshot.braneId) if not self.milvus_client.has_collection(cname): self.milvus_client.create_collection(cname) vector = await self.resnet_client.process_snapshot(snapshot) if vector is None: log.error(f'No vector from resnet by this snapshot') return grs_service_pb2.Result() try: brane_result = await self._find(snapshot, vector, cname, 1, False) except MilvusClientException as e: log.exception(e) log.error( f"We have connection or collection error. Try to fix") # TODO think about this or remove it, spoon self.milvus_client.refresh_collection(cname) brane_result = await self._find(snapshot, vector, cname, 1, False) add_global = True add_brane = True if len(brane_result.snapshots) > 0: add_brane = filter_by_similarity(brane_result.snapshots[0], FILTER_SIMILARITY_BRANE) add_global = filter_by_similarity(brane_result.snapshots[0], FILTER_SIMILARITY_GLOBAL) else: brane_result = grs_service_pb2.Result(snapshots=[snapshot]) if add_brane: try: await self._add(vector, snapshot, cname, add_global) except MilvusClientException as e: log.exception(e) log.error( f"We have connection or collection error. Try to fix") # TODO think about this or remove it, spoon self.milvus_client.refresh_collection(cname) await self._add(vector, snapshot, cname, add_global) log.debug(f'Processor.add result {brane_result}') return brane_result