不依赖 BigQuery、不依赖 GCP,纯本地 parquet 倒排索引 + DuckDB SQL。
下面是把"持续 ingest + 本地时间序列分析 + 日报推送"三件事拆成三个独立脚本的完整版。三份代码放在同一个目录下,共享同一份 .cache/,互不阻塞。
项目目录结构(最终形态)
github-hunter/
├── main.py ← Part A:持续 ingest(GH Archive → .cache/)
├── analyze.py ← Part B:本地时间序列分析库
├── daily_report.py ← Part C:每日报告生成 + 可选 AI 总结
├── requirements.txt
├── .env
├── .cache/ ← 自动创建,hourly parquet
├── result/ ← main.py 输出
├── reports/ ← daily_report.py 输出
└── web/public/results/ ← 前端消费的 result.csv
requirements.txt
requests
pandas
pyarrow
duckdb
tqdm
python-dotenv
openai
.env
GH_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENROUTER_API_KEY=sk-or-v1-xxxxxxxxxxxxxxxxxxxx # 可选
Part A — main.py(ingest + 24h 滑窗结果)
把上一版 main.py 略作收尾,KEEP_DAYS 默认改为 365,从这一刻起 .cache/ 就开始累积一年的 hourly 倒排索引。这一份和上一轮基本一致,贴在这里方便整体复制。
"""
github-hunter (无 BigQuery 版 + 增量缓存 + 长期保留)
数据源: https://data.gharchive.org/<YYYY-MM-DD-H>.json.gz
"""
from __future__ import annotations
import os, io, gzip, json, time, shutil
import requests
import pandas as pd
from collections import Counter
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from dotenv import load_dotenv
load_dotenv()
CACHE_DIR = ".cache"
RESULT_DIR = "result"
WEB_OUT_DIR = "web/public/results"
HOURS_WINDOW = 24
KEEP_DAYS = 365 # ← 改为一年
FINAL_LAG_H = 2
TOP_N_REPOS = 1000
TOP_N_AI = 50
os.makedirs(CACHE_DIR, exist_ok=True)
# ── GH Archive 增量缓存 ───────────────────────────────────────
def _cache_path(ts):
return os.path.join(CACHE_DIR, f"{ts.strftime('%Y-%m-%d')}-{ts.hour}.parquet")
def _is_hour_finalized(ts):
return (datetime.now(timezone.utc) - ts) >= timedelta(hours=FINAL_LAG_H)
def _download_and_count(ts):
url = f"https://data.gharchive.org/{ts.strftime('%Y-%m-%d')}-{ts.hour}.json.gz"
try:
r = requests.get(url, timeout=120)
except requests.RequestException as e:
print(f"[NET] {url}: {e}"); return None
if r.status_code == 404: return None
if r.status_code != 200:
print(f"[SKIP {r.status_code}] {url}"); return None
counter = Counter()
try:
with gzip.GzipFile(fileobj=io.BytesIO(r.content)) as gz:
for line in gz:
try: ev = json.loads(line)
except Exception: continue
if ev.get("type") == "WatchEvent":
name = ev.get("repo", {}).get("name")
if name: counter[name] += 1
except OSError as e:
print(f"[GZIP] {url}: {e}"); return None
return counter
def _load_or_fetch(ts):
path = _cache_path(ts)
if os.path.exists(path):
df = pd.read_parquet(path)
return Counter(dict(zip(df["repo_name"], df["count"])))
counter = _download_and_count(ts)
if counter is None: return Counter()
if _is_hour_finalized(ts) and counter:
pd.DataFrame(counter.items(), columns=["repo_name", "count"])\
.to_parquet(path, compression="zstd", index=False)
return counter
def prune_cache(keep_days=KEEP_DAYS):
now = datetime.now(timezone.utc); removed = 0
for fn in os.listdir(CACHE_DIR):
if not fn.endswith(".parquet"): continue
try:
ts = datetime.strptime(fn[:-8], "%Y-%m-%d-%H").replace(tzinfo=timezone.utc)
except ValueError: continue
if (now - ts) > timedelta(days=keep_days):
os.remove(os.path.join(CACHE_DIR, fn)); removed += 1
if removed: print(f"[cache] pruned {removed} files older than {keep_days}d")
def collect_watch_events(hours_back=HOURS_WINDOW, max_workers=8):
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
hours = [now - timedelta(hours=i) for i in range(1, hours_back + 1)]
cached, to_fetch = [], []
for h in hours:
(cached if os.path.exists(_cache_path(h)) else to_fetch).append(h)
print(f"[cache] window={hours_back}h hit={len(cached)} miss={len(to_fetch)}")
total = Counter()
for h in cached: total.update(_load_or_fetch(h))
if to_fetch:
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = {ex.submit(_load_or_fetch, h): h for h in to_fetch}
for fut in tqdm(as_completed(futures), total=len(futures),
desc="GH Archive (miss)"):
total.update(fut.result())
return pd.DataFrame(total.most_common(TOP_N_REPOS),
columns=["repo_name", "star_count"])
# ── GraphQL 补全 ─────────────────────────────────────────────
GRAPHQL_URL = "https://api.github.com/graphql"
TOKEN = os.environ.get("GH_TOKEN")
HEADERS = {"Authorization": f"bearer {TOKEN}", "Content-Type": "application/json"}
GRAPHQL_QUERY_TEMPLATE = """
query
}}
"""
def fetch_repo_details(repo_name):
if "/" not in repo_name: return None, None
owner, name = repo_name.split("/", 1)
q = GRAPHQL_QUERY_TEMPLATE.format(repo_owner=owner, repo_name=name)
try:
r = requests.post(GRAPHQL_URL, json={"query": q}, headers=HEADERS, timeout=30)
if r.status_code == 200:
repo = r.json().get("data", {}).get("repository")
if repo: return repo.get("createdAt"), repo.get("stargazerCount")
else:
print(f"[GraphQL {r.status_code}] {repo_name}: {r.text[:120]}")
except requests.RequestException as e:
print(f"[GraphQL ERR] {repo_name}: {e}")
return None, None
def fetch_repo_details_parallel(df):
df["created_at"] = None
df["current_star_count"] = None
with ThreadPoolExecutor(max_workers=10) as ex:
futures = {ex.submit(fetch_repo_details, row["repo_name"]): idx
for idx, row in df.iterrows()}
for fut in tqdm(as_completed(futures), total=len(futures), desc="Repo details"):
idx = futures[fut]
created_at, stars = fut.result()
df.at[idx, "created_at"] = created_at
df.at[idx, "current_star_count"] = stars
return df
# ── OpenRouter AI 总结(可选) ────────────────────────────────
def summarize_with_openrouter(repo_name, star_count, created_at, current_stars):
from openai import OpenAI
api_key = os.environ.get("OPENROUTER_API_KEY")
if not api_key: return None
client = OpenAI(base_url="https://openrouter.ai/api/v1", api_key=api_key)
prompt = (f"请分析以下 GitHub 项目,用简洁的中文总结(100字以内):\n"
f"项目名称: {repo_name}\n最近新增星标: {star_count}\n"
f"当前总星标: {current_stars}\n创建时间: {created_at}\n\n"
"请回答:\n1. 这个项目是做什么的?(推测)\n2. 为什么值得关注?\n"
"输出格式: 直接输出总结内容, 不要标题。")
try:
resp = client.chat.completions.create(
model="xiaomi/mimo-v2-flash:free",
messages=[
{"role": "system", "content": "你是一个技术分析师,擅长分析 GitHub 项目。"},
{"role": "user", "content": prompt},
])
return resp.choices[0].message.content.strip()
except Exception as e:
print(f"[OpenRouter ERR] {repo_name}: {e}")
return None
def generate_summaries(df, top_n=TOP_N_AI):
df["ai_summary"] = None
def _one(idx):
row = df.iloc[idx]
if pd.isna(row["created_at"]) or row["current_star_count"] is None:
return idx, None
return idx, summarize_with_openrouter(
row["repo_name"], row["star_count"],
row["created_at"], row["current_star_count"])
with ThreadPoolExecutor(max_workers=10) as ex:
futures = {ex.submit(_one, i): i for i in range(min(top_n, len(df)))}
for fut in tqdm(as_completed(futures), total=len(futures), desc="AI summaries"):
idx, summary = fut.result()
df.at[df.index[idx], "ai_summary"] = summary
return df
def main():
if not TOKEN:
raise RuntimeError("缺少 GH_TOKEN 环境变量")
t0 = time.time()
print(">> 1/3 GH Archive ingest ...")
df = collect_watch_events(HOURS_WINDOW)
prune_cache(KEEP_DAYS)
if df.empty: print("[WARN] empty result"); return
print(">> 2/3 GraphQL ...")
df = fetch_repo_details_parallel(df)
if os.environ.get("OPENROUTER_API_KEY"):
print(f">> 3/3 AI summaries (top {TOP_N_AI}) ...")
df = generate_summaries(df, TOP_N_AI)
else:
print(">> skip AI (OPENROUTER_API_KEY not set)")
df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce", utc=True)
df = df.sort_values("created_at", ascending=False)
today = datetime.now().strftime("%Y-%m-%d")
os.makedirs(RESULT_DIR, exist_ok=True)
out = os.path.join(RESULT_DIR, f"result_{today}.csv")
df.to_csv(out, index=False)
os.makedirs(WEB_OUT_DIR, exist_ok=True)
shutil.copy(out, os.path.join(WEB_OUT_DIR, "result.csv"))
print(f"[done] {out} rows={len(df)} elapsed={time.time()-t0:.1f}s")
if __name__ == "__main__":
main()
Part B — analyze.py(在 .cache/ 上的离线分析库)
"""
github-hunter 离线分析层
依赖 .cache/*.parquet (由 main.py 产出)
用 DuckDB 在原地做 SQL,不需要预合并文件。
"""
from __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
import duckdb
import pandas as pd
CACHE_DIR = ".cache"
GLOB = os.path.join(CACHE_DIR, "*.parquet").replace("\\", "/")
_con = duckdb.connect(database=":memory:")
_con.execute("PRAGMA threads=8")
_view_registered = False
def _register_view() -> None:
"""events(hour TIMESTAMP_UTC, repo_name TEXT, count BIGINT)"""
global _view_registered
if _view_registered:
return
_con.execute(f"""
CREATE OR REPLACE VIEW events AS
SELECT
strptime(
regexp_extract(
regexp_replace(filename, '^.*[\\\\/]', ''),
'(\\d{{4}}-\\d{{2}}-\\d{{2}})-(\\d{{1,2}})\\.parquet$',
0),
'%Y-%m-%d-%H.parquet'
) AS hour,
repo_name,
count
FROM read_parquet('{GLOB}', filename = true)
""")
_view_registered = True
def refresh_view() -> None:
"""ingest 新文件后强制重建视图(在长跑进程里调用)"""
global _view_registered
_view_registered = False
_register_view()
# ── 1. 单仓库 star 增长曲线 ───────────────────────────────────
def star_curve(repo: str,
freq: str = "day",
since: datetime | None = None,
until: datetime | None = None) -> pd.DataFrame:
_register_view()
until = until or datetime.now(timezone.utc)
since = since or until - timedelta(days=90)
truncate = {"hour": "hour", "day": "day", "week": "week"}[freq]
df = _con.execute(f"""
SELECT date_trunc('{truncate}', hour) AS bucket,
SUM(count) AS stars
FROM events
WHERE repo_name = ? AND hour >= ? AND hour < ?
GROUP BY bucket ORDER BY bucket
""", [repo, since, until]).df()
df["cum_stars"] = df["stars"].cumsum()
return df
# ── 2. 任意时段 Top N ────────────────────────────────────────
def top_n_in_range(since: datetime, until: datetime, n: int = 100) -> pd.DataFrame:
_register_view()
return _con.execute("""
SELECT repo_name, SUM(count) AS stars
FROM events WHERE hour >= ? AND hour < ?
GROUP BY repo_name ORDER BY stars DESC LIMIT ?
""", [since, until, n]).df()
def top_n_on_day(date: str, n: int = 100) -> pd.DataFrame:
d = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
return top_n_in_range(d, d + timedelta(days=1), n)
def top_n_on_week(week_start: str, n: int = 100) -> pd.DataFrame:
d = datetime.strptime(week_start, "%Y-%m-%d").replace(tzinfo=timezone.utc)
return top_n_in_range(d, d + timedelta(days=7), n)
# ── 3. 连续 N 天高速增长 ──────────────────────────────────────
def fast_growing(streak_days: int = 3,
daily_threshold: int = 50,
lookback_days: int = 14) -> pd.DataFrame:
_register_view()
until = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0)
since = until - timedelta(days=lookback_days)
return _con.execute("""
WITH daily AS (
SELECT date_trunc('day', hour) AS day,
repo_name, SUM(count) AS daily_stars
FROM events WHERE hour >= ? AND hour < ?
GROUP BY day, repo_name
),
flagged AS (
SELECT *, (daily_stars >= ?) AS hot,
ROW_NUMBER() OVER (PARTITION BY repo_name ORDER BY day) -
ROW_NUMBER() OVER (PARTITION BY repo_name, (daily_stars >= ?) ORDER BY day)
AS grp
FROM daily
),
streaks AS (
SELECT repo_name, grp,
COUNT(*) AS streak_len,
SUM(daily_stars) AS streak_stars,
MIN(day) AS streak_start,
MAX(day) AS streak_end
FROM flagged WHERE hot
GROUP BY repo_name, grp
HAVING COUNT(*) >= ?
)
SELECT repo_name, streak_len, streak_stars, streak_start, streak_end
FROM streaks ORDER BY streak_stars DESC
""", [since, until, daily_threshold, daily_threshold, streak_days]).df()
# ── 4. 加速度榜:今日 vs 7日均 ────────────────────────────────
def acceleration_board(min_today: int = 20, top_n: int = 100) -> pd.DataFrame:
_register_view()
today = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0)
return _con.execute("""
WITH today AS (
SELECT repo_name, SUM(count) AS today_stars
FROM events WHERE hour >= ? AND hour < ?
GROUP BY repo_name
),
week AS (
SELECT repo_name, SUM(count) / 7.0 AS avg7
FROM events WHERE hour >= ? AND hour < ?
GROUP BY repo_name
)
SELECT t.repo_name, t.today_stars,
COALESCE(w.avg7, 0) AS avg7,
t.today_stars / (COALESCE(w.avg7, 0) + 1) AS accel
FROM today t LEFT JOIN week w USING (repo_name)
WHERE t.today_stars >= ?
ORDER BY accel DESC LIMIT ?
""", [today, today + timedelta(days=1),
today - timedelta(days=7), today,
min_today, top_n]).df()
# ── 5. 幽灵仓库候选 ───────────────────────────────────────────
def ghost_repo_candidates(lookback_days: int = 7,
min_stars: int = 30) -> pd.DataFrame:
"""挑出累计 stars >= min_stars 但 GraphQL 查不到的仓库"""
_register_view()
until = datetime.now(timezone.utc)
since = until - timedelta(days=lookback_days)
candidates = _con.execute("""
SELECT repo_name, SUM(count) AS stars
FROM events WHERE hour >= ? AND hour < ?
GROUP BY repo_name HAVING SUM(count) >= ?
ORDER BY stars DESC
""", [since, until, min_stars]).df()
from main import fetch_repo_details_parallel
enriched = fetch_repo_details_parallel(candidates.copy())
ghosts = enriched[enriched["created_at"].isna()].copy()
return ghosts[["repo_name", "stars"]].sort_values("stars", ascending=False)
# ── 6. 命令行直接预览 ─────────────────────────────────────────
if __name__ == "__main__":
pd.set_option("display.max_rows", 30)
print("\n== acceleration_board(top 20) ==")
print(acceleration_board(min_today=20, top_n=20))
print("\n== fast_growing(streak=3, >=50/d, 14d) ==")
print(fast_growing(3, 50, 14).head(20))
Part C — daily_report.py(每日报告 + 可选 AI 摘要 + 可选推送)
"""
每日定时任务:
- 在 .cache/ 上跑三种分析,写到 reports/
- 可选:用 OpenRouter 生成一段 Markdown 日报
- 可选:把日报推送到 Webhook (Telegram / 企业微信 / Slack 通用)
"""
from __future__ import annotations
import os, json, requests
from datetime import datetime, timezone
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
from analyze import (
fast_growing, acceleration_board, ghost_repo_candidates,
top_n_on_day,
)
load_dotenv()
REPORT_DIR = Path("reports"); REPORT_DIR.mkdir(exist_ok=True)
def _today_utc() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def _yesterday_utc() -> str:
from datetime import timedelta
return (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d")
def collect() -> dict[str, pd.DataFrame]:
"""跑全部分析,落 CSV,同时返回 DataFrame 字典供下游使用"""
today = _today_utc()
yday = _yesterday_utc()
data = {
"yesterday_top": top_n_on_day(yday, n=50),
"acceleration": acceleration_board(min_today=20, top_n=50),
"fast_growing": fast_growing(streak_days=3, daily_threshold=50,
lookback_days=14),
"ghosts": ghost_repo_candidates(lookback_days=7, min_stars=30),
}
for name, df in data.items():
path = REPORT_DIR / f"{name}_{today}.csv"
df.to_csv(path, index=False)
print(f"[report] {path} rows={len(df)}")
return data
# ── 用 LLM 把四张表压成一段 Markdown 日报 ─────────────────────
def render_markdown(data: dict[str, pd.DataFrame]) -> str:
from openai import OpenAI
api_key = os.environ.get("OPENROUTER_API_KEY")
today = _today_utc()
sections = []
sections.append(f"# GitHub Hunter 日报 · {today}\n")
def _fmt(df: pd.DataFrame, limit: int = 10) -> str:
if df.empty: return "_(无数据)_\n"
return df.head(limit).to_markdown(index=False) + "\n"
sections.append("## 1. 昨日 Top 50(摘前 10)\n" + _fmt(data["yesterday_top"]))
sections.append("## 2. 今日加速度榜(摘前 10)\n" + _fmt(data["acceleration"]))
sections.append("## 3. 连续 3 天高速增长(摘前 10)\n" + _fmt(data["fast_growing"]))
sections.append("## 4. 幽灵仓库候选(摘前 10)\n" + _fmt(data["ghosts"]))
md = "\n".join(sections)
if not api_key:
return md # 没配 LLM 就只给原始数据
# 让 LLM 加一段总结开头
client = OpenAI(base_url="https://openrouter.ai/api/v1", api_key=api_key)
prompt = ("你是一名 GitHub 趋势分析师。基于以下四张表,"
"用中文写一段 200 字以内的总览开篇(不要罗列表格),"
"突出今天最值得关注的 2~3 个项目,以及任何异常信号。\n\n" + md)
try:
resp = client.chat.completions.create(
model="xiaomi/mimo-v2-flash:free",
messages=[
{"role": "system", "content": "你擅长用简洁中文写技术趋势日报。"},
{"role": "user", "content": prompt},
])
intro = resp.choices[0].message.content.strip()
return f"# GitHub Hunter 日报 · {today}\n\n> {intro}\n\n" + "\n".join(sections[1:])
except Exception as e:
print(f"[LLM] {e}")
return md
# ── 推送到 Webhook(可选) ───────────────────────────────────
def push_webhook(markdown: str) -> None:
url = os.environ.get("REPORT_WEBHOOK_URL")
if not url:
return
style = os.environ.get("REPORT_WEBHOOK_STYLE", "raw") # raw|wecom|tg
try:
if style == "wecom": # 企业微信机器人
payload = {"msgtype": "markdown", "markdown": {"content": markdown[:4000]}}
elif style == "tg": # Telegram sendMessage,url 形如
# https://api.telegram.org/bot<token>/sendMessage?chat_id=xxx
payload = {"text": markdown[:4000], "parse_mode": "Markdown"}
else:
payload = {"text": markdown}
r = requests.post(url, json=payload, timeout=30)
print(f"[push] {style} -> {r.status_code}")
except Exception as e:
print(f"[push] {e}")
def main():
data = collect()
md = render_markdown(data)
md_path = REPORT_DIR / f"daily_{_today_utc()}.md"
md_path.write_text(md, encoding="utf-8")
print(f"[report] {md_path}")
push_webhook(md)
if __name__ == "__main__":
main()
运行方式(Windows 任务计划程序)
把整个 github-hunter/ 放在 E:\mywork\github-hunter\,先手动跑一次确认无误:
cd /d E:\mywork\github-hunter
python -m pip install -r requirements.txt
python main.py :: 首次会下满 24 个 hourly,~5 分钟
python analyze.py :: 直接命令行看两张表
python daily_report.py :: 生成 reports/daily_YYYY-MM-DD.md
然后加两个计划任务:
- 任务 1:每小时 ingest — 触发器"每天 00:05,每 1 小时一次",操作
python E:\mywork\github-hunter\main.py,这会让 .cache/ 永远保持最新且每次仅下载 1 个 hourly。
- 任务 2:每天报告 — 触发器"每天 UTC 01:00(国内 09:00)",操作
python E:\mywork\github-hunter\daily_report.py,产出 Markdown 日报并(若配了 REPORT_WEBHOOK_URL)推送到机器人。
可选:推送到企业微信 / Telegram
在 .env 里加两行就能开启:
REPORT_WEBHOOK_URL=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxxxxx
REPORT_WEBHOOK_STYLE=wecom
或 Telegram:
REPORT_WEBHOOK_URL=https://api.telegram.org/bot123456:ABC.../sendMessage?chat_id=987654
REPORT_WEBHOOK_STYLE=tg
留空则只把 Markdown 写到 reports/daily_YYYY-MM-DD.md,不发任何外部消息。
一个常见疑问预答
analyze.py 第一次跑会触发 DuckDB 扫所有 parquet 的 metadata,在 .cache/ 文件数还少(< 100)时几乎瞬时;长到一年(~8400 个文件)后大约要 3–8 秒。_register_view() 用 _view_registered 这个模块级开关做了一次性注册,所以同一个进程里反复调用各 top_n_* / fast_growing / acceleration_board 都是毫秒级响应。如果你在 Jupyter 里增量加了新数据,显式调一下 analyze.refresh_view() 让视图重建即可。
到这里整套架构就完整了:main.py 持续 ingest,.cache/ 沉淀一年的 hourly 倒排索引,analyze.py 提供 SQL 化的分析能力,daily_report.py 把这些能力变成可推送的日报。 与原版 github-hunter 相比,本地化、零 GCP 依赖、每次运行成本降到下载量的 4%,还多出了"历史回放、加速度榜、幽灵仓库检测、连续增长黑马"四类 BigQuery 原版没有的能力。
内容由 AI 生成仅供参考
不依赖 BigQuery、不依赖 GCP,纯本地 parquet 倒排索引 + DuckDB SQL。
下面是把"持续 ingest + 本地时间序列分析 + 日报推送"三件事拆成三个独立脚本的完整版。三份代码放在同一个目录下,共享同一份
.cache/,互不阻塞。项目目录结构(最终形态)
requirements.txt.envPart A —
main.py(ingest + 24h 滑窗结果)把上一版
main.py略作收尾,KEEP_DAYS默认改为 365,从这一刻起.cache/就开始累积一年的 hourly 倒排索引。这一份和上一轮基本一致,贴在这里方便整体复制。Part B —
analyze.py(在.cache/上的离线分析库)Part C —
daily_report.py(每日报告 + 可选 AI 摘要 + 可选推送)运行方式(Windows 任务计划程序)
把整个
github-hunter/放在E:\mywork\github-hunter\,先手动跑一次确认无误:然后加两个计划任务:
python E:\mywork\github-hunter\main.py,这会让.cache/永远保持最新且每次仅下载 1 个 hourly。python E:\mywork\github-hunter\daily_report.py,产出 Markdown 日报并(若配了REPORT_WEBHOOK_URL)推送到机器人。可选:推送到企业微信 / Telegram
在
.env里加两行就能开启:或 Telegram:
留空则只把 Markdown 写到
reports/daily_YYYY-MM-DD.md,不发任何外部消息。一个常见疑问预答
analyze.py第一次跑会触发 DuckDB 扫所有 parquet 的 metadata,在.cache/文件数还少(< 100)时几乎瞬时;长到一年(~8400 个文件)后大约要 3–8 秒。_register_view()用_view_registered这个模块级开关做了一次性注册,所以同一个进程里反复调用各top_n_*/fast_growing/acceleration_board都是毫秒级响应。如果你在 Jupyter 里增量加了新数据,显式调一下analyze.refresh_view()让视图重建即可。到这里整套架构就完整了:
main.py持续 ingest,.cache/沉淀一年的 hourly 倒排索引,analyze.py提供 SQL 化的分析能力,daily_report.py把这些能力变成可推送的日报。 与原版github-hunter相比,本地化、零 GCP 依赖、每次运行成本降到下载量的 4%,还多出了"历史回放、加速度榜、幽灵仓库检测、连续增长黑马"四类 BigQuery 原版没有的能力。内容由 AI 生成仅供参考