大改git add .!

master
ZGGSONG 3 years ago
parent 33d5b1bf4e
commit 431bf31f39

@ -9,15 +9,13 @@ namespace Ksat.Supplyment.Library.Uploader
public interface IUploaderOperator<T>
{
/// <summary>
/// 上传数据
/// </summary>
IUploaderRequest<T> UploadData(T data);
T ConvertCacheToRequest(string cache);
string ConvertRequestToCachel(T requestData);
/// <summary>

@ -6,7 +6,7 @@ using Ksat.Supplyment.Library.Model.Uploader;
namespace Ksat.Supplyment.Library.Uploader
{
public delegate void UploadFinishHandler(object sender, bool success, string reason) ;
public delegate void UploadFinishHandler(object sender, bool success, string reason);
public interface IUploaderRequest<T>
{

@ -35,7 +35,7 @@ namespace Ksat.Supplyment.Library.Uploader
/// <summary>
/// 最大队列长度
/// </summary>
public int MaxOperatingQueueSize = 5;
public int MaxOperatingQueueSize = 4;
/// <summary>
/// 同时上传的数量
@ -58,8 +58,6 @@ namespace Ksat.Supplyment.Library.Uploader
throw new System.ArgumentNullException();
this.mTaskIdentify = ID;
this.mOperator = op;
//sLog = LoggerFactory.ForContext("Uploader:" + this.mTaskIdentify);
// 入口
mDatasWorkerThread = new System.Threading.Thread(uploadWorker)
{
Name = "upload worker",
@ -67,7 +65,6 @@ namespace Ksat.Supplyment.Library.Uploader
};
mDatasWorkerThread.Start();
// Swap 处理: 超过4s保存至数据库 Caches 表
mSwapClearTimer = new Timer((x) =>
{
var toDBs = new List<UploadModel<T>>();
@ -160,15 +157,16 @@ namespace Ksat.Supplyment.Library.Uploader
if (reqModel == null)
{
//sLog.Formf($"no request found:{request.RequestData}");
Console.WriteLine("no request found:{request.RequestData}");
return;
}
reqModel.ErrorInfo = reason;
//sLog.Formf($"{reqModel.Request} finished:{success}");
string cancelRetryReason = null;
if (success)
{
try
{
using (var db = new CodeFirstDbContext())
{
db.UploadFinishs.Add(new UploadFinish
@ -177,10 +175,16 @@ namespace Ksat.Supplyment.Library.Uploader
RetryAt = reqModel.RetryAt,
RetryCount = reqModel.RetryCount,
RequestData = mOperator.ConvertRequestToCachel(reqModel.Request),
Tag = reqModel.Tag,
UploaderID = mTaskIdentify
});
db.SaveChanges();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
else if (mOperator.ShouldRetryRequest(reqModel, out cancelRetryReason))
{
@ -195,6 +199,8 @@ namespace Ksat.Supplyment.Library.Uploader
}
}
else
{
try
{
using (var db = new CodeFirstDbContext())
{
@ -205,14 +211,18 @@ namespace Ksat.Supplyment.Library.Uploader
RetryCount = reqModel.RetryCount,
RequestData = mOperator.ConvertRequestToCachel(reqModel.Request),
ErrorInfo = "Task cancel Retry:" + cancelRetryReason ?? "No Reason",
Tag = reqModel.Tag,
UploaderID = mTaskIdentify
});;
});
db.SaveChanges();
}
}
Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task {x} result:{success}");
//throw new NotImplementedException();
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} task {x} result:{success}");
}
/// <summary>
@ -228,7 +238,7 @@ namespace Ksat.Supplyment.Library.Uploader
if (reqCount >= MaxCocurrentTaskCount)
{
System.Threading.Thread.Sleep(10);
Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task exceed {MaxCocurrentTaskCount}, waiting");
Console.WriteLine($@"{DateTime.Now.ToString("HH:mm:ss.fff")} task exceed {MaxCocurrentTaskCount}, waiting");
continue;
}
int newRequestCount = MaxCocurrentTaskCount - reqCount;
@ -242,6 +252,8 @@ namespace Ksat.Supplyment.Library.Uploader
{
string donotStartReason;
if (!mOperator.ShouldStartRequest(d, out donotStartReason))
{
try
{
using (var db = new CodeFirstDbContext())
{
@ -252,10 +264,16 @@ namespace Ksat.Supplyment.Library.Uploader
RetryCount = d.RetryCount,
RequestData = mOperator.ConvertRequestToCachel(d.Request),
ErrorInfo = "Task cancel Start:" + donotStartReason ?? "No Reason",
Tag = d.Tag,
UploaderID = mTaskIdentify
}); ;
db.SaveChanges();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
continue;
}
d.RetryAt = DateTime.Now;
@ -263,8 +281,6 @@ namespace Ksat.Supplyment.Library.Uploader
var req = mOperator.UploadData(d.Request);
enqueRequest(req, d);
ThreadPool.QueueUserWorkItem(new WaitCallback((o) => req.Start()));
//new System.Threading.Thread(req.Start).Start();
//req.Start();
i++;
continue;
}
@ -290,7 +306,7 @@ namespace Ksat.Supplyment.Library.Uploader
{
using (var db = new CodeFirstDbContext())
{
//sLog.Formf($@"cache requests,{datas.Count()}");
Console.WriteLine($@"cache requests,{datas.Count()}");
var caches = datas.Select((data) =>
new UploadCache
{
@ -307,9 +323,9 @@ namespace Ksat.Supplyment.Library.Uploader
db.SaveChanges();
}
}
catch (Exception)
catch (Exception ex)
{
//sLog.FormErrorf(ex.Message);
Console.WriteLine(ex.Message);
}
}
@ -319,7 +335,8 @@ namespace Ksat.Supplyment.Library.Uploader
/// <param name="datas"></param>
private void cacheRequest(List<UploadModel<T>> datas)
{
//sLog.Formf($"cache requests count,{datas.Count()}");
try
{
using (var db = new CodeFirstDbContext())
{
var caches = datas.Select((data) =>
@ -330,6 +347,7 @@ namespace Ksat.Supplyment.Library.Uploader
RetryAt = data.RetryAt,
RetryCount = data.RetryCount,
ErrorInfo = data.ErrorInfo,
Tag = data.Tag,
RequestData = mOperator.ConvertRequestToCachel(data.Request)
}
);
@ -337,6 +355,11 @@ namespace Ksat.Supplyment.Library.Uploader
db.SaveChanges();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
/// <summary>
/// 从数据库中提取指定数量的请求数据
@ -355,6 +378,9 @@ namespace Ksat.Supplyment.Library.Uploader
toGet--;
}
try
{
using (var db = new CodeFirstDbContext())
{
var caches = from c in db.UploadCaches
@ -372,15 +398,19 @@ namespace Ksat.Supplyment.Library.Uploader
RetryAt = data.RetryAt,
RetryCount = data.RetryCount,
ErrorInfo = data.ErrorInfo,
Tag = data.Tag,
Request = mOperator.ConvertCacheToRequest(data.RequestData)
}
).ToList();
ret.AddRange(modelsFromDB);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
//if (ret.Count() != 0)
//sLog.Formf($"get {number} cache return {ret.Count()}");
return ret;
}
@ -391,7 +421,6 @@ namespace Ksat.Supplyment.Library.Uploader
{
if (!Interlocked.Equals(mIsCanceled, 1))
{
var requestData = new UploadModel<T>
{
CreateAt = DateTime.Now,
@ -403,18 +432,18 @@ namespace Ksat.Supplyment.Library.Uploader
};
if (mDatas.Count >= MaxOperatingQueueSize)
{
//sLog.Formf($"add request to swap,{data}");
Console.WriteLine("add request to swap");
mSwapDatas.Enqueue(requestData);
}
else
{
//sLog.Formf($"add request to queue,{data}");
Console.WriteLine("add request to queue");
mDatas.Enqueue(requestData);
}
}
else
{
//sLog.Formf("task canceled");
Console.WriteLine("task canceled");
cacheRequest(new List<Tuple<T, string>>() { new Tuple<T, string>(data, tag) });
}
}

@ -2,9 +2,9 @@
using System.Net;
using System.Text;
namespace S3Demo.Model
namespace S3Demo.Helper
{
public class HttpModel
public class HttpHelper
{
/// <summary>
/// POST请求

@ -10,9 +10,9 @@ using Amazon.Runtime;
using Amazon.S3;
using Amazon.S3.Model;
namespace S3Demo.Model
namespace S3Demo.Helper
{
class ObjectModel
class ObjectHelper
{
#region 上传对象(带标签)
/// <summary>
@ -47,6 +47,7 @@ namespace S3Demo.Model
};
PutObjectResponse response = await client.PutObjectAsync(putRequest);
//Console.WriteLine(putRequest.Key + "," + putRequest.BucketName + "," + putRequest.FilePath + "," + response.HttpStatusCode);
return response.HttpStatusCode.ToString();
#region 查看tag

@ -0,0 +1,11 @@
using System.Collections.Generic;
namespace S3Demo.Model.Uploader
{
public class S3UploadModel
{
public string Keyname { get; set; }
public string Path { get; set; }
public List<string> Tags { get; set; }
}
}

@ -2,25 +2,23 @@
using Ksat.Logging;
using Ksat.Supplyment.Library.Uploader;
using Ksat.Supplyment.Library.Model.Uploader;
using S3Demo.Model.Uploader;
namespace S3Demo.Uploader
namespace S3Demo.Model.Uploader
{
public class S3UploadOperator : IUploaderOperator<string>
public class S3UploadOperator : IUploaderOperator<S3UploadModel>
{
//Log
private ILogger sLog = LoggerFactory.ForContext<S3UploadOperator>();
public string ConvertCacheToRequest(string cache)
public S3UploadModel ConvertCacheToRequest(string cache)
{
return cache;
return Newtonsoft.Json.JsonConvert.DeserializeObject<S3UploadModel>(cache);
}
public string ConvertRequestToCachel(string requestData)
public string ConvertRequestToCachel(S3UploadModel requestData)
{
return requestData;
return Newtonsoft.Json.JsonConvert.SerializeObject(requestData);
}
public bool ShouldRetryRequest(UploadModel<string> requestModel, out string cancelReason)
public bool ShouldRetryRequest(UploadModel<S3UploadModel> requestModel, out string cancelReason)
{
if (DateTime.Now.Subtract(requestModel.CreateAt).TotalSeconds > 600)
{
@ -36,7 +34,7 @@ namespace S3Demo.Uploader
return true;
}
public bool ShouldStartRequest(UploadModel<string> requestModel, out string cancelReason)
public bool ShouldStartRequest(UploadModel<S3UploadModel> requestModel, out string cancelReason)
{
if (DateTime.Now.Subtract(requestModel.CreateAt).TotalSeconds > 600)
{
@ -52,7 +50,7 @@ namespace S3Demo.Uploader
return true;
}
public IUploaderRequest<string> UploadData(string data)
public IUploaderRequest<S3UploadModel> UploadData(S3UploadModel data)
{
return new S3UploadRequest() { RequestData = data };
}

@ -1,32 +1,60 @@
using System;
using S3Demo.Uploader;
using S3Demo.Helper;
using Ksat.Supplyment.Library.Uploader;
using Ksat.Supplyment.Library.Model.Uploader;
using Ksat.Logging;
using System.Threading.Tasks;
using Amazon.S3;
using Amazon;
using Amazon.Runtime;
namespace S3Demo.Model.Uploader
{
public class S3UploadRequest : IUploaderRequest<string>
public class S3UploadRequest : IUploaderRequest<S3UploadModel>
{
// 通过固定域名的方式
//private static AmazonS3Config conf = new AmazonS3Config()
//{
// ServiceURL = "https://play.min.io",
// ForcePathStyle = true
//};
//new BasicAWSCredentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")
public string bucketName = "minio/test";
public AmazonS3Client client = new AmazonS3Client(new BasicAWSCredentials("admin", "admin123."), new AmazonS3Config()
{
//Log
UseHttp = true,
ProxyHost = "http://192.168.60.132",
ProxyPort = 9000,
RegionEndpoint = RegionEndpoint.USEast1
});
private ILogger sLog = LoggerFactory.ForContext<S3UploadOperator>();
public string RequestData { get; set; }
public S3UploadModel RequestData { get; set; }
public event UploadFinishHandler UploadFinish;
public void Cancel()
{
mTask?.Dispose();
//TODO:
}
public void Start()
{
var ran = new Random();
//TODO:
System.Threading.Thread.Sleep(1000);
UploadFinish(this, ran.Next() % 3 == 0, "No reason");
_ = objSend();
}
private Task<string> mTask;
public async Task objSend()
{
Console.WriteLine("==开始上传==");
mTask = ObjectHelper.PutObjectsWithTagsAsync(client, bucketName, RequestData.Keyname, RequestData.Path, RequestData.Tags.ToArray());
string res = await mTask;
UploadFinish(this, res.Equals("OK"), res);
Console.WriteLine($"S3 Upload recv: {res}");
}
}
}

@ -1,21 +1,20 @@
using System;
using Newtonsoft.Json;
using Ksat.Supplyment.Library.Uploader;
using S3Demo.Uploader;
namespace S3Demo.Model.Uploader
{
public class S3Uploader
public class S3Uploader : Ksat.CommonModelLibrary.Utils.SingletonUtils<S3Uploader>,IDisposable
{
private static Uploader<string> mUploader;
private static Uploader<S3UploadModel> mUploader;
private static S3UploadOperator mOperator;
public S3Uploader(string UploadID)
private S3Uploader()
{
mOperator = new S3UploadOperator();
mUploader = new Uploader<string>(UploadID, mOperator);
mUploader = new Uploader<S3UploadModel>("xxx", mOperator);
}
public void AddUploadTask(string reqData, string tag)
public void AddUploadTask(S3UploadModel reqData, string tag)
{
mUploader.AddUploadRequest(reqData, tag);
}

@ -11,41 +11,14 @@ using Ksat.Supplyment.Library.Uploader;
using Ksat.Supplyment.Library.Model.Uploader;
using System.Threading.Tasks;
using System.Threading;
using System.Linq;
namespace S3Demo
{
public class Run
{
//private static string url = "http://172.17.204.180:9000/api/tri/trackin";
private static BasicAWSCredentials credentials = new BasicAWSCredentials("admin", "admin123.");
//private static BasicAWSCredentials credentials = new BasicAWSCredentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG");
private static string bucketName = "minio/test";
// 受监控的目录,完整目录如:"D:\ClientDir\line01\AOI\20211129\133055851SN001.png"
private static string[] paths = { @"D:\ClientDir" };
private static string suffix = "*.png";
/// <summary>
/// https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html
/// https://blog.csdn.net/tw_tangliang/article/details/118669099
/// https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPI.cs
/// </summary>
// 通过IP + Port方式
private static AmazonS3Config conf = new AmazonS3Config()
{
UseHttp = true,
ProxyHost = "http://192.168.60.132",
ProxyPort = 9000,
RegionEndpoint = RegionEndpoint.USEast1
};
// 通过固定域名的方式
//private static AmazonS3Config conf = new AmazonS3Config()
//{
// ServiceURL = "https://play.min.io",
// ForcePathStyle = true
//};
public static AmazonS3Client client = new AmazonS3Client(credentials, conf);
public static void Main()
{
try
@ -162,77 +135,19 @@ namespace S3Demo
{
Console.WriteLine($"Main: {ex.Message}");
}
Console.ReadKey();
}
#region 上传测试
public class UploaderOperator : IUploaderOperator<string>
{
public string ConvertCacheToRequest(string cache)
{
return cache;
}
public string ConvertRequestToCachel(string requestData)
{
return requestData;
}
public bool ShouldRetryRequest(UploadModel<string> requestModel, out string cancelReason)
{
cancelReason = null;
if (requestModel.RetryCount >= 2)
{
cancelReason = "已经两次fail了";
return false;
}
return true;
}
public bool ShouldStartRequest(UploadModel<string> requestModel, out string cancelReason)
{
cancelReason = null;
return true;
}
public IUploaderRequest<string> UploadData(string data)
{
var request = new UploadRequest();
request.RequestData = data;
return request;
}
}
public class UploadRequest : IUploaderRequest<string>
{
private string mRequestData;
public string RequestData { get => mRequestData; set => mRequestData = value; }
public event UploadFinishHandler UploadFinish;
public void Start()
{
Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task {RequestData} Started");
System.Threading.Thread.Sleep(100);
var random = new Random();
UploadFinish(this, random.Next() % 2 == 0, "task failed");
}
public void Cancel()
{
}
}
#endregion
/// <summary>
/// 创建文件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static async void OnCreated(object sender, FileSystemEventArgs e)
private static void OnCreated(object sender, FileSystemEventArgs e)
{
Waiting(e.FullPath);
await DisposeFile(e.FullPath);
//Waiting(e.FullPath);
// 如果创建过程中就上传1、重传机制会载入重传队列2、也可手动添加固定延时等待创建完成在上传
DisposeFile(e.FullPath);
}
/// <summary>
/// 等待onCreated事件处理完成
@ -262,7 +177,7 @@ namespace S3Demo
/// </summary>
/// <param name="path">文件路径</param>
/// <returns></returns>
private static async Task DisposeFile(string path)
private static void DisposeFile(string path)
{
// D:\ClientDir\line01\AOI\20211129\085023891P107CN14T00001.png
string lineName = Path.GetDirectoryName(path).Split('\\')[2];
@ -275,10 +190,9 @@ namespace S3Demo
string now = DateTime.Now.ToString("yyyyMMddHHmmssfff");
String[] tags = new String[] { sn, dt_fomat };
string S3Path = lineName + "/" + equipmentName + "/" + date + "/" + fileName;
string recv = await ObjectModel.PutObjectsWithTagsAsync(client, bucketName, S3Path, path, tags);
Console.WriteLine("recv from S3: " + recv);
//引入统一上传方式
//new S3Uploader(lineName).AddUploadTask(S3Path, sn);
var uploadModel = new S3UploadModel { Keyname = S3Path, Path = path, Tags = tags.ToList() };
S3Uploader.Instance().AddUploadTask(uploadModel, sn);
}
/// <summary>

@ -50,6 +50,9 @@
<Reference Include="EntityFramework.SqlServer, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL">
<HintPath>..\packages\EntityFramework.6.4.4\lib\net45\EntityFramework.SqlServer.dll</HintPath>
</Reference>
<Reference Include="KsatCommonModelLibrary">
<HintPath>..\vendor\KsatCommonModelLibrary.dll</HintPath>
</Reference>
<Reference Include="KsatLogging">
<HintPath>..\vendor\KsatLogging.dll</HintPath>
</Reference>
@ -80,10 +83,11 @@
</ItemGroup>
<ItemGroup>
<Compile Include="Model\BucketModel.cs" />
<Compile Include="Model\HttpModel.cs" />
<Compile Include="Model\ObjectModel.cs" />
<Compile Include="Helper\HttpHelper.cs" />
<Compile Include="Helper\ObjectHelper.cs" />
<Compile Include="Model\DataModel.cs" />
<Compile Include="Model\Uploader\S3Uploader.cs" />
<Compile Include="Model\Uploader\S3UploadModel.cs" />
<Compile Include="Model\Uploader\S3UploadRequest.cs" />
<Compile Include="Storage\CreateBucket.cs" />
<Compile Include="Storage\ListObjects.cs" />

Binary file not shown.
Loading…
Cancel
Save