Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ def _reply_comment(self, authorperm: str, body: str) -> str:
if self.cache:
self.cache.set(cache_key, "processed")
logger.info(f"Reply comment {authorperm} success body: {body} result {result}")
read_steem_success_count.inc()
post_steem_success_count.inc()
return f"Reply comment {authorperm} success body: {body}"
except Exception as e:
logger.error(f"Reply comment {authorperm} failed e : {str(e)}")
read_steem_failure_count.inc()
post_steem_failure_count.inc()
logger.error(traceback.format_exc())
return f"Reply comment {authorperm} failed e : {str(e)}"

Expand Down
3 changes: 2 additions & 1 deletion packages/sunagent-ext/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "sunagent-ext"
version = "0.0.7b1"
version = "0.0.7b2"
license = {file = "LICENSE-CODE"}
description = "AutoGen extensions library"
readme = "README.md"
Expand All @@ -20,6 +20,7 @@ dependencies = [
"onepassword-sdk>=0.3.0",
"pytz",
"redis",
"prometheus_client",
"requests",
]

Expand Down
79 changes: 79 additions & 0 deletions packages/sunagent-ext/src/sunagent_ext/tweet/tweet_hub_client.py
Comment thread
boboliu-1010 marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# kol_sdk.py
import json
import logging
from datetime import datetime
from typing import Any, Dict, List

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logger = logging.getLogger("sunagent-ext")

DEFAULT_TIMEOUT = 10 # 秒


class TweetHubClient:
"""
轻量级 requests 封装,支持:
1. 创建 KOL 列表
2. 删除 KOL 列表
3. 自动重试 + 超时
4. 中文友好(ensure_ascii=False)
"""

def __init__(self, base_url: str, agent_id: str = "hub", timeout: int = DEFAULT_TIMEOUT):
"""
:param base_url: 不含尾巴 / ,例:http://127.0.0.1:8084/api/sun
:param agent_id: 默认 agent_id
:param timeout: 单次请求超时
"""
self.base_url = base_url.rstrip("/")
self.agent_id = agent_id
self.timeout = timeout
self._session = requests.Session()

# 重试策略:3 次、backoff、状态码 5xx/429
retry = Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods={"POST", "DELETE"},
)
self._session.mount("http://", HTTPAdapter(max_retries=retry))
self._session.mount("https://", HTTPAdapter(max_retries=retry))

# ---------- 内部方法 ----------
def _request(self, method: str, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}{endpoint}"
# 自动注入 agent_id
payload.setdefault("agent_id", self.agent_id)
try:
resp = self._session.request(
method=method,
url=url,
json=payload, # requests 会自动用 utf-8 编码
timeout=self.timeout,
)
resp.raise_for_status()
return resp.json() # type: ignore[no-any-return]
except requests.exceptions.RequestException as e:
logger.error(f"[KolClient] {method} {url} error: {e}")
raise

# ---------- 业务接口 ----------
def create_kol(self, kol_list: List[str], agent_id: str | None = None) -> Dict[str, Any]:
"""
批量创建/绑定 KOL
:param kol_list: twitter_id 列表
:param agent_id: 可选,不传使用实例默认值
"""
payload = {"kol_list": kol_list, "agent_id": agent_id or self.agent_id}
return self._request("POST", "/kol", payload)

def delete_kol(self, kol_list: List[str], agent_id: str | None = None) -> Dict[str, Any]:
"""
批量删除/解绑 KOL
"""
payload = {"kol_list": kol_list, "agent_id": agent_id or self.agent_id}
return self._request("DELETE", "/kol", payload)
101 changes: 97 additions & 4 deletions packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

import asyncio
import logging
import traceback
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Dict, List, Optional, cast

import tweepy
from prometheus_client import Counter, Gauge
from tweepy import Media, NotFound, TwitterServerError, User # 保持原类型注解
from tweepy import Media, NotFound, Tweet, TwitterServerError, User # 保持原类型注解
from tweepy import Response as TwitterResponse

from sunagent_ext.tweet.twitter_client_pool import TwitterClientPool
Expand Down Expand Up @@ -223,7 +225,7 @@ async def _fetch_timeline(
read_tweet_success_count.labels(client_key=client_key).inc(len(resp.data or []))

# 交给中间层处理
tweet_list, next_token = await self.on_twitter_response(agent_id, resp, filter_func)
tweet_list, next_token = await self.on_twitter_response(agent_id, me_id, resp, filter_func)
all_raw.extend(tweet_list)
if not next_token:
break
Expand Down Expand Up @@ -254,6 +256,7 @@ async def _fetch_timeline(
async def on_twitter_response( # type: ignore[no-any-unimported]
self,
agent_id: str,
me_id: str,
response: TwitterResponse,
filter_func: Callable[[Dict[str, Any]], bool],
) -> tuple[list[Dict[str, Any]], Optional[str]]:
Expand All @@ -267,20 +270,23 @@ async def on_twitter_response( # type: ignore[no-any-unimported]
out: list[Dict[str, Any]] = []

for tweet in all_tweets:
if not await self._should_keep(agent_id, tweet, filter_func):
if not await self._should_keep(agent_id, me_id, tweet, filter_func):
continue
norm = await self._normalize_tweet(tweet)
out.append(norm)
return out, next_token

async def _should_keep(
self, agent_id: str, tweet: Dict[str, Any], filter_func: Callable[[Dict[str, Any]], bool]
self, agent_id: str, me_id: str, tweet: Dict[str, Any], filter_func: Callable[[Dict[str, Any]], bool]
) -> bool:
is_processed = await self._check_tweet_process(tweet["id"], agent_id)
if is_processed:
logger.info("already processed %s", tweet["id"])
return False
author_id = str(tweet["author_id"])
if me_id == author_id:
logger.info("skip my tweet %s", tweet["id"])
return False
if author_id in self.block_uids:
logger.info("blocked user %s", author_id)
return False
Expand Down Expand Up @@ -366,6 +372,93 @@ async def _recursive_fetch(self, tweet: Dict[str, Any], chain: list[Dict[str, An
await self._recursive_fetch(parent, chain, depth + 1)
chain.append(tweet)

async def fetch_new_tweets_manual_( # type: ignore[no-any-unimported]
self,
ids: List[str],
last_seen_id: str | None = None,
) -> tuple[List[Tweet], str | None]:
"""
1. 取所有 ALIVE KOL 的 twitter_id
Comment thread
boboliu-1010 marked this conversation as resolved.
Outdated
2. 将 id 列表拆分成多条不超长 query
3. 逐条交给 fetch_new_tweets_manual_tweets 翻页
4. 返回全部结果以及 **所有结果中最大的 tweet_id**
"""
BASE_EXTRA = " -is:retweet"
max_len = 512 - len(BASE_EXTRA) - 10
queries: List[str] = []

buf: list[str] = []
for uid in ids:
clause = f"from:{uid}"
if len(" OR ".join(buf + [clause])) > max_len:
queries.append(" OR ".join(buf) + BASE_EXTRA)
buf = [clause]
else:
buf.append(clause)
if buf:
queries.append(" OR ".join(buf) + BASE_EXTRA)
# 3) 逐条调用内层并合并
all_tweets: List[tweepy.Tweet] = [] # type: ignore[no-any-unimported]
for q in queries:
tweets = await self.fetch_new_tweets_manual_tweets(query=q, last_seen_id=last_seen_id)
all_tweets.extend(tweets)

# 4) 取所有结果中最大的 id 作为 last_seen_id
last_id = max((tw.id for tw in all_tweets), default=None)
return all_tweets, last_id

async def get_kol_tweet(self, kol_ids: List[str]) -> List[Tweet]: # type: ignore[no-any-unimported]
cache_key = "kol_last_seen_id"
last_seen_id = self.cache.get(cache_key)
tweets, last_seen_id = await self.fetch_new_tweets_manual_(ids=kol_ids, last_seen_id=last_seen_id)
logger.info(f"get_kol_tweet tweets: {len(tweets)} last_seen_id: {last_seen_id}")
if last_seen_id:
self.cache.set(cache_key, last_seen_id)
return tweets

async def fetch_new_tweets_manual_tweets( # type: ignore[no-any-unimported]
self, query: str, last_seen_id: str | None = None, max_per_page: int = 100, hours: int = 24
) -> List[Tweet]:
tweets = []
next_token = None
since = datetime.now(timezone.utc) - timedelta(hours=hours)
start_time = None if last_seen_id else since.isoformat(timespec="seconds")
logger.info(f"query: {query}")
while True:
cli = None
try:
cli, key = await self.pool.acquire()
resp = cli.search_recent_tweets(
query=query,
start_time=start_time,
since_id=last_seen_id,
max_results=max_per_page,
tweet_fields=TWEET_FIELDS,
next_token=next_token,
user_auth=True,
)
page_data = resp.data or []
logger.info(f"page_data: {len(page_data)}")
for tw in page_data:
Comment thread
boboliu-1010 marked this conversation as resolved.
Outdated
# 1. 已读过的直接停
tweets.append(tw)
read_tweet_success_count.labels(client_key=key).inc(len(resp.data or []))
next_token = resp.meta.get("next_token")
if not next_token:
break
except tweepy.TooManyRequests:
logger.error(traceback.format_exc())
read_tweet_failure_count.labels(client_key=key).inc()
if cli:
await self.pool.report_failure(cli)
return tweets
except tweepy.TweepyException:
if cli:
await self.pool.report_failure(cli)
logger.error(traceback.format_exc())
return tweets
return tweets

async def _get_tweet_with_retry(self, tweet_id: str) -> Optional[Dict[str, Any]]:
for attempt in range(3):
cli, client_key = await self.pool.acquire()
Expand Down
Loading