import os
import requests
import asyncio
import re
import sqlite3
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
MessageHandler,
filters,
ContextTypes,
ChatMemberHandler
)
from requests.exceptions import ReadTimeout, ConnectionError, RequestException
# --- Configuration Constants ---
# Replace with your actual Bot Token
# It's highly recommended to store your bot token as an environment variable
# For local testing, you can directly replace "YOUR_BOT_TOKEN_HERE" with your
token,
# but for deployment, use environment variables.
BOT_TOKEN = os.getenv("BOT_TOKEN",
"8082989597:AAH02ZzvqZauy6flAKwPsQGiUJ3CEThbyFc")
# Your Telegram Group Chat ID (obtained using @RawDataBot)
GROUP_CHAT_ID = -1002509996803
# Your Telegram Group Invite Link
GROUP_INVITE_LINK = "https://t.me/+DWzpnrqU4XM4ODg1"
# Your Admin User ID
ADMIN_ID = 7804252008
# Your bot's username (without the @) for referral links
BOT_USERNAME = "XRDLEG10N_ROBOT"
# API Keys
NUMVERIFY_API_BASE_URL = "https://num-info-api.osinter.workers.dev/?
api_keyy=DASTRA&num="
HIBP_API_BASE_BASE_URL = "https://num-info-api.osinter.workers.dev/?
api_keyy=DASTRA&emaill_id="
# --- Logging Setup ---
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# --- Database Setup ---
DB_NAME = 'bot_data.db'
def init_db():
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT, -- New column for username
credits INTEGER DEFAULT 0,
is_blacklisted BOOLEAN DEFAULT FALSE,
is_admin BOOLEAN DEFAULT FALSE,
referrer_id INTEGER,
has_joined_group BOOLEAN DEFAULT FALSE,
referral_bonus_given BOOLEAN DEFAULT FALSE
)
''')
# Add username column if it doesn't exist (for existing databases)
try:
c.execute("ALTER TABLE users ADD COLUMN username TEXT")
except sqlite3.OperationalError:
# Column already exists, ignore error
pass
# Add referral_bonus_given column if it doesn't exist
try:
c.execute("ALTER TABLE users ADD COLUMN referral_bonus_given BOOLEAN
DEFAULT FALSE")
except sqlite3.OperationalError:
pass # Column already exists
# Set the initial admin
c.execute("INSERT OR IGNORE INTO users (user_id, username, is_admin) VALUES (?,
?, ?)", (ADMIN_ID, "admin", True)) # Set default username for admin
conn.commit()
conn.close()
def get_user(user_id):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT * FROM users WHERE user_id = ?", (user_id,))
user_data = c.fetchone()
conn.close()
if user_data:
# Updated keys to include 'username' and 'referral_bonus_given'
keys = ["user_id", "username", "credits", "is_blacklisted", "is_admin",
"referrer_id", "has_joined_group", "referral_bonus_given"]
return dict(zip(keys, user_data))
return None
def add_or_update_user(user_id, **kwargs):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
# Check if user exists
c.execute("SELECT user_id FROM users WHERE user_id = ?", (user_id,))
existing_user = c.fetchone()
if existing_user:
# Update existing user
set_clause = ", ".join([f"{k} = ?" for k in kwargs.keys()])
values = list(kwargs.values()) + [user_id]
c.execute(f"UPDATE users SET {set_clause} WHERE user_id = ?", values)
else:
# Insert new user with default values for columns not in kwargs
# Ensure all columns are present or have defaults
defaults = {
'credits': 0,
'is_blacklisted': False,
'is_admin': False,
'referrer_id': None,
'has_joined_group': False,
'referral_bonus_given': False,
'username': None # Default username to None
}
for key, default_value in defaults.items():
if key not in kwargs:
kwargs[key] = default_value
columns = "user_id, " + ", ".join(kwargs.keys())
placeholders = "?, " + ", ".join(["?"] * len(kwargs))
values = [user_id] + [kwargs[k] for k in kwargs.keys()] # Order values by
sorted keys for consistent insertion
c.execute(f"INSERT INTO users ({columns}) VALUES ({placeholders})", values)
conn.commit()
conn.close()
def get_all_users():
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
# Fetch username as well
c.execute("SELECT user_id, username, credits, is_blacklisted, is_admin FROM
users")
users_data = c.fetchall()
conn.close()
return users_data
# --- User State Management ---
user_state = {} # For handling multi-step inputs (e.g., awaiting phone number,
admin commands)
# --- Helper functions for API calls ---
async def make_external_api_request(url: str, headers: dict = None, timeout: int =
10, retries: int = 3, backoff_factor: float = 0.5):
for i in range(retries):
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
return response.json()
except (ReadTimeout, ConnectionError) as e:
logger.warning(f"Attempt {i+1} failed due to network/read timeout:
{e}")
if i < retries - 1:
await asyncio.sleep(backoff_factor * (2 ** i))
except RequestException as e:
logger.error(f"Attempt {i+1} failed due to a request error: {e}")
if i < retries - 1:
await asyncio.sleep(backoff_factor * (2 ** i))
except Exception as e:
logger.exception(f"Attempt {i+1} failed due to an unexpected error:
{e}")
raise
raise Exception(f"Failed to complete request after {retries} attempts.")
def format_single_result(result_dict: dict, result_number: int) -> str:
mobile = result_dict.get('mobile', 'N/A')
name = result_dict.get('name', 'N/A')
fname = result_dict.get('fname', 'N/A')
address = result_dict.get('address', 'N/A')
circle = result_dict.get('circle', 'N/A')
alt_mobile = result_dict.get('alt', 'N/A')
result_id = result_dict.get('id', 'N/A')
formatted_string = (
f"➡️ *Result {result_number}*\n"
f"📱 Mobile: `{mobile}`\n"
f"👤 Name: `{name}`\n"
f" Father/Husband: `{fname}`\n"
f"🏠 Address: `{address}`\n"
f"🌐 Circle: `{circle}`\n"
f"🆔 ID: `{result_id}`\n"
f"📲 Alt Mobile: `{alt_mobile}`\n"
f"📧 Email: `N/A`"
)
return formatted_string
# --- Group Membership Check ---
async def check_group_membership_api(user_id: int, context:
ContextTypes.DEFAULT_TYPE) -> bool:
"""Checks user's membership status in the configured Telegram group via API."""
try:
chat_member = await context.bot.get_chat_member(chat_id=GROUP_CHAT_ID,
user_id=user_id)
if chat_member.status in ['member', 'administrator', 'creator']:
return True
else:
return False
except Exception as e:
logger.error(f"Error checking group membership for user {user_id} in chat
{GROUP_CHAT_ID}: {e}")
return False
# --- Core Bot Handlers ---
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
username = update.effective_user.username # Get username
full_name = update.effective_user.full_name # Get full name for display
# Check for referral link in /start command payload
referred_by_id = None
if context.args and len(context.args) == 1:
match = re.match(r'^ref_(\d+)$', context.args[0])
if match:
referred_by_id = int(match.group(1))
logger.info(f"User {user_id} started with referral from
{referred_by_id}")
# Initialize user in DB if not exists or update username
user_data = get_user(user_id)
if not user_data:
add_or_update_user(user_id, username=username, credits=0,
is_blacklisted=False, is_admin=(user_id == ADMIN_ID), referrer_id=referred_by_id,
has_joined_group=False, referral_bonus_given=False)
user_data = get_user(user_id) # Re-fetch updated data
else:
# Always update username in case it changed
if user_data.get('username') != username:
add_or_update_user(user_id, username=username)
user_data['username'] = username # Update local dict
# Blacklist check
if user_data['is_blacklisted']:
await (update.callback_query.message if update.callback_query else
update.message).reply_text(
"❌ You have been blacklisted from using this bot.",
parse_mode="Markdown"
)
return
# Group membership check
joined_group_in_db = user_data['has_joined_group']
is_member_now = await check_group_membership_api(user_id, context)
if not joined_group_in_db and not is_member_now:
# User not in DB as joined, and not currently a member
keyboard = [[InlineKeyboardButton("Join Our Telegram Group",
url=GROUP_INVITE_LINK)]]
reply_markup = InlineKeyboardMarkup(keyboard)
await (update.callback_query.message if update.callback_query else
update.message).reply_text(
"🚫 You must join our Telegram group to use this bot. Please click the
button below to join:",
reply_markup=reply_markup
)
return # Exit function, don't show main menu
elif not joined_group_in_db and is_member_now:
# User is now a member, but DB wasn't updated (e.g., bot was offline, or
user joined before /start)
add_or_update_user(user_id, has_joined_group=True)
user_data['has_joined_group'] = True # Update local dict
# Award joining bonus if this is the first time confirmed joined
if user_data['credits'] == 0:
add_or_update_user(user_id, credits=1)
user_data['credits'] += 1
await context.bot.send_message(chat_id=user_id, text="🎉 Welcome!
You've received 1 bonus credit for joining our group!", parse_mode="Markdown")
# Award referral bonus if applicable and not already given
if user_data['referrer_id'] and not user_data['referral_bonus_given']:
referrer_id = user_data['referrer_id']
referrer_data = get_user(referrer_id)
if referrer_data:
add_or_update_user(referrer_id, credits=referrer_data['credits'] +
1)
add_or_update_user(user_id, referral_bonus_given=True) # Mark bonus
as given for this user
await context.bot.send_message(chat_id=referrer_id, text=f"💰 Your
referral ({full_name}) joined the group! You've received 1 credit!",
parse_mode="Markdown")
logger.info(f"Referral bonus given for {user_id} to {referrer_id}")
logger.info(f"User {user_id} detected as joined group during /start and DB
updated.")
# Main menu logic
display_username = f"@{username}" if username else full_name
welcome_text = (
f"Hi **{display_username}** (ID: `{user_id}`),\n"
"Welcome to **XRDLegion OSINT Bot**! 👋\n\n"
f"💰 Your current credits: `{user_data['credits']}`\n\n"
"**Available Searches:**\n"
"📞 **Number Info Lookup**\n"
"📧 **Email Lookup**\n\n"
"Please choose an option below or get more credits:"
)
buttons = [
[InlineKeyboardButton("📱 Phone Lookup (1 Credit)",
callback_data="phone_lookup_menu")],
[InlineKeyboardButton("📧 Email Check (1 Credit)",
callback_data="email_check_menu")],
[InlineKeyboardButton("🔗 Get Referral Link",
callback_data="get_referral_link")],
[InlineKeyboardButton("❓ How to get more Credits?",
callback_data="how_to_credits")],
[InlineKeyboardButton("Contact Admin ", url="https://t.me/devil_x_xrd")]
]
# Add Admin Panel button only for ADMIN_ID
if user_id == ADMIN_ID:
buttons.append([InlineKeyboardButton("⚙️ Admin Panel",
callback_data="admin_panel_menu")])
reply_markup = InlineKeyboardMarkup(buttons)
if update.callback_query:
await update.callback_query.message.edit_text(
welcome_text,
reply_markup=reply_markup,
parse_mode="Markdown"
)
else:
await update.message.reply_text(
welcome_text,
reply_markup=reply_markup,
parse_mode="Markdown"
)
# Clear user state when returning to main menu
if user_id in user_state:
del user_state[user_id]
async def how_to_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()
text = (
"💰 *How to get more Credits:*\n\n"
"1. *Refer New Users:* Share your unique referral link. When someone joins
our Telegram group using your link, you get 1 credit.\n"
"2. *Contact Admin:* You can contact the admin (`@devil_x_xrd`) to request
a credit top-up."
)
keyboard = [[InlineKeyboardButton("🔙 Back to Main Menu",
callback_data="main_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")
async def get_referral_link(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
referral_link = f"https://t.me/{BOT_USERNAME}?start=ref_{user_id}"
text = (
"🔗 *Your Referral Link:*\n"
f"`{referral_link}`\n\n"
"Share this link! When a new user joins our Telegram group using your link,
you'll receive 1 credit."
)
keyboard = [[InlineKeyboardButton("🔙 Back to Main Menu",
callback_data="main_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")
# Phone Lookup (Using NumVerify API) - Initiator
async def phone_lookup_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_data = get_user(user_id)
# Re-verify group membership just before proceeding to a paid feature
if not user_data['has_joined_group'] and not await
check_group_membership_api(user_id, context):
keyboard = [[InlineKeyboardButton("Join Our Telegram Group",
url=GROUP_INVITE_LINK)]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"🚫 You must join our Telegram group to use this bot. Please click the
button below to join:",
reply_markup=reply_markup
)
await asyncio.sleep(2)
await start(update, context)
return
if user_data['credits'] < 1:
await query.edit_message_text("❌ You don't have enough credits for this
search. Get more credits by referring users or contacting admin.",
parse_mode="Markdown")
await asyncio.sleep(2)
await start(update, context) # Return to main menu
return
await query.edit_message_text("📱 *Enter a 10-digit phone number (e.g.,
8835756477):*", parse_mode="Markdown")
user_state[user_id] = {'state': 'awaiting_phone', 'cost': 1} # Store cost for
refund logic
# Phone Lookup (Using NumVerify API) - Processor
async def process_phone_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -
> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'awaiting_phone':
return # Not in phone lookup state, ignore input
phone_raw = update.message.text
digits_only = ''.join(filter(str.isdigit, phone_raw))
if len(digits_only) >= 10:
phone = digits_only[-10:]
else:
await update.message.reply_text("❌ Please enter a valid 10-digit number.
The number you provided is too short after removing non-digits.",
parse_mode="Markdown")
# Do not clear state here, let user retry or hit /start
return
user_data = get_user(user_id)
if user_data['credits'] < 1: # Re-check credits just in case
await update.message.reply_text("❌ You don't have enough credits for this
search.", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context)
return
# Deduct credit before API call
add_or_update_user(user_id, credits=user_data['credits'] - 1)
await update.message.reply_text(f"⏳ Searching for phone number `{phone}`... 1
credit deducted. Your balance: `{user_data['credits'] - 1}`",
parse_mode="Markdown")
url = f"{NUMVERIFY_API_BASE_URL}{phone}"
search_successful_with_result = False # Flag to track if a useful result was
found
try:
response_data = await make_external_api_request(url, timeout=15)
results_to_display = []
if response_data and isinstance(response_data, dict) and 'result' in
response_data:
api_results_list = response_data['result']
if isinstance(api_results_list, list) and len(api_results_list) > 0:
unique_results_keys = set()
for result in api_results_list:
key_parts = (
result.get('mobile', '').strip().lower(),
result.get('name', '').strip().lower(),
result.get('fname', '').strip().lower(),
result.get('address', '').strip().lower(),
result.get('circle', '').strip().lower(), result.get('id',
'').strip().lower()
)
unique_key = tuple(key_parts)
if unique_key not in unique_results_keys:
unique_results_keys.add(unique_key)
results_to_display.append(result)
if results_to_display:
await update.message.reply_text(f"📱 *Found
{len(results_to_display)} unique entries for {phone}:*", parse_mode="Markdown")
for i, result in enumerate(results_to_display):
formatted_output = format_single_result(result, i + 1)
await update.message.reply_text(formatted_output,
parse_mode="Markdown")
search_successful_with_result = True # Result was found
else:
await update.message.reply_text(f"ℹ️ No information found for
the number {phone}. Try a different number.", parse_mode="Markdown")
elif api_results_list == "No matching users found":
await update.message.reply_text(f"ℹ️ No information found for the
number {phone}. Try a different number.", parse_mode="Markdown")
else:
await update.message.reply_text(f"ℹ️ No information found for the
number {phone}. Try a different number.", parse_mode="Markdown")
elif response_data and isinstance(response_data, dict) and
response_data.get("result") == "No matching users found":
await update.message.reply_text(f"ℹ️ No information found for the number
{phone}. Try a different number.", parse_mode="Markdown")
else:
await update.message.reply_text(f"ℹ️ No information found for the number
{phone}. Try a different number.", parse_mode="Markdown")
except Exception as e:
await update.message.reply_text(f"An error occurred during phone lookup:
{e}", parse_mode="Markdown")
logger.error(f"Error during phone lookup for {phone}: {e}")
finally:
# Refund credit if no useful results were found
if not search_successful_with_result:
current_user_data = get_user(user_id)
add_or_update_user(user_id, credits=current_user_data['credits'] + 1)
await update.message.reply_text("✅ No results found. 1 credit has been
refunded.", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context) # Return to main menu
# Email Breach Check (HaveIBeenPwned) - Initiator
async def email_check_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_data = get_user(user_id)
# Re-verify group membership just before proceeding to a paid feature
if not user_data['has_joined_group'] and not await
check_group_membership_api(user_id, context):
keyboard = [[InlineKeyboardButton("Join Our Telegram Group",
url=GROUP_INVITE_LINK)]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"🚫 You must join our Telegram group to use this bot. Please click the
button below to join:",
reply_markup=reply_markup
)
await asyncio.sleep(2)
await start(update, context)
return
if user_data['credits'] < 1:
await query.edit_message_text("❌ You don't have enough credits for this
search. Get more credits by referring users or contacting admin.",
parse_mode="Markdown")
await asyncio.sleep(2)
await start(update, context) # Return to main menu
return
await query.edit_message_text("📧 *Enter an email to check for breaches:*",
parse_mode="Markdown")
user_state[user_id] = {'state': 'awaiting_email', 'cost': 1} # Store cost for
refund logic
# Email Breach Check (HaveIBeenPwned) - Processor
async def process_email_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -
> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'awaiting_email':
return # Not in email lookup state, ignore input
email = update.message.text.strip()
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email): #
More robust email validation
await update.message.reply_text("❌ Please enter a valid email address.",
parse_mode="Markdown")
return
user_data = get_user(user_id)
if user_data['credits'] < 1: # Re-check credits
await update.message.reply_text("❌ You don't have enough credits for this
search.", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context)
return
# Deduct credit before API call
add_or_update_user(user_id, credits=user_data['credits'] - 1)
await update.message.reply_text(f"⏳ Searching for email `{email}`... 1 credit
deducted. Your balance: `{user_data['credits'] - 1}`", parse_mode="Markdown")
url = f"{HIBP_API_BASE_BASE_URL}{email}"
headers = {"User-Agent": "OSINT_Telegram_Bot/1.0"}
search_successful_with_result = False # Flag to track if a useful result was
found
try:
response_data = await make_external_api_request(url, headers=headers,
timeout=15)
if response_data and isinstance(response_data, list):
if response_data:
breach_list = "\n".join([f"- {b.get('Name', 'N/A')}
({b.get('BreachDate', 'N/A')})" for b in response_data])
await update.message.reply_text(f"🔓 *Breaches found for {email}:*\
n{breach_list}", parse_mode="Markdown")
search_successful_with_result = True # Result was found
else:
await update.message.reply_text(f"✅ No breaches found for
{email}.", parse_mode="Markdown")
elif response_data is None:
await update.message.reply_text(f"✅ No breaches found for {email}.",
parse_mode="Markdown")
else:
await update.message.reply_text(f"Could not retrieve breach information
for {email}. Try a different email.", parse_mode="Markdown")
except Exception as e:
await update.message.reply_text(f"An error occurred during email breach
check: {e}", parse_mode="Markdown")
logger.error(f"Error during email lookup for {email}: {e}")
finally:
# Refund credit if no useful results were found
if not search_successful_with_result:
current_user_data = get_user(user_id)
add_or_update_user(user_id, credits=current_user_data['credits'] + 1)
await update.message.reply_text("✅ No breaches found. 1 credit has been
refunded.", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context) # Return to main menu
# General User Input Handler (for text messages)
async def handle_user_input(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
user_id = update.effective_user.id
current_state_info = user_state.get(user_id, {})
current_state = current_state_info.get('state')
# Basic check for admin commands input, ensure only admin can process
if user_id == ADMIN_ID:
if current_state == 'admin_increase_credit_id':
await admin_process_increase_credit_id(update, context)
return
elif current_state == 'admin_increase_credit_amount':
await admin_process_increase_credit_amount(update, context)
return
elif current_state == 'admin_decrease_credit_id':
await admin_process_decrease_credit_id(update, context)
return
elif current_state == 'admin_decrease_credit_amount':
await admin_process_decrease_credit_amount(update, context)
return
elif current_state == 'admin_blacklist_id':
await admin_process_blacklist_id(update, context)
return
elif current_state == 'admin_unblacklist_id':
await admin_process_unblacklist_id(update, context)
return
elif current_state == 'admin_add_admin_id':
await admin_process_add_admin_id(update, context)
return
# Process regular user inputs
if current_state == 'awaiting_phone':
await process_phone_input(update, context)
elif current_state == 'awaiting_email':
await process_email_input(update, context)
else:
await update.message.reply_text("I'm not expecting specific input right
now. Returning to main menu...", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context)
# --- Admin Panel Handlers ---
async def admin_panel_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
if user_id != ADMIN_ID: # Double check admin status
await query.edit_message_text("❌ You are not authorized to view the admin
panel.", parse_mode="Markdown")
await asyncio.sleep(2)
await start(update, context)
return
text = "⚙️ *Admin Panel*\n\nChoose an action:"
buttons = [
[InlineKeyboardButton("📊 View Users", callback_data="admin_view_users")],
[InlineKeyboardButton("⬆️ Increase Credit",
callback_data="admin_increase_credit")],
[InlineKeyboardButton("⬇️ Decrease Credit",
callback_data="admin_decrease_credit")],
[InlineKeyboardButton("🚫 Blacklist User",
callback_data="admin_blacklist_user")],
[InlineKeyboardButton("✅ Remove Blacklist",
callback_data="admin_unblacklist_user")],
[InlineKeyboardButton("➕ Add Admin", callback_data="admin_add_admin")],
[InlineKeyboardButton("🔙 Back to Main Menu", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(buttons)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")
async def admin_view_users(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()
users_data = get_all_users() # This now fetches username
if not users_data:
text = "No users in the database."
else:
text = "*User List:*\n\n"
for user_id, username, credits, is_blacklisted, is_admin in users_data: #
Unpack username
status = []
if is_admin:
status.append("👑 Admin")
if is_blacklisted:
status.append("🚫 Blacklisted")
status_str = f" ({', '.join(status)})" if status else ""
display_name = f"@{username}" if username else "N/A" # Display
@username or N/A
text += f"ID: `{user_id}` | User: {display_name} | Credits:
`{credits}`{status_str}\n"
keyboard = [[InlineKeyboardButton("🔙 Back to Admin Panel",
callback_data="admin_panel_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")
async def admin_increase_credit(update: Update, context: ContextTypes.DEFAULT_TYPE)
-> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_increase_credit_id'}
await query.edit_message_text("⬆️ *Enter the User ID to increase credits
for:*", parse_mode="Markdown")
async def admin_process_increase_credit_id(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_increase_credit_id':
return
target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return
target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (They might need to start the bot once).", parse_mode="Markdown")
del user_state[user_id] # Clear state
await asyncio.sleep(1)
await admin_panel_menu(update, context) # Go back to admin panel
return
user_state[user_id] = {'state': 'admin_increase_credit_amount', 'target_id':
target_user_id}
await update.message.reply_text(f"✅ User `{target_user_id}` found (Credits:
`{target_user_data['credits']}`).\n"
"⬆️ *Now enter the amount of credits to add:*",
parse_mode="Markdown")
async def admin_process_increase_credit_amount(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
current_state_info = user_state.get(user_id, {})
if current_state_info.get('state') != 'admin_increase_credit_amount': return
amount_str = update.message.text.strip()
if not amount_str.isdigit() or int(amount_str) <= 0:
await update.message.reply_text("❌ Invalid amount. Please enter a positive
numerical value.", parse_mode="Markdown")
return
amount = int(amount_str)
target_user_id = current_state_info['target_id']
target_user_data = get_user(target_user_id)
if target_user_data:
new_credits = target_user_data['credits'] + amount
add_or_update_user(target_user_id, credits=new_credits)
await update.message.reply_text(f"✅ Successfully added `{amount}` credits
to user `{target_user_id}`. New balance: `{new_credits}`", parse_mode="Markdown")
try: # Notify target user
await context.bot.send_message(chat_id=target_user_id, text=f"🎉 Your
credits have been increased by `{amount}`! Your new balance is `{new_credits}`.",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about credit
increase: {e}")
else:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (Perhaps deleted from bot?)", parse_mode="Markdown")
del user_state[user_id] # Clear state
await asyncio.sleep(1)
await admin_panel_menu(update, context)
async def admin_decrease_credit(update: Update, context: ContextTypes.DEFAULT_TYPE)
-> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_decrease_credit_id'}
await query.edit_message_text("⬇️ *Enter the User ID to decrease credits
for:*", parse_mode="Markdown")
async def admin_process_decrease_credit_id(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_decrease_credit_id':
return
target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return
target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (They might need to start the bot once).", parse_mode="Markdown")
del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)
return
user_state[user_id] = {'state': 'admin_decrease_credit_amount', 'target_id':
target_user_id}
await update.message.reply_text(f"✅ User `{target_user_id}` found (Credits:
`{target_user_data['credits']}`).\n"
"⬇️ *Now enter the amount of credits to
subtract:*", parse_mode="Markdown")
async def admin_process_decrease_credit_amount(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
current_state_info = user_state.get(user_id, {})
if current_state_info.get('state') != 'admin_decrease_credit_amount': return
amount_str = update.message.text.strip()
if not amount_str.isdigit() or int(amount_str) <= 0:
await update.message.reply_text("❌ Invalid amount. Please enter a positive
numerical value.", parse_mode="Markdown")
return
amount = int(amount_str)
target_user_id = current_state_info['target_id']
target_user_data = get_user(target_user_id)
if target_user_data:
new_credits = max(0, target_user_data['credits'] - amount) # Don't go below
0
add_or_update_user(target_user_id, credits=new_credits)
await update.message.reply_text(f"✅ Successfully deducted `{amount}`
credits from user `{target_user_id}`. New balance: `{new_credits}`",
parse_mode="Markdown")
try: # Notify target user
await context.bot.send_message(chat_id=target_user_id, text=f"📉 Your
credits have been decreased by `{amount}`. Your new balance is `{new_credits}`.",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about credit
decrease: {e}")
else:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (Perhaps deleted from bot?)", parse_mode="Markdown")
del user_state[user_id] # Clear state
await asyncio.sleep(1)
await admin_panel_menu(update, context)
async def admin_blacklist_user(update: Update, context: ContextTypes.DEFAULT_TYPE)
-> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_blacklist_id'}
await query.edit_message_text("🚫 *Enter the User ID to blacklist:*",
parse_mode="Markdown")
async def admin_process_blacklist_id(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_blacklist_id': return
target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return
target_user_id = int(target_user_id_str)
if target_user_id == ADMIN_ID:
await update.message.reply_text("🚫 Cannot blacklist the main admin.",
parse_mode="Markdown")
del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)
return
target_user_data = get_user(target_user_id)
if not target_user_data:
# If user not in DB, add them as blacklisted. They won't be able to start.
add_or_update_user(target_user_id, is_blacklisted=True, credits=0,
is_admin=False, has_joined_group=False, referral_bonus_given=False, username=None)
await update.message.reply_text(f"✅ User `{target_user_id}` added to
database and blacklisted.", parse_mode="Markdown")
elif target_user_data['is_blacklisted']:
await update.message.reply_text(f"ℹ️ User `{target_user_id}` is already
blacklisted.", parse_mode="Markdown")
else:
add_or_update_user(target_user_id, is_blacklisted=True)
await update.message.reply_text(f"✅ User `{target_user_id}` has been
blacklisted.", parse_mode="Markdown")
try:
await context.bot.send_message(chat_id=target_user_id, text="🚫 You have
been blacklisted from using this bot.", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about
blacklist: {e}")
del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)
async def admin_unblacklist_user(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_unblacklist_id'}
await query.edit_message_text("✅ *Enter the User ID to remove from
blacklist:*", parse_mode="Markdown")
async def admin_process_unblacklist_id(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_unblacklist_id': return
target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return
target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found in database.", parse_mode="Markdown")
elif not target_user_data['is_blacklisted']:
await update.message.reply_text(f"ℹ️ User `{target_user_id}` is not
blacklisted.", parse_mode="Markdown")
else:
add_or_update_user(target_user_id, is_blacklisted=False)
await update.message.reply_text(f"✅ User `{target_user_id}` has been
removed from blacklist.", parse_mode="Markdown")
try:
await context.bot.send_message(chat_id=target_user_id, text="✅ You have
been unblacklisted and can now use the bot.", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about
unblacklist: {e}")
del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)
async def admin_add_admin(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_add_admin_id'}
await query.edit_message_text("➕ *Enter the User ID to grant admin
privileges:*", parse_mode="Markdown")
async def admin_process_add_admin_id(update: Update, context:
ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_add_admin_id': return
target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return
target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data: # If user not in DB, add them and make admin
add_or_update_user(target_user_id, is_admin=True, credits=0,
is_blacklisted=False, has_joined_group=False, referral_bonus_given=False,
username=None)
await update.message.reply_text(f"✅ User `{target_user_id}` created in DB
and granted admin privileges.", parse_mode="Markdown")
elif target_user_data['is_admin']:
await update.message.reply_text(f"ℹ️ User `{target_user_id}` is already an
admin.", parse_mode="Markdown")
else:
add_or_update_user(target_user_id, is_admin=True)
await update.message.reply_text(f"✅ User `{target_user_id}` has been
granted admin privileges.", parse_mode="Markdown")
try:
await context.bot.send_message(chat_id=target_user_id, text="👑 You have
been granted admin privileges for this bot!", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about admin
status: {e}")
del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)
# --- Chat Member Updates Handler (for Group Join Tracking) ---
async def track_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
# Ensure this is a chat member update for the specified group
if update.effective_chat.id != GROUP_CHAT_ID:
return
user_id = update.chat_member.new_chat_member.user.id
user_username = update.chat_member.new_chat_member.user.username
user_full_name = update.chat_member.new_chat_member.user.full_name
# Ignore bot's own updates or updates for other bots
if update.chat_member.new_chat_member.user.is_bot:
return
user_data = get_user(user_id)
# Handle user joining
if update.chat_member.new_chat_member.status in ['member', 'administrator',
'creator']:
logger.info(f"User {user_id} joined/is member of group {GROUP_CHAT_ID}.")
# If user is new to DB or their has_joined_group flag is false
if not user_data or not user_data['has_joined_group']:
# Update user's username and joined status
add_or_update_user(user_id, username=user_username,
has_joined_group=True)
# Give joining bonus if new user or hadn't received it (credits == 0 is
a simple heuristic)
if not user_data or user_data['credits'] == 0:
add_or_update_user(user_id, credits=1)
try:
await context.bot.send_message(chat_id=user_id, text="🎉
Welcome! You've received 1 bonus credit for joining our group!",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not send joining bonus message to user
{user_id}: {e}")
# Process referral bonus if applicable and not already given
if user_data and user_data['referrer_id'] and not
user_data['referral_bonus_given']:
referrer_id = user_data['referrer_id']
referrer_data = get_user(referrer_id)
if referrer_data:
add_or_update_user(referrer_id,
credits=referrer_data['credits'] + 1)
add_or_update_user(user_id, referral_bonus_given=True) # Mark
bonus as given for this user
try:
await context.bot.send_message(chat_id=referrer_id,
text=f"💰 Your referral ({user_full_name}) joined the group! You've received 1
credit!", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not send referral bonus message to
referrer {referrer_id}: {e}")
logger.info(f"Referral bonus given for {user_id} to {referrer_id}
via group join.")
else:
# User already in DB and marked as joined, just update username if it
changed
if user_data.get('username') != user_username:
add_or_update_user(user_id, username=user_username)
logger.info(f"User {user_id} already marked as joined, no credit action
on group join. Username updated if needed.")
# Handle user leaving (optional, but good for accuracy)
elif update.chat_member.old_chat_member.status in ['member', 'administrator',
'creator'] and \
update.chat_member.new_chat_member.status in ['left', 'kicked', 'banned']:
logger.info(f"User {user_id} left/was removed from group {GROUP_CHAT_ID}.
Updating DB.")
if user_data:
add_or_update_user(user_id, has_joined_group=False)
try:
await context.bot.send_message(chat_id=user_id, text="ℹ️ You have
left the group. Some bot features might be restricted until you rejoin.",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not send leave group message to user
{user_id}: {e}")
else:
logger.warning(f"User {user_id} left group but not found in DB.")
# --- Main Bot Setup ---
def main() -> None:
init_db() # Initialize the database
if not BOT_TOKEN:
logger.error("Error: Telegram Bot Token not found. Please set the
'BOT_TOKEN' environment variable or provide it directly.")
return
application =
Application.builder().token(BOT_TOKEN).read_timeout(20).write_timeout(20).pool_time
out(5).build()
# Command Handlers
application.add_handler(CommandHandler("start", start))
# Callback Query Handlers (for inline keyboard buttons)
application.add_handler(CallbackQueryHandler(phone_lookup_menu,
pattern="^phone_lookup_menu$"))
application.add_handler(CallbackQueryHandler(email_check_menu,
pattern="^email_check_menu$"))
application.add_handler(CallbackQueryHandler(how_to_credits,
pattern="^how_to_credits$"))
application.add_handler(CallbackQueryHandler(get_referral_link,
pattern="^get_referral_link$"))
application.add_handler(CallbackQueryHandler(admin_panel_menu,
pattern="^admin_panel_menu$")) # Admin Panel main menu
application.add_handler(CallbackQueryHandler(admin_view_users,
pattern="^admin_view_users$"))
application.add_handler(CallbackQueryHandler(admin_increase_credit,
pattern="^admin_increase_credit$"))
application.add_handler(CallbackQueryHandler(admin_decrease_credit,
pattern="^admin_decrease_credit$"))
application.add_handler(CallbackQueryHandler(admin_blacklist_user,
pattern="^admin_blacklist_user$"))
application.add_handler(CallbackQueryHandler(admin_unblacklist_user,
pattern="^admin_unblacklist_user$"))
application.add_handler(CallbackQueryHandler(admin_add_admin,
pattern="^admin_add_admin$"))
application.add_handler(CallbackQueryHandler(start, pattern="^main_menu$")) #
Back to Main Menu
# Message Handler (for text input when awaiting specific data from user)
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND,
handle_user_input))
# Chat Member Updates Handler (for tracking group joins/leaves)
application.add_handler(ChatMemberHandler(track_chat_members,
ChatMemberHandler.CHAT_MEMBER))
logger.info("Bot started polling...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()