0% found this document useful (0 votes)
10 views20 pages

Xrdsueieieghjejeje

This document outlines a Telegram bot implementation that includes user management, group membership checks, and API integration for phone and email lookups. It features a SQLite database for storing user data, referral systems for credits, and structured command handling for user interactions. The bot also includes logging and error handling mechanisms to ensure robustness during API calls and user operations.

Uploaded by

nishalxk
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
10 views20 pages

Xrdsueieieghjejeje

This document outlines a Telegram bot implementation that includes user management, group membership checks, and API integration for phone and email lookups. It features a SQLite database for storing user data, referral systems for credits, and structured command handling for user interactions. The bot also includes logging and error handling mechanisms to ensure robustness during API calls and user operations.

Uploaded by

nishalxk
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 20

import os

import requests
import asyncio
import re
import sqlite3
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
MessageHandler,
filters,
ContextTypes,
ChatMemberHandler
)
from requests.exceptions import ReadTimeout, ConnectionError, RequestException

# --- Configuration Constants ---


# Replace with your actual Bot Token
# It's highly recommended to store your bot token as an environment variable
# For local testing, you can directly replace "YOUR_BOT_TOKEN_HERE" with your
token,
# but for deployment, use environment variables.
BOT_TOKEN = os.getenv("BOT_TOKEN",
"8082989597:AAH02ZzvqZauy6flAKwPsQGiUJ3CEThbyFc")

# Your Telegram Group Chat ID (obtained using @RawDataBot)


GROUP_CHAT_ID = -1002509996803

# Your Telegram Group Invite Link


GROUP_INVITE_LINK = "https://t.me/+DWzpnrqU4XM4ODg1"

# Your Admin User ID


ADMIN_ID = 7804252008

# Your bot's username (without the @) for referral links


BOT_USERNAME = "XRDLEG10N_ROBOT"

# API Keys
NUMVERIFY_API_BASE_URL = "https://num-info-api.osinter.workers.dev/?
api_keyy=DASTRA&num="
HIBP_API_BASE_BASE_URL = "https://num-info-api.osinter.workers.dev/?
api_keyy=DASTRA&emaill_id="

# --- Logging Setup ---


logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)

# --- Database Setup ---


DB_NAME = 'bot_data.db'

def init_db():
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT, -- New column for username
credits INTEGER DEFAULT 0,
is_blacklisted BOOLEAN DEFAULT FALSE,
is_admin BOOLEAN DEFAULT FALSE,
referrer_id INTEGER,
has_joined_group BOOLEAN DEFAULT FALSE,
referral_bonus_given BOOLEAN DEFAULT FALSE
)
''')

# Add username column if it doesn't exist (for existing databases)


try:
c.execute("ALTER TABLE users ADD COLUMN username TEXT")
except sqlite3.OperationalError:
# Column already exists, ignore error
pass

# Add referral_bonus_given column if it doesn't exist


try:
c.execute("ALTER TABLE users ADD COLUMN referral_bonus_given BOOLEAN
DEFAULT FALSE")
except sqlite3.OperationalError:
pass # Column already exists

# Set the initial admin


c.execute("INSERT OR IGNORE INTO users (user_id, username, is_admin) VALUES (?,
?, ?)", (ADMIN_ID, "admin", True)) # Set default username for admin
conn.commit()
conn.close()

def get_user(user_id):
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
c.execute("SELECT * FROM users WHERE user_id = ?", (user_id,))
user_data = c.fetchone()
conn.close()
if user_data:
# Updated keys to include 'username' and 'referral_bonus_given'
keys = ["user_id", "username", "credits", "is_blacklisted", "is_admin",
"referrer_id", "has_joined_group", "referral_bonus_given"]
return dict(zip(keys, user_data))
return None

def add_or_update_user(user_id, **kwargs):


conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
# Check if user exists
c.execute("SELECT user_id FROM users WHERE user_id = ?", (user_id,))
existing_user = c.fetchone()

if existing_user:
# Update existing user
set_clause = ", ".join([f"{k} = ?" for k in kwargs.keys()])
values = list(kwargs.values()) + [user_id]
c.execute(f"UPDATE users SET {set_clause} WHERE user_id = ?", values)
else:
# Insert new user with default values for columns not in kwargs
# Ensure all columns are present or have defaults
defaults = {
'credits': 0,
'is_blacklisted': False,
'is_admin': False,
'referrer_id': None,
'has_joined_group': False,
'referral_bonus_given': False,
'username': None # Default username to None
}
for key, default_value in defaults.items():
if key not in kwargs:
kwargs[key] = default_value

columns = "user_id, " + ", ".join(kwargs.keys())


placeholders = "?, " + ", ".join(["?"] * len(kwargs))
values = [user_id] + [kwargs[k] for k in kwargs.keys()] # Order values by
sorted keys for consistent insertion
c.execute(f"INSERT INTO users ({columns}) VALUES ({placeholders})", values)

conn.commit()
conn.close()

def get_all_users():
conn = sqlite3.connect(DB_NAME)
c = conn.cursor()
# Fetch username as well
c.execute("SELECT user_id, username, credits, is_blacklisted, is_admin FROM
users")
users_data = c.fetchall()
conn.close()
return users_data

# --- User State Management ---


user_state = {} # For handling multi-step inputs (e.g., awaiting phone number,
admin commands)

# --- Helper functions for API calls ---


async def make_external_api_request(url: str, headers: dict = None, timeout: int =
10, retries: int = 3, backoff_factor: float = 0.5):
for i in range(retries):
try:
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status()
return response.json()
except (ReadTimeout, ConnectionError) as e:
logger.warning(f"Attempt {i+1} failed due to network/read timeout:
{e}")
if i < retries - 1:
await asyncio.sleep(backoff_factor * (2 ** i))
except RequestException as e:
logger.error(f"Attempt {i+1} failed due to a request error: {e}")
if i < retries - 1:
await asyncio.sleep(backoff_factor * (2 ** i))
except Exception as e:
logger.exception(f"Attempt {i+1} failed due to an unexpected error:
{e}")
raise
raise Exception(f"Failed to complete request after {retries} attempts.")
def format_single_result(result_dict: dict, result_number: int) -> str:
mobile = result_dict.get('mobile', 'N/A')
name = result_dict.get('name', 'N/A')
fname = result_dict.get('fname', 'N/A')
address = result_dict.get('address', 'N/A')
circle = result_dict.get('circle', 'N/A')
alt_mobile = result_dict.get('alt', 'N/A')
result_id = result_dict.get('id', 'N/A')

formatted_string = (
f"➡️ *Result {result_number}*\n"
f"📱 Mobile: `{mobile}`\n"
f"👤 Name: `{name}`\n"
f" Father/Husband: `{fname}`\n"
f"🏠 Address: `{address}`\n"
f"🌐 Circle: `{circle}`\n"
f"🆔 ID: `{result_id}`\n"
f"📲 Alt Mobile: `{alt_mobile}`\n"
f"📧 Email: `N/A`"
)
return formatted_string

# --- Group Membership Check ---


async def check_group_membership_api(user_id: int, context:
ContextTypes.DEFAULT_TYPE) -> bool:
"""Checks user's membership status in the configured Telegram group via API."""
try:
chat_member = await context.bot.get_chat_member(chat_id=GROUP_CHAT_ID,
user_id=user_id)
if chat_member.status in ['member', 'administrator', 'creator']:
return True
else:
return False
except Exception as e:
logger.error(f"Error checking group membership for user {user_id} in chat
{GROUP_CHAT_ID}: {e}")
return False

# --- Core Bot Handlers ---

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:


user_id = update.effective_user.id
username = update.effective_user.username # Get username
full_name = update.effective_user.full_name # Get full name for display

# Check for referral link in /start command payload


referred_by_id = None
if context.args and len(context.args) == 1:
match = re.match(r'^ref_(\d+)$', context.args[0])
if match:
referred_by_id = int(match.group(1))
logger.info(f"User {user_id} started with referral from
{referred_by_id}")

# Initialize user in DB if not exists or update username


user_data = get_user(user_id)
if not user_data:
add_or_update_user(user_id, username=username, credits=0,
is_blacklisted=False, is_admin=(user_id == ADMIN_ID), referrer_id=referred_by_id,
has_joined_group=False, referral_bonus_given=False)
user_data = get_user(user_id) # Re-fetch updated data
else:
# Always update username in case it changed
if user_data.get('username') != username:
add_or_update_user(user_id, username=username)
user_data['username'] = username # Update local dict

# Blacklist check
if user_data['is_blacklisted']:
await (update.callback_query.message if update.callback_query else
update.message).reply_text(
"❌ You have been blacklisted from using this bot.",
parse_mode="Markdown"
)
return

# Group membership check


joined_group_in_db = user_data['has_joined_group']
is_member_now = await check_group_membership_api(user_id, context)

if not joined_group_in_db and not is_member_now:


# User not in DB as joined, and not currently a member
keyboard = [[InlineKeyboardButton("Join Our Telegram Group",
url=GROUP_INVITE_LINK)]]
reply_markup = InlineKeyboardMarkup(keyboard)
await (update.callback_query.message if update.callback_query else
update.message).reply_text(
"🚫 You must join our Telegram group to use this bot. Please click the
button below to join:",
reply_markup=reply_markup
)
return # Exit function, don't show main menu

elif not joined_group_in_db and is_member_now:


# User is now a member, but DB wasn't updated (e.g., bot was offline, or
user joined before /start)
add_or_update_user(user_id, has_joined_group=True)
user_data['has_joined_group'] = True # Update local dict

# Award joining bonus if this is the first time confirmed joined


if user_data['credits'] == 0:
add_or_update_user(user_id, credits=1)
user_data['credits'] += 1
await context.bot.send_message(chat_id=user_id, text="🎉 Welcome!
You've received 1 bonus credit for joining our group!", parse_mode="Markdown")

# Award referral bonus if applicable and not already given


if user_data['referrer_id'] and not user_data['referral_bonus_given']:
referrer_id = user_data['referrer_id']
referrer_data = get_user(referrer_id)
if referrer_data:
add_or_update_user(referrer_id, credits=referrer_data['credits'] +
1)
add_or_update_user(user_id, referral_bonus_given=True) # Mark bonus
as given for this user
await context.bot.send_message(chat_id=referrer_id, text=f"💰 Your
referral ({full_name}) joined the group! You've received 1 credit!",
parse_mode="Markdown")
logger.info(f"Referral bonus given for {user_id} to {referrer_id}")
logger.info(f"User {user_id} detected as joined group during /start and DB
updated.")

# Main menu logic


display_username = f"@{username}" if username else full_name
welcome_text = (
f"Hi **{display_username}** (ID: `{user_id}`),\n"
"Welcome to **XRDLegion OSINT Bot**! 👋\n\n"
f"💰 Your current credits: `{user_data['credits']}`\n\n"
"**Available Searches:**\n"
"📞 **Number Info Lookup**\n"
"📧 **Email Lookup**\n\n"
"Please choose an option below or get more credits:"
)

buttons = [
[InlineKeyboardButton("📱 Phone Lookup (1 Credit)",
callback_data="phone_lookup_menu")],
[InlineKeyboardButton("📧 Email Check (1 Credit)",
callback_data="email_check_menu")],
[InlineKeyboardButton("🔗 Get Referral Link",
callback_data="get_referral_link")],
[InlineKeyboardButton("❓ How to get more Credits?",
callback_data="how_to_credits")],
[InlineKeyboardButton("Contact Admin ", url="https://t.me/devil_x_xrd")]
]

# Add Admin Panel button only for ADMIN_ID


if user_id == ADMIN_ID:
buttons.append([InlineKeyboardButton("⚙️ Admin Panel",
callback_data="admin_panel_menu")])

reply_markup = InlineKeyboardMarkup(buttons)

if update.callback_query:
await update.callback_query.message.edit_text(
welcome_text,
reply_markup=reply_markup,
parse_mode="Markdown"
)
else:
await update.message.reply_text(
welcome_text,
reply_markup=reply_markup,
parse_mode="Markdown"
)
# Clear user state when returning to main menu
if user_id in user_state:
del user_state[user_id]

async def how_to_credits(update: Update, context: ContextTypes.DEFAULT_TYPE) ->


None:
query = update.callback_query
await query.answer()
text = (
"💰 *How to get more Credits:*\n\n"
"1. *Refer New Users:* Share your unique referral link. When someone joins
our Telegram group using your link, you get 1 credit.\n"
"2. *Contact Admin:* You can contact the admin (`@devil_x_xrd`) to request
a credit top-up."
)
keyboard = [[InlineKeyboardButton("🔙 Back to Main Menu",
callback_data="main_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")

async def get_referral_link(update: Update, context: ContextTypes.DEFAULT_TYPE) ->


None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
referral_link = f"https://t.me/{BOT_USERNAME}?start=ref_{user_id}"
text = (
"🔗 *Your Referral Link:*\n"
f"`{referral_link}`\n\n"
"Share this link! When a new user joins our Telegram group using your link,
you'll receive 1 credit."
)
keyboard = [[InlineKeyboardButton("🔙 Back to Main Menu",
callback_data="main_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")

# Phone Lookup (Using NumVerify API) - Initiator


async def phone_lookup_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()

user_id = update.effective_user.id
user_data = get_user(user_id)

# Re-verify group membership just before proceeding to a paid feature


if not user_data['has_joined_group'] and not await
check_group_membership_api(user_id, context):
keyboard = [[InlineKeyboardButton("Join Our Telegram Group",
url=GROUP_INVITE_LINK)]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"🚫 You must join our Telegram group to use this bot. Please click the
button below to join:",
reply_markup=reply_markup
)
await asyncio.sleep(2)
await start(update, context)
return

if user_data['credits'] < 1:
await query.edit_message_text("❌ You don't have enough credits for this
search. Get more credits by referring users or contacting admin.",
parse_mode="Markdown")
await asyncio.sleep(2)
await start(update, context) # Return to main menu
return

await query.edit_message_text("📱 *Enter a 10-digit phone number (e.g.,


8835756477):*", parse_mode="Markdown")
user_state[user_id] = {'state': 'awaiting_phone', 'cost': 1} # Store cost for
refund logic

# Phone Lookup (Using NumVerify API) - Processor


async def process_phone_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -
> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'awaiting_phone':
return # Not in phone lookup state, ignore input

phone_raw = update.message.text
digits_only = ''.join(filter(str.isdigit, phone_raw))
if len(digits_only) >= 10:
phone = digits_only[-10:]
else:
await update.message.reply_text("❌ Please enter a valid 10-digit number.
The number you provided is too short after removing non-digits.",
parse_mode="Markdown")
# Do not clear state here, let user retry or hit /start
return

user_data = get_user(user_id)
if user_data['credits'] < 1: # Re-check credits just in case
await update.message.reply_text("❌ You don't have enough credits for this
search.", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context)
return

# Deduct credit before API call


add_or_update_user(user_id, credits=user_data['credits'] - 1)
await update.message.reply_text(f"⏳ Searching for phone number `{phone}`... 1
credit deducted. Your balance: `{user_data['credits'] - 1}`",
parse_mode="Markdown")

url = f"{NUMVERIFY_API_BASE_URL}{phone}"
search_successful_with_result = False # Flag to track if a useful result was
found

try:
response_data = await make_external_api_request(url, timeout=15)

results_to_display = []
if response_data and isinstance(response_data, dict) and 'result' in
response_data:
api_results_list = response_data['result']

if isinstance(api_results_list, list) and len(api_results_list) > 0:


unique_results_keys = set()
for result in api_results_list:
key_parts = (
result.get('mobile', '').strip().lower(),
result.get('name', '').strip().lower(),
result.get('fname', '').strip().lower(),
result.get('address', '').strip().lower(),
result.get('circle', '').strip().lower(), result.get('id',
'').strip().lower()
)
unique_key = tuple(key_parts)
if unique_key not in unique_results_keys:
unique_results_keys.add(unique_key)
results_to_display.append(result)

if results_to_display:
await update.message.reply_text(f"📱 *Found
{len(results_to_display)} unique entries for {phone}:*", parse_mode="Markdown")
for i, result in enumerate(results_to_display):
formatted_output = format_single_result(result, i + 1)
await update.message.reply_text(formatted_output,
parse_mode="Markdown")
search_successful_with_result = True # Result was found
else:
await update.message.reply_text(f"ℹ️ No information found for
the number {phone}. Try a different number.", parse_mode="Markdown")

elif api_results_list == "No matching users found":


await update.message.reply_text(f"ℹ️ No information found for the
number {phone}. Try a different number.", parse_mode="Markdown")
else:
await update.message.reply_text(f"ℹ️ No information found for the
number {phone}. Try a different number.", parse_mode="Markdown")

elif response_data and isinstance(response_data, dict) and


response_data.get("result") == "No matching users found":
await update.message.reply_text(f"ℹ️ No information found for the number
{phone}. Try a different number.", parse_mode="Markdown")
else:
await update.message.reply_text(f"ℹ️ No information found for the number
{phone}. Try a different number.", parse_mode="Markdown")

except Exception as e:
await update.message.reply_text(f"An error occurred during phone lookup:
{e}", parse_mode="Markdown")
logger.error(f"Error during phone lookup for {phone}: {e}")
finally:
# Refund credit if no useful results were found
if not search_successful_with_result:
current_user_data = get_user(user_id)
add_or_update_user(user_id, credits=current_user_data['credits'] + 1)
await update.message.reply_text("✅ No results found. 1 credit has been
refunded.", parse_mode="Markdown")

await asyncio.sleep(1)
await start(update, context) # Return to main menu

# Email Breach Check (HaveIBeenPwned) - Initiator


async def email_check_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
query = update.callback_query
await query.answer()

user_id = update.effective_user.id
user_data = get_user(user_id)

# Re-verify group membership just before proceeding to a paid feature


if not user_data['has_joined_group'] and not await
check_group_membership_api(user_id, context):
keyboard = [[InlineKeyboardButton("Join Our Telegram Group",
url=GROUP_INVITE_LINK)]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"🚫 You must join our Telegram group to use this bot. Please click the
button below to join:",
reply_markup=reply_markup
)
await asyncio.sleep(2)
await start(update, context)
return

if user_data['credits'] < 1:
await query.edit_message_text("❌ You don't have enough credits for this
search. Get more credits by referring users or contacting admin.",
parse_mode="Markdown")
await asyncio.sleep(2)
await start(update, context) # Return to main menu
return

await query.edit_message_text("📧 *Enter an email to check for breaches:*",


parse_mode="Markdown")
user_state[user_id] = {'state': 'awaiting_email', 'cost': 1} # Store cost for
refund logic

# Email Breach Check (HaveIBeenPwned) - Processor


async def process_email_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -
> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'awaiting_email':
return # Not in email lookup state, ignore input

email = update.message.text.strip()
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email): #
More robust email validation
await update.message.reply_text("❌ Please enter a valid email address.",
parse_mode="Markdown")
return

user_data = get_user(user_id)
if user_data['credits'] < 1: # Re-check credits
await update.message.reply_text("❌ You don't have enough credits for this
search.", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context)
return

# Deduct credit before API call


add_or_update_user(user_id, credits=user_data['credits'] - 1)
await update.message.reply_text(f"⏳ Searching for email `{email}`... 1 credit
deducted. Your balance: `{user_data['credits'] - 1}`", parse_mode="Markdown")

url = f"{HIBP_API_BASE_BASE_URL}{email}"
headers = {"User-Agent": "OSINT_Telegram_Bot/1.0"}
search_successful_with_result = False # Flag to track if a useful result was
found

try:
response_data = await make_external_api_request(url, headers=headers,
timeout=15)

if response_data and isinstance(response_data, list):


if response_data:
breach_list = "\n".join([f"- {b.get('Name', 'N/A')}
({b.get('BreachDate', 'N/A')})" for b in response_data])
await update.message.reply_text(f"🔓 *Breaches found for {email}:*\
n{breach_list}", parse_mode="Markdown")
search_successful_with_result = True # Result was found
else:
await update.message.reply_text(f"✅ No breaches found for
{email}.", parse_mode="Markdown")
elif response_data is None:
await update.message.reply_text(f"✅ No breaches found for {email}.",
parse_mode="Markdown")
else:
await update.message.reply_text(f"Could not retrieve breach information
for {email}. Try a different email.", parse_mode="Markdown")

except Exception as e:
await update.message.reply_text(f"An error occurred during email breach
check: {e}", parse_mode="Markdown")
logger.error(f"Error during email lookup for {email}: {e}")
finally:
# Refund credit if no useful results were found
if not search_successful_with_result:
current_user_data = get_user(user_id)
add_or_update_user(user_id, credits=current_user_data['credits'] + 1)
await update.message.reply_text("✅ No breaches found. 1 credit has been
refunded.", parse_mode="Markdown")

await asyncio.sleep(1)
await start(update, context) # Return to main menu

# General User Input Handler (for text messages)


async def handle_user_input(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
user_id = update.effective_user.id
current_state_info = user_state.get(user_id, {})
current_state = current_state_info.get('state')

# Basic check for admin commands input, ensure only admin can process
if user_id == ADMIN_ID:
if current_state == 'admin_increase_credit_id':
await admin_process_increase_credit_id(update, context)
return
elif current_state == 'admin_increase_credit_amount':
await admin_process_increase_credit_amount(update, context)
return
elif current_state == 'admin_decrease_credit_id':
await admin_process_decrease_credit_id(update, context)
return
elif current_state == 'admin_decrease_credit_amount':
await admin_process_decrease_credit_amount(update, context)
return
elif current_state == 'admin_blacklist_id':
await admin_process_blacklist_id(update, context)
return
elif current_state == 'admin_unblacklist_id':
await admin_process_unblacklist_id(update, context)
return
elif current_state == 'admin_add_admin_id':
await admin_process_add_admin_id(update, context)
return

# Process regular user inputs


if current_state == 'awaiting_phone':
await process_phone_input(update, context)
elif current_state == 'awaiting_email':
await process_email_input(update, context)
else:
await update.message.reply_text("I'm not expecting specific input right
now. Returning to main menu...", parse_mode="Markdown")
await asyncio.sleep(1)
await start(update, context)

# --- Admin Panel Handlers ---

async def admin_panel_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) ->


None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id

if user_id != ADMIN_ID: # Double check admin status


await query.edit_message_text("❌ You are not authorized to view the admin
panel.", parse_mode="Markdown")
await asyncio.sleep(2)
await start(update, context)
return

text = "⚙️ *Admin Panel*\n\nChoose an action:"


buttons = [
[InlineKeyboardButton("📊 View Users", callback_data="admin_view_users")],
[InlineKeyboardButton("⬆️ Increase Credit",
callback_data="admin_increase_credit")],
[InlineKeyboardButton("⬇️ Decrease Credit",
callback_data="admin_decrease_credit")],
[InlineKeyboardButton("🚫 Blacklist User",
callback_data="admin_blacklist_user")],
[InlineKeyboardButton("✅ Remove Blacklist",
callback_data="admin_unblacklist_user")],
[InlineKeyboardButton("➕ Add Admin", callback_data="admin_add_admin")],
[InlineKeyboardButton("🔙 Back to Main Menu", callback_data="main_menu")]
]
reply_markup = InlineKeyboardMarkup(buttons)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")

async def admin_view_users(update: Update, context: ContextTypes.DEFAULT_TYPE) ->


None:
query = update.callback_query
await query.answer()
users_data = get_all_users() # This now fetches username
if not users_data:
text = "No users in the database."
else:
text = "*User List:*\n\n"
for user_id, username, credits, is_blacklisted, is_admin in users_data: #
Unpack username
status = []
if is_admin:
status.append("👑 Admin")
if is_blacklisted:
status.append("🚫 Blacklisted")

status_str = f" ({', '.join(status)})" if status else ""


display_name = f"@{username}" if username else "N/A" # Display
@username or N/A
text += f"ID: `{user_id}` | User: {display_name} | Credits:
`{credits}`{status_str}\n"

keyboard = [[InlineKeyboardButton("🔙 Back to Admin Panel",


callback_data="admin_panel_menu")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup,
parse_mode="Markdown")

async def admin_increase_credit(update: Update, context: ContextTypes.DEFAULT_TYPE)


-> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_increase_credit_id'}
await query.edit_message_text("⬆️ *Enter the User ID to increase credits
for:*", parse_mode="Markdown")

async def admin_process_increase_credit_id(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_increase_credit_id':
return

target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return

target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (They might need to start the bot once).", parse_mode="Markdown")
del user_state[user_id] # Clear state
await asyncio.sleep(1)
await admin_panel_menu(update, context) # Go back to admin panel
return

user_state[user_id] = {'state': 'admin_increase_credit_amount', 'target_id':


target_user_id}
await update.message.reply_text(f"✅ User `{target_user_id}` found (Credits:
`{target_user_data['credits']}`).\n"
"⬆️ *Now enter the amount of credits to add:*",
parse_mode="Markdown")

async def admin_process_increase_credit_amount(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
current_state_info = user_state.get(user_id, {})
if current_state_info.get('state') != 'admin_increase_credit_amount': return

amount_str = update.message.text.strip()
if not amount_str.isdigit() or int(amount_str) <= 0:
await update.message.reply_text("❌ Invalid amount. Please enter a positive
numerical value.", parse_mode="Markdown")
return

amount = int(amount_str)
target_user_id = current_state_info['target_id']
target_user_data = get_user(target_user_id)

if target_user_data:
new_credits = target_user_data['credits'] + amount
add_or_update_user(target_user_id, credits=new_credits)
await update.message.reply_text(f"✅ Successfully added `{amount}` credits
to user `{target_user_id}`. New balance: `{new_credits}`", parse_mode="Markdown")
try: # Notify target user
await context.bot.send_message(chat_id=target_user_id, text=f"🎉 Your
credits have been increased by `{amount}`! Your new balance is `{new_credits}`.",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about credit
increase: {e}")
else:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (Perhaps deleted from bot?)", parse_mode="Markdown")

del user_state[user_id] # Clear state


await asyncio.sleep(1)
await admin_panel_menu(update, context)

async def admin_decrease_credit(update: Update, context: ContextTypes.DEFAULT_TYPE)


-> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_decrease_credit_id'}
await query.edit_message_text("⬇️ *Enter the User ID to decrease credits
for:*", parse_mode="Markdown")

async def admin_process_decrease_credit_id(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_decrease_credit_id':
return

target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return

target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (They might need to start the bot once).", parse_mode="Markdown")
del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)
return

user_state[user_id] = {'state': 'admin_decrease_credit_amount', 'target_id':


target_user_id}
await update.message.reply_text(f"✅ User `{target_user_id}` found (Credits:
`{target_user_data['credits']}`).\n"
"⬇️ *Now enter the amount of credits to
subtract:*", parse_mode="Markdown")

async def admin_process_decrease_credit_amount(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
current_state_info = user_state.get(user_id, {})
if current_state_info.get('state') != 'admin_decrease_credit_amount': return

amount_str = update.message.text.strip()
if not amount_str.isdigit() or int(amount_str) <= 0:
await update.message.reply_text("❌ Invalid amount. Please enter a positive
numerical value.", parse_mode="Markdown")
return

amount = int(amount_str)
target_user_id = current_state_info['target_id']
target_user_data = get_user(target_user_id)

if target_user_data:
new_credits = max(0, target_user_data['credits'] - amount) # Don't go below
0
add_or_update_user(target_user_id, credits=new_credits)
await update.message.reply_text(f"✅ Successfully deducted `{amount}`
credits from user `{target_user_id}`. New balance: `{new_credits}`",
parse_mode="Markdown")
try: # Notify target user
await context.bot.send_message(chat_id=target_user_id, text=f"📉 Your
credits have been decreased by `{amount}`. Your new balance is `{new_credits}`.",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about credit
decrease: {e}")
else:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found. (Perhaps deleted from bot?)", parse_mode="Markdown")

del user_state[user_id] # Clear state


await asyncio.sleep(1)
await admin_panel_menu(update, context)

async def admin_blacklist_user(update: Update, context: ContextTypes.DEFAULT_TYPE)


-> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_blacklist_id'}
await query.edit_message_text("🚫 *Enter the User ID to blacklist:*",
parse_mode="Markdown")

async def admin_process_blacklist_id(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_blacklist_id': return

target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return

target_user_id = int(target_user_id_str)
if target_user_id == ADMIN_ID:
await update.message.reply_text("🚫 Cannot blacklist the main admin.",
parse_mode="Markdown")
del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)
return

target_user_data = get_user(target_user_id)
if not target_user_data:
# If user not in DB, add them as blacklisted. They won't be able to start.
add_or_update_user(target_user_id, is_blacklisted=True, credits=0,
is_admin=False, has_joined_group=False, referral_bonus_given=False, username=None)
await update.message.reply_text(f"✅ User `{target_user_id}` added to
database and blacklisted.", parse_mode="Markdown")
elif target_user_data['is_blacklisted']:
await update.message.reply_text(f"ℹ️ User `{target_user_id}` is already
blacklisted.", parse_mode="Markdown")
else:
add_or_update_user(target_user_id, is_blacklisted=True)
await update.message.reply_text(f"✅ User `{target_user_id}` has been
blacklisted.", parse_mode="Markdown")
try:
await context.bot.send_message(chat_id=target_user_id, text="🚫 You have
been blacklisted from using this bot.", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about
blacklist: {e}")

del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)

async def admin_unblacklist_user(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_unblacklist_id'}
await query.edit_message_text("✅ *Enter the User ID to remove from
blacklist:*", parse_mode="Markdown")

async def admin_process_unblacklist_id(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_unblacklist_id': return

target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return

target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data:
await update.message.reply_text(f"❌ User with ID `{target_user_id}` not
found in database.", parse_mode="Markdown")
elif not target_user_data['is_blacklisted']:
await update.message.reply_text(f"ℹ️ User `{target_user_id}` is not
blacklisted.", parse_mode="Markdown")
else:
add_or_update_user(target_user_id, is_blacklisted=False)
await update.message.reply_text(f"✅ User `{target_user_id}` has been
removed from blacklist.", parse_mode="Markdown")
try:
await context.bot.send_message(chat_id=target_user_id, text="✅ You have
been unblacklisted and can now use the bot.", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about
unblacklist: {e}")

del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)

async def admin_add_admin(update: Update, context: ContextTypes.DEFAULT_TYPE) ->


None:
query = update.callback_query
await query.answer()
user_id = update.effective_user.id
user_state[user_id] = {'state': 'admin_add_admin_id'}
await query.edit_message_text("➕ *Enter the User ID to grant admin
privileges:*", parse_mode="Markdown")

async def admin_process_add_admin_id(update: Update, context:


ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_state.get(user_id, {}).get('state') != 'admin_add_admin_id': return

target_user_id_str = update.message.text.strip()
if not target_user_id_str.isdigit():
await update.message.reply_text("❌ Invalid User ID. Please enter a
numerical User ID.", parse_mode="Markdown")
return

target_user_id = int(target_user_id_str)
target_user_data = get_user(target_user_id)
if not target_user_data: # If user not in DB, add them and make admin
add_or_update_user(target_user_id, is_admin=True, credits=0,
is_blacklisted=False, has_joined_group=False, referral_bonus_given=False,
username=None)
await update.message.reply_text(f"✅ User `{target_user_id}` created in DB
and granted admin privileges.", parse_mode="Markdown")
elif target_user_data['is_admin']:
await update.message.reply_text(f"ℹ️ User `{target_user_id}` is already an
admin.", parse_mode="Markdown")
else:
add_or_update_user(target_user_id, is_admin=True)
await update.message.reply_text(f"✅ User `{target_user_id}` has been
granted admin privileges.", parse_mode="Markdown")
try:
await context.bot.send_message(chat_id=target_user_id, text="👑 You have
been granted admin privileges for this bot!", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not notify user {target_user_id} about admin
status: {e}")

del user_state[user_id]
await asyncio.sleep(1)
await admin_panel_menu(update, context)

# --- Chat Member Updates Handler (for Group Join Tracking) ---
async def track_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
None:
# Ensure this is a chat member update for the specified group
if update.effective_chat.id != GROUP_CHAT_ID:
return

user_id = update.chat_member.new_chat_member.user.id
user_username = update.chat_member.new_chat_member.user.username
user_full_name = update.chat_member.new_chat_member.user.full_name

# Ignore bot's own updates or updates for other bots


if update.chat_member.new_chat_member.user.is_bot:
return

user_data = get_user(user_id)

# Handle user joining


if update.chat_member.new_chat_member.status in ['member', 'administrator',
'creator']:
logger.info(f"User {user_id} joined/is member of group {GROUP_CHAT_ID}.")

# If user is new to DB or their has_joined_group flag is false


if not user_data or not user_data['has_joined_group']:
# Update user's username and joined status
add_or_update_user(user_id, username=user_username,
has_joined_group=True)

# Give joining bonus if new user or hadn't received it (credits == 0 is


a simple heuristic)
if not user_data or user_data['credits'] == 0:
add_or_update_user(user_id, credits=1)
try:
await context.bot.send_message(chat_id=user_id, text="🎉
Welcome! You've received 1 bonus credit for joining our group!",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not send joining bonus message to user
{user_id}: {e}")

# Process referral bonus if applicable and not already given


if user_data and user_data['referrer_id'] and not
user_data['referral_bonus_given']:
referrer_id = user_data['referrer_id']
referrer_data = get_user(referrer_id)
if referrer_data:
add_or_update_user(referrer_id,
credits=referrer_data['credits'] + 1)
add_or_update_user(user_id, referral_bonus_given=True) # Mark
bonus as given for this user
try:
await context.bot.send_message(chat_id=referrer_id,
text=f"💰 Your referral ({user_full_name}) joined the group! You've received 1
credit!", parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not send referral bonus message to
referrer {referrer_id}: {e}")
logger.info(f"Referral bonus given for {user_id} to {referrer_id}
via group join.")
else:
# User already in DB and marked as joined, just update username if it
changed
if user_data.get('username') != user_username:
add_or_update_user(user_id, username=user_username)
logger.info(f"User {user_id} already marked as joined, no credit action
on group join. Username updated if needed.")

# Handle user leaving (optional, but good for accuracy)


elif update.chat_member.old_chat_member.status in ['member', 'administrator',
'creator'] and \
update.chat_member.new_chat_member.status in ['left', 'kicked', 'banned']:
logger.info(f"User {user_id} left/was removed from group {GROUP_CHAT_ID}.
Updating DB.")
if user_data:
add_or_update_user(user_id, has_joined_group=False)
try:
await context.bot.send_message(chat_id=user_id, text="ℹ️ You have
left the group. Some bot features might be restricted until you rejoin.",
parse_mode="Markdown")
except Exception as e:
logger.warning(f"Could not send leave group message to user
{user_id}: {e}")
else:
logger.warning(f"User {user_id} left group but not found in DB.")

# --- Main Bot Setup ---


def main() -> None:
init_db() # Initialize the database

if not BOT_TOKEN:
logger.error("Error: Telegram Bot Token not found. Please set the
'BOT_TOKEN' environment variable or provide it directly.")
return

application =
Application.builder().token(BOT_TOKEN).read_timeout(20).write_timeout(20).pool_time
out(5).build()

# Command Handlers
application.add_handler(CommandHandler("start", start))

# Callback Query Handlers (for inline keyboard buttons)


application.add_handler(CallbackQueryHandler(phone_lookup_menu,
pattern="^phone_lookup_menu$"))
application.add_handler(CallbackQueryHandler(email_check_menu,
pattern="^email_check_menu$"))
application.add_handler(CallbackQueryHandler(how_to_credits,
pattern="^how_to_credits$"))
application.add_handler(CallbackQueryHandler(get_referral_link,
pattern="^get_referral_link$"))
application.add_handler(CallbackQueryHandler(admin_panel_menu,
pattern="^admin_panel_menu$")) # Admin Panel main menu
application.add_handler(CallbackQueryHandler(admin_view_users,
pattern="^admin_view_users$"))
application.add_handler(CallbackQueryHandler(admin_increase_credit,
pattern="^admin_increase_credit$"))
application.add_handler(CallbackQueryHandler(admin_decrease_credit,
pattern="^admin_decrease_credit$"))
application.add_handler(CallbackQueryHandler(admin_blacklist_user,
pattern="^admin_blacklist_user$"))
application.add_handler(CallbackQueryHandler(admin_unblacklist_user,
pattern="^admin_unblacklist_user$"))
application.add_handler(CallbackQueryHandler(admin_add_admin,
pattern="^admin_add_admin$"))
application.add_handler(CallbackQueryHandler(start, pattern="^main_menu$")) #
Back to Main Menu

# Message Handler (for text input when awaiting specific data from user)
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND,
handle_user_input))

# Chat Member Updates Handler (for tracking group joins/leaves)


application.add_handler(ChatMemberHandler(track_chat_members,
ChatMemberHandler.CHAT_MEMBER))

logger.info("Bot started polling...")


application.run_polling(allowed_updates=Update.ALL_TYPES)

if __name__ == "__main__":
main()

You might also like