diff --git a/KsatSupplymentLibrary/Uploader/IUploaderOperator.cs b/KsatSupplymentLibrary/Uploader/IUploaderOperator.cs index 932935f..eacbedf 100644 --- a/KsatSupplymentLibrary/Uploader/IUploaderOperator.cs +++ b/KsatSupplymentLibrary/Uploader/IUploaderOperator.cs @@ -9,15 +9,13 @@ namespace Ksat.Supplyment.Library.Uploader public interface IUploaderOperator { - /// /// 上传数据 /// IUploaderRequest UploadData(T data); - - T ConvertCacheToRequest(string cache); + string ConvertRequestToCachel(T requestData); /// diff --git a/KsatSupplymentLibrary/Uploader/IUploaderRequest.cs b/KsatSupplymentLibrary/Uploader/IUploaderRequest.cs index 0757e98..2d35968 100644 --- a/KsatSupplymentLibrary/Uploader/IUploaderRequest.cs +++ b/KsatSupplymentLibrary/Uploader/IUploaderRequest.cs @@ -6,7 +6,7 @@ using Ksat.Supplyment.Library.Model.Uploader; namespace Ksat.Supplyment.Library.Uploader { - public delegate void UploadFinishHandler(object sender, bool success, string reason) ; + public delegate void UploadFinishHandler(object sender, bool success, string reason); public interface IUploaderRequest { diff --git a/KsatSupplymentLibrary/Uploader/Uploader.cs b/KsatSupplymentLibrary/Uploader/Uploader.cs index 1b46bcd..e802592 100644 --- a/KsatSupplymentLibrary/Uploader/Uploader.cs +++ b/KsatSupplymentLibrary/Uploader/Uploader.cs @@ -35,7 +35,7 @@ namespace Ksat.Supplyment.Library.Uploader /// /// 最大队列长度 /// - public int MaxOperatingQueueSize = 5; + public int MaxOperatingQueueSize = 4; /// /// 同时上传的数量 @@ -58,8 +58,6 @@ namespace Ksat.Supplyment.Library.Uploader throw new System.ArgumentNullException(); this.mTaskIdentify = ID; this.mOperator = op; - //sLog = LoggerFactory.ForContext("Uploader:" + this.mTaskIdentify); - // 入口 mDatasWorkerThread = new System.Threading.Thread(uploadWorker) { Name = "upload worker", @@ -67,7 +65,6 @@ namespace Ksat.Supplyment.Library.Uploader }; mDatasWorkerThread.Start(); - // Swap 处理: 超过4s保存至数据库 Caches 表 mSwapClearTimer = new Timer((x) => { var toDBs = new List>(); @@ -160,28 +157,35 @@ namespace Ksat.Supplyment.Library.Uploader if (reqModel == null) { - //sLog.Formf($"no request found:{request.RequestData}"); + Console.WriteLine("no request found:{request.RequestData}"); return; } reqModel.ErrorInfo = reason; - - //sLog.Formf($"{reqModel.Request} finished:{success}"); string cancelRetryReason = null; if (success) - using (var db = new CodeFirstDbContext()) + { + try { - db.UploadFinishs.Add(new UploadFinish + using (var db = new CodeFirstDbContext()) { - CreateAt = reqModel.CreateAt, - RetryAt = reqModel.RetryAt, - RetryCount = reqModel.RetryCount, - RequestData = mOperator.ConvertRequestToCachel(reqModel.Request), - UploaderID = mTaskIdentify - }); - db.SaveChanges(); - + db.UploadFinishs.Add(new UploadFinish + { + CreateAt = reqModel.CreateAt, + RetryAt = reqModel.RetryAt, + RetryCount = reqModel.RetryCount, + RequestData = mOperator.ConvertRequestToCachel(reqModel.Request), + Tag = reqModel.Tag, + UploaderID = mTaskIdentify + }); + db.SaveChanges(); + } + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); } + } else if (mOperator.ShouldRetryRequest(reqModel, out cancelRetryReason)) { @@ -196,23 +200,29 @@ namespace Ksat.Supplyment.Library.Uploader } else { - using (var db = new CodeFirstDbContext()) + try { - db.UploadCancels.Add(new UploadCancel + using (var db = new CodeFirstDbContext()) { - CreateAt = reqModel.CreateAt, - RetryAt = reqModel.RetryAt, - RetryCount = reqModel.RetryCount, - RequestData = mOperator.ConvertRequestToCachel(reqModel.Request), - ErrorInfo = "Task cancel Retry:" + cancelRetryReason ?? "No Reason", - UploaderID = mTaskIdentify - });; - db.SaveChanges(); + db.UploadCancels.Add(new UploadCancel + { + CreateAt = reqModel.CreateAt, + RetryAt = reqModel.RetryAt, + RetryCount = reqModel.RetryCount, + RequestData = mOperator.ConvertRequestToCachel(reqModel.Request), + ErrorInfo = "Task cancel Retry:" + cancelRetryReason ?? "No Reason", + Tag = reqModel.Tag, + UploaderID = mTaskIdentify + }); + db.SaveChanges(); + } + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); } } - - Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task {x} result:{success}"); - //throw new NotImplementedException(); + Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")} task {x} result:{success}"); } /// @@ -228,7 +238,7 @@ namespace Ksat.Supplyment.Library.Uploader if (reqCount >= MaxCocurrentTaskCount) { System.Threading.Thread.Sleep(10); - Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task exceed {MaxCocurrentTaskCount}, waiting"); + Console.WriteLine($@"{DateTime.Now.ToString("HH:mm:ss.fff")} task exceed {MaxCocurrentTaskCount}, waiting"); continue; } int newRequestCount = MaxCocurrentTaskCount - reqCount; @@ -243,18 +253,26 @@ namespace Ksat.Supplyment.Library.Uploader string donotStartReason; if (!mOperator.ShouldStartRequest(d, out donotStartReason)) { - using (var db = new CodeFirstDbContext()) + try { - db.UploadCancels.Add(new UploadCancel + using (var db = new CodeFirstDbContext()) { - CreateAt = d.CreateAt, - RetryAt = d.RetryAt, - RetryCount = d.RetryCount, - RequestData = mOperator.ConvertRequestToCachel(d.Request), - ErrorInfo = "Task cancel Start:" + donotStartReason ?? "No Reason", - UploaderID = mTaskIdentify - }); ; - db.SaveChanges(); + db.UploadCancels.Add(new UploadCancel + { + CreateAt = d.CreateAt, + RetryAt = d.RetryAt, + RetryCount = d.RetryCount, + RequestData = mOperator.ConvertRequestToCachel(d.Request), + ErrorInfo = "Task cancel Start:" + donotStartReason ?? "No Reason", + Tag = d.Tag, + UploaderID = mTaskIdentify + }); ; + db.SaveChanges(); + } + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); } continue; } @@ -263,8 +281,6 @@ namespace Ksat.Supplyment.Library.Uploader var req = mOperator.UploadData(d.Request); enqueRequest(req, d); ThreadPool.QueueUserWorkItem(new WaitCallback((o) => req.Start())); - //new System.Threading.Thread(req.Start).Start(); - //req.Start(); i++; continue; } @@ -290,7 +306,7 @@ namespace Ksat.Supplyment.Library.Uploader { using (var db = new CodeFirstDbContext()) { - //sLog.Formf($@"cache requests,{datas.Count()}"); + Console.WriteLine($@"cache requests,{datas.Count()}"); var caches = datas.Select((data) => new UploadCache { @@ -307,9 +323,9 @@ namespace Ksat.Supplyment.Library.Uploader db.SaveChanges(); } } - catch (Exception) + catch (Exception ex) { - //sLog.FormErrorf(ex.Message); + Console.WriteLine(ex.Message); } } @@ -319,22 +335,29 @@ namespace Ksat.Supplyment.Library.Uploader /// private void cacheRequest(List> datas) { - //sLog.Formf($"cache requests count,{datas.Count()}"); - using (var db = new CodeFirstDbContext()) + try { - var caches = datas.Select((data) => - new UploadCache + using (var db = new CodeFirstDbContext()) { - UploaderID = mTaskIdentify, - CreateAt = data.CreateAt, - RetryAt = data.RetryAt, - RetryCount = data.RetryCount, - ErrorInfo = data.ErrorInfo, - RequestData = mOperator.ConvertRequestToCachel(data.Request) + var caches = datas.Select((data) => + new UploadCache + { + UploaderID = mTaskIdentify, + CreateAt = data.CreateAt, + RetryAt = data.RetryAt, + RetryCount = data.RetryCount, + ErrorInfo = data.ErrorInfo, + Tag = data.Tag, + RequestData = mOperator.ConvertRequestToCachel(data.Request) + } + ); + db.UploadCaches.AddRange(caches); + db.SaveChanges(); } - ); - db.UploadCaches.AddRange(caches); - db.SaveChanges(); + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); } } @@ -355,32 +378,39 @@ namespace Ksat.Supplyment.Library.Uploader toGet--; } - using (var db = new CodeFirstDbContext()) - { - var caches = from c in db.UploadCaches - orderby c.RetryAt ascending - where c.UploaderID == this.mTaskIdentify - select c; - var x = caches.Take(toGet).ToList(); - //从数据库中移除 - db.UploadCaches.RemoveRange(x); - db.SaveChanges(); - var modelsFromDB = x.Select((data) => - new UploadModel - { - CreateAt = data.CreateAt, - RetryAt = data.RetryAt, - RetryCount = data.RetryCount, - ErrorInfo = data.ErrorInfo, - Request = mOperator.ConvertCacheToRequest(data.RequestData) - } - ).ToList(); - ret.AddRange(modelsFromDB); + try + { + using (var db = new CodeFirstDbContext()) + { + var caches = from c in db.UploadCaches + orderby c.RetryAt ascending + where c.UploaderID == this.mTaskIdentify + select c; + var x = caches.Take(toGet).ToList(); + //从数据库中移除 + db.UploadCaches.RemoveRange(x); + db.SaveChanges(); + var modelsFromDB = x.Select((data) => + new UploadModel + { + CreateAt = data.CreateAt, + RetryAt = data.RetryAt, + RetryCount = data.RetryCount, + ErrorInfo = data.ErrorInfo, + Tag = data.Tag, + Request = mOperator.ConvertCacheToRequest(data.RequestData) + } + ).ToList(); + + ret.AddRange(modelsFromDB); + } + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); } - //if (ret.Count() != 0) - //sLog.Formf($"get {number} cache return {ret.Count()}"); return ret; } @@ -391,7 +421,6 @@ namespace Ksat.Supplyment.Library.Uploader { if (!Interlocked.Equals(mIsCanceled, 1)) { - var requestData = new UploadModel { CreateAt = DateTime.Now, @@ -403,18 +432,18 @@ namespace Ksat.Supplyment.Library.Uploader }; if (mDatas.Count >= MaxOperatingQueueSize) { - //sLog.Formf($"add request to swap,{data}"); + Console.WriteLine("add request to swap"); mSwapDatas.Enqueue(requestData); } else { - //sLog.Formf($"add request to queue,{data}"); + Console.WriteLine("add request to queue"); mDatas.Enqueue(requestData); } } else { - //sLog.Formf("task canceled"); + Console.WriteLine("task canceled"); cacheRequest(new List>() { new Tuple(data, tag) }); } } diff --git a/S3Demo/Model/HttpModel.cs b/S3Demo/Helper/HttpHelper.cs similarity index 95% rename from S3Demo/Model/HttpModel.cs rename to S3Demo/Helper/HttpHelper.cs index 82f8ebb..4525060 100644 --- a/S3Demo/Model/HttpModel.cs +++ b/S3Demo/Helper/HttpHelper.cs @@ -2,9 +2,9 @@ using System.Net; using System.Text; -namespace S3Demo.Model +namespace S3Demo.Helper { - public class HttpModel + public class HttpHelper { /// /// POST请求 diff --git a/S3Demo/Model/ObjectModel.cs b/S3Demo/Helper/ObjectHelper.cs similarity index 97% rename from S3Demo/Model/ObjectModel.cs rename to S3Demo/Helper/ObjectHelper.cs index c856f2d..3014f7c 100644 --- a/S3Demo/Model/ObjectModel.cs +++ b/S3Demo/Helper/ObjectHelper.cs @@ -10,9 +10,9 @@ using Amazon.Runtime; using Amazon.S3; using Amazon.S3.Model; -namespace S3Demo.Model +namespace S3Demo.Helper { - class ObjectModel + class ObjectHelper { #region 上传对象(带标签) /// @@ -47,6 +47,7 @@ namespace S3Demo.Model }; PutObjectResponse response = await client.PutObjectAsync(putRequest); + //Console.WriteLine(putRequest.Key + "," + putRequest.BucketName + "," + putRequest.FilePath + "," + response.HttpStatusCode); return response.HttpStatusCode.ToString(); #region 查看tag diff --git a/S3Demo/Model/Uploader/S3UploadModel.cs b/S3Demo/Model/Uploader/S3UploadModel.cs new file mode 100644 index 0000000..4f84e54 --- /dev/null +++ b/S3Demo/Model/Uploader/S3UploadModel.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace S3Demo.Model.Uploader +{ + public class S3UploadModel + { + public string Keyname { get; set; } + public string Path { get; set; } + public List Tags { get; set; } + } +} diff --git a/S3Demo/Model/Uploader/S3UploadOperator.cs b/S3Demo/Model/Uploader/S3UploadOperator.cs index be0378a..9a42c11 100644 --- a/S3Demo/Model/Uploader/S3UploadOperator.cs +++ b/S3Demo/Model/Uploader/S3UploadOperator.cs @@ -2,25 +2,23 @@ using Ksat.Logging; using Ksat.Supplyment.Library.Uploader; using Ksat.Supplyment.Library.Model.Uploader; -using S3Demo.Model.Uploader; - -namespace S3Demo.Uploader +namespace S3Demo.Model.Uploader { - public class S3UploadOperator : IUploaderOperator + public class S3UploadOperator : IUploaderOperator { //Log private ILogger sLog = LoggerFactory.ForContext(); - public string ConvertCacheToRequest(string cache) + public S3UploadModel ConvertCacheToRequest(string cache) { - return cache; + return Newtonsoft.Json.JsonConvert.DeserializeObject(cache); } - public string ConvertRequestToCachel(string requestData) + public string ConvertRequestToCachel(S3UploadModel requestData) { - return requestData; + return Newtonsoft.Json.JsonConvert.SerializeObject(requestData); } - public bool ShouldRetryRequest(UploadModel requestModel, out string cancelReason) + public bool ShouldRetryRequest(UploadModel requestModel, out string cancelReason) { if (DateTime.Now.Subtract(requestModel.CreateAt).TotalSeconds > 600) { @@ -36,7 +34,7 @@ namespace S3Demo.Uploader return true; } - public bool ShouldStartRequest(UploadModel requestModel, out string cancelReason) + public bool ShouldStartRequest(UploadModel requestModel, out string cancelReason) { if (DateTime.Now.Subtract(requestModel.CreateAt).TotalSeconds > 600) { @@ -52,7 +50,7 @@ namespace S3Demo.Uploader return true; } - public IUploaderRequest UploadData(string data) + public IUploaderRequest UploadData(S3UploadModel data) { return new S3UploadRequest() { RequestData = data }; } diff --git a/S3Demo/Model/Uploader/S3UploadRequest.cs b/S3Demo/Model/Uploader/S3UploadRequest.cs index 6c38df0..26a9839 100644 --- a/S3Demo/Model/Uploader/S3UploadRequest.cs +++ b/S3Demo/Model/Uploader/S3UploadRequest.cs @@ -1,32 +1,60 @@ using System; -using S3Demo.Uploader; +using S3Demo.Helper; using Ksat.Supplyment.Library.Uploader; -using Ksat.Supplyment.Library.Model.Uploader; using Ksat.Logging; +using System.Threading.Tasks; +using Amazon.S3; +using Amazon; +using Amazon.Runtime; namespace S3Demo.Model.Uploader { - public class S3UploadRequest : IUploaderRequest + + public class S3UploadRequest : IUploaderRequest { - //Log + + // 通过固定域名的方式 + //private static AmazonS3Config conf = new AmazonS3Config() + //{ + // ServiceURL = "https://play.min.io", + // ForcePathStyle = true + //}; + //new BasicAWSCredentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG") + public string bucketName = "minio/test"; + public AmazonS3Client client = new AmazonS3Client(new BasicAWSCredentials("admin", "admin123."), new AmazonS3Config() + { + UseHttp = true, + ProxyHost = "http://192.168.60.132", + ProxyPort = 9000, + RegionEndpoint = RegionEndpoint.USEast1 + }); private ILogger sLog = LoggerFactory.ForContext(); - public string RequestData { get; set; } + public S3UploadModel RequestData { get; set; } public event UploadFinishHandler UploadFinish; public void Cancel() { + mTask?.Dispose(); //TODO: } public void Start() { - var ran = new Random(); //TODO: - System.Threading.Thread.Sleep(1000); - UploadFinish(this, ran.Next() % 3 == 0, "No reason"); + _ = objSend(); + } + + private Task mTask; + public async Task objSend() + { + Console.WriteLine("==开始上传=="); + mTask = ObjectHelper.PutObjectsWithTagsAsync(client, bucketName, RequestData.Keyname, RequestData.Path, RequestData.Tags.ToArray()); + string res = await mTask; + UploadFinish(this, res.Equals("OK"), res); + Console.WriteLine($"S3 Upload recv: {res}"); } } } diff --git a/S3Demo/Model/Uploader/S3Uploader.cs b/S3Demo/Model/Uploader/S3Uploader.cs index c33aee0..995b9cb 100644 --- a/S3Demo/Model/Uploader/S3Uploader.cs +++ b/S3Demo/Model/Uploader/S3Uploader.cs @@ -1,21 +1,20 @@ using System; using Newtonsoft.Json; using Ksat.Supplyment.Library.Uploader; -using S3Demo.Uploader; namespace S3Demo.Model.Uploader { - public class S3Uploader + public class S3Uploader : Ksat.CommonModelLibrary.Utils.SingletonUtils,IDisposable { - private static Uploader mUploader; + private static Uploader mUploader; private static S3UploadOperator mOperator; - public S3Uploader(string UploadID) + private S3Uploader() { mOperator = new S3UploadOperator(); - mUploader = new Uploader(UploadID, mOperator); + mUploader = new Uploader("xxx", mOperator); } - public void AddUploadTask(string reqData, string tag) + public void AddUploadTask(S3UploadModel reqData, string tag) { mUploader.AddUploadRequest(reqData, tag); } diff --git a/S3Demo/Run.cs b/S3Demo/Run.cs index 3bde81e..ecf50eb 100644 --- a/S3Demo/Run.cs +++ b/S3Demo/Run.cs @@ -11,41 +11,14 @@ using Ksat.Supplyment.Library.Uploader; using Ksat.Supplyment.Library.Model.Uploader; using System.Threading.Tasks; using System.Threading; +using System.Linq; namespace S3Demo { public class Run { - //private static string url = "http://172.17.204.180:9000/api/tri/trackin"; - private static BasicAWSCredentials credentials = new BasicAWSCredentials("admin", "admin123."); - //private static BasicAWSCredentials credentials = new BasicAWSCredentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"); - private static string bucketName = "minio/test"; - // 受监控的目录,完整目录如:"D:\ClientDir\line01\AOI\20211129\133055851SN001.png" private static string[] paths = { @"D:\ClientDir" }; private static string suffix = "*.png"; - - /// - /// https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html - /// https://blog.csdn.net/tw_tangliang/article/details/118669099 - /// https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPI.cs - /// - - // 通过IP + Port方式 - private static AmazonS3Config conf = new AmazonS3Config() - { - UseHttp = true, - ProxyHost = "http://192.168.60.132", - ProxyPort = 9000, - RegionEndpoint = RegionEndpoint.USEast1 - }; - // 通过固定域名的方式 - //private static AmazonS3Config conf = new AmazonS3Config() - //{ - // ServiceURL = "https://play.min.io", - // ForcePathStyle = true - //}; - public static AmazonS3Client client = new AmazonS3Client(credentials, conf); - public static void Main() { try @@ -162,77 +135,19 @@ namespace S3Demo { Console.WriteLine($"Main: {ex.Message}"); } - Console.ReadKey(); } - #region 上传测试 - public class UploaderOperator : IUploaderOperator - { - public string ConvertCacheToRequest(string cache) - { - return cache; - } - - public string ConvertRequestToCachel(string requestData) - { - return requestData; - } - - public bool ShouldRetryRequest(UploadModel requestModel, out string cancelReason) - { - cancelReason = null; - if (requestModel.RetryCount >= 2) - { - cancelReason = "已经两次fail了"; - return false; - } - return true; - } - - public bool ShouldStartRequest(UploadModel requestModel, out string cancelReason) - { - cancelReason = null; - return true; - } - - public IUploaderRequest UploadData(string data) - { - var request = new UploadRequest(); - request.RequestData = data; - return request; - } - } - public class UploadRequest : IUploaderRequest - { - private string mRequestData; - public string RequestData { get => mRequestData; set => mRequestData = value; } - - public event UploadFinishHandler UploadFinish; - - public void Start() - { - Console.WriteLine($"{DateTime.UtcNow.ToString("mm:ss.fff")} task {RequestData} Started"); - System.Threading.Thread.Sleep(100); - var random = new Random(); - - UploadFinish(this, random.Next() % 2 == 0, "task failed"); - } - - public void Cancel() - { - } - } - #endregion /// /// 创建文件 /// /// /// - private static async void OnCreated(object sender, FileSystemEventArgs e) + private static void OnCreated(object sender, FileSystemEventArgs e) { - Waiting(e.FullPath); - await DisposeFile(e.FullPath); + //Waiting(e.FullPath); + // 如果创建过程中就上传:1、重传机制会载入重传队列;2、也可手动添加固定延时等待创建完成在上传 + DisposeFile(e.FullPath); } /// /// 等待onCreated事件处理完成 @@ -262,7 +177,7 @@ namespace S3Demo /// /// 文件路径 /// - private static async Task DisposeFile(string path) + private static void DisposeFile(string path) { // D:\ClientDir\line01\AOI\20211129\085023891P107CN14T00001.png string lineName = Path.GetDirectoryName(path).Split('\\')[2]; @@ -275,10 +190,9 @@ namespace S3Demo string now = DateTime.Now.ToString("yyyyMMddHHmmssfff"); String[] tags = new String[] { sn, dt_fomat }; string S3Path = lineName + "/" + equipmentName + "/" + date + "/" + fileName; - string recv = await ObjectModel.PutObjectsWithTagsAsync(client, bucketName, S3Path, path, tags); - Console.WriteLine("recv from S3: " + recv); - //引入统一上传方式 - //new S3Uploader(lineName).AddUploadTask(S3Path, sn); + + var uploadModel = new S3UploadModel { Keyname = S3Path, Path = path, Tags = tags.ToList() }; + S3Uploader.Instance().AddUploadTask(uploadModel, sn); } /// diff --git a/S3Demo/S3Demo.csproj b/S3Demo/S3Demo.csproj index 66821df..2a633e0 100644 --- a/S3Demo/S3Demo.csproj +++ b/S3Demo/S3Demo.csproj @@ -50,6 +50,9 @@ ..\packages\EntityFramework.6.4.4\lib\net45\EntityFramework.SqlServer.dll + + ..\vendor\KsatCommonModelLibrary.dll + ..\vendor\KsatLogging.dll @@ -80,10 +83,11 @@ - - + + + diff --git a/vendor/KsatCommonModelLibrary.dll b/vendor/KsatCommonModelLibrary.dll new file mode 100644 index 0000000..1c3df36 Binary files /dev/null and b/vendor/KsatCommonModelLibrary.dll differ