Designing an AI Commercial Real Estate Assistant: A Comprehensive Guide with Implementation Scripts - Cirebon Raya Jeh | Artificial Intelligence Financial System

Designing an AI Commercial Real Estate Assistant: A Comprehensive Guide with Implementation Scripts

The commercial real estate (CRE) industry is undergoing a fundamental transformation driven by artificial intelligence. What was once a sector defined by manual processes, fragmented data systems, and intuition-based decision-making is rapidly evolving into a data-driven ecosystem where AI-powered assistants are becoming indispensable tools.

CBRE, the world's largest commercial real estate services firm, faced challenges with fragmented property data scattered across 10 distinct sources and four separate databases, forcing property management professionals to manually search through millions of documents and switch between multiple systems. To address this, CBRE partnered with AWS to build a next-generation unified search and digital assistant experience within their PULSE system, achieving a 67% reduction in SQL query generation time, 80% improvement in database query performance, and 95% accuracy in search results.

Similarly, JLL developed JLL Property Assistant built on JLL Falcon, an AI platform specifically designed for the real estate industry. The tool integrates with JLL's Acumen platform, bringing together financial data from systems like Yardi and MRI, operational data from Prism by Building Engines, and other property technology tools. Through a natural language chat interface, property and asset managers can ask freeform questions about their buildings.

This guide provides a comprehensive, code-focused framework for designing and implementing an AI Commercial Real Estate Assistant, with complete, runnable Python scripts for each component.

The Core Problems

1.1 Data Fragmentation

CBRE's experience exemplifies the industry-wide challenge: property management professionals had to navigate 10 distinct data sources and four separate databases containing both structured transactional data and unstructured documents (lease agreements, property inspections, maintenance records). This fragmentation created significant productivity losses and made it difficult to derive comprehensive insights about property operations.

1.2 The Expertise Bottleneck

Experts in property management, not database syntax, needed to ask complex questions in natural language, quickly synthesize disparate information, and avoid manual review of lengthy documents. AI assistants remove this bottleneck, enabling professionals to focus on strategic activities rather than data retrieval.

1.3 Limited Insight

When data is scattered, professionals cannot easily identify emerging tenant trends, analyze expense patterns at scale, or access financial performance metrics in real-time. JLL Property Assistant addresses this by generating tenancy reports, auto-generating stacking plans, analyzing expense trends, and uncovering tenant retention and occupancy insights.

Solution Architecture

2.1 Reference Architecture (CBRE Model)

CBRE's production architecture leverages Amazon Bedrock as the central AI orchestration layer, providing access to multiple foundation models through a single API. The architecture uses:

  • Amazon Nova Pro specifically for SQL query generation

  • Claude Haiku for document interactions

  • Amazon OpenSearch Service for indexing and searching

  • Amazon RDS (PostgreSQL) for structured transactional data

  • Amazon DynamoDB for conversation history

  • Amazon ElastiCache for Redis for user-specific permissions

The architecture is organized around two primary interaction pathways: SQL Interact for structured data and DocInteract for unstructured documents.

2.2 Why RAG

Retrieval-Augmented Generation (RAG) allows an AI tool to pull current market data, specific lease clauses, or underwriting methodologies from a curated knowledge base before generating a response, significantly reducing inaccuracies and keeping outputs grounded in real, up-to-date information.

Complete Implementation Scripts

3.1 Environment Setup

Create a virtual environment and install dependencies:

bash
# Create and activate virtual environment
python3 -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate

# Create requirements.txt
cat > requirements.txt << 'EOF'
fastapi==0.115.6
uvicorn==0.34.0
langchain==0.3.13
langchain-community==0.3.13
langchain-aws==0.2.6
langchain-openai==0.3.0
boto3==1.35.76
pypdf==5.1.0
python-dotenv==1.0.1
qdrant-client==1.12.1
sentence-transformers==3.3.1
opensearch-py==2.7.1
requests-aws4auth==1.3.1
pydantic==2.10.4
EOF

# Install dependencies
pip install -r requirements.txt

Create environment configuration:

bash
cat > .env << 'EOF'
# AWS Configuration
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key

# Model Configuration
BEDROCK_SQL_MODEL=amazon.nova-pro-v1:0
BEDROCK_DOC_MODEL=anthropic.claude-3-haiku-20240307-v1:0
BEDROCK_EMBED_MODEL=amazon.titan-embed-text-v2:0

# Database Configuration DB_HOST=localhost DB_NAME=cre_portfolio DB_USER=postgres DB_PASSWORD=your_password # OpenSearch Configuration OPENSEARCH_HOST=your-opensearch-host OPENSEARCH_PORT=443 OPENSEARCH_INDEX=cre_documents # Qdrant Configuration (if using Qdrant instead of OpenSearch) QDRANT_HOST=localhost QDRANT_PORT=6333 COLLECTION_NAME=cre_knowledge_base # Application Settings CHUNK_SIZE=500 CHUNK_OVERLAP=50 TOP_K=3
EOF

3.2 Data Models

Create the data models (models.py):

python
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
from datetime import datetime
from enum import Enum

class QueryType(str, Enum):
STRUCTURED = "structured"
UNSTRUCTURED = "unstructured"
HYBRID = "hybrid"

class QueryRequest(BaseModel):
question: str
session_id: Optional[str] = "default"
user_id: Optional[str] = None
query_type: Optional[QueryType] = QueryType.HYBRID

class QueryResponse(BaseModel):
answer: str
sources: List[Dict[str, Any]]
query_type: str
execution_time_ms: float
token_usage: Optional[Dict[str, int]] = None

class ConversationHistory(BaseModel):
session_id: str
messages: List[Dict[str, str]]
created_at: datetime
updated_at: datetime

class PropertyData(BaseModel):
property_id: str
property_name: str
address: str
city: str
state: str
property_type: str # office, retail, industrial, multifamily
square_feet: float
occupancy_rate: float
noi: float # Net Operating Income
vacancy_risk: Optional[float] = None
lease_expirations: List[Dict[str, Any]]

3.3 RAG Implementation with OpenSearch

Create the RAG engine (rag_engine.py):

python
import os
import boto3
import json
from typing import List, Dict, Any, Optional
from langchain_aws import BedrockEmbeddings, ChatBedrock
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import pypdf

class RAGEngine:
def __init__(self):
self.region = os.getenv("AWS_REGION", "us-east-1")
self.embed_model_id = os.getenv("BEDROCK_EMBED_MODEL", "amazon.titan-embed-text-v2:0")
self.doc_model_id = os.getenv("BEDROCK_DOC_MODEL", "anthropic.claude-3-haiku-20240307-v1:0")
# Initialize Bedrock client
self.bedrock_runtime = boto3.client(
service_name='bedrock-runtime',
region_name=self.region
)
# Initialize embeddings
self.embeddings = BedrockEmbeddings(
model_id=self.embed_model_id,
client=self.bedrock_runtime
)
# Initialize LLM for document Q&A
self.llm = ChatBedrock(
model_id=self.doc_model_id,
client=self.bedrock_runtime,
model_kwargs={"temperature": 0.1, "max_tokens": 500}
)
# Initialize OpenSearch connection
self._init_opensearch()
# Initialize text splitter
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=int(os.getenv("CHUNK_SIZE", 500)),
chunk_overlap=int(os.getenv("CHUNK_OVERLAP", 50)),
separators=["\n\n", "\n", ". ", " ", ""]
)
# Initialize prompt template
self.prompt_template = PromptTemplate(
template="""
You are a commercial real estate assistant. Use the following context to answer the question.
The context includes property data, lease agreements, and market information.
Context:
{context}
Question: {question}
Provide a clear, concise answer based only on the context provided.
If the context does not contain the answer, say "I don't have enough information to answer this question."
Answer:
""",
input_variables=["context", "question"]
)
def _init_opensearch(self):
"""Initialize OpenSearch connection with AWS Signature V4 authentication"""
host = os.getenv("OPENSEARCH_HOST")
port = int(os.getenv("OPENSEARCH_PORT", 443))
index_name = os.getenv("OPENSEARCH_INDEX", "cre_documents")
# AWS Signature V4 authentication for OpenSearch Serverless
service = 'aoss'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
self.region,
service,
session_token=credentials.token
)
self.opensearch_client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300
)
self.vector_store = OpenSearchVectorSearch(
opensearch_url=f"https://{host}:{port}",
index_name=index_name,
embedding_function=self.embeddings,
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
def ingest_document(self, file_path: str, metadata: Dict[str, Any] = None) -> int:
"""
Ingest a document into the vector store.
Args:
file_path: Path to the document (PDF, TXT, etc.)
metadata: Optional metadata for the document
Returns:
Number of chunks ingested
"""
if metadata is None:
metadata = {}
# Extract text from PDF
if file_path.endswith('.pdf'):
reader = pypdf.PdfReader(file_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
else:
with open(file_path, 'r') as f:
text = f.read()
# Split text into chunks
chunks = self.text_splitter.split_text(text)
# Prepare documents with metadata
documents = []
for i, chunk in enumerate(chunks):
doc_metadata = metadata.copy()
doc_metadata['chunk_index'] = i
doc_metadata['source'] = file_path
doc_metadata['total_chunks'] = len(chunks)
documents.append((chunk, doc_metadata))
# Add to vector store
self.vector_store.add_texts(
texts=[doc[0] for doc in documents],
metadatas=[doc[1] for doc in documents]
)
return len(documents)
def ingest_lease_documents(self, lease_data: List[Dict[str, Any]]) -> int:
"""
Ingest structured lease data as documents.
Args:
lease_data: List of lease dictionaries
Returns:
Number of documents ingested
"""
total = 0
for lease in lease_data:
# Format lease as text
lease_text = f"""
Lease Agreement
Property: {lease.get('property_name', 'Unknown')}
Tenant: {lease.get('tenant_name', 'Unknown')}
Start Date: {lease.get('start_date', 'Unknown')}
End Date: {lease.get('end_date', 'Unknown')}
Rent: ${lease.get('monthly_rent', 0):,.2f} per month
Square Feet: {lease.get('square_feet', 0):,}
Lease Type: {lease.get('lease_type', 'Unknown')}
Renewal Options: {lease.get('renewal_options', 'None')}
"""
metadata = {
'property_id': lease.get('property_id'),
'tenant_id': lease.get('tenant_id'),
'document_type': 'lease',
'property_name': lease.get('property_name')
}
# Add to vector store
self.vector_store.add_texts(
texts=[lease_text],
metadatas=[metadata]
)
total += 1
return total
def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:
"""
Query the RAG system.
Args:
question: User question
top_k: Number of documents to retrieve
Returns:
Dictionary with answer and sources
"""
# Retrieve relevant documents
docs = self.vector_store.similarity_search(question, k=top_k)
# Prepare context
context = "\n\n".join([doc.page_content for doc in docs])
# Format prompt
prompt = self.prompt_template.format(context=context, question=question)
# Generate response
response = self.llm.invoke(prompt)
# Extract sources
sources = []
for doc in docs:
sources.append({
'content': doc.page_content[:200] + "...",
'metadata': doc.metadata
})
return {
'answer': response.content,
'sources': sources,
'retrieved_count': len(docs)
}

3.4 Text-to-SQL Implementation

Create the SQL generation engine (sql_engine.py):

python
import os
import boto3
import json
import psycopg2
from typing import List, Dict, Any, Optional

class TextToSQLEngine:
"""
Text-to-SQL engine using Amazon Nova Pro via Bedrock.
Based on AWS sample implementation for data consistency in generative AI.
"""
def __init__(self):
self.region = os.getenv("AWS_REGION", "us-east-1")
self.sql_model_id = os.getenv("BEDROCK_SQL_MODEL", "amazon.nova-pro-v1:0")
# Initialize Bedrock client
self.bedrock_runtime = boto3.client(
service_name='bedrock-runtime',
region_name=self.region
)
# Database connection
self.db_host = os.getenv("DB_HOST", "localhost")
self.db_name = os.getenv("DB_NAME", "cre_portfolio")
self.db_user = os.getenv("DB_USER", "postgres")
self.db_password = os.getenv("DB_PASSWORD", "")
# Cache schema
self.schema_cache = None
def get_database_schema(self) -> str:
"""
Retrieve database schema dynamically.
"""
if self.schema_cache:
return self.schema_cache
conn = psycopg2.connect(
host=self.db_host,
database=self.db_name,
user=self.db_user,
password=self.db_password
)
schema_parts = []
cursor = conn.cursor()
# Get tables
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")
tables = cursor.fetchall()
for table in tables:
table_name = table[0]
cursor.execute(f"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '{table_name}'
ORDER BY ordinal_position
""")
columns = cursor.fetchall()
schema_parts.append(f"Table: {table_name}")
for col in columns:
schema_parts.append(f" - {col[0]} ({col[1]}){' NULL' if col[2] == 'YES' else ' NOT NULL'}")
schema_parts.append("")
conn.close()
self.schema_cache = "\n".join(schema_parts)
return self.schema_cache
def generate_sql(self, natural_language_query: str) -> Dict[str, Any]:
"""
Generate SQL from natural language using Amazon Nova Pro.
Args:
natural_language_query: User question in natural language
Returns:
Dictionary with SQL query and explanation
"""
schema = self.get_database_schema()
# System prompt for SQL generation
system_prompt = f"""
You are an expert SQL query generator for a commercial real estate database.
Generate precise, deterministic SQL queries based on the user's natural language question.
Database Schema:
{schema}
Rules:
1. Always use the exact table and column names from the schema
2. Include appropriate WHERE clauses for filtering
3. Use proper JOINs when data spans multiple tables
4. Use aggregate functions (COUNT, SUM, AVG) when appropriate
5. Return only the SQL query, no additional text
6. Do not use SELECT *
"""
# Format messages for Converse API
messages = [
{
"role": "user",
"content": [
{
"text": f"Convert this to SQL: {natural_language_query}"
}
]
}
]
# Call Bedrock with Nova Pro
response = self.bedrock_runtime.converse(
modelId=self.sql_model_id,
messages=messages,
system=[{"text": system_prompt}],
inferenceConfig={
"temperature": 0.0, # Deterministic output
"maxTokens": 500
}
)
sql_query = response['output']['message']['content'][0]['text']
# Clean up the SQL (remove markdown code blocks if present)
sql_query = sql_query.replace("```sql", "").replace("```", "").strip()
return {
'sql': sql_query,
'tokens_used': response.get('usage', {}).get('totalTokens', 0)
}
def execute_sql(self, sql_query: str) -> List[Dict[str, Any]]:
"""
Execute SQL query and return results.
Args:
sql_query: SQL query string
Returns:
List of result rows as dictionaries
"""
conn = psycopg2.connect(
host=self.db_host,
database=self.db_name,
user=self.db_user,
password=self.db_password
)
cursor = conn.cursor()
cursor.execute(sql_query)
# Get column names
columns = [desc[0] for desc in cursor.description] if cursor.description else []
# Fetch results
rows = cursor.fetchall()
# Convert to list of dictionaries
results = []
for row in rows:
results.append(dict(zip(columns, row)))
conn.close()
return results
def query(self, natural_language_query: str) -> Dict[str, Any]:
"""
Complete pipeline: natural language → SQL → execution → results.
Args:
natural_language_query: User question
Returns:
Dictionary with SQL, results, and formatted answer
"""
# Generate SQL
sql_result = self.generate_sql(natural_language_query)
sql_query = sql_result['sql']
# Execute SQL
try:
results = self.execute_sql(sql_query)
# Format results for response
if results:
# Use Nova Lite to format results
formatting_prompt = f"""
Format the following query results as a clear, concise answer.
Query: {natural_language_query}
SQL: {sql_query}
Results: {json.dumps(results, indent=2, default=str)}
Provide a professional summary of the results.
"""
# Use Nova Lite for formatting
format_response = self.bedrock_runtime.converse(
modelId="amazon.nova-lite-v1:0",
messages=[{"role": "user", "content": [{"text": formatting_prompt}]}],
inferenceConfig={"temperature": 0.1, "maxTokens": 300}
)
formatted_answer = format_response['output']['message']['content'][0]['text']
else:
formatted_answer = "No results found for your query."
return {
'sql': sql_query,
'results': results,
'answer': formatted_answer,
'row_count': len(results)
}
except Exception as e:
return {
'sql': sql_query,
'error': str(e),
'answer': f"An error occurred while executing the query: {str(e)}"
}

3.5 Orchestration Layer

Create the orchestration engine (orchestrator.py):

python
import os
import time
import json
from typing import Dict, Any, Optional
from datetime import datetime
import boto3
from dynamodb_utils import ConversationManager

class Orchestrator:
"""
Central orchestration layer for the AI Commercial Real Estate Assistant.
Routes queries to appropriate engines based on intent.
"""
def __init__(self):
self.rag_engine = None
self.sql_engine = None
self.conversation_manager = ConversationManager()
# Initialize engines lazily
self._init_engines()
def _init_engines(self):
"""Initialize RAG and SQL engines."""
try:
from rag_engine import RAGEngine
self.rag_engine = RAGEngine()
except Exception as e:
print(f"Warning: RAG engine initialization failed: {e}")
try:
from sql_engine import TextToSQLEngine
self.sql_engine = TextToSQLEngine()
except Exception as e:
print(f"Warning: SQL engine initialization failed: {e}")
def detect_query_type(self, question: str) -> str:
"""
Detect whether the query needs structured or unstructured data.
"""
# Keywords suggesting structured data queries
structured_keywords = [
'how many', 'count', 'total', 'sum', 'average', 'list',
'properties with', 'tenants', 'vacancy', 'occupancy',
'rent', 'lease', 'noi', 'expense', 'revenue'
]
question_lower = question.lower()
for keyword in structured_keywords:
if keyword in question_lower:
return 'structured'
# Keywords suggesting unstructured document queries
unstructured_keywords = [
'explain', 'describe', 'tell me about', 'what is',
'why', 'how does', 'definition', 'meaning'
]
for keyword in unstructured_keywords:
if keyword in question_lower:
return 'unstructured'
return 'hybrid'
def route_query(self, question: str, query_type: str = None) -> Dict[str, Any]:
"""
Route query to appropriate engine.
Args:
question: User question
query_type: Optional forced query type
Returns:
Response dictionary
"""
start_time = time.time()
if query_type is None:
query_type = self.detect_query_type(question)
response = {
'question': question,
'query_type': query_type,
'execution_time_ms': 0,
'answer': '',
'sources': []
}
if query_type == 'structured' and self.sql_engine:
# Use Text-to-SQL
result = self.sql_engine.query(question)
response['answer'] = result.get('answer', '')
response['sql'] = result.get('sql')
response['row_count'] = result.get('row_count', 0)
elif query_type == 'unstructured' and self.rag_engine:
# Use RAG
result = self.rag_engine.query(question)
response['answer'] = result.get('answer', '')
response['sources'] = result.get('sources', [])
else: # hybrid
# Try both and combine results
structured_result = None
unstructured_result = None
if self.sql_engine:
try:
structured_result = self.sql_engine.query(question)
except Exception as e:
pass
if self.rag_engine:
try:
unstructured_result = self.rag_engine.query(question)
except Exception as e:
pass
# Combine results
if structured_result and structured_result.get('answer'):
response['answer'] = structured_result.get('answer')
response['sql'] = structured_result.get('sql')
response['sources'] = []
elif unstructured_result and unstructured_result.get('answer'):
response['answer'] = unstructured_result.get('answer')
response['sources'] = unstructured_result.get('sources', [])
else:
response['answer'] = "I couldn't find an answer to your question. Please rephrase or provide more specific information."
response['execution_time_ms'] = (time.time() - start_time) * 1000
return response
def chat(self, question: str, session_id: str = None) -> Dict[str, Any]:
"""
Full chat interface with conversation history.
Args:
question: User question
session_id: Optional session identifier
Returns:
Complete response with history
"""
if session_id is None:
session_id = f"session_{datetime.now().timestamp()}"
# Get conversation history
history = self.conversation_manager.get_history(session_id)
# Add context from history to question
if history and len(history) > 0:
# Simple context: last 3 exchanges
context = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in history[-6:] # Last 6 messages = 3 exchanges
])
enhanced_question = f"Context from previous conversation:\n{context}\n\nCurrent question: {question}"
else:
enhanced_question = question
# Get response
response = self.route_query(enhanced_question)
# Store conversation
self.conversation_manager.add_message(session_id, 'user', question)
self.conversation_manager.add_message(session_id, 'assistant', response['answer'])
return {
'session_id': session_id,
'question': question,
'answer': response['answer'],
'query_type': response.get('query_type'),
'execution_time_ms': response.get('execution_time_ms'),
'sources': response.get('sources', [])
}

3.6 Conversation Management with DynamoDB

Create the conversation manager (dynamodb_utils.py):

python
import os
import boto3
import json
from typing import List, Dict, Any, Optional
from datetime import datetime
from decimal import Decimal

class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)

class ConversationManager:
"""
Manages conversation history using Amazon DynamoDB.
"""
def __init__(self):
self.table_name = os.getenv("DYNAMODB_TABLE", "cre_conversations")
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(self.table_name)
# Create table if it doesn't exist
self._create_table_if_not_exists()
def _create_table_if_not_exists(self):
"""Create DynamoDB table if it doesn't exist."""
try:
self.table.table_status
except self.dynamodb.meta.client.exceptions.ResourceNotFoundException:
# Create table
self.dynamodb.create_table(
TableName=self.table_name,
KeySchema=[
{'AttributeName': 'session_id', 'KeyType': 'HASH'},
{'AttributeName': 'timestamp', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'session_id', 'AttributeType': 'S'},
{'AttributeName': 'timestamp', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
# Wait for table to be created
self.table.meta.client.get_waiter('table_exists').wait(TableName=self.table_name)
def get_history(self, session_id: str, limit: int = 50) -> List[Dict[str, str]]:
"""
Get conversation history for a session.
Args:
session_id: Session identifier
limit: Maximum number of messages to retrieve
Returns:
List of messages with 'role' and 'content'
"""
try:
response = self.table.query(
KeyConditionExpression='session_id = :sid',
ExpressionAttributeValues={':sid': session_id},
Limit=limit,
ScanIndexForward=True # Oldest first
)
messages = []
for item in response.get('Items', []):
messages.append({
'role': item.get('role', 'unknown'),
'content': item.get('content', ''),
'timestamp': item.get('timestamp', '')
})
return messages
except Exception as e:
print(f"Error retrieving history: {e}")
return []
def add_message(self, session_id: str, role: str, content: str) -> bool:
"""
Add a message to conversation history.
Args:
session_id: Session identifier
role: 'user' or 'assistant'
content: Message content
Returns:
True if successful, False otherwise
"""
try:
timestamp = datetime.now().isoformat()
self.table.put_item(
Item={
'session_id': session_id,
'timestamp': timestamp,
'role': role,
'content': content,
'ttl': int(datetime.now().timestamp()) + 86400 * 30 # 30 days TTL
}
)
return True
except Exception as e:
print(f"Error adding message: {e}")
return False
def delete_session(self, session_id: str) -> bool:
"""
Delete all messages for a session.
Args:
session_id: Session identifier
Returns:
True if successful
"""
try:
# Query all items for the session
response = self.table.query(
KeyConditionExpression='session_id = :sid',
ExpressionAttributeValues={':sid': session_id}
)
# Delete each item
with self.table.batch_writer() as batch:
for item in response.get('Items', []):
batch.delete_item(
Key={
'session_id': item['session_id'],
'timestamp': item['timestamp']
}
)
return True
except Exception as e:
print(f"Error deleting session: {e}")
return False

3.7 FastAPI Application

Create the main application (main.py):

python
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any
import uvicorn
import json
import asyncio
from datetime import datetime

from models import QueryRequest, QueryResponse, ConversationHistory
from orchestrator import Orchestrator

# Initialize FastAPI app
app = FastAPI(
title="AI Commercial Real Estate Assistant API",
description="Enterprise-grade AI assistant for commercial real estate professionals",
version="1.0.0"
)

# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Initialize orchestrator
orchestrator = Orchestrator()

# Health check endpoint
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": {
"orchestrator": "ready",
"rag_engine": "ready" if orchestrator.rag_engine else "unavailable",
"sql_engine": "ready" if orchestrator.sql_engine else "unavailable"
}
}

# Main chat endpoint
@app.post("/chat")
async def chat(request: QueryRequest) -> Dict[str, Any]:
"""
Send a message to the AI assistant.
Args:
request: QueryRequest with question and session_id
Returns:
Response with answer and metadata
"""
try:
response = orchestrator.chat(
question=request.question,
session_id=request.session_id
)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

# Streaming chat endpoint
@app.post("/chat/stream")
async def chat_stream(request: QueryRequest):
"""
Stream the AI assistant's response token by token.
"""
async def generate():
try:
# Get the full response
response = orchestrator.chat(
question=request.question,
session_id=request.session_id
)
# Stream the answer token by token
answer = response.get('answer', '')
for word in answer.split():
yield json.dumps({
"type": "token",
"content": word + " "
}) + "\n"
await asyncio.sleep(0.01) # Simulate streaming
# Send completion
yield json.dumps({
"type": "complete",
"metadata": {
"session_id": response.get('session_id'),
"query_type": response.get('query_type'),
"execution_time_ms": response.get('execution_time_ms')
}
}) + "\n"
except Exception as e:
yield json.dumps({
"type": "error",
"content": str(e)
}) + "\n"
return StreamingResponse(generate(), media_type="application/x-ndjson")

# Get conversation history
@app.get("/conversations/{session_id}")
async def get_conversation(session_id: str) -> Dict[str, Any]:
"""
Get conversation history for a session.
"""
history = orchestrator.conversation_manager.get_history(session_id)
return {
"session_id": session_id,
"messages": history,
"count": len(history)
}

# Delete conversation
@app.delete("/conversations/{session_id}")
async def delete_conversation(session_id: str) -> Dict[str, Any]:
"""
Delete a conversation session.
"""
success = orchestrator.conversation_manager.delete_session(session_id)
return {
"session_id": session_id,
"deleted": success
}

# Ingest document
@app.post("/ingest/document")
async def ingest_document(request: Request):
"""
Ingest a document into the knowledge base.
"""
if not orchestrator.rag_engine:
raise HTTPException(status_code=503, detail="RAG engine not available")
data = await request.json()
file_path = data.get('file_path')
metadata = data.get('metadata', {})
if not file_path:
raise HTTPException(status_code=400, detail="file_path required")
try:
chunks = orchestrator.rag_engine.ingest_document(file_path, metadata)
return {
"status": "success",
"file_path": file_path,
"chunks_ingested": chunks
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

# Ingest lease data
@app.post("/ingest/leases")
async def ingest_leases(request: Request):
"""
Ingest lease data into the knowledge base.
"""
if not orchestrator.rag_engine:
raise HTTPException(status_code=503, detail="RAG engine not available")
data = await request.json()
leases = data.get('leases', [])
if not leases:
raise HTTPException(status_code=400, detail="leases array required")
try:
count = orchestrator.rag_engine.ingest_lease_documents(leases)
return {
"status": "success",
"leases_ingested": count
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

# Get database schema (for debugging)
@app.get("/schema")
async def get_schema():
"""
Get the database schema used by the SQL engine.
"""
if not orchestrator.sql_engine:
raise HTTPException(status_code=503, detail="SQL engine not available")
try:
schema = orchestrator.sql_engine.get_database_schema()
return {"schema": schema}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)

3.8 Frontend Integration (React/Next.js)

Create a simple frontend component (ChatInterface.tsx):

typescript
import React, { useState, useRef, useEffect } from 'react';

interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
timestamp: Date;
}

interface ChatResponse {
session_id: string;
question: string;
answer: string;
query_type?: string;
execution_time_ms?: number;
sources?: Array<{ content: string; metadata: any }>;
}

const ChatInterface: React.FC = () => {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isLoading, setIsLoading] = useState(false);
const [sessionId, setSessionId] = useState<string>(
`session_${Date.now()}`
);
const messagesEndRef = useRef<HTMLDivElement>(null);

const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};

useEffect(() => {
scrollToBottom();
}, [messages]);

const sendMessage = async () => {
if (!input.trim() || isLoading) return;

const userMessage: Message = {
id: Date.now().toString(),
role: 'user',
content: input,
timestamp: new Date(),
};

setMessages((prev) => [...prev, userMessage]);
setInput('');
setIsLoading(true);

try {
const response = await fetch('http://localhost:8000/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
question: input,
session_id: sessionId,
}),
});

if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}

const data: ChatResponse = await response.json();

const assistantMessage: Message = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: data.answer,
timestamp: new Date(),
};

setMessages((prev) => [...prev, assistantMessage]);

// Update session ID if provided
if (data.session_id) {
setSessionId(data.session_id);
}

} catch (error) {
console.error('Error sending message:', error);
const errorMessage: Message = {
id: (Date.now() + 1).toString(),
role: 'assistant',
content: 'Sorry, an error occurred. Please try again.',
timestamp: new Date(),
};
setMessages((prev) => [...prev, errorMessage]);
} finally {
setIsLoading(false);
}
};

const handleKeyPress = (e: React.KeyboardEvent) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
};

const clearConversation = async () => {
try {
await fetch(`http://localhost:8000/conversations/${sessionId}`, {
method: 'DELETE',
});
setMessages([]);
setSessionId(`session_${Date.now()}`);
} catch (error) {
console.error('Error clearing conversation:', error);
}
};

return (
<div className="flex flex-col h-screen max-w-4xl mx-auto p-4">
{/* Header */}
<div className="flex justify-between items-center mb-4 pb-4 border-b">
<h1 className="text-2xl font-bold text-gray-800">
🏢 CRE AI Assistant
</h1>
<button
onClick={clearConversation}
className="px-4 py-2 text-sm text-red-600 border border-red-300 rounded hover:bg-red-50"
>
Clear Chat
</button>
</div>

{/* Messages */}
<div className="flex-1 overflow-y-auto mb-4 space-y-4">
{messages.length === 0 && (
<div className="text-center text-gray-500 mt-20">
<p className="text-lg">Ask me anything about your commercial real estate portfolio</p>
<p className="text-sm mt-2">
Examples: "Show me properties with vacancy risk above 15%" or
"What's our total NOI this quarter?"
</p>
</div>
)}
{messages.map((message) => (
<div
key={message.id}
className={`flex ${
message.role === 'user' ? 'justify-end' : 'justify-start'
}`}
>
<div
className={`max-w-[80%] p-3 rounded-lg ${
message.role === 'user'
? 'bg-blue-600 text-white'
: 'bg-gray-100 text-gray-800'
}`}
>
<p className="whitespace-pre-wrap">{message.content}</p>
<span className="text-xs opacity-70 mt-1 block">
{message.timestamp.toLocaleTimeString()}
</span>
</div>
</div>
))}
{isLoading && (
<div className="flex justify-start">
<div className="bg-gray-100 p-3 rounded-lg">
<span className="inline-block w-2 h-2 bg-gray-500 rounded-full animate-bounce mr-1"></span>
<span className="inline-block w-2 h-2 bg-gray-500 rounded-full animate-bounce mr-1" style={{ animationDelay: '0.2s' }}></span>
<span className="inline-block w-2 h-2 bg-gray-500 rounded-full animate-bounce" style={{ animationDelay: '0.4s' }}></span>
</div>
</div>
)}
<div ref={messagesEndRef} />
</div>

{/* Input */}
<div className="flex gap-2 border-t pt-4">
<textarea
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={handleKeyPress}
placeholder="Ask about your properties, leases, or investments..."
className="flex-1 p-2 border rounded-lg resize-none focus:outline-none focus:ring-2 focus:ring-blue-500"
rows={2}
disabled={isLoading}
/>
<button
onClick={sendMessage}
disabled={isLoading || !input.trim()}
className="px-6 py-2 bg-blue-600 text-white rounded-lg hover:bg-blue-700 disabled:opacity-50 disabled:cursor-not-allowed self-end"
>
Send
</button>
</div>
</div>
);
};

export default ChatInterface;

3.9 Deployment Script

Create a deployment script (deploy.sh):

bash
#!/bin/bash

# AI Commercial Real Estate Assistant - Deployment Script

echo "🚀 Starting deployment of AI CRE Assistant..."

# Check prerequisites
echo "📋 Checking prerequisites..."
command -v python3 >/dev/null 2>&1 || { echo "Python3 is required but not installed. Aborting." >&2; exit 1; }
command -v aws >/dev/null 2>&1 || { echo "AWS CLI is required but not installed. Aborting." >&2; exit 1; }
command -v docker >/dev/null 2>&1 || { echo "Docker is required but not installed. Aborting." >&2; exit 1; }

# Load environment variables
if [ -f .env ]; then
echo "📝 Loading environment variables..."
export $(cat .env | grep -v '^#' | xargs)
else
echo "❌ .env file not found. Please create one from .env.example"
exit 1
fi

# Install dependencies
echo "📦 Installing Python dependencies..."
pip install -r requirements.txt

# Deploy OpenSearch (if using Docker)
echo "🔄 Starting OpenSearch..."
docker run -d \
--name opensearch \
-p 9200:9200 \
-p 9600:9600 \
-e "discovery.type=single-node" \
-e "OPENSEARCH_INITIAL_ADMIN_PASSWORD=Admin123!" \
opensearchproject/opensearch:latest

# Wait for OpenSearch to be ready
echo "⏳ Waiting for OpenSearch to be ready..."
sleep 30

# Create OpenSearch index
echo "📊 Creating OpenSearch index..."
curl -X PUT "https://localhost:9200/cre_documents" \
-H "Content-Type: application/json" \
-d '{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"knn": true
}
},
"mappings": {
"properties": {
"vector": {
"type": "knn_vector",
"dimension": 1024
},
"text": {
"type": "text"
},
"metadata": {
"type": "object",
"enabled": true
}
}
}
}' --insecure -u "admin:Admin123!" || echo "⚠️ Index creation failed (may already exist)"

# Initialize database (if using PostgreSQL)
if [ ! -z "$DB_HOST" ]; then
echo "🗄️ Initializing database..."
python -c "
import psycopg2
conn = psycopg2.connect(host='$DB_HOST', database='postgres', user='$DB_USER', password='$DB_PASSWORD')
conn.autocommit = True
cur = conn.cursor()
cur.execute('CREATE DATABASE IF NOT EXISTS $DB_NAME')
conn.close()
"
fi

# Start the FastAPI application
echo "🚀 Starting FastAPI application..."
nohup python main.py > app.log 2>&1 &

echo "✅ Deployment complete!"
echo "🌐 API available at: http://localhost:8000"
echo "📚 API Documentation: http://localhost:8000/docs"
echo "📄 Logs: tail -f app.log"

3.10 Running the Application

bash
# 1. Clone or create the project
mkdir cre-ai-assistant
cd cre-ai-assistant

# 2. Set up environment
python3 -m venv .venv
source .venv/bin/activate

# 3. Install dependencies
pip install -r requirements.txt

# 4. Configure environment
cp .env.example .env
# Edit .env with your AWS credentials and configuration

# 5. Start OpenSearch (Docker)
docker run -d \
--name opensearch \
-p 9200:9200 \
-p 9600:9600 \
-e "discovery.type=single-node" \
-e "OPENSEARCH_INITIAL_ADMIN_PASSWORD=Admin123!" \
opensearchproject/opensearch:latest

# 6. Ingest sample documents
python -c "
from rag_engine import RAGEngine
engine = RAGEngine()
# Add lease documents
leases = [
{
'property_name': 'Downtown Tower',
'tenant_name': 'TechCorp Inc.',
'start_date': '2023-01-01',
'end_date': '2028-12-31',
'monthly_rent': 45000,
'square_feet': 15000,
'lease_type': 'NNN',
'renewal_options': 'Two 5-year options'
}
]
engine.ingest_lease_documents(leases)
print('Documents ingested successfully')
"

# 7. Start the application
python main.py

# 8. Test the API
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"question": "What is the total monthly rent for all properties?", "session_id": "test"}'

Tools Required

CategoryToolPurpose
Cloud PlatformAWS (Bedrock, OpenSearch, RDS, DynamoDB)Infrastructure and AI orchestration
Foundation ModelsAmazon Nova ProSQL query generation
Claude HaikuDocument interactions
Titan EmbeddingsVector embeddings
DevelopmentPython 3.9+Core implementation
FastAPIBackend API
LangChainLLM orchestration
React/Next.jsFrontend
Vector DatabaseOpenSearchVector storage and search
QdrantAlternative vector DB
DatabasePostgreSQLStructured data storage
SecurityAWS IAMAccess management
RedisPermission caching


Posting Komentar untuk "Designing an AI Commercial Real Estate Assistant: A Comprehensive Guide with Implementation Scripts"