| """ |
| Data Collection Background Worker - CONFIGURABLE INTERVALS |
| |
| This worker manages data collection from all sources with: |
| - Bulk data collection: 15-30 minute intervals |
| - Real-time data: On-demand when client requests |
| - Smart scheduling based on source type |
| |
| COLLECTION INTERVALS: |
| - Market data: 15 minutes |
| - News: 15 minutes |
| - Sentiment: 15 minutes |
| - On-chain: 30 minutes |
| - Historical: 30 minutes |
| - DeFi: 15 minutes |
| |
| REAL-TIME DATA: |
| - When client requests data, fetch immediately from source |
| - Cache results for configured TTL |
| """ |
|
|
| import asyncio |
| import time |
| import logging |
| import os |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Any, Optional |
| import httpx |
|
|
| from utils.logger import setup_logger |
|
|
| logger = setup_logger("data_collection_worker") |
|
|
| |
|
|
| |
| COLLECTION_INTERVALS = { |
| "market": 15, |
| "news": 15, |
| "sentiment": 15, |
| "social": 30, |
| "onchain": 30, |
| "historical": 30, |
| "defi": 15, |
| "technical": 15, |
| } |
|
|
| |
| CACHE_TTL = { |
| "market": 60, |
| "news": 300, |
| "sentiment": 300, |
| "ohlcv": 60, |
| "fear_greed": 3600, |
| "whale": 300, |
| } |
|
|
| |
| REALTIME_SOURCES = { |
| "binance": ["price", "ohlcv", "trades"], |
| "coingecko": ["price", "market"], |
| "coincap": ["price", "assets"], |
| "cryptocompare": ["price", "ohlcv"], |
| "fear_greed": ["index"], |
| } |
|
|
|
|
| |
|
|
| class BaseDataCollector: |
| """Base class for data collectors""" |
| |
| def __init__(self, name: str, interval_minutes: int): |
| self.name = name |
| self.interval_minutes = interval_minutes |
| self.last_run = None |
| self.is_running = False |
| self.error_count = 0 |
| self.success_count = 0 |
| self.timeout = httpx.Timeout(15.0) |
| |
| async def collect(self) -> Dict[str, Any]: |
| """Override in subclass""" |
| raise NotImplementedError |
| |
| async def should_run(self) -> bool: |
| """Check if collector should run based on interval""" |
| if self.is_running: |
| return False |
| if self.last_run is None: |
| return True |
| elapsed = datetime.utcnow() - self.last_run |
| return elapsed >= timedelta(minutes=self.interval_minutes) |
| |
| async def run(self) -> Optional[Dict[str, Any]]: |
| """Run collection with error handling""" |
| if not await self.should_run(): |
| return None |
| |
| self.is_running = True |
| start_time = time.time() |
| |
| try: |
| logger.info(f"[{self.name}] Starting collection...") |
| result = await self.collect() |
| |
| elapsed = time.time() - start_time |
| self.last_run = datetime.utcnow() |
| self.success_count += 1 |
| self.error_count = 0 |
| |
| logger.info(f"[{self.name}] Collection completed in {elapsed:.2f}s") |
| return result |
| |
| except Exception as e: |
| self.error_count += 1 |
| logger.error(f"[{self.name}] Collection error: {e}") |
| return {"success": False, "error": str(e)} |
| |
| finally: |
| self.is_running = False |
|
|
|
|
| class MarketDataCollector(BaseDataCollector): |
| """Collect market data (prices, market cap, volume)""" |
| |
| COINGECKO_URL = "https://api.coingecko.com/api/v3" |
| COINCAP_URL = "https://api.coincap.io/v2" |
| |
| def __init__(self): |
| super().__init__("market_data", COLLECTION_INTERVALS["market"]) |
| self.top_coins = [ |
| "bitcoin", "ethereum", "binancecoin", "ripple", "cardano", |
| "solana", "polkadot", "dogecoin", "polygon", "avalanche" |
| ] |
| |
| async def collect(self) -> Dict[str, Any]: |
| """Collect market data from multiple sources""" |
| results = {"success": True, "data": [], "source": "multi"} |
| |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| ids = ",".join(self.top_coins) |
| url = f"{self.COINGECKO_URL}/coins/markets" |
| params = { |
| "vs_currency": "usd", |
| "ids": ids, |
| "order": "market_cap_desc", |
| "per_page": 50, |
| "sparkline": False |
| } |
| |
| response = await client.get(url, params=params) |
| if response.status_code == 200: |
| coins = response.json() |
| for coin in coins: |
| results["data"].append({ |
| "symbol": coin.get("symbol", "").upper(), |
| "name": coin.get("name"), |
| "price": coin.get("current_price"), |
| "market_cap": coin.get("market_cap"), |
| "volume_24h": coin.get("total_volume"), |
| "change_24h": coin.get("price_change_percentage_24h"), |
| "high_24h": coin.get("high_24h"), |
| "low_24h": coin.get("low_24h"), |
| "source": "coingecko", |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| results["source"] = "coingecko" |
| return results |
| except Exception as e: |
| logger.warning(f"CoinGecko failed, trying CoinCap: {e}") |
| |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| response = await client.get(f"{self.COINCAP_URL}/assets?limit=50") |
| if response.status_code == 200: |
| data = response.json() |
| for asset in data.get("data", []): |
| results["data"].append({ |
| "symbol": asset.get("symbol", "").upper(), |
| "name": asset.get("name"), |
| "price": float(asset.get("priceUsd", 0)), |
| "market_cap": float(asset.get("marketCapUsd", 0)) if asset.get("marketCapUsd") else None, |
| "volume_24h": float(asset.get("volumeUsd24Hr", 0)) if asset.get("volumeUsd24Hr") else None, |
| "change_24h": float(asset.get("changePercent24Hr", 0)) if asset.get("changePercent24Hr") else None, |
| "source": "coincap", |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| results["source"] = "coincap" |
| except Exception as e: |
| logger.error(f"CoinCap also failed: {e}") |
| results["success"] = False |
| results["error"] = str(e) |
| |
| return results |
|
|
|
|
| class NewsDataCollector(BaseDataCollector): |
| """Collect news from multiple sources""" |
| |
| RSS_FEEDS = { |
| "decrypt": "https://decrypt.co/feed", |
| "cryptoslate": "https://cryptoslate.com/feed/", |
| "bitcoinmagazine": "https://bitcoinmagazine.com/feed", |
| "coindesk": "https://www.coindesk.com/arc/outboundfeeds/rss/", |
| } |
| |
| CRYPTOCOMPARE_URL = "https://min-api.cryptocompare.com/data/v2/news/" |
| |
| def __init__(self): |
| super().__init__("news_data", COLLECTION_INTERVALS["news"]) |
| |
| async def collect(self) -> Dict[str, Any]: |
| """Collect news from multiple sources""" |
| import feedparser |
| |
| results = {"success": True, "data": [], "sources": []} |
| |
| |
| for source_name, feed_url in self.RSS_FEEDS.items(): |
| try: |
| loop = asyncio.get_event_loop() |
| feed = await loop.run_in_executor(None, feedparser.parse, feed_url) |
| |
| for entry in feed.entries[:10]: |
| results["data"].append({ |
| "title": entry.get("title", ""), |
| "link": entry.get("link", ""), |
| "published": entry.get("published", ""), |
| "summary": entry.get("summary", "")[:300] if entry.get("summary") else "", |
| "source": source_name, |
| "fetched_at": datetime.utcnow().isoformat() |
| }) |
| results["sources"].append(source_name) |
| except Exception as e: |
| logger.warning(f"RSS feed {source_name} failed: {e}") |
| |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| response = await client.get(self.CRYPTOCOMPARE_URL, params={"lang": "EN"}) |
| if response.status_code == 200: |
| data = response.json() |
| for article in data.get("Data", [])[:20]: |
| results["data"].append({ |
| "title": article.get("title", ""), |
| "link": article.get("url", ""), |
| "published": datetime.fromtimestamp(article.get("published_on", 0)).isoformat(), |
| "summary": article.get("body", "")[:300] if article.get("body") else "", |
| "source": "cryptocompare", |
| "fetched_at": datetime.utcnow().isoformat() |
| }) |
| results["sources"].append("cryptocompare") |
| except Exception as e: |
| logger.warning(f"CryptoCompare news failed: {e}") |
| |
| return results |
|
|
|
|
| class SentimentDataCollector(BaseDataCollector): |
| """Collect sentiment data""" |
| |
| FEAR_GREED_URL = "https://api.alternative.me/fng/" |
| |
| def __init__(self): |
| super().__init__("sentiment_data", COLLECTION_INTERVALS["sentiment"]) |
| |
| async def collect(self) -> Dict[str, Any]: |
| """Collect Fear & Greed Index and other sentiment""" |
| results = {"success": True, "data": {}, "source": "fear_greed"} |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| response = await client.get(f"{self.FEAR_GREED_URL}?limit=30") |
| if response.status_code == 200: |
| data = response.json() |
| fng_data = data.get("data", []) |
| |
| if fng_data: |
| latest = fng_data[0] |
| results["data"] = { |
| "value": int(latest.get("value", 50)), |
| "classification": latest.get("value_classification", "Neutral"), |
| "timestamp": latest.get("timestamp"), |
| "history": [ |
| { |
| "value": int(d.get("value", 50)), |
| "classification": d.get("value_classification"), |
| "timestamp": d.get("timestamp") |
| } |
| for d in fng_data[:30] |
| ] |
| } |
| except Exception as e: |
| logger.error(f"Fear & Greed fetch failed: {e}") |
| results["success"] = False |
| results["error"] = str(e) |
| |
| return results |
|
|
|
|
| class OnChainDataCollector(BaseDataCollector): |
| """Collect on-chain data""" |
| |
| BLOCKCHAIR_URL = "https://api.blockchair.com" |
| |
| def __init__(self): |
| super().__init__("onchain_data", COLLECTION_INTERVALS["onchain"]) |
| |
| async def collect(self) -> Dict[str, Any]: |
| """Collect on-chain statistics""" |
| results = {"success": True, "data": {}, "source": "blockchair"} |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| |
| response = await client.get(f"{self.BLOCKCHAIR_URL}/bitcoin/stats") |
| if response.status_code == 200: |
| data = response.json() |
| results["data"]["bitcoin"] = data.get("data", {}) |
| |
| |
| response = await client.get(f"{self.BLOCKCHAIR_URL}/ethereum/stats") |
| if response.status_code == 200: |
| data = response.json() |
| results["data"]["ethereum"] = data.get("data", {}) |
| except Exception as e: |
| logger.error(f"On-chain data fetch failed: {e}") |
| results["success"] = False |
| results["error"] = str(e) |
| |
| return results |
|
|
|
|
| class DeFiDataCollector(BaseDataCollector): |
| """Collect DeFi data from DefiLlama""" |
| |
| DEFILLAMA_URL = "https://api.llama.fi" |
| |
| def __init__(self): |
| super().__init__("defi_data", COLLECTION_INTERVALS["defi"]) |
| |
| async def collect(self) -> Dict[str, Any]: |
| """Collect DeFi TVL and protocol data""" |
| results = {"success": True, "data": {}, "source": "defillama"} |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| |
| response = await client.get(f"{self.DEFILLAMA_URL}/tvl") |
| if response.status_code == 200: |
| results["data"]["total_tvl"] = response.json() |
| |
| |
| response = await client.get(f"{self.DEFILLAMA_URL}/protocols") |
| if response.status_code == 200: |
| protocols = response.json() |
| results["data"]["top_protocols"] = protocols[:20] if isinstance(protocols, list) else [] |
| except Exception as e: |
| logger.error(f"DeFi data fetch failed: {e}") |
| results["success"] = False |
| results["error"] = str(e) |
| |
| return results |
|
|
|
|
| |
|
|
| class RealTimeDataFetcher: |
| """ |
| Fetch data in real-time when client requests |
| For instant data that shouldn't wait for scheduled collection |
| """ |
| |
| def __init__(self): |
| self.cache = {} |
| self.timeout = httpx.Timeout(10.0) |
| |
| def _get_cache_key(self, source: str, data_type: str, params: Dict) -> str: |
| """Generate cache key""" |
| params_str = "_".join(f"{k}={v}" for k, v in sorted(params.items())) |
| return f"{source}_{data_type}_{params_str}" |
| |
| def _is_cache_valid(self, cache_key: str, ttl_seconds: int) -> bool: |
| """Check if cached data is still valid""" |
| if cache_key not in self.cache: |
| return False |
| cached_at = self.cache[cache_key].get("cached_at") |
| if not cached_at: |
| return False |
| return (datetime.utcnow() - cached_at).total_seconds() < ttl_seconds |
| |
| async def fetch_price(self, symbol: str, source: str = "binance") -> Dict[str, Any]: |
| """Fetch real-time price""" |
| cache_key = self._get_cache_key(source, "price", {"symbol": symbol}) |
| ttl = CACHE_TTL.get("market", 60) |
| |
| if self._is_cache_valid(cache_key, ttl): |
| return self.cache[cache_key]["data"] |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| if source == "binance": |
| url = f"https://api.binance.com/api/v3/ticker/price?symbol={symbol}USDT" |
| response = await client.get(url) |
| if response.status_code == 200: |
| data = response.json() |
| result = { |
| "success": True, |
| "symbol": symbol, |
| "price": float(data.get("price", 0)), |
| "source": "binance", |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| self.cache[cache_key] = {"data": result, "cached_at": datetime.utcnow()} |
| return result |
| |
| elif source == "coingecko": |
| url = f"https://api.coingecko.com/api/v3/simple/price?ids={symbol.lower()}&vs_currencies=usd" |
| response = await client.get(url) |
| if response.status_code == 200: |
| data = response.json() |
| price = data.get(symbol.lower(), {}).get("usd", 0) |
| result = { |
| "success": True, |
| "symbol": symbol, |
| "price": price, |
| "source": "coingecko", |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| self.cache[cache_key] = {"data": result, "cached_at": datetime.utcnow()} |
| return result |
| except Exception as e: |
| logger.error(f"Real-time price fetch error: {e}") |
| |
| return {"success": False, "error": "Failed to fetch price"} |
| |
| async def fetch_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> Dict[str, Any]: |
| """Fetch real-time OHLCV data""" |
| cache_key = self._get_cache_key("binance", "ohlcv", {"symbol": symbol, "interval": interval}) |
| ttl = CACHE_TTL.get("ohlcv", 60) |
| |
| if self._is_cache_valid(cache_key, ttl): |
| return self.cache[cache_key]["data"] |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| url = "https://api.binance.com/api/v3/klines" |
| params = { |
| "symbol": f"{symbol}USDT", |
| "interval": interval, |
| "limit": limit |
| } |
| response = await client.get(url, params=params) |
| |
| if response.status_code == 200: |
| klines = response.json() |
| ohlcv = [] |
| for k in klines: |
| ohlcv.append({ |
| "t": k[0], |
| "o": float(k[1]), |
| "h": float(k[2]), |
| "l": float(k[3]), |
| "c": float(k[4]), |
| "v": float(k[5]), |
| }) |
| |
| result = { |
| "success": True, |
| "symbol": symbol, |
| "interval": interval, |
| "data": ohlcv, |
| "source": "binance", |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| self.cache[cache_key] = {"data": result, "cached_at": datetime.utcnow()} |
| return result |
| except Exception as e: |
| logger.error(f"OHLCV fetch error: {e}") |
| |
| return {"success": False, "error": "Failed to fetch OHLCV"} |
|
|
|
|
| |
|
|
| class DataCollectionWorker: |
| """Main data collection worker managing all collectors""" |
| |
| def __init__(self): |
| self.collectors = { |
| "market": MarketDataCollector(), |
| "news": NewsDataCollector(), |
| "sentiment": SentimentDataCollector(), |
| "onchain": OnChainDataCollector(), |
| "defi": DeFiDataCollector(), |
| } |
| self.realtime_fetcher = RealTimeDataFetcher() |
| self.is_running = False |
| self.last_results = {} |
| |
| async def run_all_collectors(self) -> Dict[str, Any]: |
| """Run all collectors that are due""" |
| results = {} |
| for name, collector in self.collectors.items(): |
| result = await collector.run() |
| if result: |
| results[name] = result |
| self.last_results[name] = { |
| "data": result, |
| "collected_at": datetime.utcnow().isoformat() |
| } |
| return results |
| |
| async def worker_loop(self): |
| """Main worker loop""" |
| self.is_running = True |
| logger.info("Starting data collection worker...") |
| logger.info(f"Collection intervals: {COLLECTION_INTERVALS}") |
| |
| while self.is_running: |
| try: |
| |
| for name, collector in self.collectors.items(): |
| if await collector.should_run(): |
| result = await collector.run() |
| if result: |
| self.last_results[name] = { |
| "data": result, |
| "collected_at": datetime.utcnow().isoformat() |
| } |
| |
| |
| await asyncio.sleep(60) |
| |
| except Exception as e: |
| logger.error(f"Worker loop error: {e}") |
| await asyncio.sleep(60) |
| |
| def stop(self): |
| """Stop the worker""" |
| self.is_running = False |
| logger.info("Stopping data collection worker...") |
| |
| def get_collector_status(self) -> Dict[str, Any]: |
| """Get status of all collectors""" |
| return { |
| name: { |
| "last_run": collector.last_run.isoformat() if collector.last_run else None, |
| "interval_minutes": collector.interval_minutes, |
| "is_running": collector.is_running, |
| "success_count": collector.success_count, |
| "error_count": collector.error_count, |
| "next_run_in": max(0, collector.interval_minutes * 60 - |
| (datetime.utcnow() - collector.last_run).total_seconds()) |
| if collector.last_run else 0 |
| } |
| for name, collector in self.collectors.items() |
| } |
|
|
|
|
| |
|
|
| _worker = None |
| _realtime_fetcher = None |
|
|
|
|
| def get_data_collection_worker() -> DataCollectionWorker: |
| """Get global worker instance""" |
| global _worker |
| if _worker is None: |
| _worker = DataCollectionWorker() |
| return _worker |
|
|
|
|
| def get_realtime_fetcher() -> RealTimeDataFetcher: |
| """Get global real-time fetcher instance""" |
| global _realtime_fetcher |
| if _realtime_fetcher is None: |
| _realtime_fetcher = RealTimeDataFetcher() |
| return _realtime_fetcher |
|
|
|
|
| async def start_data_collection_worker(): |
| """Start the data collection worker""" |
| worker = get_data_collection_worker() |
| |
| |
| logger.info("Running initial data collection...") |
| await worker.run_all_collectors() |
| |
| |
| asyncio.create_task(worker.worker_loop()) |
| logger.info("Data collection worker started") |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def test(): |
| print("="*70) |
| print("π§ͺ Testing Data Collection Worker") |
| print("="*70) |
| |
| worker = DataCollectionWorker() |
| |
| print("\nπ Collection Intervals:") |
| for data_type, interval in COLLECTION_INTERVALS.items(): |
| print(f" β’ {data_type}: {interval} minutes") |
| |
| print("\nπ Running all collectors...") |
| results = await worker.run_all_collectors() |
| |
| for name, result in results.items(): |
| if result.get("success"): |
| data = result.get("data", {}) |
| count = len(data) if isinstance(data, list) else "object" |
| print(f" β
{name}: {count} items") |
| else: |
| print(f" β {name}: {result.get('error')}") |
| |
| print("\nβ‘ Testing Real-time Fetcher...") |
| fetcher = RealTimeDataFetcher() |
| |
| price = await fetcher.fetch_price("BTC") |
| if price.get("success"): |
| print(f" β
BTC Price: ${price.get('price')}") |
| else: |
| print(f" β Price fetch failed: {price.get('error')}") |
| |
| ohlcv = await fetcher.fetch_ohlcv("BTC", "1h", 10) |
| if ohlcv.get("success"): |
| print(f" β
OHLCV: {len(ohlcv.get('data', []))} candles") |
| else: |
| print(f" β OHLCV fetch failed: {ohlcv.get('error')}") |
| |
| print("\n" + "="*70) |
| print("β
Data Collection Worker Test Complete!") |
| print("="*70) |
| |
| asyncio.run(test()) |
|
|