代码:
let requestQueue = [];
let processing = 0;
let defaults = {concurrency:5,timeout:10000,retry:3,priority:'weight'};
let fetchImpl;
if(typeof fetch==='undefined'&&typeof require!=='undefined'){
try{fetchImpl=require('node-fetch').default||require('node-fetch')}catch(e){throw new Error('Please install node-fetch for Node.js compatibility')}
}else{fetchImpl=fetch}
function addRequest(input,init={}){
return new Promise((resolve,reject)=>{
let request={input,init:{signal:init.signal||new AbortController().signal,...init},weight:init.weight||1,retryCount:init.retry||defaults.retry,resolve,reject,timestamp:Date.now()};
let index=requestQueue.length;
while(index>0&&requestQueue[index-1].weight<request.weight)index--;
requestQueue.splice(index,0,request);
processQueue()
})
}
function processQueue(){
if(processing>=defaults.concurrency||requestQueue.length===0)return;
processing++;
let request=requestQueue.shift();
executeRequest(request).catch(error=>{
if(request.retryCount>0){request.retryCount--;requestQueue.unshift(request);processQueue()}else{request.reject(error)}
}).finally(()=>{processing--;processQueue()})
}
async function executeRequest(request){
let controller=new AbortController();
let timeoutId=setTimeout(()=>{
controller.abort();
throw new Error(`Request timed out after ${request.init.timeout||defaults.timeout}ms`)
},request.init.timeout||defaults.timeout);
try{
let response=await Promise.race([
fetchImpl(request.input,{...request.init,signal:controller.signal}),
createTimeoutPromise(request)
]);
if(request.init.streamMatch)return handleStream(response,request);
return response;
}catch(error){throw error}finally{clearTimeout(timeoutId)}
}
function createTimeoutPromise(request){
return new Promise((_,reject)=>{
setTimeout(()=>{
reject(new Error(`Request timed out after ${request.init.timeout||defaults.timeout}ms`))
},request.init.timeout||defaults.timeout)
})
}
async function handleStream(response,request){
let matcher=request.init.streamMatch,reader=response.body.getReader(),receivedLength=0,chunks=[];
while(true){
let {done,value}=await reader.read();
if(done)break;
chunks.push(value);
receivedLength+=value.length;
let combined=Buffer.concat(chunks).toString();
if(typeof matcher==='string'&&combined.includes(matcher)){
reader.cancel();
return {matched:true,data:combined}
}
if(matcher instanceof RegExp&&matcher.test(combined)){
reader.cancel();
return {matched:true,data:combined}
}
}
return {matched:false,data:Buffer.concat(chunks).toString()}
}
function enhancedFetch(config={}){
let newDefaults={...defaults,...config};
return function(input,init={}){
return addRequest(input,{...init,weight:init.weight||(newDefaults.priority==='weight'?1:0)})
}
}
if(typeof module!=='undefined'&&module.exports)module.exports=enhancedFetch;
else if(typeof define==='function'&&define.amd)define(()=>enhancedFetch);
else window.enhancedFetch=enhancedFetch;
感觉还挺不错的,不过需要确保这个代码在不同的环境中都能正常运行(比如浏览器和nodejs)。在低版本Node.js环境中,fetch并不是内置的,所以我们需要手动安装node-fetch(我恨你!!!)
然后这是点用法
// 高权重请求插队
enhancedFetch()('https://some.web.site.?/我很重要 :3', { weight: 10 })
// 设置3次重试+8秒超时
enhancedFetch({ retry: 3 })('https://土豆服务器.api', { timeout: 8000 })
// 监控日志流,发现ERROR就停止
enhancedFetch()('https://some.stream', {
streamMatch: /ERROR/,
onMatch: (data) => sendAlert(data)
})
还能并发控制,不过这代码我是真有点看不懂 🙁
问与瞎答:
Q:能兼容axios吗?
A:猜猜为什么是fetch(暂时不行,但可以你似乎可以自己包装成axios格式)
Q:最大支持多少并发?
A:看你设备
Q:队列数据能持久化吗?
A:你的项目是3s崩一次吗?
Q:为什么用权重不用优先级?
A:简单粗暴,你数字大就优先,能给浏览器的队列干飞(bushi