aboutsummaryrefslogtreecommitdiff
path: root/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'processor.go')
-rw-r--r--processor.go295
1 files changed, 295 insertions, 0 deletions
diff --git a/processor.go b/processor.go
new file mode 100644
index 0000000..1079e3d
--- /dev/null
+++ b/processor.go
@@ -0,0 +1,295 @@
+// PROCESSING PIPELINE
+//
+// Handles batch processing of URLs with rate limiting and fallback strategies.
+//
+// DESIGN:
+// - fixed chunk size (50) to balance API efficiency and error recovery
+// - batching for arxiv/s2 APIs, individual fallback on batch failure
+// - sep handlers for each route type (arxiv, s2, rawhtml)
+// - JSONL logging of every attempt (success/failure) with timestamps
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "time"
+)
+
+type ProcessResult struct {
+ ArticlesWritten int
+ Errors int
+}
+
+type URLLogEntry struct {
+ Time string `json:"time"`
+ URL string `json:"url"`
+ Success int `json:"success"`
+ API string `json:"api"`
+ Error string `json:"error,omitempty"`
+}
+
+func logArticleAttempt(logEncoder *json.Encoder, url, api string, err error) error {
+ success := 0
+ errMsg := ""
+ if err == nil {
+ success = 1
+ } else {
+ errMsg = err.Error()
+ }
+ return logEncoder.Encode(URLLogEntry{
+ Time: time.Now().Format(time.RFC3339),
+ URL: url,
+ Success: success,
+ API: api,
+ Error: errMsg,
+ })
+}
+
+func logEncodingFailure(logEncoder *json.Encoder, url string, err error) error {
+ return logEncoder.Encode(URLLogEntry{
+ Time: time.Now().Format(time.RFC3339),
+ URL: url,
+ Success: 0,
+ API: "",
+ Error: fmt.Sprintf("encoding error: %v", err),
+ })
+}
+
+// ProcessURLsWithConfig orchestrates the entire processing pipeline
+// chunks URLs to balance API efficiency with error recovery
+func ProcessURLsWithConfig(urls []string, config *Config, encoder *json.Encoder, logFile io.Writer) ProcessResult {
+ result := ProcessResult{}
+ ctx := context.Background()
+ logEncoder := json.NewEncoder(logFile)
+
+ chunkSize := 50
+
+ processedCount := 0
+
+ // process URLs in chunks
+ for i := 0; i < len(urls); i += chunkSize {
+ end := i + chunkSize
+ if end > len(urls) {
+ end = len(urls)
+ }
+
+ chunk := urls[i:end]
+
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Processing chunk %d-%d of %d URLs", i+1, end, len(urls))
+ }
+
+ // do the work
+ chunkResult := processChunk(ctx, chunk, config, encoder, logEncoder)
+
+ result.ArticlesWritten += chunkResult.ArticlesWritten
+ result.Errors += chunkResult.Errors
+ processedCount += len(chunk)
+
+ if config.Verbose && config.Logger != nil {
+ fmt.Fprintf(os.Stderr, "Processed %d articles...\n", processedCount)
+ }
+ }
+
+ return result
+}
+
+// processChunk handles routing, batching, and fallback for a given chunk of URLs.
+func processChunk(ctx context.Context, urls []string, config *Config, encoder *json.Encoder, logEncoder *json.Encoder) ProcessResult {
+ result := ProcessResult{}
+
+ // create temporary articles for routing and processing
+ articles := make([]*Article, len(urls))
+ for i, url := range urls {
+ articles[i] = &Article{URL: url}
+ }
+
+ // 1. toute all articles in the chunk
+ for _, article := range articles {
+ routeArticle(article)
+ }
+
+ // 2. group by type for batching
+ arxivURLs := []string{}
+ s2URLs := []string{}
+ htmlURLs := []string{}
+
+ for _, article := range articles {
+ switch article.Route {
+ case "arxiv":
+ arxivURLs = append(arxivURLs, article.URL)
+ case "s2":
+ s2URLs = append(s2URLs, article.URL)
+ default:
+ htmlURLs = append(htmlURLs, article.URL)
+ }
+ }
+
+ // 3. process each type (lim to chunk size)
+ if len(arxivURLs) > 0 {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Processing %d arXiv URLs in chunk", len(arxivURLs))
+ }
+ n, err := processArxiv(ctx, arxivURLs, encoder, config, logEncoder)
+ result.ArticlesWritten += n
+ if err != nil {
+ result.Errors++
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error processing arXiv URLs: %v", err)
+ }
+ }
+ }
+
+ if len(s2URLs) > 0 {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Processing %d Semantic Scholar URLs in chunk", len(s2URLs))
+ }
+ n, err := processSemanticScholar(ctx, s2URLs, encoder, config, logEncoder)
+ result.ArticlesWritten += n
+ if err != nil {
+ result.Errors++
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error processing S2 URLs: %v", err)
+ }
+ }
+ }
+
+ if len(htmlURLs) > 0 {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Processing %d raw HTML URLs in chunk", len(htmlURLs))
+ }
+ n, err := processHTML(ctx, htmlURLs, encoder, config, logEncoder)
+ result.ArticlesWritten += n
+ if err != nil {
+ result.Errors++
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error processing HTML URLs: %v", err)
+ }
+ }
+ }
+
+ return result
+}
+
+func processArxiv(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) {
+ articles, err := fetchArxivBatch(ctx, config, urls)
+ if err != nil {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("arXiv batch failed: %v, falling back to individual processing", err)
+ }
+ return processIndividualArxiv(ctx, urls, encoder, config, logEncoder)
+ }
+
+ written := 0
+ for _, article := range articles {
+ if err := encoder.Encode(article); err != nil {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error encoding article: %v", err)
+ }
+ _ = logEncodingFailure(logEncoder, article.URL, err)
+ } else {
+ written++
+ _ = logArticleAttempt(logEncoder, article.URL, "arxiv", nil)
+ }
+ }
+ return written, nil
+}
+
+func processSemanticScholar(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) {
+ articles, err := fetchSemanticScholarBatch(ctx, config, urls)
+ if err != nil {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("S2 batch failed: %v, falling back to individual processing", err)
+ }
+ return processIndividualS2(ctx, urls, encoder, config, logEncoder)
+ }
+
+ written := 0
+ for _, article := range articles {
+ if err := encoder.Encode(article); err != nil {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error encoding article: %v", err)
+ }
+ _ = logEncodingFailure(logEncoder, article.URL, err)
+ } else {
+ written++
+ _ = logArticleAttempt(logEncoder, article.URL, "s2", nil)
+ }
+ }
+ return written, nil
+}
+
+func processHTML(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) {
+ written := 0
+ for _, url := range urls {
+ article, err := fetchRawHTML(ctx, config, url)
+ if err != nil {
+ _ = logArticleAttempt(logEncoder, url, "", err)
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error fetching HTML %s: %v", url, err)
+ }
+ continue
+ }
+ if err := encoder.Encode(article); err != nil {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error encoding article: %v", err)
+ }
+ _ = logEncodingFailure(logEncoder, url, err)
+ } else {
+ written++
+ _ = logArticleAttempt(logEncoder, url, "html", nil)
+ }
+ }
+ return written, nil
+}
+
+func processIndividualArxiv(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) {
+ written := 0
+ for _, url := range urls {
+ article, err := fetchArxiv(ctx, config, url)
+ if err != nil {
+ _ = logArticleAttempt(logEncoder, url, "", err)
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error fetching arXiv %s: %v", url, err)
+ }
+ continue
+ }
+ if err := encoder.Encode(article); err != nil {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error encoding article: %v", err)
+ }
+ _ = logEncodingFailure(logEncoder, url, err)
+ } else {
+ written++
+ _ = logArticleAttempt(logEncoder, url, "arxiv", nil)
+ }
+ }
+ return written, nil
+}
+
+func processIndividualS2(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) {
+ written := 0
+ for _, url := range urls {
+ article, err := fetchSemanticScholar(ctx, config, url)
+ if err != nil {
+ _ = logArticleAttempt(logEncoder, url, "", err)
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error fetching S2 %s: %v", url, err)
+ }
+ continue
+ }
+ if err := encoder.Encode(article); err != nil {
+ if config.Verbose && config.Logger != nil {
+ config.Logger.Printf("Error encoding article: %v", err)
+ }
+ _ = logEncodingFailure(logEncoder, url, err)
+ } else {
+ written++
+ _ = logArticleAttempt(logEncoder, url, "s2", nil)
+ }
+ }
+ return written, nil
+}