#!
/usr/bin/env python3
"""
bot_uploader_all_in_one_v3.py
Versión actualizada: fuerza X-CSRF en todos los POST, añade Referer/Origin/X-
Requested-With,
prueba POST JSON para get-upload-fee y múltiples variantes de multipart upload.
"""
import os
import re
import time
import json
import shutil
import tempfile
import asyncio
from pathlib import Path
from typing import Optional, Dict, Any
import requests
try:
from requests_toolbelt.multipart.encoder import MultipartEncoder
except Exception:
MultipartEncoder = None
import discord
from discord.ext import commands
# ------------------ USER CONFIG (EDIT) ------------------
DISCORD_TOKEN = "PASTE_YOUR_DISCORD_TOKEN_HERE" # <--- Pega tu token
COOKIE_DEFAULT = "PASTE_YOUR_.ROBLOSECURITY_HERE" # <--- Pega tu .ROBLOSECURITY
# ------------------ SETTINGS ----------------------------
BOT_PREFIX = "$"
PRICE_ROBUX = 5
REQUEST_TIMEOUT = 40
AVATAR_TYPE_IDS = {"S": 11, "P": 12}
LEGACY_ENDPOINTS = [
"https://www.roblox.com/asset/upload",
"https://www.roblox.com/creations/upload",
"https://data.roblox.com/Data/Upload.ashx",
"https://develop.roblox.com/v1/assets/upload",
]
PUBLISH_ENDPOINTS = [
"https://publish.roblox.com/v1/assets/upload",
"https://apis.roblox.com/assets/v1/assets",
]
def itemconfig_fee_ep(type_id: int) -> str:
return f"https://itemconfiguration.roblox.com/v1/avatar-assets/{type_id}/get-
upload-fee"
def itemconfig_upload_ep(type_id: int) -> str:
return f"https://itemconfiguration.roblox.com/v1/avatar-assets/{type_id}/
upload"
SALE_ENDPOINTS = [
"https://economy.roblox.com/v1/assets/{assetId}/resale",
"https://marketplace.roblox.com/v1/items/{assetId}/sell",
"https://marketplace.roblox.com/v1/assets/{assetId}/publish",
"https://develop.roblox.com/v1/assets/{assetId}/sales",
]
# ------------------ util --------------------------------
def sanitize_cookie(raw: str) -> Optional[str]:
if not raw:
return None
s = raw.strip()
m = re.search(r'_\|WARNING:[^\|]*\|_(.+)', s)
if m:
token = m.group(1).split()[0].split(";")[0]
return token
m2 = re.search(r'ROBLOSECURITY\s*=\s*([^; \n\r]+)', s)
if m2:
return m2.group(1)
if len(s) > 30:
return s
return None
def extract_group_id(text: str) -> Optional[str]:
if not text:
return None
txt = text.strip()
m = re.match(r"^(\d+)$", txt)
if m:
return m.group(1)
m = re.search(r"/groups/(\d+)", txt)
if m:
return m.group(1)
m = re.search(r"/communities/(\d+)", txt)
if m:
return m.group(1)
m = re.search(r"(\d{5,})", txt)
if m:
return m.group(1)
return None
# ------------------ Cookie helper -----------------------
class CookieHelper:
def __init__(self, cookie: str):
self.cookie = cookie
self._token = None
self._token_ts = 0.0
self.user_id = None
def session(self) -> requests.Session:
s = requests.Session()
s.headers.update({
"User-Agent": "uploader-bot/1.0",
"Accept": "application/json, text/plain, */*",
"Referer": "https://www.roblox.com",
"Origin": "https://www.roblox.com",
"x-requested-with": "XMLHttpRequest"
})
s.cookies.set(".ROBLOSECURITY", self.cookie, domain=".roblox.com")
return s
def obtain_token(self) -> Optional[str]:
# refresh token every 100s
if self._token and (time.time() - self._token_ts) < 100:
return self._token
try:
s = self.session()
r = s.post("https://auth.roblox.com/v1/logout", timeout=10)
token = r.headers.get("x-csrf-token") or r.headers.get("X-CSRF-TOKEN")
if token:
self._token = token
self._token_ts = time.time()
return token
except Exception:
pass
# try endpoints
for ep in LEGACY_ENDPOINTS + PUBLISH_ENDPOINTS:
try:
s = self.session()
r = s.post(ep, timeout=8)
token = r.headers.get("x-csrf-token") or r.headers.get("X-CSRF-
TOKEN")
if token:
self._token = token
self._token_ts = time.time()
return token
except Exception:
continue
return None
def get_user_id(self) -> Optional[int]:
if self.user_id:
return self.user_id
try:
s = self.session()
r = s.get("https://users.roblox.com/v1/users/authenticated", timeout=8)
if r.status_code == 200:
j = r.json()
self.user_id = int(j.get("id"))
return self.user_id
except Exception:
pass
return None
# ------------------ Upload flows (blocking) ----------------
def try_itemconfig_flow(file_path: str, group_id: str, type_id: int, cookie: str) -
> Dict[str,Any]:
out = {"flow":"itemconfig", "attempts": []}
cookie_clean = sanitize_cookie(cookie)
if not cookie_clean:
return {"ok": False, "error": "invalid_cookie"}
ch = CookieHelper(cookie_clean)
s = ch.session()
fee_ep = itemconfig_fee_ep(type_id)
# try GET
try:
r = s.get(fee_ep, params={"groupId": group_id}, timeout=12)
out["attempts"].append({"endpoint": fee_ep+"?groupId=..", "status":
r.status_code, "text": r.text[:1200], "headers": {k:v for k,v in
r.headers.items()}})
except Exception as e:
out["attempts"].append({"endpoint": fee_ep+"?groupId=..", "exception":
str(e)})
# try POST JSON (some services expect POST)
try:
headers = {"Content-Type": "application/json"}
token = ch.obtain_token()
if token:
headers["x-csrf-token"] = token
r2 = s.post(fee_ep, json={"groupId": int(group_id)}, headers=headers,
timeout=12)
out["attempts"].append({"endpoint": fee_ep+" (POST JSON)", "status":
r2.status_code, "text": r2.text[:1200], "headers": {k:v for k,v in
r2.headers.items()}})
except Exception as e:
out["attempts"].append({"endpoint": fee_ep+" (POST JSON)", "exception":
str(e)})
# now upload variations
upload_ep = itemconfig_upload_ep(type_id)
try:
token = ch.obtain_token()
headers = {"x-csrf-token": token} if token else {}
headers.update({"Referer":"https://www.roblox.com","Origin":"https://
www.roblox.com","x-requested-with":"XMLHttpRequest"})
request_payload = {"displayName": f"Uploaded {int(time.time())}",
"description": "Uploaded via bot", "creationContext": {"creator": {"groupId":
int(group_id)}}}
uid = ch.get_user_id()
if uid:
request_payload["publisherUserId"] = uid
mime = "application/octet-stream"
try:
import mimetypes
mt, _ = mimetypes.guess_type(file_path)
if mt: mime = mt
except Exception:
pass
# Try multipart with request JSON + file, with groupId param
if MultipartEncoder:
fields = {"request": ("request", json.dumps(request_payload),
"application/json"),
"file": (Path(file_path).name, open(file_path, "rb"), mime)}
m = MultipartEncoder(fields=fields)
headers["Content-Type"] = m.content_type
s.headers.update(headers)
r3 = s.post(upload_ep, data=m, params={"groupId": group_id},
timeout=90)
try: fields["file"][1].close()
except: pass
out["attempts"].append({"endpoint": upload_ep+" (multipart
request+file, params groupId)", "status": r3.status_code, "text": r3.text[:1500],
"headers": {k:v for k,v in r3.headers.items()}})
else:
files = {"file": (Path(file_path).name, open(file_path, "rb"), mime)}
data = {"request": json.dumps(request_payload)}
s.headers.update(headers)
r3 = s.post(upload_ep, data=data, files=files, params={"groupId":
group_id}, timeout=90)
try: files["file"][1].close()
except: pass
out["attempts"].append({"endpoint": upload_ep+" (form request+file,
params groupId)", "status": r3.status_code, "text": r3.text[:1500], "headers": {k:v
for k,v in r3.headers.items()}})
# If 403 XSRF, try again with explicit token header from response
if r3.status_code == 403:
hdr_tok = r3.headers.get("x-csrf-token") or r3.headers.get("X-CSRF-
TOKEN")
if hdr_tok:
headers["x-csrf-token"] = hdr_tok
s.headers.update(headers)
# retry simple POST with file only
files2 = {"file": (Path(file_path).name, open(file_path, "rb"),
mime)}
r4 = s.post(upload_ep, files=files2, params={"groupId": group_id},
timeout=80)
try: files2["file"][1].close()
except: pass
out["attempts"].append({"endpoint": upload_ep+" (retry with
hdr_tok)", "status": r4.status_code, "text": r4.text[:1500], "headers": {k:v for
k,v in r4.headers.items()}})
# examine last response r3/r4
last = out["attempts"][-1]
txt = last.get("text","")
parsed = None
try:
parsed = json.loads(txt) if txt else None
except Exception:
parsed = None
# attempt to find asset id
aid = None
if isinstance(parsed, dict):
for k in ("assetId","id","createdAssetId"):
if parsed.get(k):
aid = str(parsed.get(k)); break
if not aid:
m = re.search(r"(\d{5,})", txt or "")
if m: aid = m.group(1)
if aid:
return {"ok": True, "assetId": aid, "attempts": out["attempts"]}
# otherwise return failure with attempts
return {"ok": False, "attempts": out["attempts"], "status":
last.get("status"), "text": last.get("text")}
except Exception as e:
out["attempts"].append({"endpoint": upload_ep, "exception": str(e)})
return {"ok": False, "attempts": out["attempts"], "exception": str(e)}
def try_publish_flow(file_path: str, group_id: str, kind: str, cookie: str) ->
Dict[str,Any]:
attempts = []
cookie_clean = sanitize_cookie(cookie)
if not cookie_clean:
return {"ok": False, "error": "invalid_cookie"}
for ep in PUBLISH_ENDPOINTS:
try:
s = requests.Session()
s.cookies.set(".ROBLOSECURITY", cookie_clean, domain=".roblox.com")
# get token
r0 = s.post(ep, timeout=8)
token = r0.headers.get("x-csrf-token") or r0.headers.get("X-CSRF-
TOKEN")
headers =
{"Referer":"https://www.roblox.com","Origin":"https://www.roblox.com","x-requested-
with":"XMLHttpRequest"}
if token:
headers["x-csrf-token"] = token
# try multipart as before
mime = "application/octet-stream"
try:
import mimetypes
mt, _ = mimetypes.guess_type(file_path)
if mt: mime = mt
except:
pass
if MultipartEncoder:
fields = {"request": ("request", json.dumps({"name": f"{kind}
{int(time.time())}"}), "application/json"),
"file": (Path(file_path).name, open(file_path, "rb"),
mime)}
m = MultipartEncoder(fields=fields)
headers["Content-Type"] = m.content_type
s.headers.update(headers)
r = s.post(ep, data=m, timeout=60)
try: fields["file"][1].close()
except: pass
else:
files = {"file": (Path(file_path).name, open(file_path, "rb"),
mime)}
s.headers.update(headers)
r = s.post(ep, data={"request": json.dumps({"name": f"{kind}
{int(time.time())}"})}, files=files, timeout=60)
try: files["file"][1].close()
except: pass
attempts.append({"endpoint": ep, "status": r.status_code, "text":
r.text[:1500], "headers": {k:v for k,v in r.headers.items()}})
if r.status_code in (200,201,202):
parsed = None
try: parsed = r.json()
except: parsed = None
aid = None
if isinstance(parsed, dict):
for k in ("assetId","id","createdAssetId"):
if parsed.get(k): aid = str(parsed.get(k)); break
if not aid:
m = re.search(r"(\d{5,})", r.text or "")
if m: aid = m.group(1)
return {"ok": True, "assetId": aid, "attempts": attempts}
except Exception as e:
attempts.append({"endpoint": ep, "exception": str(e)})
continue
return {"ok": False, "attempts": attempts}
def try_legacy_flows(file_path: str, group_id: str, kind: str, cookie: str) ->
Dict[str,Any]:
attempts = []
cookie_clean = sanitize_cookie(cookie)
if not cookie_clean:
return {"ok": False, "error": "invalid_cookie"}
for ep in LEGACY_ENDPOINTS:
try:
s = requests.Session()
s.cookies.set(".ROBLOSECURITY", cookie_clean, domain=".roblox.com")
token = None
try:
r0 = s.post(ep, timeout=8)
token = r0.headers.get("x-csrf-token") or r0.headers.get("X-CSRF-
TOKEN")
except Exception:
token = None
headers =
{"Referer":"https://www.roblox.com","Origin":"https://www.roblox.com","x-requested-
with":"XMLHttpRequest"}
if token: headers["x-csrf-token"] = token
mime = "application/octet-stream"
try:
import mimetypes
mt, _ = mimetypes.guess_type(file_path)
if mt: mime = mt
except:
pass
if MultipartEncoder:
fields = {"request": ("request", json.dumps({"name": f"{kind}
{int(time.time())}"}), "application/json"),
"file": (Path(file_path).name, open(file_path, "rb"),
mime)}
m = MultipartEncoder(fields=fields)
headers["Content-Type"] = m.content_type
s.headers.update(headers)
r = s.post(ep, data=m, timeout=60)
try: fields["file"][1].close()
except: pass
else:
files = {"file": (Path(file_path).name, open(file_path, "rb"),
mime)}
s.headers.update(headers)
r = s.post(ep, data={"request": json.dumps({"name": f"{kind}
{int(time.time())}"})}, files=files, timeout=60)
try: files["file"][1].close()
except: pass
attempts.append({"endpoint": ep, "status": r.status_code, "text":
r.text[:1500], "headers": {k:v for k,v in r.headers.items()}})
if r.status_code in (200,201,202):
parsed = None
try: parsed = r.json()
except: parsed = None
aid = None
if isinstance(parsed, dict):
for k in ("assetId","id","createdAssetId"):
if parsed.get(k): aid = str(parsed.get(k)); break
if not aid:
m = re.search(r"(\d{5,})", r.text or "")
if m: aid = m.group(1)
return {"ok": True, "assetId": aid, "attempts": attempts}
except Exception as e:
attempts.append({"endpoint": ep, "exception": str(e)})
continue
return {"ok": False, "attempts": attempts}
def create_asset_best_effort(file_path: str, group_id: str, kind: str, cookie: str)
-> Dict[str,Any]:
type_id = AVATAR_TYPE_IDS.get(kind.upper(), None)
details = {}
if type_id:
res_ic = try_itemconfig_flow(file_path, group_id, type_id, cookie)
details["itemconfig"] = res_ic
if res_ic.get("ok"):
return {"ok": True, "assetId": res_ic.get("assetId"),
"method":"itemconfig", "details": details}
res_pub = try_publish_flow(file_path, group_id, kind, cookie)
details["publish"] = res_pub
if res_pub.get("ok"):
return {"ok": True, "assetId": res_pub.get("assetId"), "method":"publish",
"details": details}
res_leg = try_legacy_flows(file_path, group_id, kind, cookie)
details["legacy"] = res_leg
if res_leg.get("ok"):
return {"ok": True, "assetId": res_leg.get("assetId"), "method":"legacy",
"details": details}
return {"ok": False, "error":"all_failed", "details": details}
def set_asset_for_sale(asset_id: str, price: int, cookie: str) -> Dict[str,Any]:
cookie_clean = sanitize_cookie(cookie)
if not cookie_clean:
return {"ok": False, "error": "invalid_cookie"}
s = requests.Session()
s.cookies.set(".ROBLOSECURITY", cookie_clean, domain=".roblox.com")
for tmpl in SALE_ENDPOINTS:
ep = tmpl.format(assetId=asset_id)
for method in ("POST","PATCH","PUT"):
try:
payload = {"price": price, "isForSale": True}
headers =
{"Content-Type":"application/json","Referer":"https://www.roblox.com","Origin":"htt
ps://www.roblox.com","x-requested-with":"XMLHttpRequest"}
r = s.request(method, ep, json=payload, headers=headers,
timeout=20)
if r.status_code in (200,201,204):
return {"ok": True, "endpoint": ep, "method": method, "status":
r.status_code}
if r.status_code == 403:
tok = r.headers.get("x-csrf-token") or r.headers.get("X-CSRF-
TOKEN")
if tok:
headers["x-csrf-token"] = tok
r2 = s.request(method, ep, json=payload, headers=headers,
timeout=20)
if r2.status_code in (200,201,204):
return {"ok": True, "endpoint": ep, "method": method,
"status": r2.status_code}
else:
return {"ok": False, "endpoint": ep, "method": method,
"status": r2.status_code, "text": r2.text[:400]}
try:
j = r.json()
if j.get("success") or j.get("isForSale") or j.get("status") in
("ok","success"):
return {"ok": True, "endpoint": ep, "method": method,
"status": r.status_code, "json": j}
except Exception:
pass
except Exception:
continue
return {"ok": False, "error": "sale_failed"}
# ------------------ Discord bot ------------------------------
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix=BOT_PREFIX, intents=intents)
async def send_debug(channel: discord.abc.Messageable, data: Any, delete_after:
Optional[float] = 30.0):
try:
txt = json.dumps(data, ensure_ascii=False, indent=2) if not
isinstance(data, str) else data
msg = await channel.send(f"```json\n{txt[:1900]}\n```")
if delete_after:
await asyncio.sleep(delete_after)
try: await msg.delete()
except: pass
except Exception:
pass
@bot.event
async def on_ready():
print("[ready] Bot connected as", bot.user, "id:", bot.user.id)
async def process_upload(ctx: commands.Context, local_path: str, group_arg: str,
kind: str):
raw_cookie = COOKIE_DEFAULT.strip() or os.getenv("DEFAULT_COOKIE","").strip()
cookie = sanitize_cookie(raw_cookie)
if not cookie:
await ctx.reply("❌ COOKIE_DEFAULT no configurada correctamente.",
mention_author=False)
return
gid = extract_group_id(group_arg)
if not gid:
await ctx.reply("❌ No pude extraer ID de grupo válido del argumento.",
mention_author=False)
return
status_msg = await ctx.reply(f"➡️ Iniciando upload ({'Shirt' if kind=='S' else
'Pants'}) a grupo `{gid}` — intentando flujos (itemconfig → publish → legacy).",
mention_author=False)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, create_asset_best_effort, local_path,
gid, kind, cookie)
await send_debug(ctx.channel, result.get("details", result))
if not result.get("ok"):
await status_msg.edit(content=f"❌ Upload falló. Revisa los mensajes de
debug (se autodestruirán).")
return
asset_id = result.get("assetId")
if not asset_id:
await status_msg.edit(content=f"⚠️ Upload parece exitoso pero no devolvió
assetId. Revisa debug.")
return
await status_msg.edit(content=f"✅ Upload completado. AssetId: `{asset_id}`.
Intentando activar venta por {PRICE_ROBUX} Robux...")
sale = await loop.run_in_executor(None, set_asset_for_sale, asset_id,
PRICE_ROBUX, cookie)
await send_debug(ctx.channel, sale)
if sale.get("ok"):
await status_msg.edit(content=f"✅ Upload y venta completados. AssetId:
`{asset_id}`")
else:
await status_msg.edit(content=f"⚠️ Upload completado pero no se pudo
activar la venta. Revisa debug.")
@bot.command(name="S")
async def cmd_shirt(ctx, group: str = None):
if not group:
await ctx.reply("Uso: $S <groupID|groupLink> (adjunta la imagen en el
mismo mensaje)", mention_author=False)
return
if not ctx.message.attachments:
await ctx.reply("❌ Adjunta la imagen en el mismo mensaje del comando.",
mention_author=False)
return
att = ctx.message.attachments[0]
td = tempfile.mkdtemp()
local = os.path.join(td, att.filename)
try:
await att.save(local)
except Exception as e:
await ctx.reply(f"❌ Error guardando adjunto: {e}", mention_author=False)
shutil.rmtree(td, ignore_errors=True)
return
try:
await process_upload(ctx, local, group, "S")
finally:
shutil.rmtree(td, ignore_errors=True)
@bot.command(name="P")
async def cmd_pants(ctx, group: str = None):
if not group:
await ctx.reply("Uso: $P <groupID|groupLink> (adjunta la imagen en el
mismo mensaje)", mention_author=False)
return
if not ctx.message.attachments:
await ctx.reply("❌ Adjunta la imagen en el mismo mensaje del comando.",
mention_author=False)
return
att = ctx.message.attachments[0]
td = tempfile.mkdtemp()
local = os.path.join(td, att.filename)
try:
await att.save(local)
except Exception as e:
await ctx.reply(f"❌ Error guardando adjunto: {e}", mention_author=False)
shutil.rmtree(td, ignore_errors=True)
return
try:
await process_upload(ctx, local, group, "P")
finally:
shutil.rmtree(td, ignore_errors=True)
@bot.event
async def on_command_error(ctx, error):
try:
await ctx.author.send(f"Error procesando comando: {type(error).__name__}")
except Exception:
pass
print("on_command_error:", repr(error))
import traceback as tb
tb.print_exc()
if __name__ == "__main__":
token = DISCORD_TOKEN or os.getenv("DISCORD_TOKEN")
if not token or token.startswith("PASTE"):
print("ERROR: configura DISCORD_TOKEN en el archivo o en la variable de
entorno DISCORD_TOKEN.")
raise SystemExit(1)
if not sanitize_cookie(COOKIE_DEFAULT or os.getenv("DEFAULT_COOKIE","")):
print("WARNING: COOKIE_DEFAULT parece no estar configurada correctamente.")
print("Iniciando bot (v3)... (asegúrate Message Content Intent activado).")
bot.run(token)