#!
/usr/bin/env python3
"""
bot_uploader_all_in_one.py
Single-file Discord bot that uploads Shirt/Pants templates to a Roblox group using
a mandatory .ROBLOSECURITY cookie.
Config (edit these):
DISCORD_TOKEN = "<PASTE_YOUR_DISCORD_BOT_TOKEN_HERE>"
COOKIE_DEFAULT = "<PASTE_YOUR_.ROBLOSECURITY_HERE>"
Commands (prefix $):
$S <groupId_or_link> -> upload attached image as Shirt
$P <groupId_or_link> -> upload attached image as Pants
Requirements:
pip install discord.py requests requests-toolbelt
(If you don't want requests-toolbelt it's optional; code falls back to requests)
"""
import os
import re
import time
import json
import shutil
import tempfile
import asyncio
from pathlib import Path
from typing import Optional, Callable, Tuple, Dict, Any, List
import requests
try:
from requests_toolbelt.multipart.encoder import MultipartEncoder
except Exception:
MultipartEncoder = None
import discord
from discord.ext import commands
# ------------------ USER CONFIG (EDIT) ------------------
# Paste your bot token and cookie here (be careful: secrets!)
DISCORD_TOKEN = "PASTE_YOUR_DISCORD_TOKEN_HERE"
COOKIE_DEFAULT = "PASTE_YOUR_.ROBLOSECURITY_HERE"
# You can instead set DISCORD_TOKEN/DEFAULT_COOKIE as env variables and empty the
strings above.
# ------------------ Other settings -----------------------
BOT_PREFIX = "$"
PRICE_ROBUX = 5
REQUEST_TIMEOUT = 40
# endpoints to try for creating assets (xool-style + common guesses)
CREATE_ENDPOINTS = [
"https://www.roblox.com/asset/upload",
"https://www.roblox.com/creations/upload",
"https://develop.roblox.com/v1/assets/upload",
"https://data.roblox.com/Data/Upload.ashx",
# (OpenCloud is handled separately if you want to add it)
]
# endpoints to try for setting sale (best-effort guesses)
SALE_ENDPOINTS = [
"https://economy.roblox.com/v1/assets/{assetId}/resale",
"https://marketplace.roblox.com/v1/items/{assetId}/sell",
"https://marketplace.roblox.com/v1/assets/{assetId}/publish",
"https://develop.roblox.com/v1/assets/{assetId}/sales",
]
# ------------------ util helpers ------------------------
def sanitize_cookie(raw: str) -> Optional[str]:
if not raw:
return None
s = raw.strip()
# handle _|WARNING:...|_<TOKEN>
m = re.search(r'_\|WARNING:[^\|]*\|_(.+)', s)
if m:
token = m.group(1).split()[0].split(";")[0]
return token
m2 = re.search(r'ROBLOSECURITY\s*=\s*([^; \n\r]+)', s)
if m2:
return m2.group(1)
# if looks long enough, assume it's token
if len(s) > 30:
return s
return None
def extract_group_id(text: str) -> Optional[str]:
if not text:
return None
txt = text.strip()
m = re.match(r"^(\d+)$", txt)
if m:
return m.group(1)
m = re.search(r"/groups/(\d+)", txt)
if m:
return m.group(1)
m = re.search(r"/communities/(\d+)", txt)
if m:
return m.group(1)
m = re.search(r"(\d{5,})", txt)
if m:
return m.group(1)
return None
# ------------------ Xool-like cookie wrapper -----------------
class CookieHelper:
"""Simple helper to get x-csrf-token and user id using the cookie."""
def __init__(self, cookie: str):
self.cookie = cookie
self._token = None
self._token_ts = 0.0
self.user_id = None
def session(self) -> requests.Session:
s = requests.Session()
s.headers.update({"User-Agent": "uploader-bot/1.0",
"Accept":"application/json, text/plain, */*"})
s.cookies.set(".ROBLOSECURITY", self.cookie, domain=".roblox.com")
return s
def obtain_token(self) -> Optional[str]:
# refresh if older than 100s
if self._token and (time.time() - self._token_ts) < 100:
return self._token
try:
s = self.session()
r = s.post("https://auth.roblox.com/v1/logout", timeout=10)
t = r.headers.get("x-csrf-token") or r.headers.get("X-CSRF-TOKEN")
if t:
self._token = t
self._token_ts = time.time()
return t
except Exception:
pass
# try upload endpoints
for ep in CREATE_ENDPOINTS:
try:
s = self.session()
r = s.post(ep, timeout=8)
t = r.headers.get("x-csrf-token") or r.headers.get("X-CSRF-TOKEN")
if t:
self._token = t
self._token_ts = time.time()
return t
except Exception:
continue
return None
def get_user_id(self) -> Optional[int]:
if self.user_id:
return self.user_id
try:
s = self.session()
r = s.get("https://users.roblox.com/v1/users/authenticated", timeout=8)
if r.status_code == 200:
j = r.json()
uid = j.get("id") or j.get("Id")
if uid:
self.user_id = int(uid)
return self.user_id
except Exception:
pass
try:
s = self.session()
r2 = s.get("https://www.roblox.com/mobileapi/userinfo", timeout=8)
if r2.status_code == 200:
j2 = r2.json()
if "UserID" in j2:
self.user_id = int(j2["UserID"])
return self.user_id
except Exception:
pass
return None
# ------------------ Core upload logic (blocking) --------------
def create_asset_attempts(file_path: str, group_id: str, kind: str, cookie: str) ->
Dict[str,Any]:
"""
Blocking function that tries multiple endpoints to upload asset.
Returns dict with results: success(bool), assetId (maybe), results(list of per-
endpoint dict).
"""
cookie_clean = sanitize_cookie(cookie)
if not cookie_clean:
return {"ok": False, "error": "invalid_cookie"}
ch = CookieHelper(cookie_clean)
# build a simple request payload (xool-style)
request_payload = {
"displayName": f"{kind} upload {int(time.time())}",
"description": f"Uploaded via Discord bot ({kind})",
"creationContext": {"creator": {"groupId": int(group_id)}},
}
# try to get user id
uid = ch.get_user_id()
if uid:
request_payload["publisherUserId"] = uid
results = []
for ep in CREATE_ENDPOINTS:
try:
session = ch.session()
token = ch.obtain_token()
if token:
session.headers.update({"x-csrf-token": token})
# Build multipart: 'request' json + file
mime = "application/octet-stream"
try:
import mimetypes
mt, _ = mimetypes.guess_type(file_path)
if mt:
mime = mt
except Exception:
pass
# prefer MultipartEncoder for correct content-type boundary
if MultipartEncoder:
fields = {
"request": ("request", json.dumps(request_payload),
"application/json"),
"file": (Path(file_path).name, open(file_path, "rb"), mime)
}
m = MultipartEncoder(fields=fields)
headers = {"Content-Type": m.content_type}
# include token header if present
if token:
headers["x-csrf-token"] = token
session.headers.update(headers)
r = session.post(ep, data=m, timeout=60)
# close file
try:
fields["file"][1].close()
except Exception:
pass
else:
# fallback to requests files
files = {"file": (Path(file_path).name, open(file_path, "rb"),
mime)}
data = {"request": json.dumps(request_payload)}
if token:
session.headers.update({"x-csrf-token": token})
r = session.post(ep, data=data, files=files, timeout=60)
try:
files["file"][1].close()
except Exception:
pass
status = r.status_code
txt = r.text[:1500] if r.text else ""
parsed = None
try:
parsed = r.json()
except Exception:
parsed = None
# if 403 and header has x-csrf-token, retry once
if status == 403:
hdr_tok = r.headers.get("x-csrf-token") or r.headers.get("X-CSRF-
TOKEN")
if hdr_tok:
# retry with new token
session.headers.update({"x-csrf-token": hdr_tok})
if MultipartEncoder:
fields["file"] = (Path(file_path).name, open(file_path,
"rb"), mime)
m2 = MultipartEncoder(fields=fields)
session.headers.update({"Content-Type": m2.content_type})
r2 = session.post(ep, data=m2, timeout=60)
try:
fields["file"][1].close()
except Exception:
pass
status = r2.status_code
txt = r2.text[:1500] if r2.text else ""
try:
parsed = r2.json()
except Exception:
parsed = None
else:
f2 = open(file_path, "rb")
r2 = session.post(ep, data={"request":
json.dumps(request_payload)}, files={"file": (Path(file_path).name, f2, mime)},
timeout=60)
try:
f2.close()
except Exception:
pass
status = r2.status_code
txt = r2.text[:1500] if r2.text else ""
try:
parsed = r2.json()
except Exception:
parsed = None
info = {"endpoint": ep, "status": status, "text_snippet": txt, "json":
parsed}
results.append(info)
# Try extract asset id
asset_id = None
if isinstance(parsed, dict):
for key in ("id","assetId","asset_id","createdAssetId"):
if key in parsed and parsed[key]:
asset_id = str(parsed[key])
break
# search values
if not asset_id:
for v in parsed.values():
try:
if isinstance(v, (int,str)) and re.match(r"^\d{5,}$",
str(v)):
asset_id = str(v); break
except Exception:
continue
if not asset_id:
m = re.search(r"(\d{5,})", txt or "")
if m:
asset_id = m.group(1)
if asset_id:
return {"ok": True, "assetId": asset_id, "results": results}
# if status indicates success but no asset id, return success candidate
if status in (200,201,202):
return {"ok": True, "assetId": None, "results": results}
# else continue to next endpoint
except Exception as e:
results.append({"endpoint": ep, "exception": str(e)})
continue
# all endpoints tried
return {"ok": False, "error": "all_failed", "results": results}
# ------------------ set price (blocking) --------------------
def set_asset_for_sale(asset_id: str, price: int, cookie: str) -> Dict[str,Any]:
cookie_clean = sanitize_cookie(cookie)
if not cookie_clean:
return {"ok": False, "error": "invalid_cookie"}
s = requests.Session()
s.cookies.set(".ROBLOSECURITY", cookie_clean, domain=".roblox.com")
# try each sale endpoint with POST/PATCH/PUT
for tmpl in SALE_ENDPOINTS:
ep = tmpl.format(assetId=asset_id)
for method in ("POST","PATCH","PUT"):
try:
payload = {"price": price, "isForSale": True}
headers = {"Content-Type":"application/json"}
r = s.request(method, ep, json=payload, headers=headers,
timeout=30)
if r.status_code in (200,201,204):
return {"ok": True, "endpoint": ep, "method": method, "status":
r.status_code, "text": (r.text[:800] if r.text else "")}
# if 403 and header token -> retry
if r.status_code == 403:
tok = r.headers.get("x-csrf-token") or r.headers.get("X-CSRF-
TOKEN")
if tok:
s.headers.update({"x-csrf-token": tok})
r2 = s.request(method, ep, json=payload, headers=headers,
timeout=30)
if r2.status_code in (200,201,204):
return {"ok": True, "endpoint": ep, "method": method,
"status": r2.status_code}
else:
return {"ok": False, "endpoint": ep, "method": method,
"status": r2.status_code, "text": (r2.text[:500] if r2.text else "")}
# try parse JSON for success
try:
j = r.json()
if j.get("success") or j.get("isForSale") or j.get("status") in
("ok","success"):
return {"ok": True, "endpoint": ep, "method": method,
"status": r.status_code, "json": j}
except Exception:
pass
except Exception as e:
continue
return {"ok": False, "error": "sale_failed"}
# ------------------ Discord bot (async) ---------------------
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix=BOT_PREFIX, intents=intents)
# small helper to safely send and optionally auto-delete debug messages
async def send_debug(channel: discord.abc.Messageable, text: str, delete_after:
Optional[float] = 18.0):
try:
msg = await channel.send(f"```prolog\n{text[:1800]}\n```")
if delete_after:
try:
await asyncio.sleep(delete_after)
await msg.delete()
except Exception:
pass
except Exception:
pass
@bot.event
async def on_ready():
print("[ready] Bot connected as", bot.user, "id:", bot.user.id)
async def process_upload(ctx: commands.Context, attachment_path: str, group_arg:
str, kind: str):
# sanitize cookie (always use COOKIE_DEFAULT)
raw_cookie = COOKIE_DEFAULT.strip() or os.getenv("DEFAULT_COOKIE", "").strip()
cookie = sanitize_cookie(raw_cookie)
if not cookie:
await ctx.reply("❌ COOKIE_DEFAULT no configurada correctamente. Edita el
archivo y pega tu .ROBLOSECURITY en COOKIE_DEFAULT (o define DEFAULT_COOKIE env).",
mention_author=False)
return
gid = extract_group_id(group_arg)
if not gid:
await ctx.reply("❌ No pude extraer ID de grupo válido del argumento.",
mention_author=False)
return
# send initial progress
status = await ctx.reply(f"➡️ Iniciando upload ({'Shirt' if kind=='S' else
'Pants'}) a grupo `{gid}` — intentando endpoints... (esto puede tardar).",
mention_author=False)
loop = asyncio.get_running_loop()
# run blocking create attempts in executor
res = await loop.run_in_executor(None, create_asset_attempts, attachment_path,
gid, ("Shirt" if kind=="S" else "Pants"), cookie)
# report per-endpoint debug
if "results" in res:
for e in res["results"]:
# compress the debug info
ep = e.get("endpoint", "<no-endpoint>")
status_code = e.get("status", e.get("exception", "exc"))
snippet = e.get("text_snippet") or (e.get("exception") or str(e) )
await send_debug(ctx.channel, f"Endpoint: {ep}\nStatus: {status_code}\
nSnippet: {snippet}")
if not res.get("ok"):
await status.edit(content=f"❌ Upload falló. Info: {res.get('error','no-
info')}. Revisa los mensajes de debug (se autodestruyeron).")
return
asset_id = res.get("assetId")
if not asset_id:
# We succeeded but didn't get an assetId — show candidate and return
await status.edit(content=f"⚠️ Upload aparentemente exitoso pero no se
devolvió assetId. Revisa los mensajes de debug y el panel del grupo.")
return
await status.edit(content=f"✅ Upload completado. AssetId: `{asset_id}`.
Intentando poner a la venta por {PRICE_ROBUX} Robux...")
sale = await loop.run_in_executor(None, set_asset_for_sale, asset_id,
PRICE_ROBUX, cookie)
if sale.get("ok"):
await status.edit(content=f"✅ Upload y venta completados.\nAssetId:
`{asset_id}`\nVenta: activada por {PRICE_ROBUX} Robux.\nDetalles venta:
{sale.get('endpoint')} {sale.get('method')} status {sale.get('status')}")
else:
await send_debug(ctx.channel, f"Sale attempt failed: {json.dumps(sale,
ensure_ascii=False)[:1200]}")
await status.edit(content=f"⚠️ Upload completado pero no se pudo activar la
venta. Revisa mensajes de debug.")
# Commands
@bot.command(name="S")
async def cmd_shirt(ctx, group: str = None):
if not group:
await ctx.reply("Uso: $S <groupId|groupLink> (adjunta la imagen en el
mismo mensaje).", mention_author=False)
return
if not ctx.message.attachments:
await ctx.reply("❌ Adjunta la imagen en el mismo mensaje del comando.",
mention_author=False)
return
att = ctx.message.attachments[0]
td = tempfile.mkdtemp()
local = os.path.join(td, att.filename)
try:
await att.save(local)
except Exception as e:
await ctx.reply(f"❌ Error guardando adjunto: {e}", mention_author=False)
shutil.rmtree(td, ignore_errors=True)
return
try:
await process_upload(ctx, local, group, "S")
finally:
shutil.rmtree(td, ignore_errors=True)
@bot.command(name="P")
async def cmd_pants(ctx, group: str = None):
if not group:
await ctx.reply("Uso: $P <groupId|groupLink> (adjunta la imagen en el
mismo mensaje).", mention_author=False)
return
if not ctx.message.attachments:
await ctx.reply("❌ Adjunta la imagen en el mismo mensaje del comando.",
mention_author=False)
return
att = ctx.message.attachments[0]
td = tempfile.mkdtemp()
local = os.path.join(td, att.filename)
try:
await att.save(local)
except Exception as e:
await ctx.reply(f"❌ Error guardando adjunto: {e}", mention_author=False)
shutil.rmtree(td, ignore_errors=True)
return
try:
await process_upload(ctx, local, group, "P")
finally:
shutil.rmtree(td, ignore_errors=True)
@bot.event
async def on_command_error(ctx, error):
try:
await ctx.author.send(f"Error procesando comando: {type(error).__name__}")
except Exception:
pass
print("on_command_error:", repr(error))
import traceback as tb
tb.print_exc()
# ------------------ run bot -----------------------
if __name__ == "__main__":
token = DISCORD_TOKEN or os.getenv("DISCORD_TOKEN")
if not token or token.startswith("PASTE"):
print("ERROR: configura DISCORD_TOKEN en el archivo o en la variable de
entorno DISCORD_TOKEN.")
raise SystemExit(1)
# sanitize cookie early so we can warn
if not sanitize_cookie(COOKIE_DEFAULT or os.getenv("DEFAULT_COOKIE","")):
print("WARNING: COOKIE_DEFAULT parece no estar configurada correctamente.
El bot fallará hasta que pongas una cookie válida.")
print("Iniciando bot... (asegúrate Message Content Intent activado).")
bot.run(token)