# Index Anything, Search Everything: Scalable Vector Search with Replicate AI, MongoDB, and Hookdeck

In this tutorial, we'll build a Flask application allowing users to index and search anything on the internet with a publicly accessible URL. That's right! Ask the app to index an MP3 or WAV file, an HTML or text file, or a MOV or MP4 file, and it will use the power of Replicate AI to create a textual representation of that file, and the app stores the results in MongoDB Atlas. As long as an LLM can analyze the resource and create a textual representation, it can be indexed. Then, all those indexed files, no matter the originating file type, can be searched with text using MongoDB Atlas.

We'll use Hookdeck, an event gateway that provides infrastructure and tooling to build and manage event-driven applications by seamlessly handling webhooks and API requests. In this tutorial, Hookdeck ensures reliable, scalable communication between the Flask application and Replicate, an LLM API platform, by acting as a serverless queue for rate-limiting API calls and guaranteeing webhook delivery.

We'll begin by setting up the required services and getting the Flask application up and running. Then, we'll follow the journey of data through key components and code within the app, covering submitting the indexing request, analyzing the content type, generating a textual representation, and generating and storing a vector embedding. The content is ultimately made available for search within a vector search index.

## Architecture overview

Scalability is often overhyped, but it remains an important aspect of building robust applications. One of the benefits of using serverless and cloud-hosted providers is the ability to offload work to specialized services. Also required in any scalable architecture is a way of ensuring services aren't overloaded and your application is fault-tolerant. In this tutorial, we leverage several such services to handle different aspects of our application.

First, let's take a look at the services:

* [Replicate](https://replicate.com): Provides open-source machine learning models accessible via an API
* [MongoDB Atlas](https://www.mongodb.com/products/platform/atlas-database): An integrated suite of data services centered around a cloud database designed to accelerate and simplify how you build with data
* [Hookdeck](https://hookdeck.com): An event gateway that provides engineering teams with infrastructure and tooling to build and manage event-driven applications

Next, let's see how the services are used.

![All The Things Architecture with Replicate AI, MongoDB, Python Flask and Hookdeck](./images/mongodb-ai/all-the-things-architecture.png)

* MongoDB Atlas: MongoDB Atlas provides database storage and vector search capabilities, ensuring our data is stored efficiently and queried quickly.
* Index All The Things: This is the Flask application.
* Hookdeck: Hookdeck acts as a serverless queue for a) ensuring Replicate API requests do not exceed rate limits and can be retried and b) ingestion, delivery, and retrying webhooks from Replicate to ensure reliable ingestion of events. Note: We'll also use the [Hookdeck CLI](https://hookdeck.com/docs/cli) to receive webhooks in our local development environment.
* Replicate: Replicate handles AI inference, produces text and embeddings, and allows us to offload the computationally intensive tasks of running machine learning models. We use different LLMs to analyze different content types.

By utilizing these cloud-based services, we can focus on building the core functionality of our application while ensuring it remains scalable and efficient. Webhooks, in particular, allow for scalability by enabling [asynchronous AI workflows](https://hookdeck.com/blog/asynchronous-ai), offloading those high compute usage scenarios to the third-party services, and just receiving callbacks via a webhook when work is completed.

## Prerequisites

Before you begin, ensure you have the following:

* A free [Hookdeck account](https://dashboard.hookdeck.com/signup)
* The [Hookdeck CLI installed](https://hookdeck.com/docs/cli)
* A free [MongoDB Atlas account](https://www.mongodb.com/cloud/atlas/register)
* A free [Replicate account](https://replicate.com/signin)
* [Python 3](https://www.python.org/downloads/)
* [Poetry](https://python-poetry.org/docs/#installation) for package management

## Get the app up and running

Let's begin by getting the application running and seeing it in action.

### Get the code

Begin by getting the application codebase.

```bash
git clone https://github.com/hookdeck/index-all-the-things.git

```

Activate a virtual environment with Poetry:

```bash
poetry shell

```

And install the app dependencies:

```bash
poetry install

```

### Configure the app

The application needs credentials for the services it interacts with.

Copy the example `.env-example` file to a new `.env` file:

```bash
cp .env-example .env

```

Update the values within `.env` as follows:

* `SECRET_KEY`: A secret key that will be used for securely signing the session cookie. See the [`SECRET_KEY` Flask docs](https://flask.palletsprojects.com/en/stable/config/#SECRET_KEY) for more information.
* `MONGODB_CONNECTION_URI`: Populate with a MongoDB Atlas connection string with a Read and write to any database role. See the [Get Connection String docs](https://www.mongodb.com/docs/guides/atlas/connection-string/).
* `HOOKDECK_PROJECT_API_KEY`: Get an API Key from the Project -> Settings -> Secrets section of the [Hookdeck dashboard](https://dashboard.hookdeck.com).
* `HOOKDECK_WEBHOOK_SECRET`: Get a Signing Secret from the Project -> Settings -> Secrets section of the [Hookdeck dashboard](https://dashboard.hookdeck.com).
* `REPLICATE_API_TOKEN`: [Create an API token](https://replicate.com/account/api-tokens) in the Replicate dashboard.
* `REPLICATE_WEBHOOKS_SECRET`: Go to the [Webhooks section](https://replicate.com/account/webhook) of the Replicate dashboard and click the Show signing key button.
* `HOOKDECK_REPLICATE_API_QUEUE_API_KEY`, `HOOKDECK_REPLICATE_API_QUEUE_URL`, `AUDIO_WEBHOOK_URL` and `EMBEDDINGS_WEBHOOK_URL` will be automatically populated in the next step.

### Create Hookdeck connections

[Hookdeck connections](https://hookdeck.com/docs/connections) are used to route inbound HTTP requests received by a [Hookdeck source](https://hookdeck.com/docs/sources) to a [Hookdeck destination](https://hookdeck.com/docs/destinations).

The `create-hookdeck-connections.py` script automatically creates the following Hookdeck connections that:

1. Route requests made to Hookdeck URLs to the locally running application via the Hookdeck CLI. Here, Hookdeck is used as an inbound queue.
2. Route requests made to a Hookdeck URL to the Replicate API. Hookdeck is used as an outbound queue, in this situation.

The script also updates the `.env` file with the source URLs that handle the webhooks. Let's go through the details of the script.

First, ensure you have the necessary imports and define the authentication and content type headers for the Hookdeck API request:

```py
import httpx
import re
import hashlib
import os

from config import Config

headers = {
    "Authorization": f"Bearer {Config.HOOKDECK_PROJECT_API_KEY}",
    "Content-Type": "application/json",
}

```

Next, define a function to create a connection to the Hookdeck API:

```py
def create_connection(payload):
    response = httpx.request(
        "PUT",
        "https://api.hookdeck.com/2024-09-01/connections",
        headers=headers,
        json=payload,
    )
    data = response.json()

    if response.status_code != 200:
        raise Exception(f"Failed to create connection: {data}")

    return data

```

This function makes a `PUT` request to the Hookdeck API with the [upsert connection payload](https://hookdeck.com/docs/api#createupdate-a-connection) and handles the response. If the response status is not `200` (OK), an exception is raised. The function returns the parsed JSON response.

The first connection to be created is one for the Replicate API outbound queue:

```py
replicate_api_queue_api_key = hashlib.sha256(os.urandom(32)).hexdigest()
replicate_api_queue = {
    "name": "replicate-api-queue",
    "source": {
        "name": "replicate-api-queue",
        "verification": {
            "type": "API_KEY",
            "configs": {
                "header_key": Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME,
                "api_key": replicate_api_queue_api_key,
            },
        },
    },
    "rules": [
        {
            "type": "retry",
            "strategy": "exponential",
            "count": 5,
            "interval": 30000,
            "response_status_codes": ["429", "500"],
        }
    ],
    "destination": {
        "name": "replicate-api",
        "url": "https://api.replicate.com/v1/",
        "auth_method": {
            "type": "BEARER_TOKEN",
            "config": {
                "token": Config.REPLICATE_API_TOKEN,
            },
        },
    },
}

replicate_api_connection = create_connection(replicate_api_queue)

```

The Connection has a `name`, a `source`, and a `destination`. The `source` also has a `name` and a `verification`. The `verification` instructs Hookdeck how to authenticate requests. Since the connection is acting as an API queue, we're using the `API_KEY` type with the `header_key` set to the value defined in `Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME` and the `api_key` value set to the generated hash stored in `replicate_api_queue_api_key`.

The `rules` define the request retry strategy to use when interacting with the Replicate API. In this case, we're stating that we should retry up to five times, using an interval of `30000` milliseconds, but apply an `exponential` back-off retry strategy. Also, we're using the `response_status_codes` option to inform Hookdeck to only retry on `429` and `500` HTTP responses. See the [Hookdeck Retry docs](https://hookdeck.com/docs/retries) for more information on retries and the [Hookdeck Rules](https://hookdeck.com/docs/connections#connection-rules) docs for details on other types of rules that are available.

The `url` on the destination is the base URL for the Replicate API. Hookdeck uses path forwarding by default, so any path appended to the Hookdeck source URL will also be appended to the destination URL. For example, a request to a Hookdeck source with URL `https://hkdk.events/{id}/predictions` will result in a request to a connected destination of `https://api.replicate.com/v1/predictions` where the destination has a base URL of `https://api.replicate.com/v1/`. Hookdeck acts very much like a proxy in this scenario.

The `auth_method` on the destination is of type `BEARER_TOKEN` with a `config.token` set to the value of the `REPLICATE_API_TOKEN` environment variable. This allows Hookdeck to make authenticated API calls to Replicate.

Now, create a connection for the Replicate Audio webhooks to handle audio analysis callbacks:

```py
replicate_audio = {
    "name": "replicate-audio",
    "source": {
        "name": "replicate-audio",
        "verification": {
            "type": "REPLICATE",
            "configs": {
                "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET,
            },
        },
    },
    "rules": [
        {
            "type": "retry",
            "count": 5,
            "interval": 30000,
            "strategy": "exponential",
            "response_status_codes": ["!404"],
        }
    ],
    "destination": {
        "name": "cli-replicate-audio",
        "cli_path": "/webhooks/audio",
    },
}

replicate_audio_connection = create_connection(replicate_audio)

```

The Replicate Audio webhook callback connection uses a `verification` of type `REPLICATE` with a `configs.webhook_secret_key` value set from the `REPLICATE_WEBHOOKS_SECRET` value we stored in the `.env` file. This enables and instructs Hookdeck to verify that the webhook has come from Replicate.

The `rules` for this inbound connection are similar to the outbound connection and define a delivery retry strategy to follow if any requests to our application's webhook endpoint fail. The only difference is the `response_status_codes` informs Hookdeck not to retry if it receives a `200` or `404` response.

The `destination` has a `name` and a `cli_path` informing Hookdeck that the destination is the Hookdeck CLI and the path the request should be forwarded to is `/webhooks/audio`.

Next, create a connection for Replicate Embeddings webhook callbacks:

```py
replicate_embedding = {
    "name": "replicate-embedding",
    "source": {
        "name": "replicate-embedding",
        "verification": {
            "type": "REPLICATE",
            "configs": {
                "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET,
            },
        },
    },
    "rules": [
        {
            "type": "retry",
            "count": 5,
            "interval": 30000,
            "strategy": "exponential",
            "response_status_codes": ["!200", "!404"],
        }
    ],
    "destination": {
        "name": "cli-replicate-embedding",
        "cli_path": "/webhooks/embedding",
    },
}

replicate_embedding_connection = create_connection(replicate_embedding)

```

Finally, update the `.env` file with some of the generated values:

```py
# Update .env
with open(".env", "r") as file:
    env_content = file.read()

replicate_api_connection_url = replicate_api_connection["source"]["url"]
audio_webhook_url = replicate_audio_connection["source"]["url"]
embedding_webhook_url = replicate_embedding_connection["source"]["url"]

# Replace the .env URLs in the .env content
env_content = re.sub(
    r"HOOKDECK_REPLICATE_API_QUEUE_API_KEY=.*",
    f"HOOKDECK_REPLICATE_API_QUEUE_API_KEY={replicate_api_queue_api_key}",
    env_content,
)
env_content = re.sub(
    r"HOOKDECK_REPLICATE_API_QUEUE_URL=.*",
    f"HOOKDECK_REPLICATE_API_QUEUE_URL={replicate_api_connection_url}",
    env_content,
)
env_content = re.sub(
    r"AUDIO_WEBHOOK_URL=.*", f"AUDIO_WEBHOOK_URL={audio_webhook_url}", env_content
)
env_content = re.sub(
    r"EMBEDDINGS_WEBHOOK_URL=.*",
    f"EMBEDDINGS_WEBHOOK_URL={embedding_webhook_url}",
    env_content,
)

with open(".env", "w") as file:
    file.write(env_content)

print("Connections created successfully!")

```

This code reads the current `.env` content, replaces the lines with existing environmental variable placeholders using regular expressions, and writes the updated content back to the `.env` file. This ensures that the environment variables, such as the webhook URLs, are up-to-date.

Run the script:

```bash
poetry run python create-hookdeck-connections.py

```

Check your `.env` file to ensure all values are populated.

Also, navigate to the Connections section of the Hookdeck dashboard and check the visual representation of your connections.

![Hookdeck connection in the Hookdeck dashboard](./images/mongodb-ai/hookdeck-connections.png)

### Create MongoDB Atlas indexes

To search a MongoDB database efficiently, you need indexes. For MongoDB vector search, you must create an [Atlas Vector Search index](https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-overview/#atlas-vector-search-indexes). The `create-indexes.py` script automates the creation and updating of the search indexes in MongoDB using the `pymongo` library.

First, ensure you have the necessary imports and initialize the database connection:

```py
from allthethings.mongo import Database
from pymongo.operations import SearchIndexModel

database = Database()
collection = database.get_collection()

```

`Database` is defined in `allthethings/mongo.py` and provides utility access to the `assets` collection in the `iaat` database, with these string values defined in `config.py`.

Next, ensure that the required collection exists within the database so that the indexes can be created:

```py
if collection.name not in collection.database.list_collection_names():
    print("Creating empty collection so indexes can be created.")
    collection.database.create_collection(collection.name)

```

With the collection created, define a function to create or update search indexes:

```py
def create_or_update_search_index(index_name, index_definition, index_type):
    indexes = list(collection.list_search_indexes(index_name))

    if len(indexes) == 0:
        print(f'Creating search index: "{index_name}"')
        index_model = SearchIndexModel(
            definition=index_definition,
            name=index_name,
            type=index_type,
        )
        collection.create_search_index(model=index_model)

    else:
        print(f'Search index "{index_name}" already exists. Updating.')
        collection.update_search_index(name=index_name, definition=index_definition)

```

This function checks if an index with the given `index_name` already exists. It creates a new search index using the provided definition and type if it does not exist. If it exists, it updates the existing index with the new definition.

Now, create a vector search index for embeddings:

```py
vector_result = create_or_update_search_index(
    "vector_index",
    {
        "fields": [
            {
                "type": "vector",
                "path": "embedding",
                "numDimensions": 768,
                "similarity": "euclidean",
            }
        ]
    },
    "vectorSearch",
)

```

This creates or updates a vector search index named "vector_index" for the `embedding` field.

Finally, create a search index for the `url` field, as this is used to determine if a URL has already been indexed:

```py
create_or_update_search_index(
    "url_index",
    {
        "mappings": {
            "fields": {
                "url": {
                    "type": "string",
                },
            },
        }
    },
    "search",
)

print("Indexes created successfully!")

```

Run the script:

```bash
poetry run python create-indexes.py

```

Go to the Atlas Search section within the MongoDB Atlas dashboard and check the search indexes have been created.

![MongoDB Atlas dashboard Atlas Search indexes](./images/mongodb-ai/mongodb-atlas-search-indexes.png)

### Check the app is working

In one terminal window, run the Flask application:

```bash
poetry run python -m flask --app app --debug run

```

In a second terminal window, create a localtunnel using the Hookdeck CLI:

```bash
hookdeck listen 5000 '*'

```

This command listens to all Hookdeck sources connected to a CLI destination, routing webhooks to the application running locally on port 5000.

When you run the command, you will see output similar to the following:

```bash
Listening for events on Sources that have Connections with CLI Destinations

Dashboard
👉 Inspect and replay events: https://dashboard.hookdeck.com?team_id=tm_{id}

Sources
🔌 replicate-embedding URL: https://hkdk.events/{id}
🔌 replicate-audio URL: https://hkdk.events/{id}

Connections
replicate-embedding -> replicate-embedding forwarding to /webhooks/embedding
replicate-audio -> replicate-audio forwarding to /webhooks/audio

> Ready! (^C to quit)

```

Open `localhost:5000` in your web browser to ensure the Flask app is running.

![Index All the The Things app](./images/mongodb-ai/iaat-first-run.png)

## Submit content for analysis and indexing

With the app running, it's time to submit an asset for indexing.

Click Bruce (mp3) under the Examples header to populate the in-app search bar with a URL and click Submit.

![URL submitted for indexing](./images/mongodb-ai/iaat-bruce-submitted.png)

Submitting the form sends the URL to a `/process` endpoint as a `POST` request. Let's walk through what that code does.

First, define the `/process` route in `app.py`:

```py
@app.route("/process", methods=["POST"])
def process():
    url = request.form["url"]

    parsed_url = urlparse(url)
    if not all([parsed_url.scheme, parsed_url.netloc]):
        flash("Invalid URL")
        return redirect(url_for("index"))

```

This route handles the `POST` request to the `/process` endpoint and retrieves the URL from the form data submitted by the user. It validates the URL and redirects to the index page with an error message if it's not.

Next, check if the URL already exists in the database:

```py
    database = Database()
    collection = database.get_collection()

    exists = collection.find_one({"url": url})

    if exists is not None:
        flash("URL has already been indexed")
        return redirect(url_for("index"))

```

If the URL is already indexed, flash a message to the user and redirect them to the index page.

Ensure the resource exists:

```py
    req = urllib.request.Request(url, method="HEAD")
    fetch = urllib.request.urlopen(req)

    if fetch.status != 200:
        flash("URL is not reachable")
        return redirect(url_for("index"))

```

This code sends a `HEAD` request to the URL to avoid downloading the entire file. If the URL is not reachable (status code is not 200), flash a message to the user and redirect them to the index page.

Retrieve the content type and length from the response headers:

```py
    content_length = fetch.headers["Content-Length"]
    content_type = fetch.headers["Content-Type"]

```

This code extracts the content length and content type from the response headers.

Retrieve the appropriate asset processor based on the content type:

```py
    processor = get_asset_processor(content_type)

    if processor is None:
        flash('Unsupported content type "' + content_type + '"')
        return redirect(url_for("index"))

```

If no processor is found for the content type, flash a message to the user and redirect them to the index page.

The `get_asset_processor` function, defined in `allthethings/processors.py`, returns a processor used to analyze the contents of an asset based on the `content_type`.

```py
def get_asset_processor(
    content_type,
):
    if "audio/" in content_type:
        return AudioProcessor()
    elif "video/" in content_type:
        return None
    elif "image/" in content content_type:
        return None
    else:
        return None

```

In this case, the file is an MP3, and the `content_type` is `audio/mpeg`, so return an `AudioProcessor` instance.

Insert the URL, along with its content type and length, into the database with a status of `SUBMITTED`:

```py
    asset = collection.insert_one(
        {
            "url": url,
            "content_type": content_type,
            "content_length": content_length,
            "status": "SUBMITTED",
        }
    )

```

Process the URL using the asset processor, an `AudioProcessor`, and obtain the prediction results:

```py
    try:
        response = processor.process(asset.inserted_id, url)
    except Exception as e:
        app.logger.error("Error processing asset: %s", e)
        collection.update_one(
            filter={"url": url},
            update={
                "$set": {
                    "status": "PROCESSING_ERROR",
                    "error": str(e),
                }
            },
        )
        flash("Error processing asset")
        return redirect(url_for("index"))

```

Let's look at the `AudioProcessor` from `allthethings/processors.py` in more detail to understand what this does:

```py
import httpx
from config import Config

...

class AudioProcessor:
    def process(self, id, url):
        input = {
            "audio": url,
            "model": "large-v3",
            "language": "auto",
            "translate": False,
            "temperature": 0,
            "transcription": "plain text",
            "suppress_tokens": "-1",
            "logprob_threshold": -1,
            "no_speech_threshold": 0.6,
            "condition_on_previous_text": True,
            "compression_ratio_threshold": 2.4,
            "temperature_increment_on_fallback": 0.2,
        }

        payload = {
            "version": "cdd97b257f93cb89dede1c7584e3f3dfc969571b357dbcee08e793740bedd854",
            "input": input,
            "webhook": f"{Config.AUDIO_WEBHOOK_URL}/{id}",
            "webhook_events_filter": ["completed"],
        }

        response = httpx.request(
            "POST",
            f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions",
            headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS,
            json=payload,
        )

        return response.json()

```

`process` method processes the audio URL by creating a prediction request passing the `payload` as the JSON body.

`payload` includes `webhooks`, which consists of the `Config.AUDIO_WEBHOOK_URL` with an appended path (`/{id}`) that indicates which asset the callback is for. The use of the `webhook_events_filter=["completed"]` filter informs Replicate to only send a webhook when the prediction is completed.

The `payload.version` instructs Replicate to use the [OpenAI Whisper model](https://replicate.com/openai/whisper) for audio to text. The `input` includes details such as the language should be auto-detected and the transcription should be in `plain text`.

Since we're using Hookdeck as an outbound API queue, the request uses the `Config.HOOKDECK_REPLICATE_API_QUEUE_URL` with the API path `/predications` suffix. The appropriate auth headers are also used from `Config.HOOKDECK_QUEUE_AUTH_HEADERS`.

Back in `app.py`, update the database with the processing status and pending prediction details:

```py
    collection.update_one(
        filter={"url": url},
        update={
            "$set": {
                "status": "PROCESSING",
                "processor_response": response,
            }
        },
    )

```

The `processor_response` value is stored for debug purposes as it contains a Hookdeck request ID that can be useful.

Flash a success message to the user and redirect them to the index page:

```py
    flash(
        message="Processing: " + url + " with content type: " + content_type,
        category="success",
    )

    return redirect(url_for("index"))

```

At this point, the Flask application has offloaded all the work to Replicate, and, from a data journey perspective, we're waiting for the predication completed webhook.

### Handle audio to text prediction completion webhook

Once Replicate completes the predication, it makes a webhook callback to Hookdeck. Hookdeck instantly ingests the webhook, verifies the event came from Replicate, and pushes the data onto a queue for processing and delivery. Based on the current Hookdeck connection setup, the webhook event is delivered to the CLI and then to the `/webhooks/audio/<id>` endpoint of the Flask application. Let's look at the code that handles the `/webhooks/audio/<id>` request.

Here's the `/webhooks/audio/<id>` route definition in `app.py`:

```py
@app.route("/webhooks/audio/<id>", methods=["POST"])
def webhook_audio(id):
    if not verify_webhook(request):
        app.logger.error("Webhook signature verification failed")
        return jsonify({"error": "Webhook signature verification failed"}), 401

    payload = request.json
    app.logger.info("Audio payload received for id %s", id)
    app.logger.debug(payload)

```

This route handles `POST` requests to the `/webhooks/audio/<id>` endpoint. The `id` path parameter represents the asset in the MongoDB database that the audio callback is for. The JSON payload from the webhook callback from Replicate.

Before handling the webhook, we check that the webhook came from Hookdeck via a `verify_webhook` function. If the verification fails a `401` response is returned. Here's the code to verify the webhook:

```py
def verify_webhook(request):
    if Config.HOOKDECK_WEBHOOK_SECRET is None:
        app.logger.error("No HOOKDECK_WEBHOOK_SECRET found.")
        return False

    hmac_header = request.headers.get("x-hookdeck-signature")

    hash = base64.b64encode(
        hmac.new(
            Config.HOOKDECK_WEBHOOK_SECRET.encode(), request.data, hashlib.sha256
        ).digest()
    ).decode()

    verified = hash == hmac_header
    app.logger.debug("Webhook signature verification: %s", verified)
    return verified

```

This reads the Hookdeck webhook secret stored in the `HOOKDECK_WEBHOOK_SECRET` environment variable, generates a hash using the secret from the inbound webhook data, and compares it with the hash that was sent in the `x-hookdeck-signature` header. If they match, the webhook is verified.

Next, the processing status is determined based on the presence of an error in the payload:

```py
    database = Database()
    collection = database.get_collection()

    status = (
        "PROCESSING_ERROR" if "error" in payload and payload["error"] else "PROCESSED"
    )

```

If an error is present, the status is set to `PROCESSING_ERROR`. Otherwise, it is set to `PROCESSED`.

The database is updated with the transcription results and the processing status:

```py
    result = collection.find_one_and_update(
        filter={"_id": ObjectId(id)},
        update={
            "$set": {
                "status": status,
                "text": payload["output"]["transcription"],
                "replicate_response": payload,
            }
        },
        return_document=True,
    )

```

This finds the document in the database with the matching `id` and updates it with the new status, transcription `text`, and the entire Replicate response payload.

Next, we check to ensure the document was found:

```py
    if result is None:
        app.logger.error(
            "No document found for id %s to add audio transcript", payload["id"]
        )
        return jsonify({"error": "No document found to add audio transcript"}), 404

```

If no document is found for the given `id`, an error is logged, and a JSON response with an error message is returned. The `404` response will inform Hookdeck that although the request did not succeed, the request should not be retried.

With the audio converted to text and stored, the data journey moves to generating embeddings via Replicate:

```py
    app.logger.info("Transcription updated")
    app.logger.debug(result)

    request_embeddings(id)

    return "OK"

```

Next, the `request_embeddings` function is called to generate embeddings for the processed audio. The endpoint returns an `OK` response to inform Hookdeck the webhook has been successfully processed.

## Generate embedding

The `request_embeddings` function triggers the generation of embeddings for the textual representation of an indexed asset:

```py
def request_embeddings(id):
    app.logger.info("Requesting embeddings for %s", id)

    database = Database()
    collection = database.get_collection()

    asset = collection.find_one({"_id": id})

    if asset is None:
        raise RuntimeError("Asset not found")

    if asset["status"] != "PROCESSED":
        raise RuntimeError("Asset has not been processed")

```

If this asset with the passed `id` is not found or the status of the asset is not `PROCESSED`, which indicates that a textual representation has been created, a `RuntimeError` is raised.

### Trigger embedding generation with webhook callback

Next, the embeddings are generated for the processed asset using the `AsyncEmbeddingsGenerator`:

```py
    generator = AsyncEmbeddingsGenerator()

    try:
        response = generator.generate(id, asset["text"])
    except Exception as e:
        app.logger.error("Error generating embeddings for %s: %s", id, e)
        raise

```

This initializes the `AsyncEmbeddingsGenerator` and calls the `generate` function on the instance, passing the ID of the asset being indexed and the textual representation.

The `AsyncEmbeddingsGenerator` definition in `allthethings/generators.py` follows a similar pattern to the previously used processor:

```py
import httpx
from config import Config

class AsyncEmbeddingsGenerator:
    def generate(self, id, text):
        payload = {
            "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305",
            "input": {"text": text},
            "webhook": f"{Config.EMBEDDINGS_WEBHOOK_URL}/{id}",
            "webhook_events_filter": ["completed"],
        }

        response = httpx.request(
            "POST",
            f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions",
            headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS,
            json=payload,
        )

        return response.json()

```

The `generate` method receives the asset `id` and the `text` that embeddings are to be generated for.

A request `payload` is created containing a `version` that identifies that the [replicate/all-mpnet-base-v2](https://replicate.com/replicate/all-mpnet-base-v2) model is used to generate embeddings, and that the `text` for the embedding is passed within an `input` parameter.

The `webhook` property is set to `Config.EMBEDDINGS_WEBHOOK_URL` with an appended path (`/{id}`) that indicates which asset the callback is for. As before, the use of the `webhook_events_filter=["completed"]` filter informs Replicate to only send a webhook when the prediction is completed.

Since this is an asynchronous call, Hookdeck is again used to queue the Replicate API request via a call to the `HOOKDECK_REPLICATE_API_QUEUE_URL` endpoint with the `/predications` path.

The method returns the Hookdeck response.

Back in `app.py`, update the database with the status and embedding request ID:

```py
    collection.update_one(
        filter={"_id": ObjectId(id)},
        update={
            "$set": {
                "status": "GENERATING_EMBEDDINGS",
                "generator_response": response,
            }
        },
    )

```

Update the document in the database with the new status `GENERATING_EMBEDDINGS` and the Hookdeck queue response.

The request to asynchronously generate the embeddings has been triggered, and the work offloaded to Replicate. When the result is read, a webhook will be triggered with the result.

### Handle embedding generation webhook callback

Once Replicate has generated the embedding, a webhook callback is made to the `/webhooks/embedding/<id>` route in our Flask application. This route receives the webhook payload, verifies it came from Hookdeck, updates the database with the embedding results, and sets the appropriate status.

Here's the route definition:

```py
@app.route("/webhooks/audio/<id>", methods=["POST"])
def webhook_audio(id):
        if not verify_webhook(request):
        app.logger.error("Webhook signature verification failed")
        return jsonify({"error": "Webhook signature verification failed"}), 401

    payload = request.json
    app.logger.info("Audio payload received for id %s", id)
    app.logger.debug(payload)

```

This route handles `POST` requests to the `/webhooks/embedding/<id>` endpoint and is passed the `id` path parameter. It verifies the request came from Hookdeck and, if so, retrieves the JSON payload from the request. Otherwise, it returns a `401` response.

Next, it checks for errors:

```py
    status = (
        "EMBEDDINGS_ERROR" if "error" in payload and payload["error"] else "SEARCHABLE"
    )

```

If an error is present, the status is set to `EMBEDDINGS_ERROR`. Otherwise, it is set to `SEARCHABLE`.

Next, the vector embedding is extracted from the payload and the database is updated with the embedding details and the new status:

```py
    embedding = payload["output"][0]["embedding"]

    database = Database()
    collection = database.get_collection()

    result = collection.update_one(
        filter={"_id": ObjectId(id)},
        update={
            "$set": {
                "status": status,
                "embedding": embedding,
                "replicate_embeddings_response": payload,
            }
        },
    )

```

This finds the document in the database with the matching `id` and updates it with the new status, embedding, and the entire payload.

Check if the document was found and updated:

```py
    if result.matched_count == 0:
        app.logger.error(
            "No document found for id %s to update embedding", payload["id"]
        )
        return jsonify({"error": "No document found to update embedding"}), 404

    return "OK"

```

If no document is found for the given `id`, an error is logged, and a JSON response with an error message is returned with a `404` status. If the update was a success, return an `OK` to inform Hookdeck that the webhook has been processed.

With the vector embedding stored in the `embedding` property, it's now searchable with MongoDB due to the previously defined vector search index.

## Searching using Atlas Vector Search

Search is user-driven. The user enters a search term and submits a form. That search query is handled and processed, and the result is returned and displayed. Ideally, this is a real-time experience, so operations are performed synchronously.

Let's walk through each of those steps.

### Handle search submission

The user navigates to the `/search` endpoint in their web browser, enters a search term, and submits the form, making a `GET` request to the `/search` endpoint:

```py
@app.route("/search", methods=["GET"])
def search():
    query = request.args.get("query")
    if query is None:
        return render_template("search.html", results=[])

    app.logger.info("Query submitted")
    app.logger.debug(query)

    results = query_vector_search(query)

    results = format_results(results)

    app.logger.debug("Formatted search results", results)

    return render_template("search.html", results=results, query=query)

```

The `search` function in the Flask application handles `GET` requests to the `/search` endpoint. It retrieves the search `query` from the `request.args.get` submitted by the user. If there is no query, the `search` template is rendered. Otherwise, a vector search is performed using the `query_vector_search` function. The result is then formatted by passing the results to the `format_results` function. The formatted results are then rendered using the `search.html` template.

### Generating search query embeddings

The `query_vector_search` function generates embeddings for the query, performs a vector search using the query provided by the user, and retrieves matching documents from the MongoDB collection.

```py
def query_vector_search(q):
    generator = SyncEmbeddingsGenerator()

    try:
        generator_response = generator.generate(q)
        app.logger.debug(generator_response)
    except Exception as e:
        app.logger.error("Error generating embeddings: %s", e)
        return None

    if generator_response["status"] != "completed":
        app.logger.debug("Embeddings generation timed out")
        return None

    query_embedding = generator_response["output"][0]["embedding"]

```

The function takes the query, `q`, and uses the `SyncEmbeddingsGenerator` to generate the embedding for the search query by calling its `generate` function and passing the query. If the embedding creation fails for various reasons, `None` is returned.

The `SyncEmbeddingsGenerated` is used to synchronously generate embeddings for the search query. This operation is synchronous because the request is user-driven and requires a direct response. `SyncEmbeddingsGenerated` is defined in `allthethings/generators.py`:

```py
class SyncEmbeddingsGenerator:
    def generate(self, text):
        payload = {
            "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305",
            "input": {"text": text},
        }

        response = httpx.request(
            "POST",
            "https://api.replicate.com/v1/predictions",
            headers={**Config.REPLICATE_API_AUTH_HEADERS, "Prefer": "wait"},
            json=payload,
            timeout=60,
        )

        return response.json()

```

The `generate` function receives the `text` to generate an embedding from. A synchronous request is made directly to the Replicate HTTP API, passing the same [replicate/all-mpnet-base-v2](https://replicate.com/replicate/all-mpnet-base-v2) model `version` used in the asynchronous embedding request. The `"Prefer": "Wait"` header and `timeout` values are set to enable long-running synchronous HTTP requests. Also, the Replicate API token is included in the headers via `Config.REPLICATE_API_AUTH_HEADERS`.

The response JSON is returned to the calling function.

### Create Vector Search query

Back in `query_vector_search`, the embedding result is used to construct the vector search query.

```py
    ...

    query_embedding = generate_response[0]["embedding"]

    vs_query = {
        "index": "vector_index",
        "path": "embedding",
        "queryVector": query_embedding,
        "numCandidates": 100,
        "limit": 10,
    }

    new_search_query = {"$vectorSearch": vs_query}

    app.logger.info("Vector search query created")
    app.logger.debug(new_search_query)

```

`vs_query` represents the vector search to be performed. It identifies the `index` to be queried as `vector_index`; the `path` to the property, `embedding`, the query is on; and the result of the text query in embedding format (`"queryVector": query_embedding`). See the [MongoDB Vector Search docs](https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#mongodb-pipeline-pipe.-vectorSearch) for more information, including the purpose of the `numCandidates` and `limit` properties.

### Retrieve Vector Search results

Next, the function defines the projection to specify which fields to include in the search results.

```py
    project = {
        "$project": {
            "score": {"$meta": "vectorSearchScore"},
            "_id": 0,
            "url": 1,
            "content_type": 1,
            "content_length": 1,
            "text": 1,
        }
    }

```

The projection includes the vector search score, URL, content type, content length, and text. For more information on the score, see the [Atlas Vector Search Score docs](https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#atlas-vector-search-score).

The function then performs the aggregation query using the constructed vector search query and projection:

```py
    database = Database()
    collection = database.get_collection()

    app.logger.info("Vector search query without post filter")
    res = list(collection.aggregate([new_search_query, project]))

    app.logger.info("Vector search query run")
    app.logger.debug(res)
    return res

```

Overall, the `query_vector_search` function performs a vector search using the query provided by the user, generates embeddings for the query, and retrieves matching documents from the MongoDB database.

#### Format and display the Vector Search results

Next, within `search_post` in `app.py`, the results are formatted for rendering:

```py
    results = format_results(results)

```

And within `format_results`, also defined in `app.py`:

```py
def format_results(results):
    formatted_results = []
    for _idx, index in enumerate(results):
        parse_result = urlparse(index["url"])
        parsed_url = {
            "netloc": parse_result.netloc,
            "path": parse_result.path,
            "params": parse_result.params,
            "query": parse_result.query,
            "fragment": parse_result.fragment,
            "hostname": parse_result.hostname,
            "last_part": parse_result.path.rstrip("/").split("/")[-1],
        }
        index["parsed_url"] = parsed_url
        formatted_results.append(index)

    return formatted_results

```

The `format_results` function iterates over the vector search result and returns an array with each element containing the result along with a `parsed_url` property with information about the indexed asset.

Finally, back in the `POST /search` route, the results are displayed:

```py
@app.route("/search", methods=["POST"])
def search_post():
    ...

    results = format_results(results)

    return render_template("search.html", results=results, query=query)

```

This renders the `search.html` template, passing the formatted results and the original query to the template for display.

![Vector search results](./images/mongodb-ai/search-result.png)

## Conclusion

In this tutorial, we walked through the components used in a Flask application that can index and run a text search on any asset with a public URL. Data is stored, and vector search takes place via MongoDB. AI inference and embedding generation is performed by Replicate. Hookdeck is used as a serverless queue between the Flask application and Replicate to manage API request rate-limiting to Replicate, and to verify, queue, and guarantee the delivery of asynchronous webhook callbacks from Replicate back to the Flask application.

If you haven't already done so, you can [grab the Index All The Things code on GitHub](https://github.com/hookdeck/index-all-the-things). There are a number of [feature issues](https://github.com/hookdeck/index-all-the-things/labels/enhancement) to add support for additional content types, so feel free to get involved.

Finally, if you've any questions or ideas, please share them either via a GitHub issue on the repo or by messaging me on [X](https://x.com/leggetter) or [Bluesky](https://bsky.app/profile/leggetter.co.uk).