// PROCESSING PIPELINE // // Handles batch processing of URLs with rate limiting and fallback strategies. // // DESIGN: // - fixed chunk size (50) to balance API efficiency and error recovery // - batching for arxiv/s2 APIs, individual fallback on batch failure // - sep handlers for each route type (arxiv, s2, rawhtml) // - JSONL logging of every attempt (success/failure) with timestamps package main import ( "context" "encoding/json" "fmt" "io" "os" "time" ) type ProcessResult struct { ArticlesWritten int Errors int } type URLLogEntry struct { Time string `json:"time"` URL string `json:"url"` Success int `json:"success"` API string `json:"api"` Error string `json:"error,omitempty"` } func logArticleAttempt(logEncoder *json.Encoder, url, api string, err error) error { success := 0 errMsg := "" if err == nil { success = 1 } else { errMsg = err.Error() } return logEncoder.Encode(URLLogEntry{ Time: time.Now().Format(time.RFC3339), URL: url, Success: success, API: api, Error: errMsg, }) } func logEncodingFailure(logEncoder *json.Encoder, url string, err error) error { return logEncoder.Encode(URLLogEntry{ Time: time.Now().Format(time.RFC3339), URL: url, Success: 0, API: "", Error: fmt.Sprintf("encoding error: %v", err), }) } // ProcessURLsWithConfig orchestrates the entire processing pipeline // chunks URLs to balance API efficiency with error recovery func ProcessURLsWithConfig(urls []string, config *Config, encoder *json.Encoder, logFile io.Writer) ProcessResult { result := ProcessResult{} ctx := context.Background() logEncoder := json.NewEncoder(logFile) chunkSize := 50 processedCount := 0 // process URLs in chunks for i := 0; i < len(urls); i += chunkSize { end := i + chunkSize if end > len(urls) { end = len(urls) } chunk := urls[i:end] if config.Verbose && config.Logger != nil { config.Logger.Printf("Processing chunk %d-%d of %d URLs", i+1, end, len(urls)) } // do the work chunkResult := processChunk(ctx, chunk, config, encoder, logEncoder) result.ArticlesWritten += chunkResult.ArticlesWritten result.Errors += chunkResult.Errors processedCount += len(chunk) if config.Verbose && config.Logger != nil { fmt.Fprintf(os.Stderr, "Processed %d articles...\n", processedCount) } } return result } // processChunk handles routing, batching, and fallback for a given chunk of URLs. func processChunk(ctx context.Context, urls []string, config *Config, encoder *json.Encoder, logEncoder *json.Encoder) ProcessResult { result := ProcessResult{} // create temporary articles for routing and processing articles := make([]*Article, len(urls)) for i, url := range urls { articles[i] = &Article{URL: url} } // 1. toute all articles in the chunk for _, article := range articles { routeArticle(article) } // 2. group by type for batching arxivURLs := []string{} s2URLs := []string{} htmlURLs := []string{} for _, article := range articles { switch article.Route { case "arxiv": arxivURLs = append(arxivURLs, article.URL) case "s2": s2URLs = append(s2URLs, article.URL) default: htmlURLs = append(htmlURLs, article.URL) } } // 3. process each type (lim to chunk size) if len(arxivURLs) > 0 { if config.Verbose && config.Logger != nil { config.Logger.Printf("Processing %d arXiv URLs in chunk", len(arxivURLs)) } n, err := processArxiv(ctx, arxivURLs, encoder, config, logEncoder) result.ArticlesWritten += n if err != nil { result.Errors++ if config.Verbose && config.Logger != nil { config.Logger.Printf("Error processing arXiv URLs: %v", err) } } } if len(s2URLs) > 0 { if config.Verbose && config.Logger != nil { config.Logger.Printf("Processing %d Semantic Scholar URLs in chunk", len(s2URLs)) } n, err := processSemanticScholar(ctx, s2URLs, encoder, config, logEncoder) result.ArticlesWritten += n if err != nil { result.Errors++ if config.Verbose && config.Logger != nil { config.Logger.Printf("Error processing S2 URLs: %v", err) } } } if len(htmlURLs) > 0 { if config.Verbose && config.Logger != nil { config.Logger.Printf("Processing %d raw HTML URLs in chunk", len(htmlURLs)) } n, err := processHTML(ctx, htmlURLs, encoder, config, logEncoder) result.ArticlesWritten += n if err != nil { result.Errors++ if config.Verbose && config.Logger != nil { config.Logger.Printf("Error processing HTML URLs: %v", err) } } } return result } func processArxiv(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { articles, err := fetchArxivBatch(ctx, config, urls) if err != nil { if config.Verbose && config.Logger != nil { config.Logger.Printf("arXiv batch failed: %v, falling back to individual processing", err) } return processIndividualArxiv(ctx, urls, encoder, config, logEncoder) } written := 0 for _, article := range articles { if err := encoder.Encode(article); err != nil { if config.Verbose && config.Logger != nil { config.Logger.Printf("Error encoding article: %v", err) } _ = logEncodingFailure(logEncoder, article.URL, err) } else { written++ _ = logArticleAttempt(logEncoder, article.URL, "arxiv", nil) } } return written, nil } func processSemanticScholar(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { articles, err := fetchSemanticScholarBatch(ctx, config, urls) if err != nil { if config.Verbose && config.Logger != nil { config.Logger.Printf("S2 batch failed: %v, falling back to individual processing", err) } return processIndividualS2(ctx, urls, encoder, config, logEncoder) } written := 0 for _, article := range articles { if err := encoder.Encode(article); err != nil { if config.Verbose && config.Logger != nil { config.Logger.Printf("Error encoding article: %v", err) } _ = logEncodingFailure(logEncoder, article.URL, err) } else { written++ _ = logArticleAttempt(logEncoder, article.URL, "s2", nil) } } return written, nil } func processHTML(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { written := 0 for _, url := range urls { article, err := fetchRawHTML(ctx, config, url) if err != nil { _ = logArticleAttempt(logEncoder, url, "", err) if config.Verbose && config.Logger != nil { config.Logger.Printf("Error fetching HTML %s: %v", url, err) } continue } if err := encoder.Encode(article); err != nil { if config.Verbose && config.Logger != nil { config.Logger.Printf("Error encoding article: %v", err) } _ = logEncodingFailure(logEncoder, url, err) } else { written++ _ = logArticleAttempt(logEncoder, url, "html", nil) } } return written, nil } func processIndividualArxiv(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { written := 0 for _, url := range urls { article, err := fetchArxiv(ctx, config, url) if err != nil { _ = logArticleAttempt(logEncoder, url, "", err) if config.Verbose && config.Logger != nil { config.Logger.Printf("Error fetching arXiv %s: %v", url, err) } continue } if err := encoder.Encode(article); err != nil { if config.Verbose && config.Logger != nil { config.Logger.Printf("Error encoding article: %v", err) } _ = logEncodingFailure(logEncoder, url, err) } else { written++ _ = logArticleAttempt(logEncoder, url, "arxiv", nil) } } return written, nil } func processIndividualS2(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { written := 0 for _, url := range urls { article, err := fetchSemanticScholar(ctx, config, url) if err != nil { _ = logArticleAttempt(logEncoder, url, "", err) if config.Verbose && config.Logger != nil { config.Logger.Printf("Error fetching S2 %s: %v", url, err) } continue } if err := encoder.Encode(article); err != nil { if config.Verbose && config.Logger != nil { config.Logger.Printf("Error encoding article: %v", err) } _ = logEncodingFailure(logEncoder, url, err) } else { written++ _ = logArticleAttempt(logEncoder, url, "s2", nil) } } return written, nil }