|
|
|
|
using System;
|
|
|
|
|
using System.Collections.Concurrent;
|
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
using System.Linq;
|
|
|
|
|
using System.Threading;
|
|
|
|
|
using Ksat.Supplyment.Library.Model.Uploader;
|
|
|
|
|
using Ksat.Supplyment.Library.Model;
|
|
|
|
|
|
|
|
|
|
namespace Ksat.Supplyment.Library.Uploader
|
|
|
|
|
{
|
|
|
|
|
public class Uploader<T> : IDisposable
|
|
|
|
|
{
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 上传操作执行者
|
|
|
|
|
/// </summary>
|
|
|
|
|
internal IUploaderOperator<T> mOperator;
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 上传数据队列
|
|
|
|
|
/// </summary>
|
|
|
|
|
private readonly ConcurrentQueue<UploadModel<T>> mDatas = new ConcurrentQueue<UploadModel<T>>();
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 交换数据队列
|
|
|
|
|
/// </summary>
|
|
|
|
|
private readonly ConcurrentQueue<UploadModel<T>> mSwapDatas = new ConcurrentQueue<UploadModel<T>>();
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 上传请求
|
|
|
|
|
/// </summary>
|
|
|
|
|
private readonly LinkedList<Tuple<IUploaderRequest<T>, UploadModel<T>>> mRequests = new LinkedList<Tuple<IUploaderRequest<T>, UploadModel<T>>>();
|
|
|
|
|
|
|
|
|
|
private readonly System.Threading.Thread mDatasWorkerThread;
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 最大队列长度
|
|
|
|
|
/// </summary>
|
|
|
|
|
public int MaxOperatingQueueSize = 4;
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 同时上传的数量
|
|
|
|
|
/// </summary>
|
|
|
|
|
public int MaxCocurrentTaskCount = 2;
|
|
|
|
|
|
|
|
|
|
private int mIsCanceled = 0;
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 任务唯一标识,与数据库中UploaderID对应
|
|
|
|
|
/// </summary>
|
|
|
|
|
private string mTaskIdentify;
|
|
|
|
|
|
|
|
|
|
private Timer mSwapClearTimer;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public Uploader(string ID, IUploaderOperator<T> op)
|
|
|
|
|
{
|
|
|
|
|
if (op == null)
|
|
|
|
|
throw new System.ArgumentNullException();
|
|
|
|
|
this.mTaskIdentify = ID;
|
|
|
|
|
this.mOperator = op;
|
|
|
|
|
mDatasWorkerThread = new System.Threading.Thread(uploadWorker)
|
|
|
|
|
{
|
|
|
|
|
Name = "upload worker",
|
|
|
|
|
IsBackground = true
|
|
|
|
|
};
|
|
|
|
|
mDatasWorkerThread.Start();
|
|
|
|
|
|
|
|
|
|
mSwapClearTimer = new Timer((x) =>
|
|
|
|
|
{
|
|
|
|
|
var toDBs = new List<UploadModel<T>>();
|
|
|
|
|
|
|
|
|
|
UploadModel<T> d;
|
|
|
|
|
while (mSwapDatas.TryDequeue(out d))
|
|
|
|
|
{
|
|
|
|
|
toDBs.Add(d);
|
|
|
|
|
}
|
|
|
|
|
if (toDBs.Count != 0)
|
|
|
|
|
cacheRequest(toDBs);
|
|
|
|
|
}, null, 4000, 4000);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Dispose()
|
|
|
|
|
{
|
|
|
|
|
Interlocked.Exchange(ref mIsCanceled, 1);
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
mSwapClearTimer.Dispose();
|
|
|
|
|
mDatasWorkerThread.Abort();
|
|
|
|
|
|
|
|
|
|
List<UploadModel<T>> toCache = new List<UploadModel<T>>();
|
|
|
|
|
|
|
|
|
|
///取消当前上传中任务,
|
|
|
|
|
lock (mRequests)
|
|
|
|
|
{
|
|
|
|
|
mRequests.ToList().ForEach((r) =>
|
|
|
|
|
{
|
|
|
|
|
r.Item1.UploadFinish -= Req_OnRequestFinished;
|
|
|
|
|
r.Item1.Cancel();
|
|
|
|
|
r.Item2.ErrorInfo = "上传程序中止,任务cancel";
|
|
|
|
|
toCache.Add(r.Item2);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
mDatas.ToList().ForEach((d) => toCache.Add(d));
|
|
|
|
|
mSwapDatas.ToList().ForEach((d) => toCache.Add(d));
|
|
|
|
|
cacheRequest(toCache);
|
|
|
|
|
}
|
|
|
|
|
catch
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 加入到上传任务队列
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <typeparam name="TReq"></typeparam>
|
|
|
|
|
/// <param name="req"></param>
|
|
|
|
|
private void enqueRequest<TReq>(TReq req, UploadModel<T> requestModel) where TReq : IUploaderRequest<T>
|
|
|
|
|
{
|
|
|
|
|
req.UploadFinish += Req_OnRequestFinished;
|
|
|
|
|
lock (mRequests)
|
|
|
|
|
{
|
|
|
|
|
mRequests.AddLast(new Tuple<IUploaderRequest<T>, UploadModel<T>>(req, requestModel));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 从上传任务队列中移除
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <typeparam name="TReq"></typeparam>
|
|
|
|
|
/// <param name="req"></param>
|
|
|
|
|
private UploadModel<T> dequeRequest<TReq>(TReq req) where TReq : IUploaderRequest<T>
|
|
|
|
|
{
|
|
|
|
|
req.UploadFinish -= Req_OnRequestFinished;
|
|
|
|
|
UploadModel<T> reqModel = null;
|
|
|
|
|
lock (mRequests)
|
|
|
|
|
{
|
|
|
|
|
var found = mRequests.FirstOrDefault((x) => x.Item1 == req as IUploaderRequest<T>);
|
|
|
|
|
if (found != null)
|
|
|
|
|
{
|
|
|
|
|
reqModel = found.Item2;
|
|
|
|
|
mRequests.Remove(found);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return reqModel;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 任务完成回调
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="sender"></param>
|
|
|
|
|
/// <param name="success"></param>
|
|
|
|
|
private void Req_OnRequestFinished(object sender, bool success, string reason)
|
|
|
|
|
{
|
|
|
|
|
var request = sender as IUploaderRequest<T>;
|
|
|
|
|
var x = request.RequestData;
|
|
|
|
|
var reqModel = dequeRequest(request);
|
|
|
|
|
|
|
|
|
|
if (reqModel == null)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine("no request found:{request.RequestData}");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
reqModel.ErrorInfo = reason;
|
|
|
|
|
|
|
|
|
|
string cancelRetryReason = null;
|
|
|
|
|
if (success)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
using (var db = new CodeFirstDbContext())
|
|
|
|
|
{
|
|
|
|
|
db.UploadFinishs.Add(new UploadFinish
|
|
|
|
|
{
|
|
|
|
|
CreateAt = reqModel.CreateAt,
|
|
|
|
|
RetryAt = reqModel.RetryAt,
|
|
|
|
|
RetryCount = reqModel.RetryCount,
|
|
|
|
|
RequestData = mOperator.ConvertRequestToCachel(reqModel.Request),
|
|
|
|
|
Tag = reqModel.Tag,
|
|
|
|
|
UploaderID = mTaskIdentify
|
|
|
|
|
});
|
|
|
|
|
db.SaveChanges();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine(ex.Message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (mOperator.ShouldRetryRequest(reqModel, out cancelRetryReason))
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
if (mDatas.Count >= MaxOperatingQueueSize)
|
|
|
|
|
{
|
|
|
|
|
mSwapDatas.Enqueue(reqModel);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
mDatas.Enqueue(reqModel);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
using (var db = new CodeFirstDbContext())
|
|
|
|
|
{
|
|
|
|
|
db.UploadCancels.Add(new UploadCancel
|
|
|
|
|
{
|
|
|
|
|
CreateAt = reqModel.CreateAt,
|
|
|
|
|
RetryAt = reqModel.RetryAt,
|
|
|
|
|
RetryCount = reqModel.RetryCount,
|
|
|
|
|
RequestData = mOperator.ConvertRequestToCachel(reqModel.Request),
|
|
|
|
|
ErrorInfo = "Task cancel Retry:" + cancelRetryReason ?? "No Reason",
|
|
|
|
|
Tag = reqModel.Tag,
|
|
|
|
|
UploaderID = mTaskIdentify
|
|
|
|
|
});
|
|
|
|
|
db.SaveChanges();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine(ex.Message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} task {x} result:{success}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 上传worker
|
|
|
|
|
/// </summary>
|
|
|
|
|
private void uploadWorker()
|
|
|
|
|
{
|
|
|
|
|
while (!Interlocked.Equals(mIsCanceled, 1))
|
|
|
|
|
{
|
|
|
|
|
int reqCount;
|
|
|
|
|
lock (mRequests)
|
|
|
|
|
reqCount = mRequests.Count;
|
|
|
|
|
if (reqCount >= MaxCocurrentTaskCount)
|
|
|
|
|
{
|
|
|
|
|
System.Threading.Thread.Sleep(10);
|
|
|
|
|
Console.WriteLine($@"{DateTime.Now.ToString("HH:mm:ss.fff")} task exceed {MaxCocurrentTaskCount}, waiting");
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
int newRequestCount = MaxCocurrentTaskCount - reqCount;
|
|
|
|
|
|
|
|
|
|
//只尝试加载一次cache
|
|
|
|
|
bool hasGotCache = false;
|
|
|
|
|
for (int i = 0; i < newRequestCount;)
|
|
|
|
|
{
|
|
|
|
|
UploadModel<T> d;
|
|
|
|
|
if (mDatas.TryDequeue(out d))
|
|
|
|
|
{
|
|
|
|
|
string donotStartReason;
|
|
|
|
|
if (!mOperator.ShouldStartRequest(d, out donotStartReason))
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
using (var db = new CodeFirstDbContext())
|
|
|
|
|
{
|
|
|
|
|
db.UploadCancels.Add(new UploadCancel
|
|
|
|
|
{
|
|
|
|
|
CreateAt = d.CreateAt,
|
|
|
|
|
RetryAt = d.RetryAt,
|
|
|
|
|
RetryCount = d.RetryCount,
|
|
|
|
|
RequestData = mOperator.ConvertRequestToCachel(d.Request),
|
|
|
|
|
ErrorInfo = "Task cancel Start:" + donotStartReason ?? "No Reason",
|
|
|
|
|
Tag = d.Tag,
|
|
|
|
|
UploaderID = mTaskIdentify
|
|
|
|
|
}); ;
|
|
|
|
|
db.SaveChanges();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine(ex.Message);
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
d.RetryAt = DateTime.Now;
|
|
|
|
|
d.RetryCount++;
|
|
|
|
|
var req = mOperator.UploadData(d.Request);
|
|
|
|
|
enqueRequest(req, d);
|
|
|
|
|
ThreadPool.QueueUserWorkItem(new WaitCallback((o) => req.Start()));
|
|
|
|
|
i++;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
else if (!hasGotCache)
|
|
|
|
|
{
|
|
|
|
|
hasGotCache = true;
|
|
|
|
|
var caches = getCaches(MaxOperatingQueueSize);
|
|
|
|
|
if (caches.Count == 0)
|
|
|
|
|
{
|
|
|
|
|
//cache也没有任务了
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
caches.ForEach((t) => { mDatas.Enqueue(t); });
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void cacheRequest(List<Tuple<T, string>> datas)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
using (var db = new CodeFirstDbContext())
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine($@"cache requests,{datas.Count()}");
|
|
|
|
|
var caches = datas.Select((data) =>
|
|
|
|
|
new UploadCache
|
|
|
|
|
{
|
|
|
|
|
UploaderID = mTaskIdentify,
|
|
|
|
|
CreateAt = DateTime.Now,
|
|
|
|
|
RetryAt = new DateTime(1970, 1, 1),
|
|
|
|
|
RetryCount = 0,
|
|
|
|
|
ErrorInfo = "",
|
|
|
|
|
Tag = data.Item2,
|
|
|
|
|
RequestData = mOperator.ConvertRequestToCachel(data.Item1)
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
db.UploadCaches.AddRange(caches);
|
|
|
|
|
db.SaveChanges();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine(ex.Message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 缓存请求数据到数据库
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="datas"></param>
|
|
|
|
|
private void cacheRequest(List<UploadModel<T>> datas)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
using (var db = new CodeFirstDbContext())
|
|
|
|
|
{
|
|
|
|
|
var caches = datas.Select((data) =>
|
|
|
|
|
new UploadCache
|
|
|
|
|
{
|
|
|
|
|
UploaderID = mTaskIdentify,
|
|
|
|
|
CreateAt = data.CreateAt,
|
|
|
|
|
RetryAt = data.RetryAt,
|
|
|
|
|
RetryCount = data.RetryCount,
|
|
|
|
|
ErrorInfo = data.ErrorInfo,
|
|
|
|
|
Tag = data.Tag,
|
|
|
|
|
RequestData = mOperator.ConvertRequestToCachel(data.Request)
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
db.UploadCaches.AddRange(caches);
|
|
|
|
|
db.SaveChanges();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine(ex.Message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 从数据库中提取指定数量的请求数据
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="number"></param>
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
private List<UploadModel<T>> getCaches(int number)
|
|
|
|
|
{
|
|
|
|
|
var ret = new List<UploadModel<T>>();
|
|
|
|
|
var toGet = number;
|
|
|
|
|
UploadModel<T> model;
|
|
|
|
|
|
|
|
|
|
while (toGet > 0 && mSwapDatas.TryDequeue(out model))
|
|
|
|
|
{
|
|
|
|
|
ret.Add(model);
|
|
|
|
|
toGet--;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
using (var db = new CodeFirstDbContext())
|
|
|
|
|
{
|
|
|
|
|
var caches = from c in db.UploadCaches
|
|
|
|
|
orderby c.RetryAt ascending
|
|
|
|
|
where c.UploaderID == this.mTaskIdentify
|
|
|
|
|
select c;
|
|
|
|
|
var x = caches.Take(toGet).ToList();
|
|
|
|
|
//从数据库中移除
|
|
|
|
|
db.UploadCaches.RemoveRange(x);
|
|
|
|
|
db.SaveChanges();
|
|
|
|
|
var modelsFromDB = x.Select((data) =>
|
|
|
|
|
new UploadModel<T>
|
|
|
|
|
{
|
|
|
|
|
CreateAt = data.CreateAt,
|
|
|
|
|
RetryAt = data.RetryAt,
|
|
|
|
|
RetryCount = data.RetryCount,
|
|
|
|
|
ErrorInfo = data.ErrorInfo,
|
|
|
|
|
Tag = data.Tag,
|
|
|
|
|
Request = mOperator.ConvertCacheToRequest(data.RequestData)
|
|
|
|
|
}
|
|
|
|
|
).ToList();
|
|
|
|
|
|
|
|
|
|
ret.AddRange(modelsFromDB);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine(ex.Message);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
/// 添加上传请求
|
|
|
|
|
/// </summary>
|
|
|
|
|
public void AddUploadRequest(T data, string tag)
|
|
|
|
|
{
|
|
|
|
|
if (!Interlocked.Equals(mIsCanceled, 1))
|
|
|
|
|
{
|
|
|
|
|
var requestData = new UploadModel<T>
|
|
|
|
|
{
|
|
|
|
|
CreateAt = DateTime.Now,
|
|
|
|
|
RetryAt = new DateTime(1970, 1, 1),
|
|
|
|
|
RetryCount = 0,
|
|
|
|
|
ErrorInfo = "",
|
|
|
|
|
Tag = tag,
|
|
|
|
|
Request = data
|
|
|
|
|
};
|
|
|
|
|
if (mDatas.Count >= MaxOperatingQueueSize)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine("add request to swap");
|
|
|
|
|
mSwapDatas.Enqueue(requestData);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine("add request to queue");
|
|
|
|
|
mDatas.Enqueue(requestData);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine("task canceled");
|
|
|
|
|
cacheRequest(new List<Tuple<T, string>>() { new Tuple<T, string>(data, tag) });
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|