The commercial real estate (CRE) industry is undergoing a fundamental transformation driven by artificial intelligence. What was once a sector defined by manual processes, fragmented data systems, and intuition-based decision-making is rapidly evolving into a data-driven ecosystem where AI-powered assistants are becoming indispensable tools.
CBRE, the world's largest commercial real estate services firm, faced challenges with fragmented property data scattered across 10 distinct sources and four separate databases, forcing property management professionals to manually search through millions of documents and switch between multiple systems. To address this, CBRE partnered with AWS to build a next-generation unified search and digital assistant experience within their PULSE system, achieving a 67% reduction in SQL query generation time, 80% improvement in database query performance, and 95% accuracy in search results.
Similarly, JLL developed JLL Property Assistant built on JLL Falcon, an AI platform specifically designed for the real estate industry. The tool integrates with JLL's Acumen platform, bringing together financial data from systems like Yardi and MRI, operational data from Prism by Building Engines, and other property technology tools. Through a natural language chat interface, property and asset managers can ask freeform questions about their buildings.
This guide provides a comprehensive, code-focused framework for designing and implementing an AI Commercial Real Estate Assistant, with complete, runnable Python scripts for each component.
The Core Problems
1.1 Data Fragmentation
CBRE's experience exemplifies the industry-wide challenge: property management professionals had to navigate 10 distinct data sources and four separate databases containing both structured transactional data and unstructured documents (lease agreements, property inspections, maintenance records). This fragmentation created significant productivity losses and made it difficult to derive comprehensive insights about property operations.
1.2 The Expertise Bottleneck
Experts in property management, not database syntax, needed to ask complex questions in natural language, quickly synthesize disparate information, and avoid manual review of lengthy documents. AI assistants remove this bottleneck, enabling professionals to focus on strategic activities rather than data retrieval.
1.3 Limited Insight
When data is scattered, professionals cannot easily identify emerging tenant trends, analyze expense patterns at scale, or access financial performance metrics in real-time. JLL Property Assistant addresses this by generating tenancy reports, auto-generating stacking plans, analyzing expense trends, and uncovering tenant retention and occupancy insights.
Solution Architecture
2.1 Reference Architecture (CBRE Model)
CBRE's production architecture leverages Amazon Bedrock as the central AI orchestration layer, providing access to multiple foundation models through a single API. The architecture uses:
Amazon OpenSearch Service for indexing and searching
Amazon RDS (PostgreSQL) for structured transactional data
Amazon DynamoDB for conversation history
The architecture is organized around two primary interaction pathways: SQL Interact for structured data and DocInteract for unstructured documents.
2.2 Why RAG
Retrieval-Augmented Generation (RAG) allows an AI tool to pull current market data, specific lease clauses, or underwriting methodologies from a curated knowledge base before generating a response, significantly reducing inaccuracies and keeping outputs grounded in real, up-to-date information.
Complete Implementation Scripts
3.1 Environment Setup
Create a virtual environment and install dependencies:
# Create and activate virtual environmentpython3 -m venv .venvsource .venv/bin/activate # On Windows: .venv\Scripts\activate# Create requirements.txtcat > requirements.txt << 'EOF'fastapi==0.115.6uvicorn==0.34.0langchain==0.3.13langchain-community==0.3.13langchain-aws==0.2.6langchain-openai==0.3.0boto3==1.35.76pypdf==5.1.0python-dotenv==1.0.1qdrant-client==1.12.1sentence-transformers==3.3.1opensearch-py==2.7.1requests-aws4auth==1.3.1pydantic==2.10.4EOF# Install dependenciespip install -r requirements.txt
Create environment configuration:
cat > .env << 'EOF'# AWS ConfigurationAWS_REGION=us-east-1AWS_ACCESS_KEY_ID=your_access_keyAWS_SECRET_ACCESS_KEY=your_secret_key# Model ConfigurationBEDROCK_SQL_MODEL=amazon.nova-pro-v1:0BEDROCK_DOC_MODEL=anthropic.claude-3-haiku-20240307-v1:0BEDROCK_EMBED_MODEL=amazon.titan-embed-text-v2:0# Database Configuration DB_HOST=localhost DB_NAME=cre_portfolio DB_USER=postgres DB_PASSWORD=your_password # OpenSearch Configuration OPENSEARCH_HOST=your-opensearch-host OPENSEARCH_PORT=443 OPENSEARCH_INDEX=cre_documents # Qdrant Configuration (if using Qdrant instead of OpenSearch) QDRANT_HOST=localhost QDRANT_PORT=6333 COLLECTION_NAME=cre_knowledge_base # Application Settings CHUNK_SIZE=500 CHUNK_OVERLAP=50 TOP_K=3EOF
3.2 Data Models
Create the data models (models.py):
from typing import Optional, List, Dict, Anyfrom pydantic import BaseModelfrom datetime import datetimefrom enum import Enumclass QueryType(str, Enum):STRUCTURED = "structured"UNSTRUCTURED = "unstructured"HYBRID = "hybrid"class QueryRequest(BaseModel):question: strsession_id: Optional[str] = "default"user_id: Optional[str] = Nonequery_type: Optional[QueryType] = QueryType.HYBRIDclass QueryResponse(BaseModel):answer: strsources: List[Dict[str, Any]]query_type: strexecution_time_ms: floattoken_usage: Optional[Dict[str, int]] = Noneclass ConversationHistory(BaseModel):session_id: strmessages: List[Dict[str, str]]created_at: datetimeupdated_at: datetimeclass PropertyData(BaseModel):property_id: strproperty_name: straddress: strcity: strstate: strproperty_type: str # office, retail, industrial, multifamilysquare_feet: floatoccupancy_rate: floatnoi: float # Net Operating Incomevacancy_risk: Optional[float] = Nonelease_expirations: List[Dict[str, Any]]
3.3 RAG Implementation with OpenSearch
Create the RAG engine (rag_engine.py):
import osimport boto3import jsonfrom typing import List, Dict, Any, Optionalfrom langchain_aws import BedrockEmbeddings, ChatBedrockfrom langchain_community.vectorstores import OpenSearchVectorSearchfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain.chains import RetrievalQAfrom langchain.prompts import PromptTemplatefrom opensearchpy import OpenSearch, RequestsHttpConnectionfrom requests_aws4auth import AWS4Authimport pypdfclass RAGEngine:def __init__(self):self.region = os.getenv("AWS_REGION", "us-east-1")self.embed_model_id = os.getenv("BEDROCK_EMBED_MODEL", "amazon.titan-embed-text-v2:0")self.doc_model_id = os.getenv("BEDROCK_DOC_MODEL", "anthropic.claude-3-haiku-20240307-v1:0")# Initialize Bedrock clientself.bedrock_runtime = boto3.client(service_name='bedrock-runtime',region_name=self.region)# Initialize embeddingsself.embeddings = BedrockEmbeddings(model_id=self.embed_model_id,client=self.bedrock_runtime)# Initialize LLM for document Q&Aself.llm = ChatBedrock(model_id=self.doc_model_id,client=self.bedrock_runtime,model_kwargs={"temperature": 0.1, "max_tokens": 500})# Initialize OpenSearch connectionself._init_opensearch()# Initialize text splitterself.text_splitter = RecursiveCharacterTextSplitter(chunk_size=int(os.getenv("CHUNK_SIZE", 500)),chunk_overlap=int(os.getenv("CHUNK_OVERLAP", 50)),separators=["\n\n", "\n", ". ", " ", ""])# Initialize prompt templateself.prompt_template = PromptTemplate(template="""You are a commercial real estate assistant. Use the following context to answer the question.The context includes property data, lease agreements, and market information.Context:{context}Question: {question}Provide a clear, concise answer based only on the context provided.If the context does not contain the answer, say "I don't have enough information to answer this question."Answer:""",input_variables=["context", "question"])def _init_opensearch(self):"""Initialize OpenSearch connection with AWS Signature V4 authentication"""host = os.getenv("OPENSEARCH_HOST")port = int(os.getenv("OPENSEARCH_PORT", 443))index_name = os.getenv("OPENSEARCH_INDEX", "cre_documents")# AWS Signature V4 authentication for OpenSearch Serverlessservice = 'aoss'credentials = boto3.Session().get_credentials()awsauth = AWS4Auth(credentials.access_key,credentials.secret_key,self.region,service,session_token=credentials.token)self.opensearch_client = OpenSearch(hosts=[{'host': host, 'port': port}],http_auth=awsauth,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection,timeout=300)self.vector_store = OpenSearchVectorSearch(opensearch_url=f"https://{host}:{port}",index_name=index_name,embedding_function=self.embeddings,http_auth=awsauth,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection)def ingest_document(self, file_path: str, metadata: Dict[str, Any] = None) -> int:"""Ingest a document into the vector store.Args:file_path: Path to the document (PDF, TXT, etc.)metadata: Optional metadata for the documentReturns:Number of chunks ingested"""if metadata is None:metadata = {}# Extract text from PDFif file_path.endswith('.pdf'):reader = pypdf.PdfReader(file_path)text = ""for page in reader.pages:text += page.extract_text() + "\n"else:with open(file_path, 'r') as f:text = f.read()# Split text into chunkschunks = self.text_splitter.split_text(text)# Prepare documents with metadatadocuments = []for i, chunk in enumerate(chunks):doc_metadata = metadata.copy()doc_metadata['chunk_index'] = idoc_metadata['source'] = file_pathdoc_metadata['total_chunks'] = len(chunks)documents.append((chunk, doc_metadata))# Add to vector storeself.vector_store.add_texts(texts=[doc[0] for doc in documents],metadatas=[doc[1] for doc in documents])return len(documents)def ingest_lease_documents(self, lease_data: List[Dict[str, Any]]) -> int:"""Ingest structured lease data as documents.Args:lease_data: List of lease dictionariesReturns:Number of documents ingested"""total = 0for lease in lease_data:# Format lease as textlease_text = f"""Lease AgreementProperty: {lease.get('property_name', 'Unknown')}Tenant: {lease.get('tenant_name', 'Unknown')}Start Date: {lease.get('start_date', 'Unknown')}End Date: {lease.get('end_date', 'Unknown')}Rent: ${lease.get('monthly_rent', 0):,.2f} per monthSquare Feet: {lease.get('square_feet', 0):,}Lease Type: {lease.get('lease_type', 'Unknown')}Renewal Options: {lease.get('renewal_options', 'None')}"""metadata = {'property_id': lease.get('property_id'),'tenant_id': lease.get('tenant_id'),'document_type': 'lease','property_name': lease.get('property_name')}# Add to vector storeself.vector_store.add_texts(texts=[lease_text],metadatas=[metadata])total += 1return totaldef query(self, question: str, top_k: int = 3) -> Dict[str, Any]:"""Query the RAG system.Args:question: User questiontop_k: Number of documents to retrieveReturns:Dictionary with answer and sources"""# Retrieve relevant documentsdocs = self.vector_store.similarity_search(question, k=top_k)# Prepare contextcontext = "\n\n".join([doc.page_content for doc in docs])# Format promptprompt = self.prompt_template.format(context=context, question=question)# Generate responseresponse = self.llm.invoke(prompt)# Extract sourcessources = []for doc in docs:sources.append({'content': doc.page_content[:200] + "...",'metadata': doc.metadata})return {'answer': response.content,'sources': sources,'retrieved_count': len(docs)}
3.4 Text-to-SQL Implementation
Create the SQL generation engine (sql_engine.py):
import osimport boto3import jsonimport psycopg2from typing import List, Dict, Any, Optionalclass TextToSQLEngine:"""Text-to-SQL engine using Amazon Nova Pro via Bedrock.Based on AWS sample implementation for data consistency in generative AI."""def __init__(self):self.region = os.getenv("AWS_REGION", "us-east-1")self.sql_model_id = os.getenv("BEDROCK_SQL_MODEL", "amazon.nova-pro-v1:0")# Initialize Bedrock clientself.bedrock_runtime = boto3.client(service_name='bedrock-runtime',region_name=self.region)# Database connectionself.db_host = os.getenv("DB_HOST", "localhost")self.db_name = os.getenv("DB_NAME", "cre_portfolio")self.db_user = os.getenv("DB_USER", "postgres")self.db_password = os.getenv("DB_PASSWORD", "")# Cache schemaself.schema_cache = Nonedef get_database_schema(self) -> str:"""Retrieve database schema dynamically."""if self.schema_cache:return self.schema_cacheconn = psycopg2.connect(host=self.db_host,database=self.db_name,user=self.db_user,password=self.db_password)schema_parts = []cursor = conn.cursor()# Get tablescursor.execute("""SELECT table_nameFROM information_schema.tablesWHERE table_schema = 'public'""")tables = cursor.fetchall()for table in tables:table_name = table[0]cursor.execute(f"""SELECT column_name, data_type, is_nullableFROM information_schema.columnsWHERE table_name = '{table_name}'ORDER BY ordinal_position""")columns = cursor.fetchall()schema_parts.append(f"Table: {table_name}")for col in columns:schema_parts.append(f" - {col[0]} ({col[1]}){' NULL' if col[2] == 'YES' else ' NOT NULL'}")schema_parts.append("")conn.close()self.schema_cache = "\n".join(schema_parts)return self.schema_cachedef generate_sql(self, natural_language_query: str) -> Dict[str, Any]:"""Generate SQL from natural language using Amazon Nova Pro.Args:natural_language_query: User question in natural languageReturns:Dictionary with SQL query and explanation"""schema = self.get_database_schema()# System prompt for SQL generationsystem_prompt = f"""You are an expert SQL query generator for a commercial real estate database.Generate precise, deterministic SQL queries based on the user's natural language question.Database Schema:{schema}Rules:1. Always use the exact table and column names from the schema2. Include appropriate WHERE clauses for filtering3. Use proper JOINs when data spans multiple tables4. Use aggregate functions (COUNT, SUM, AVG) when appropriate5. Return only the SQL query, no additional text6. Do not use SELECT *"""# Format messages for Converse APImessages = [{"role": "user","content": [{"text": f"Convert this to SQL: {natural_language_query}"}]}]# Call Bedrock with Nova Proresponse = self.bedrock_runtime.converse(modelId=self.sql_model_id,messages=messages,system=[{"text": system_prompt}],inferenceConfig={"temperature": 0.0, # Deterministic output"maxTokens": 500})sql_query = response['output']['message']['content'][0]['text']# Clean up the SQL (remove markdown code blocks if present)sql_query = sql_query.replace("```sql", "").replace("```", "").strip()return {'sql': sql_query,'tokens_used': response.get('usage', {}).get('totalTokens', 0)}def execute_sql(self, sql_query: str) -> List[Dict[str, Any]]:"""Execute SQL query and return results.Args:sql_query: SQL query stringReturns:List of result rows as dictionaries"""conn = psycopg2.connect(host=self.db_host,database=self.db_name,user=self.db_user,password=self.db_password)cursor = conn.cursor()cursor.execute(sql_query)# Get column namescolumns = [desc[0] for desc in cursor.description] if cursor.description else []# Fetch resultsrows = cursor.fetchall()# Convert to list of dictionariesresults = []for row in rows:results.append(dict(zip(columns, row)))conn.close()return resultsdef query(self, natural_language_query: str) -> Dict[str, Any]:"""Complete pipeline: natural language → SQL → execution → results.Args:natural_language_query: User questionReturns:Dictionary with SQL, results, and formatted answer"""# Generate SQLsql_result = self.generate_sql(natural_language_query)sql_query = sql_result['sql']# Execute SQLtry:results = self.execute_sql(sql_query)# Format results for responseif results:# Use Nova Lite to format resultsformatting_prompt = f"""Format the following query results as a clear, concise answer.Query: {natural_language_query}SQL: {sql_query}Results: {json.dumps(results, indent=2, default=str)}Provide a professional summary of the results."""# Use Nova Lite for formattingformat_response = self.bedrock_runtime.converse(modelId="amazon.nova-lite-v1:0",messages=[{"role": "user", "content": [{"text": formatting_prompt}]}],inferenceConfig={"temperature": 0.1, "maxTokens": 300})formatted_answer = format_response['output']['message']['content'][0]['text']else:formatted_answer = "No results found for your query."return {'sql': sql_query,'results': results,'answer': formatted_answer,'row_count': len(results)}except Exception as e:return {'sql': sql_query,'error': str(e),'answer': f"An error occurred while executing the query: {str(e)}"}
3.5 Orchestration Layer
Create the orchestration engine (orchestrator.py):
import osimport timeimport jsonfrom typing import Dict, Any, Optionalfrom datetime import datetimeimport boto3from dynamodb_utils import ConversationManagerclass Orchestrator:"""Central orchestration layer for the AI Commercial Real Estate Assistant.Routes queries to appropriate engines based on intent."""def __init__(self):self.rag_engine = Noneself.sql_engine = Noneself.conversation_manager = ConversationManager()# Initialize engines lazilyself._init_engines()def _init_engines(self):"""Initialize RAG and SQL engines."""try:from rag_engine import RAGEngineself.rag_engine = RAGEngine()except Exception as e:print(f"Warning: RAG engine initialization failed: {e}")try:from sql_engine import TextToSQLEngineself.sql_engine = TextToSQLEngine()except Exception as e:print(f"Warning: SQL engine initialization failed: {e}")def detect_query_type(self, question: str) -> str:"""Detect whether the query needs structured or unstructured data."""# Keywords suggesting structured data queriesstructured_keywords = ['how many', 'count', 'total', 'sum', 'average', 'list','properties with', 'tenants', 'vacancy', 'occupancy','rent', 'lease', 'noi', 'expense', 'revenue']question_lower = question.lower()for keyword in structured_keywords:if keyword in question_lower:return 'structured'# Keywords suggesting unstructured document queriesunstructured_keywords = ['explain', 'describe', 'tell me about', 'what is','why', 'how does', 'definition', 'meaning']for keyword in unstructured_keywords:if keyword in question_lower:return 'unstructured'return 'hybrid'def route_query(self, question: str, query_type: str = None) -> Dict[str, Any]:"""Route query to appropriate engine.Args:question: User questionquery_type: Optional forced query typeReturns:Response dictionary"""start_time = time.time()if query_type is None:query_type = self.detect_query_type(question)response = {'question': question,'query_type': query_type,'execution_time_ms': 0,'answer': '','sources': []}if query_type == 'structured' and self.sql_engine:# Use Text-to-SQLresult = self.sql_engine.query(question)response['answer'] = result.get('answer', '')response['sql'] = result.get('sql')response['row_count'] = result.get('row_count', 0)elif query_type == 'unstructured' and self.rag_engine:# Use RAGresult = self.rag_engine.query(question)response['answer'] = result.get('answer', '')response['sources'] = result.get('sources', [])else: # hybrid# Try both and combine resultsstructured_result = Noneunstructured_result = Noneif self.sql_engine:try:structured_result = self.sql_engine.query(question)except Exception as e:passif self.rag_engine:try:unstructured_result = self.rag_engine.query(question)except Exception as e:pass# Combine resultsif structured_result and structured_result.get('answer'):response['answer'] = structured_result.get('answer')response['sql'] = structured_result.get('sql')response['sources'] = []elif unstructured_result and unstructured_result.get('answer'):response['answer'] = unstructured_result.get('answer')response['sources'] = unstructured_result.get('sources', [])else:response['answer'] = "I couldn't find an answer to your question. Please rephrase or provide more specific information."response['execution_time_ms'] = (time.time() - start_time) * 1000return responsedef chat(self, question: str, session_id: str = None) -> Dict[str, Any]:"""Full chat interface with conversation history.Args:question: User questionsession_id: Optional session identifierReturns:Complete response with history"""if session_id is None:session_id = f"session_{datetime.now().timestamp()}"# Get conversation historyhistory = self.conversation_manager.get_history(session_id)# Add context from history to questionif history and len(history) > 0:# Simple context: last 3 exchangescontext = "\n".join([f"{msg['role']}: {msg['content']}"for msg in history[-6:] # Last 6 messages = 3 exchanges])enhanced_question = f"Context from previous conversation:\n{context}\n\nCurrent question: {question}"else:enhanced_question = question# Get responseresponse = self.route_query(enhanced_question)# Store conversationself.conversation_manager.add_message(session_id, 'user', question)self.conversation_manager.add_message(session_id, 'assistant', response['answer'])return {'session_id': session_id,'question': question,'answer': response['answer'],'query_type': response.get('query_type'),'execution_time_ms': response.get('execution_time_ms'),'sources': response.get('sources', [])}
3.6 Conversation Management with DynamoDB
Create the conversation manager (dynamodb_utils.py):
import osimport boto3import jsonfrom typing import List, Dict, Any, Optionalfrom datetime import datetimefrom decimal import Decimalclass DateTimeEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime):return obj.isoformat()if isinstance(obj, Decimal):return float(obj)return super().default(obj)class ConversationManager:"""Manages conversation history using Amazon DynamoDB."""def __init__(self):self.table_name = os.getenv("DYNAMODB_TABLE", "cre_conversations")self.dynamodb = boto3.resource('dynamodb')self.table = self.dynamodb.Table(self.table_name)# Create table if it doesn't existself._create_table_if_not_exists()def _create_table_if_not_exists(self):"""Create DynamoDB table if it doesn't exist."""try:self.table.table_statusexcept self.dynamodb.meta.client.exceptions.ResourceNotFoundException:# Create tableself.dynamodb.create_table(TableName=self.table_name,KeySchema=[{'AttributeName': 'session_id', 'KeyType': 'HASH'},{'AttributeName': 'timestamp', 'KeyType': 'RANGE'}],AttributeDefinitions=[{'AttributeName': 'session_id', 'AttributeType': 'S'},{'AttributeName': 'timestamp', 'AttributeType': 'S'}],BillingMode='PAY_PER_REQUEST')# Wait for table to be createdself.table.meta.client.get_waiter('table_exists').wait(TableName=self.table_name)def get_history(self, session_id: str, limit: int = 50) -> List[Dict[str, str]]:"""Get conversation history for a session.Args:session_id: Session identifierlimit: Maximum number of messages to retrieveReturns:List of messages with 'role' and 'content'"""try:response = self.table.query(KeyConditionExpression='session_id = :sid',ExpressionAttributeValues={':sid': session_id},Limit=limit,ScanIndexForward=True # Oldest first)messages = []for item in response.get('Items', []):messages.append({'role': item.get('role', 'unknown'),'content': item.get('content', ''),'timestamp': item.get('timestamp', '')})return messagesexcept Exception as e:print(f"Error retrieving history: {e}")return []def add_message(self, session_id: str, role: str, content: str) -> bool:"""Add a message to conversation history.Args:session_id: Session identifierrole: 'user' or 'assistant'content: Message contentReturns:True if successful, False otherwise"""try:timestamp = datetime.now().isoformat()self.table.put_item(Item={'session_id': session_id,'timestamp': timestamp,'role': role,'content': content,'ttl': int(datetime.now().timestamp()) + 86400 * 30 # 30 days TTL})return Trueexcept Exception as e:print(f"Error adding message: {e}")return Falsedef delete_session(self, session_id: str) -> bool:"""Delete all messages for a session.Args:session_id: Session identifierReturns:True if successful"""try:# Query all items for the sessionresponse = self.table.query(KeyConditionExpression='session_id = :sid',ExpressionAttributeValues={':sid': session_id})# Delete each itemwith self.table.batch_writer() as batch:for item in response.get('Items', []):batch.delete_item(Key={'session_id': item['session_id'],'timestamp': item['timestamp']})return Trueexcept Exception as e:print(f"Error deleting session: {e}")return False
3.7 FastAPI Application
Create the main application (main.py):
from fastapi import FastAPI, HTTPException, Requestfrom fastapi.middleware.cors import CORSMiddlewarefrom fastapi.responses import JSONResponse, StreamingResponsefrom pydantic import BaseModelfrom typing import Optional, Dict, Anyimport uvicornimport jsonimport asynciofrom datetime import datetimefrom models import QueryRequest, QueryResponse, ConversationHistoryfrom orchestrator import Orchestrator# Initialize FastAPI appapp = FastAPI(title="AI Commercial Real Estate Assistant API",description="Enterprise-grade AI assistant for commercial real estate professionals",version="1.0.0")# Configure CORSapp.add_middleware(CORSMiddleware,allow_origins=["*"], # Configure appropriately for productionallow_credentials=True,allow_methods=["*"],allow_headers=["*"],)# Initialize orchestratororchestrator = Orchestrator()# Health check endpoint@app.get("/health")async def health_check():return {"status": "healthy","timestamp": datetime.now().isoformat(),"services": {"orchestrator": "ready","rag_engine": "ready" if orchestrator.rag_engine else "unavailable","sql_engine": "ready" if orchestrator.sql_engine else "unavailable"}}# Main chat endpoint@app.post("/chat")async def chat(request: QueryRequest) -> Dict[str, Any]:"""Send a message to the AI assistant.Args:request: QueryRequest with question and session_idReturns:Response with answer and metadata"""try:response = orchestrator.chat(question=request.question,session_id=request.session_id)return responseexcept Exception as e:raise HTTPException(status_code=500, detail=str(e))# Streaming chat endpoint@app.post("/chat/stream")async def chat_stream(request: QueryRequest):"""Stream the AI assistant's response token by token."""async def generate():try:# Get the full responseresponse = orchestrator.chat(question=request.question,session_id=request.session_id)# Stream the answer token by tokenanswer = response.get('answer', '')for word in answer.split():yield json.dumps({"type": "token","content": word + " "}) + "\n"await asyncio.sleep(0.01) # Simulate streaming# Send completionyield json.dumps({"type": "complete","metadata": {"session_id": response.get('session_id'),"query_type": response.get('query_type'),"execution_time_ms": response.get('execution_time_ms')}}) + "\n"except Exception as e:yield json.dumps({"type": "error","content": str(e)}) + "\n"return StreamingResponse(generate(), media_type="application/x-ndjson")# Get conversation history@app.get("/conversations/{session_id}")async def get_conversation(session_id: str) -> Dict[str, Any]:"""Get conversation history for a session."""history = orchestrator.conversation_manager.get_history(session_id)return {"session_id": session_id,"messages": history,"count": len(history)}# Delete conversation@app.delete("/conversations/{session_id}")async def delete_conversation(session_id: str) -> Dict[str, Any]:"""Delete a conversation session."""success = orchestrator.conversation_manager.delete_session(session_id)return {"session_id": session_id,"deleted": success}# Ingest document@app.post("/ingest/document")async def ingest_document(request: Request):"""Ingest a document into the knowledge base."""if not orchestrator.rag_engine:raise HTTPException(status_code=503, detail="RAG engine not available")data = await request.json()file_path = data.get('file_path')metadata = data.get('metadata', {})if not file_path:raise HTTPException(status_code=400, detail="file_path required")try:chunks = orchestrator.rag_engine.ingest_document(file_path, metadata)return {"status": "success","file_path": file_path,"chunks_ingested": chunks}except Exception as e:raise HTTPException(status_code=500, detail=str(e))# Ingest lease data@app.post("/ingest/leases")async def ingest_leases(request: Request):"""Ingest lease data into the knowledge base."""if not orchestrator.rag_engine:raise HTTPException(status_code=503, detail="RAG engine not available")data = await request.json()leases = data.get('leases', [])if not leases:raise HTTPException(status_code=400, detail="leases array required")try:count = orchestrator.rag_engine.ingest_lease_documents(leases)return {"status": "success","leases_ingested": count}except Exception as e:raise HTTPException(status_code=500, detail=str(e))# Get database schema (for debugging)@app.get("/schema")async def get_schema():"""Get the database schema used by the SQL engine."""if not orchestrator.sql_engine:raise HTTPException(status_code=503, detail="SQL engine not available")try:schema = orchestrator.sql_engine.get_database_schema()return {"schema": schema}except Exception as e:raise HTTPException(status_code=500, detail=str(e))if __name__ == "__main__":uvicorn.run("main:app",host="0.0.0.0",port=8000,reload=True)
3.8 Frontend Integration (React/Next.js)
Create a simple frontend component (ChatInterface.tsx):
import React, { useState, useRef, useEffect } from 'react';interface Message {id: string;role: 'user' | 'assistant';content: string;timestamp: Date;}interface ChatResponse {session_id: string;question: string;answer: string;query_type?: string;execution_time_ms?: number;sources?: Array<{ content: string; metadata: any }>;}const ChatInterface: React.FC = () => {const [messages, setMessages] = useState<Message[]>([]);const [input, setInput] = useState('');const [isLoading, setIsLoading] = useState(false);const [sessionId, setSessionId] = useState<string>(`session_${Date.now()}`);const messagesEndRef = useRef<HTMLDivElement>(null);const scrollToBottom = () => {messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });};useEffect(() => {scrollToBottom();}, [messages]);const sendMessage = async () => {if (!input.trim() || isLoading) return;const userMessage: Message = {id: Date.now().toString(),role: 'user',content: input,timestamp: new Date(),};setMessages((prev) => [...prev, userMessage]);setInput('');setIsLoading(true);try {const response = await fetch('http://localhost:8000/chat', {method: 'POST',headers: {'Content-Type': 'application/json',},body: JSON.stringify({question: input,session_id: sessionId,}),});if (!response.ok) {throw new Error(`HTTP error! status: ${response.status}`);}const data: ChatResponse = await response.json();const assistantMessage: Message = {id: (Date.now() + 1).toString(),role: 'assistant',content: data.answer,timestamp: new Date(),};setMessages((prev) => [...prev, assistantMessage]);// Update session ID if providedif (data.session_id) {setSessionId(data.session_id);}} catch (error) {console.error('Error sending message:', error);const errorMessage: Message = {id: (Date.now() + 1).toString(),role: 'assistant',content: 'Sorry, an error occurred. Please try again.',timestamp: new Date(),};setMessages((prev) => [...prev, errorMessage]);} finally {setIsLoading(false);}};const handleKeyPress = (e: React.KeyboardEvent) => {if (e.key === 'Enter' && !e.shiftKey) {e.preventDefault();sendMessage();}};const clearConversation = async () => {try {await fetch(`http://localhost:8000/conversations/${sessionId}`, {method: 'DELETE',});setMessages([]);setSessionId(`session_${Date.now()}`);} catch (error) {console.error('Error clearing conversation:', error);}};return (<div className="flex flex-col h-screen max-w-4xl mx-auto p-4">{/* Header */}<div className="flex justify-between items-center mb-4 pb-4 border-b"><h1 className="text-2xl font-bold text-gray-800">🏢 CRE AI Assistant</h1><buttononClick={clearConversation}className="px-4 py-2 text-sm text-red-600 border border-red-300 rounded hover:bg-red-50">Clear Chat</button></div>{/* Messages */}<div className="flex-1 overflow-y-auto mb-4 space-y-4">{messages.length === 0 && (<div className="text-center text-gray-500 mt-20"><p className="text-lg">Ask me anything about your commercial real estate portfolio</p><p className="text-sm mt-2">Examples: "Show me properties with vacancy risk above 15%" or"What's our total NOI this quarter?"</p></div>)}{messages.map((message) => (<divkey={message.id}className={`flex ${message.role === 'user' ? 'justify-end' : 'justify-start'}`}><divclassName={`max-w-[80%] p-3 rounded-lg ${message.role === 'user'? 'bg-blue-600 text-white': 'bg-gray-100 text-gray-800'}`}><p className="whitespace-pre-wrap">{message.content}</p><span className="text-xs opacity-70 mt-1 block">{message.timestamp.toLocaleTimeString()}</span></div></div>))}{isLoading && (<div className="flex justify-start"><div className="bg-gray-100 p-3 rounded-lg"><span className="inline-block w-2 h-2 bg-gray-500 rounded-full animate-bounce mr-1"></span><span className="inline-block w-2 h-2 bg-gray-500 rounded-full animate-bounce mr-1" style={{ animationDelay: '0.2s' }}></span><span className="inline-block w-2 h-2 bg-gray-500 rounded-full animate-bounce" style={{ animationDelay: '0.4s' }}></span></div></div>)}<div ref={messagesEndRef} /></div>{/* Input */}<div className="flex gap-2 border-t pt-4"><textareavalue={input}onChange={(e) => setInput(e.target.value)}onKeyPress={handleKeyPress}placeholder="Ask about your properties, leases, or investments..."className="flex-1 p-2 border rounded-lg resize-none focus:outline-none focus:ring-2 focus:ring-blue-500"rows={2}disabled={isLoading}/><buttononClick={sendMessage}disabled={isLoading || !input.trim()}className="px-6 py-2 bg-blue-600 text-white rounded-lg hover:bg-blue-700 disabled:opacity-50 disabled:cursor-not-allowed self-end">Send</button></div></div>);};export default ChatInterface;
3.9 Deployment Script
Create a deployment script (deploy.sh):
#!/bin/bash# AI Commercial Real Estate Assistant - Deployment Scriptecho "🚀 Starting deployment of AI CRE Assistant..."# Check prerequisitesecho "📋 Checking prerequisites..."command -v python3 >/dev/null 2>&1 || { echo "Python3 is required but not installed. Aborting." >&2; exit 1; }command -v aws >/dev/null 2>&1 || { echo "AWS CLI is required but not installed. Aborting." >&2; exit 1; }command -v docker >/dev/null 2>&1 || { echo "Docker is required but not installed. Aborting." >&2; exit 1; }# Load environment variablesif [ -f .env ]; thenecho "📝 Loading environment variables..."export $(cat .env | grep -v '^#' | xargs)elseecho "❌ .env file not found. Please create one from .env.example"exit 1fi# Install dependenciesecho "📦 Installing Python dependencies..."pip install -r requirements.txt# Deploy OpenSearch (if using Docker)echo "🔄 Starting OpenSearch..."docker run -d \--name opensearch \-p 9200:9200 \-p 9600:9600 \-e "discovery.type=single-node" \-e "OPENSEARCH_INITIAL_ADMIN_PASSWORD=Admin123!" \opensearchproject/opensearch:latest# Wait for OpenSearch to be readyecho "⏳ Waiting for OpenSearch to be ready..."sleep 30# Create OpenSearch indexecho "📊 Creating OpenSearch index..."curl -X PUT "https://localhost:9200/cre_documents" \-H "Content-Type: application/json" \-d '{"settings": {"index": {"number_of_shards": 1,"number_of_replicas": 0,"knn": true}},"mappings": {"properties": {"vector": {"type": "knn_vector","dimension": 1024},"text": {"type": "text"},"metadata": {"type": "object","enabled": true}}}}' --insecure -u "admin:Admin123!" || echo "⚠️ Index creation failed (may already exist)"# Initialize database (if using PostgreSQL)if [ ! -z "$DB_HOST" ]; thenecho "🗄️ Initializing database..."python -c "import psycopg2conn = psycopg2.connect(host='$DB_HOST', database='postgres', user='$DB_USER', password='$DB_PASSWORD')conn.autocommit = Truecur = conn.cursor()cur.execute('CREATE DATABASE IF NOT EXISTS $DB_NAME')conn.close()"fi# Start the FastAPI applicationecho "🚀 Starting FastAPI application..."nohup python main.py > app.log 2>&1 &echo "✅ Deployment complete!"echo "🌐 API available at: http://localhost:8000"echo "📚 API Documentation: http://localhost:8000/docs"echo "📄 Logs: tail -f app.log"
3.10 Running the Application
# 1. Clone or create the projectmkdir cre-ai-assistantcd cre-ai-assistant# 2. Set up environmentpython3 -m venv .venvsource .venv/bin/activate# 3. Install dependenciespip install -r requirements.txt# 4. Configure environmentcp .env.example .env# Edit .env with your AWS credentials and configuration# 5. Start OpenSearch (Docker)docker run -d \--name opensearch \-p 9200:9200 \-p 9600:9600 \-e "discovery.type=single-node" \-e "OPENSEARCH_INITIAL_ADMIN_PASSWORD=Admin123!" \opensearchproject/opensearch:latest# 6. Ingest sample documentspython -c "from rag_engine import RAGEngineengine = RAGEngine()# Add lease documentsleases = [{'property_name': 'Downtown Tower','tenant_name': 'TechCorp Inc.','start_date': '2023-01-01','end_date': '2028-12-31','monthly_rent': 45000,'square_feet': 15000,'lease_type': 'NNN','renewal_options': 'Two 5-year options'}]engine.ingest_lease_documents(leases)print('Documents ingested successfully')"# 7. Start the applicationpython main.py# 8. Test the APIcurl -X POST http://localhost:8000/chat \-H "Content-Type: application/json" \-d '{"question": "What is the total monthly rent for all properties?", "session_id": "test"}'
Posting Komentar untuk "Designing an AI Commercial Real Estate Assistant: A Comprehensive Guide with Implementation Scripts"