diff options
| author | Sam Scholten | 2025-12-15 19:35:46 +1000 |
|---|---|---|
| committer | Sam Scholten | 2025-12-15 19:35:57 +1000 |
| commit | 3562d2fd34bb98d29c7cf6e4d4130129a7bb24f2 (patch) | |
| tree | 42b1f0e0a346a1cf087df90e29a100edbd66b3eb /processor.go | |
| download | scholfetch-3562d2fd34bb98d29c7cf6e4d4130129a7bb24f2.tar.gz scholfetch-3562d2fd34bb98d29c7cf6e4d4130129a7bb24f2.zip | |
Diffstat (limited to 'processor.go')
| -rw-r--r-- | processor.go | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/processor.go b/processor.go new file mode 100644 index 0000000..1079e3d --- /dev/null +++ b/processor.go @@ -0,0 +1,295 @@ +// PROCESSING PIPELINE +// +// Handles batch processing of URLs with rate limiting and fallback strategies. +// +// DESIGN: +// - fixed chunk size (50) to balance API efficiency and error recovery +// - batching for arxiv/s2 APIs, individual fallback on batch failure +// - sep handlers for each route type (arxiv, s2, rawhtml) +// - JSONL logging of every attempt (success/failure) with timestamps +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "time" +) + +type ProcessResult struct { + ArticlesWritten int + Errors int +} + +type URLLogEntry struct { + Time string `json:"time"` + URL string `json:"url"` + Success int `json:"success"` + API string `json:"api"` + Error string `json:"error,omitempty"` +} + +func logArticleAttempt(logEncoder *json.Encoder, url, api string, err error) error { + success := 0 + errMsg := "" + if err == nil { + success = 1 + } else { + errMsg = err.Error() + } + return logEncoder.Encode(URLLogEntry{ + Time: time.Now().Format(time.RFC3339), + URL: url, + Success: success, + API: api, + Error: errMsg, + }) +} + +func logEncodingFailure(logEncoder *json.Encoder, url string, err error) error { + return logEncoder.Encode(URLLogEntry{ + Time: time.Now().Format(time.RFC3339), + URL: url, + Success: 0, + API: "", + Error: fmt.Sprintf("encoding error: %v", err), + }) +} + +// ProcessURLsWithConfig orchestrates the entire processing pipeline +// chunks URLs to balance API efficiency with error recovery +func ProcessURLsWithConfig(urls []string, config *Config, encoder *json.Encoder, logFile io.Writer) ProcessResult { + result := ProcessResult{} + ctx := context.Background() + logEncoder := json.NewEncoder(logFile) + + chunkSize := 50 + + processedCount := 0 + + // process URLs in chunks + for i := 0; i < len(urls); i += chunkSize { + end := i + chunkSize + if end > len(urls) { + end = len(urls) + } + + chunk := urls[i:end] + + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Processing chunk %d-%d of %d URLs", i+1, end, len(urls)) + } + + // do the work + chunkResult := processChunk(ctx, chunk, config, encoder, logEncoder) + + result.ArticlesWritten += chunkResult.ArticlesWritten + result.Errors += chunkResult.Errors + processedCount += len(chunk) + + if config.Verbose && config.Logger != nil { + fmt.Fprintf(os.Stderr, "Processed %d articles...\n", processedCount) + } + } + + return result +} + +// processChunk handles routing, batching, and fallback for a given chunk of URLs. +func processChunk(ctx context.Context, urls []string, config *Config, encoder *json.Encoder, logEncoder *json.Encoder) ProcessResult { + result := ProcessResult{} + + // create temporary articles for routing and processing + articles := make([]*Article, len(urls)) + for i, url := range urls { + articles[i] = &Article{URL: url} + } + + // 1. toute all articles in the chunk + for _, article := range articles { + routeArticle(article) + } + + // 2. group by type for batching + arxivURLs := []string{} + s2URLs := []string{} + htmlURLs := []string{} + + for _, article := range articles { + switch article.Route { + case "arxiv": + arxivURLs = append(arxivURLs, article.URL) + case "s2": + s2URLs = append(s2URLs, article.URL) + default: + htmlURLs = append(htmlURLs, article.URL) + } + } + + // 3. process each type (lim to chunk size) + if len(arxivURLs) > 0 { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Processing %d arXiv URLs in chunk", len(arxivURLs)) + } + n, err := processArxiv(ctx, arxivURLs, encoder, config, logEncoder) + result.ArticlesWritten += n + if err != nil { + result.Errors++ + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error processing arXiv URLs: %v", err) + } + } + } + + if len(s2URLs) > 0 { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Processing %d Semantic Scholar URLs in chunk", len(s2URLs)) + } + n, err := processSemanticScholar(ctx, s2URLs, encoder, config, logEncoder) + result.ArticlesWritten += n + if err != nil { + result.Errors++ + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error processing S2 URLs: %v", err) + } + } + } + + if len(htmlURLs) > 0 { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Processing %d raw HTML URLs in chunk", len(htmlURLs)) + } + n, err := processHTML(ctx, htmlURLs, encoder, config, logEncoder) + result.ArticlesWritten += n + if err != nil { + result.Errors++ + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error processing HTML URLs: %v", err) + } + } + } + + return result +} + +func processArxiv(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { + articles, err := fetchArxivBatch(ctx, config, urls) + if err != nil { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("arXiv batch failed: %v, falling back to individual processing", err) + } + return processIndividualArxiv(ctx, urls, encoder, config, logEncoder) + } + + written := 0 + for _, article := range articles { + if err := encoder.Encode(article); err != nil { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error encoding article: %v", err) + } + _ = logEncodingFailure(logEncoder, article.URL, err) + } else { + written++ + _ = logArticleAttempt(logEncoder, article.URL, "arxiv", nil) + } + } + return written, nil +} + +func processSemanticScholar(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { + articles, err := fetchSemanticScholarBatch(ctx, config, urls) + if err != nil { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("S2 batch failed: %v, falling back to individual processing", err) + } + return processIndividualS2(ctx, urls, encoder, config, logEncoder) + } + + written := 0 + for _, article := range articles { + if err := encoder.Encode(article); err != nil { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error encoding article: %v", err) + } + _ = logEncodingFailure(logEncoder, article.URL, err) + } else { + written++ + _ = logArticleAttempt(logEncoder, article.URL, "s2", nil) + } + } + return written, nil +} + +func processHTML(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { + written := 0 + for _, url := range urls { + article, err := fetchRawHTML(ctx, config, url) + if err != nil { + _ = logArticleAttempt(logEncoder, url, "", err) + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error fetching HTML %s: %v", url, err) + } + continue + } + if err := encoder.Encode(article); err != nil { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error encoding article: %v", err) + } + _ = logEncodingFailure(logEncoder, url, err) + } else { + written++ + _ = logArticleAttempt(logEncoder, url, "html", nil) + } + } + return written, nil +} + +func processIndividualArxiv(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { + written := 0 + for _, url := range urls { + article, err := fetchArxiv(ctx, config, url) + if err != nil { + _ = logArticleAttempt(logEncoder, url, "", err) + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error fetching arXiv %s: %v", url, err) + } + continue + } + if err := encoder.Encode(article); err != nil { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error encoding article: %v", err) + } + _ = logEncodingFailure(logEncoder, url, err) + } else { + written++ + _ = logArticleAttempt(logEncoder, url, "arxiv", nil) + } + } + return written, nil +} + +func processIndividualS2(ctx context.Context, urls []string, encoder *json.Encoder, config *Config, logEncoder *json.Encoder) (int, error) { + written := 0 + for _, url := range urls { + article, err := fetchSemanticScholar(ctx, config, url) + if err != nil { + _ = logArticleAttempt(logEncoder, url, "", err) + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error fetching S2 %s: %v", url, err) + } + continue + } + if err := encoder.Encode(article); err != nil { + if config.Verbose && config.Logger != nil { + config.Logger.Printf("Error encoding article: %v", err) + } + _ = logEncodingFailure(logEncoder, url, err) + } else { + written++ + _ = logArticleAttempt(logEncoder, url, "s2", nil) + } + } + return written, nil +} |
