Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 20 additions & 211 deletions delphi/delphi_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,225 +294,34 @@ def initialize_conversation_manager(self):
return stage.complete(False, error=str(e))

def load_conversation(self):
"""Load the conversation data from the database"""
stage = self.add_stage(f"Load Conversation {self.zid}")
"""Load conversation data from DynamoDB"""
stage = self.add_stage("Load Conversation")

try:
# Ensure we're using PostgreSQL for conversation data
from polismath.database.postgres import PostgresClient
# First try to load from DynamoDB
logger.info("Attempting to load conversation data from DynamoDB...")
self.conversation = self.conv_manager.load_conversation_from_dynamodb(self.zid)

# Load conversation from the PostgreSQL database
logger.info(f"Attempting to load conversation {self.zid} from PostgreSQL database")
try:
# Log database environment variables to help with debugging
logger.info(f"DATABASE_URL: {os.environ.get('DATABASE_URL', 'not set')}")
logger.info(f"Database name: {os.environ.get('DATABASE_NAME', 'not set')}")
logger.info(f"Database host: {os.environ.get('DATABASE_HOST', 'not set')}")

# Direct PostgreSQL check using SQLAlchemy
from sqlalchemy import create_engine, text

logger.info("Directly checking if conversation exists in PostgreSQL...")

# Use direct SQLAlchemy connection instead of PostgresClient
db_url = os.environ.get('DATABASE_URL', 'postgresql://colinmegill@host.docker.internal:5432/polisDB_prod_local_mar14')
logger.info(f"Using direct database URL: {db_url}")

# Try direct query first to see if the conversation exists
conversation_exists = False
try:
# Create engine directly
engine = create_engine(db_url)
with engine.connect() as conn:
# Test connection
test_result = conn.execute(text("SELECT 1")).scalar()
logger.info(f"PostgreSQL direct connection test: {test_result}")

# First check if conversation has votes (might exist in votes but not in conversations table)
try:
# Try to convert zid to integer for the query
zid_int = int(self.zid)
vote_count = conn.execute(
text("SELECT COUNT(*) FROM votes WHERE zid = :zid"),
{"zid": zid_int}
).scalar()
except ValueError:
logger.warning(f"ZID '{self.zid}' is not a valid integer, trying as string")
vote_count = conn.execute(
text("SELECT COUNT(*) FROM votes WHERE zid::text = :zid"),
{"zid": self.zid}
).scalar()

if vote_count > 0:
logger.info(f"Found {vote_count} votes for conversation {self.zid}")
conversation_exists = True

# Also check conversations table
# Ensure zid is treated as an integer
try:
# Try to convert zid to integer for the query
zid_int = int(self.zid)
zid_check = conn.execute(
text("SELECT COUNT(*) FROM conversations WHERE zid = :zid"),
{"zid": zid_int}
).scalar()
except ValueError:
logger.warning(f"ZID '{self.zid}' is not a valid integer, trying as string")
zid_check = conn.execute(
text("SELECT COUNT(*) FROM conversations WHERE zid::text = :zid"),
{"zid": self.zid}
).scalar()

if zid_check > 0:
logger.info(f"Conversation {self.zid} found in conversations table")
conversation_exists = True
except Exception as e:
logger.error(f"Error checking conversation in database: {e}")
# Continue anyway - let's try with the conversation manager
conversation_exists = True

# We know conversation 36416 exists and has votes, so always proceed
if self.zid == "36416":
logger.info("Using known conversation ID 36416")
conversation_exists = True

# Special handling for conversation 36416 - load more votes to ensure proper processing
logger.info("Using special handling for conversation 36416")

# Create a fresh connection to ensure it's not closed
try:
# Create a new engine and connection
fresh_engine = create_engine(db_url)
with fresh_engine.connect() as fresh_conn:
# Get vote count
vote_count = fresh_conn.execute(
text("SELECT COUNT(*) FROM votes WHERE zid = 36416")
).scalar()
logger.info(f"Found {vote_count} votes for conversation 36416")

# Get all participants
participants = fresh_conn.execute(
text("SELECT DISTINCT pid FROM votes WHERE zid = 36416")
).fetchall()
logger.info(f"Found {len(participants)} unique participants for conversation 36416")

# Get all comments
comments = fresh_conn.execute(
text("SELECT DISTINCT tid FROM votes WHERE zid = 36416")
).fetchall()
logger.info(f"Found {len(comments)} unique comments for conversation 36416")

# Get more votes (1000 instead of 100) to ensure proper processing
logger.info("Loading 1000 votes for conversation 36416")
self.votes_for_36416 = fresh_conn.execute(
text("SELECT pid, tid, vote, created FROM votes WHERE zid = 36416 LIMIT 1000")
).fetchall()
logger.info(f"Loaded {len(self.votes_for_36416)} votes for special handling of conversation 36416")
except Exception as e:
logger.warning(f"Could not get extended data for conversation 36416: {e}")

# Try to get the conversation through the manager
logger.info("Loading conversation through ConversationManager...")
self.conversation = self.conv_manager.get_conversation(self.zid)

if not self.conversation and conversation_exists:
logger.warning(f"Conversation {self.zid} exists in database but not loaded in manager - loading votes directly")

try:
# We need to create the conversation in the manager using votes from the database
logger.info("Loading votes directly from database for conversation creation")

# Get votes for this conversation directly from the database
with engine.connect() as conn:
# Test getting a sample of votes
try:
# Try with zid as integer
zid_int = int(self.zid)
sample_votes_query = text("""
SELECT pid, tid, vote, created
FROM votes
WHERE zid = :zid
LIMIT 100
""")

sample_votes = conn.execute(sample_votes_query, {"zid": zid_int}).fetchall()
except ValueError:
# Try with zid as string
logger.warning(f"ZID '{self.zid}' is not a valid integer for vote query, trying as string")
sample_votes_query = text("""
SELECT pid, tid, vote, created
FROM votes
WHERE zid::text = :zid
LIMIT 100
""")

sample_votes = conn.execute(sample_votes_query, {"zid": self.zid}).fetchall()

if sample_votes:
logger.info(f"Found {len(sample_votes)} sample votes, creating conversation")

# Format the votes for the conversation manager
if self.zid == "36416" and hasattr(self, 'votes_for_36416') and self.votes_for_36416:
# Use our special pre-loaded votes for 36416
logger.info(f"Using pre-loaded {len(self.votes_for_36416)} votes for conversation 36416")
votes_data = {
"votes": [
{"pid": str(v[0]), "tid": str(v[1]), "vote": v[2]}
for v in self.votes_for_36416
],
"lastVoteTimestamp": int(time.time() * 1000)
}
else:
# Use sample votes from the regular query
votes_data = {
"votes": [
{"pid": str(v[0]), "tid": str(v[1]), "vote": v[2]}
for v in sample_votes
],
"lastVoteTimestamp": int(time.time() * 1000)
}

# Create the conversation with initial votes
self.conversation = self.conv_manager.create_conversation(self.zid, votes_data)
logger.info(f"Created conversation {self.zid} with {len(votes_data['votes'])} votes")
else:
logger.error(f"No votes found for conversation {self.zid}")
return stage.complete(False, error="No votes found")
except Exception as e:
logger.error(f"Error creating conversation: {e}")
return stage.complete(False, error=f"Conversation creation error: {e}")
if not self.conversation:
logger.warning("No data found in DynamoDB, falling back to PostgreSQL...")
# Fall back to PostgreSQL only if DynamoDB has no data
self.conversation = self.conv_manager.load_conversation_from_postgres(self.zid)

if not self.conversation:
logger.error(f"Failed to load or create conversation {self.zid}")
return stage.complete(False, error="Conversation not found or created")
except Exception as e:
logger.error(f"Error during conversation loading: {e}")
logger.error("Make sure PostgreSQL is running and properly configured")
return stage.complete(False, error=f"Database error: {e}")

# Extract metrics
participant_count = self.conversation.participant_count
comment_count = self.conversation.comment_count
vote_count = sum(len(votes) for votes in self.conversation.votes_matrix.values()) if hasattr(self.conversation, 'votes_matrix') else 0

# Check if the conversation has enough data
if participant_count < 3:
logger.warning(f"Conversation has only {participant_count} participants - results may not be meaningful")

if comment_count < 5:
logger.warning(f"Conversation has only {comment_count} comments - results may not be meaningful")
raise ValueError(f"No data found for conversation {self.zid} in either DynamoDB or PostgreSQL")

# If we loaded from PostgreSQL, store in DynamoDB for future use
logger.info("Storing PostgreSQL data in DynamoDB for future use...")
self.conv_manager.store_conversation_in_dynamodb(self.conversation)

return stage.complete(
True,
participants=participant_count,
comments=comment_count,
votes=vote_count
)
logger.info(f"Successfully loaded conversation {self.zid}")
return stage.complete(True,
source="DynamoDB" if self.conversation else "PostgreSQL",
num_comments=len(self.conversation.comments),
num_votes=len(self.conversation.votes))

except Exception as e:
logger.error(f"Failed to load conversation: {e}")
import traceback as tb
tb.print_exc()
logger.error(f"Error loading conversation: {e}")
return stage.complete(False, error=str(e))

def run_math_processing(self):
Expand Down
61 changes: 45 additions & 16 deletions delphi/example.env
Original file line number Diff line number Diff line change
@@ -1,24 +1,53 @@
# Server configuration
HOST=localhost
PORT=8080
# Default WARN
LOG_LEVEL=INFO
MATH_ENV=dev
# ===== Data Storage Configuration =====
# DynamoDB is the preferred data store - set to true to enable
USE_DYNAMODB=true
PREFER_DYNAMODB=true
# Set to true to completely disable PostgreSQL lookups (will use placeholders for comment texts)
SKIP_POSTGRES_LOAD=false

# Database configuration
# ===== DynamoDB Configuration =====
# Endpoint URL for DynamoDB (local or AWS)
DYNAMODB_ENDPOINT=http://host.docker.internal:8000
AWS_REGION=us-west-2
# Only needed for production AWS access
# AWS_ACCESS_KEY_ID=your_access_key
# AWS_SECRET_ACCESS_KEY=your_secret_key

# ===== PostgreSQL Configuration =====
# Primary PostgreSQL variables (preferred)
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=polisDB_prod_local_mar14
POSTGRES_USER=postgres
POSTGRES_PASSWORD=

# Legacy PostgreSQL variables (fallback)
DATABASE_HOST=localhost
DATABASE_NAME=polis_subset
DATABASE_PASSWORD=christian
DATABASE_PORT=5432
DATABASE_USER=christian

# Database advanced
DATABASE_NAME=polisDB_prod_local_mar14
DATABASE_USER=postgres
DATABASE_PASSWORD=
DATABASE_SSL_MODE=disable
# Default 5
DATABASE_POOL_SIZE=5
# Default 10

# Conversation configuration
# ===== API Integration =====
# Anthropic API key for narrative report generation
ANTHROPIC_API_KEY=

# ===== Ollama Configuration =====
# Ollama endpoint for local LLM processing
OLLAMA_HOST=http://ollama:11434
# Model to use (default: llama3:8b)
OLLAMA_MODEL=llama3:8b

# ===== Server Configuration =====
HOST=localhost
PORT=8080
# Default WARN
LOG_LEVEL=INFO
MATH_ENV=dev

# ===== Conversation Configuration =====
# Default 5
CONV_GROUP_K_MAX=5
# Default 2
Expand All @@ -28,7 +57,7 @@ CONV_MAX_CMTS=400
# Default 5000
CONV_MAX_PTPTS=5000

# Polling configuration
# ===== Polling Configuration =====
POLL_ALLOWLIST=
POLL_BLOCKLIST=
# Default 1000
Expand Down
20 changes: 15 additions & 5 deletions delphi/polismath/run_math_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,22 @@ def connect_to_db():
"""Connect to PostgreSQL database using environment variables or defaults."""
import psycopg2
try:
# Prefer POSTGRES_* environment variables, fall back to DATABASE_* and then defaults
dbname = os.environ.get("POSTGRES_DB", os.environ.get("DATABASE_NAME", "polisDB_prod_local_mar14"))
user = os.environ.get("POSTGRES_USER", os.environ.get("DATABASE_USER", "colinmegill"))
password = os.environ.get("POSTGRES_PASSWORD", os.environ.get("DATABASE_PASSWORD", ""))
host = os.environ.get("POSTGRES_HOST", os.environ.get("DATABASE_HOST", "localhost"))
port = os.environ.get("POSTGRES_PORT", os.environ.get("DATABASE_PORT", 5432))

# Log connection information (without password)
logger.info(f"Connecting to PostgreSQL - host: {host}, port: {port}, dbname: {dbname}, user: {user}")

conn = psycopg2.connect(
dbname=os.environ.get("DATABASE_NAME", "polisDB_prod_local_mar14"),
user=os.environ.get("DATABASE_USER", "colinmegill"),
password=os.environ.get("DATABASE_PASSWORD", ""),
host=os.environ.get("DATABASE_HOST", "localhost"),
port=os.environ.get("DATABASE_PORT", 5432)
dbname=dbname,
user=user,
password=password,
host=host,
port=port
)
logger.info("Connected to database successfully")
return conn
Expand Down
Loading
Loading