You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

412 lines
14 KiB

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Ksat.Supplyment.Library.Model.Uploader;
using Ksat.Supplyment.Library.Model;
namespace Ksat.Supplyment.Library.Uploader
{
public class Uploader<T> : IDisposable
{
/// <summary>
/// 上传操作执行者
/// </summary>
internal IUploaderOperator<T> mOperator;
/// <summary>
/// 上传数据队列
/// </summary>
private readonly ConcurrentQueue<UploadModel<T>> mDatas = new ConcurrentQueue<UploadModel<T>>();
/// <summary>
/// 交换数据队列
/// </summary>
private readonly ConcurrentQueue<UploadModel<T>> mSwapDatas = new ConcurrentQueue<UploadModel<T>>();
/// <summary>
/// 上传请求
/// </summary>
private readonly LinkedList<Tuple<IUploaderRequest<T>, UploadModel<T>>> mRequests = new LinkedList<Tuple<IUploaderRequest<T>, UploadModel<T>>>();
private readonly System.Threading.Thread mDatasWorkerThread;
/// <summary>
/// 最大队列长度
/// </summary>
public int MaxOperatingQueueSize = 5;
/// <summary>
/// 同时上传的数量
/// </summary>
public int MaxCocurrentTaskCount = 2;
private int mIsCanceled = 0;
/// <summary>
/// 任务唯一标识与数据库中UploaderID对应
/// </summary>
private string mTaskIdentify;
private Timer mSwapClearTimer;
public Uploader(string ID, IUploaderOperator<T> op)
{
if (op == null)
throw new System.ArgumentNullException();
this.mTaskIdentify = ID;
this.mOperator = op;
//sLog = LoggerFactory.ForContext("Uploader:" + this.mTaskIdentify);
mDatasWorkerThread = new System.Threading.Thread(uploadWorker)
{
Name = "upload worker",
IsBackground = true
};
mDatasWorkerThread.Start();
mSwapClearTimer = new Timer((x) =>
{
var toDBs = new List<UploadModel<T>>();
UploadModel<T> d;
while (mSwapDatas.TryDequeue(out d))
{
toDBs.Add(d);
}
if (toDBs.Count != 0)
cacheRequest(toDBs);
}, null, 4000, 4000);
}
public void Dispose()
{
Interlocked.Exchange(ref mIsCanceled, 1);
try
{
mSwapClearTimer.Dispose();
mDatasWorkerThread.Abort();
List<UploadModel<T>> toCache = new List<UploadModel<T>>();
///取消当前上传中任务,
lock (mRequests)
{
mRequests.ToList().ForEach((r) =>
{
r.Item1.UploadFinish -= Req_OnRequestFinished;
r.Item1.Cancel();
r.Item2.ErrorInfo = "上传程序中止任务cancel";
toCache.Add(r.Item2);
});
}
mDatas.ToList().ForEach((d) => toCache.Add(d));
mSwapDatas.ToList().ForEach((d) => toCache.Add(d));
cacheRequest(toCache);
}
catch
{
}
}
/// <summary>
/// 加入到上传任务队列
/// </summary>
/// <typeparam name="TReq"></typeparam>
/// <param name="req"></param>
private void enqueRequest<TReq>(TReq req, UploadModel<T> requestModel) where TReq : IUploaderRequest<T>
{
req.UploadFinish += Req_OnRequestFinished;
lock (mRequests)
{
mRequests.AddLast(new Tuple<IUploaderRequest<T>, UploadModel<T>>(req, requestModel));
}
}
/// <summary>
/// 从上传任务队列中移除
/// </summary>
/// <typeparam name="TReq"></typeparam>
/// <param name="req"></param>
private UploadModel<T> dequeRequest<TReq>(TReq req) where TReq : IUploaderRequest<T>
{
req.UploadFinish -= Req_OnRequestFinished;
UploadModel<T> reqModel = null;
lock (mRequests)
{
var found = mRequests.FirstOrDefault((x) => x.Item1 == req as IUploaderRequest<T>);
if (found != null)
{
reqModel = found.Item2;
mRequests.Remove(found);
}
}
return reqModel;
}
/// <summary>
/// 任务完成回调
/// </summary>
/// <param name="sender"></param>
/// <param name="success"></param>
private void Req_OnRequestFinished(object sender, bool success, string reason)
{
var request = sender as IUploaderRequest<T>;
var x = (request.RequestData);
var reqModel = dequeRequest(request);
if (reqModel == null)
{
//sLog.Formf($"no request found:{request.RequestData}");
return;
}
reqModel.ErrorInfo = reason;
//sLog.Formf($"{reqModel.Request} finished:{success}");
string cancelRetryReason = null;
if (success)
using (var db = new CodeFirstDbContext())
{
db.UploadFinishs.Add(new UploadFinish
{
CreateAt = reqModel.CreateAt,
RetryAt = reqModel.RetryAt,
RetryCount = reqModel.RetryCount,
RequestData = mOperator.ConvertRequestToCachel(reqModel.Request),
UploaderID = mTaskIdentify
});
db.SaveChanges();
}
else if (mOperator.ShouldRetryRequest(reqModel, out cancelRetryReason))
{
if (mDatas.Count >= MaxOperatingQueueSize)
{
mSwapDatas.Enqueue(reqModel);
}
else
{
mDatas.Enqueue(reqModel);
}
}
else
{
using (var db = new CodeFirstDbContext())
{
db.UploadCancels.Add(new UploadCancel
{
CreateAt = reqModel.CreateAt,
RetryAt = reqModel.RetryAt,
RetryCount = reqModel.RetryCount,
RequestData = mOperator.ConvertRequestToCachel(reqModel.Request),
ErrorInfo = "Task cancel Retry:" + cancelRetryReason ?? "No Reason",
UploaderID = mTaskIdentify
});;
db.SaveChanges();
}
}
Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task {x} result:{success}");
//throw new NotImplementedException();
}
/// <summary>
/// 上传worker
/// </summary>
private void uploadWorker()
{
while (!Interlocked.Equals(mIsCanceled, 1))
{
int reqCount;
lock (mRequests)
reqCount = mRequests.Count;
if (reqCount >= MaxCocurrentTaskCount)
{
System.Threading.Thread.Sleep(10);
Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task exceed {MaxCocurrentTaskCount}, waiting");
continue;
}
int newRequestCount = MaxCocurrentTaskCount - reqCount;
//只尝试加载一次cache
bool hasGotCache = false;
for (int i = 0; i < newRequestCount;)
{
UploadModel<T> d;
if (mDatas.TryDequeue(out d))
{
string donotStartReason;
if (!mOperator.ShouldStartRequest(d, out donotStartReason))
{
using (var db = new CodeFirstDbContext())
{
db.UploadCancels.Add(new UploadCancel
{
CreateAt = d.CreateAt,
RetryAt = d.RetryAt,
RetryCount = d.RetryCount,
RequestData = mOperator.ConvertRequestToCachel(d.Request),
ErrorInfo = "Task cancel Start:" + donotStartReason ?? "No Reason",
UploaderID = mTaskIdentify
}); ;
db.SaveChanges();
}
continue;
}
d.RetryAt = DateTime.Now;
d.RetryCount++;
var req = mOperator.UploadData(d.Request);
enqueRequest(req, d);
ThreadPool.QueueUserWorkItem(new WaitCallback((o) => req.Start()));
//new System.Threading.Thread(req.Start).Start();
//req.Start();
i++;
continue;
}
else if (!hasGotCache)
{
hasGotCache = true;
var caches = getCaches(MaxOperatingQueueSize);
if (caches.Count == 0)
{
//cache也没有任务了
break;
}
caches.ForEach((t) => { mDatas.Enqueue(t); });
continue;
}
}
}
}
private void cacheRequest(List<T> datas)
{
using (var db = new CodeFirstDbContext())
{
//sLog.Formf($"cache requests,{datas.Count()}");
var caches = datas.Select((data) =>
new UploadCache
{
UploaderID = mTaskIdentify,
CreateAt = DateTime.Now,
RetryAt = new DateTime(1970, 1, 1),
RetryCount = 0,
ErrorInfo = "",
RequestData = mOperator.ConvertRequestToCachel(data)
}
);
db.UploadCaches.AddRange(caches);
db.SaveChanges();
}
}
/// <summary>
/// 缓存请求数据到数据库
/// </summary>
/// <param name="datas"></param>
private void cacheRequest(List<UploadModel<T>> datas)
{
//sLog.Formf($"cache requests count,{datas.Count()}");
using (var db = new CodeFirstDbContext())
{
var caches = datas.Select((data) =>
new UploadCache
{
UploaderID = mTaskIdentify,
CreateAt = data.CreateAt,
RetryAt = data.RetryAt,
RetryCount = data.RetryCount,
ErrorInfo = data.ErrorInfo,
RequestData = mOperator.ConvertRequestToCachel(data.Request)
}
);
db.UploadCaches.AddRange(caches);
db.SaveChanges();
}
}
/// <summary>
/// 从数据库中提取指定数量的请求数据
/// </summary>
/// <param name="number"></param>
/// <returns></returns>
private List<UploadModel<T>> getCaches(int number)
{
var ret = new List<UploadModel<T>>();
var toGet = number;
UploadModel<T> model;
while (toGet > 0 && mSwapDatas.TryDequeue(out model))
{
ret.Add(model);
toGet--;
}
using (var db = new CodeFirstDbContext())
{
var caches = from c in db.UploadCaches
orderby c.RetryAt ascending
where c.UploaderID == this.mTaskIdentify
select c;
var x = caches.Take(toGet).ToList();
//从数据库中移除
db.UploadCaches.RemoveRange(x);
db.SaveChanges();
var modelsFromDB = x.Select((data) =>
new UploadModel<T>
{
CreateAt = data.CreateAt,
RetryAt = data.RetryAt,
RetryCount = data.RetryCount,
ErrorInfo = data.ErrorInfo,
Request = mOperator.ConvertCacheToRequest(data.RequestData)
}
).ToList();
ret.AddRange(modelsFromDB);
}
//if (ret.Count() != 0)
//sLog.Formf($"get {number} cache return {ret.Count()}");
return ret;
}
/// <summary>
/// 添加上传请求
/// </summary>
public void AddUploadRequest(T data)
{
if (!Interlocked.Equals(mIsCanceled, 1))
{
var requestData = new UploadModel<T>
{
CreateAt = DateTime.Now,
RetryAt = new DateTime(1970, 1, 1),
RetryCount = 0,
ErrorInfo = "",
Request = data
};
if (mDatas.Count >= MaxOperatingQueueSize)
{
//sLog.Formf($"add request to swap,{data}");
mSwapDatas.Enqueue(requestData);
}
else
{
//sLog.Formf($"add request to queue,{data}");
mDatas.Enqueue(requestData);
}
}
else
{
//sLog.Formf("task canceled");
cacheRequest(new List<T>() { data });
}
}
}
}