def normalize(data):
return {"text": (data.get("text") or "").strip().lower()}
def tokenize(data):
text = data.get("text", "")
cleaned = "".join(c if (c.isalnum() or c.isspace()) else " " for c in text)
tokens = [t for t in cleaned.split() if t]
return {"tokens": tokens, "count": len(tokens)}
def sentiment(data):
toks = data.get("tokens", [])
pos = sum(t in POSITIVE for t in toks)
neg = sum(t in NEGATIVE for t in toks)
score = pos - neg
label = "positive" if score > 0 else "negative" if score < 0 else "neutral"
return {"label": label, "score": score, "pos": pos, "neg": neg}
def keywords(data):
toks = data.get("tokens", [])
stop = {"the","a","an","is","it","to","of","and","in","for","on","how"}
freq = Counter(t for t in toks if t not in stop and len(t) > 2)
return {"keywords": freq.most_common(data.get("top_n", 5))}
def analyze(data):
norm = worker.trigger({"function_id": "text::normalize", "payload": {"text": data.get("text","")}})
toks = worker.trigger({"function_id": "text::tokenize", "payload": norm})
sent = worker.trigger({"function_id": "text::sentiment", "payload": toks})
keys = worker.trigger({"function_id": "text::keywords", "payload": {**toks, "top_n": data.get("top_n", 5)}})
with _LOCK:
_STATE["docs_analyzed"] += 1
for k, c in keys["keywords"]:
_STATE["keyword_totals"][k] += c
n = _STATE["docs_analyzed"]
return {"tokens": toks["count"], "sentiment": sent, "keywords": keys["keywords"], "docs_analyzed": n}
def report(data):
with _LOCK:
return {"docs_analyzed": _STATE["docs_analyzed"],
"heartbeats": _STATE["heartbeats"],
"top_keywords_all_docs": _STATE["keyword_totals"].most_common(5)}
def http_analyze(data):
body = data.get("body") or {}
result = worker.trigger({"function_id": "pipeline::analyze", "payload": body})
return {"status_code": 200, "body": result, "headers": {"Content-Type": "application/json"}}
def heartbeat(data):
with _LOCK:
_STATE["heartbeats"] += 1
return {"ok": True}
for fid, fn in [
("text::normalize", normalize), ("text::tokenize", tokenize),
("text::sentiment", sentiment), ("text::keywords", keywords),
("pipeline::analyze", analyze), ("stats::report", report),
("http::analyze", http_analyze), ("cron::heartbeat", heartbeat),
]:
worker.register_function(fid, fn)
Credit: Source link



























