From f4de4981bae25b881d6fd5c3f22e28c7262e6475 Mon Sep 17 00:00:00 2001 From: ZGGSONG Date: Wed, 8 Dec 2021 20:02:48 +0800 Subject: [PATCH] solve async upload --- .../Model/Uploader/UploadModel.cs | 2 +- KsatSupplymentLibrary/Uploader/Uploader.cs | 4 +- README.md | 2 +- S3Demo/Model/ObjectModel.cs | 34 +---- S3Demo/Run.cs | 136 +++++++++++------- 5 files changed, 91 insertions(+), 87 deletions(-) diff --git a/KsatSupplymentLibrary/Model/Uploader/UploadModel.cs b/KsatSupplymentLibrary/Model/Uploader/UploadModel.cs index 828ec6b..cf83132 100644 --- a/KsatSupplymentLibrary/Model/Uploader/UploadModel.cs +++ b/KsatSupplymentLibrary/Model/Uploader/UploadModel.cs @@ -25,7 +25,7 @@ namespace Ksat.Supplyment.Library.Model.Uploader /// public string ErrorInfo; - public string Tag { get; set; } + public string Tag; public T Request; } diff --git a/KsatSupplymentLibrary/Uploader/Uploader.cs b/KsatSupplymentLibrary/Uploader/Uploader.cs index f20047a..1b46bcd 100644 --- a/KsatSupplymentLibrary/Uploader/Uploader.cs +++ b/KsatSupplymentLibrary/Uploader/Uploader.cs @@ -59,6 +59,7 @@ namespace Ksat.Supplyment.Library.Uploader this.mTaskIdentify = ID; this.mOperator = op; //sLog = LoggerFactory.ForContext("Uploader:" + this.mTaskIdentify); + // 入口 mDatasWorkerThread = new System.Threading.Thread(uploadWorker) { Name = "upload worker", @@ -66,6 +67,7 @@ namespace Ksat.Supplyment.Library.Uploader }; mDatasWorkerThread.Start(); + // Swap 处理: 超过4s保存至数据库 Caches 表 mSwapClearTimer = new Timer((x) => { var toDBs = new List>(); @@ -305,7 +307,7 @@ namespace Ksat.Supplyment.Library.Uploader db.SaveChanges(); } } - catch (Exception ex) + catch (Exception) { //sLog.FormErrorf(ex.Message); } diff --git a/README.md b/README.md index 9d3e74f..4493d93 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,6 @@ secretKey: 'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG' - [Sqlite:Code First模式](https://blog.csdn.net/wucdsg/article/details/78895366) - [https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html](https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html) -- [https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPI.cs](https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPI.cs) +- [https://github1s.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadObjectExample/UploadObject.cs](https://github1s.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadObjectExample/UploadObject.cs) - [https://blog.csdn.net/tw_tangliang/article/details/118669099](https://blog.csdn.net/tw_tangliang/article/details/118669099) - [https://ecloud.10086.cn/op-help-center/develop/202007021593677916792035060.pdf](https://ecloud.10086.cn/op-help-center/develop/202007021593677916792035060.pdf) diff --git a/S3Demo/Model/ObjectModel.cs b/S3Demo/Model/ObjectModel.cs index 76d5c64..c856f2d 100644 --- a/S3Demo/Model/ObjectModel.cs +++ b/S3Demo/Model/ObjectModel.cs @@ -15,30 +15,6 @@ namespace S3Demo.Model class ObjectModel { #region 上传对象(带标签) - /// - /// 带标签发送对象至存储桶 - /// - /// - /// - /// - /// - /// - /// - /// - public static async Task Putobjectswithtags(string bucketName, string keyName, string filePath, string[] tags, BasicAWSCredentials credentials, AmazonS3Config conf) - { - //string bucketName = "doc-example-bucket"; - //string keyName = "newobject.txt"; - //string filePath = @"*** file path ***"; - - // Specify your bucket region (an example region is shown). - //RegionEndpoint bucketRegion = RegionEndpoint.USWest2; - - var client = new AmazonS3Client(credentials, conf); - string resv = await PutObjectsWithTagsAsync(client, bucketName, keyName, filePath, tags); - return resv; - } - /// /// This method uploads an object with tags. It then shows the tag /// values, changes the tags, and shows the new tags. @@ -64,8 +40,9 @@ namespace S3Demo.Model TagSet = new List { new Tag { Key = "sn", Value = tags[0] }, - new Tag { Key = "date", Value = tags[1] }, - new Tag { Key = "code", Value = tags[2] } + new Tag { Key = "date", Value = tags[1] } + //TODO: 后续添加不良代码,接口获取 + //new Tag { Key = "code", Value = tags[2] } }, }; @@ -117,10 +94,9 @@ namespace S3Demo.Model // .ForEach(t => Console.WriteLine($"Key: {t.Key}, Value: {t.Value}")); #endregion } - catch (AmazonS3Exception ex) + catch (Exception ex) { - Console.WriteLine( - $"Error: '{ex.Message}'"); + Console.WriteLine($"Error: {ex.Message}"); return ex.Message.ToString(); } } diff --git a/S3Demo/Run.cs b/S3Demo/Run.cs index 40b10ac..4597b89 100644 --- a/S3Demo/Run.cs +++ b/S3Demo/Run.cs @@ -8,13 +8,14 @@ using S3Demo.Model; using Ksat.Supplyment.Library.Model; using Ksat.Supplyment.Library.Uploader; using Ksat.Supplyment.Library.Model.Uploader; +using System.Threading.Tasks; +using System.Threading; namespace S3Demo { public class Run { - //请求接口 - private static string url = "http://172.17.204.180:9000/api/tri/trackin"; + //private static string url = "http://172.17.204.180:9000/api/tri/trackin"; private static BasicAWSCredentials credentials = new BasicAWSCredentials("admin", "admin123."); //private static BasicAWSCredentials credentials = new BasicAWSCredentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"); private static string bucketName = "minio/test"; @@ -36,13 +37,13 @@ namespace S3Demo ProxyPort = 9000, RegionEndpoint = RegionEndpoint.USEast1 }; - - //通过固定域名的方式 + // 通过固定域名的方式 //private static AmazonS3Config conf = new AmazonS3Config() //{ // ServiceURL = "https://play.min.io", // ForcePathStyle = true //}; + public static AmazonS3Client client = new AmazonS3Client(credentials, conf); public static void Main() { @@ -61,18 +62,52 @@ namespace S3Demo | NotifyFilters.FileName | NotifyFilters.DirectoryName, }; - watch.Created += new FileSystemEventHandler(OnCreated); + watch.Created += OnCreated; watch.Error += OnError; // 开始监控 watch.EnableRaisingEvents = true; } #endregion + #region 保存至数据库 + //string s3Flag = ObjectModel.Putobjectswithtags(bucketName, S3Path, e.FullPath, tags, credentials, conf).Result; + //if (s3Flag.Equals("OK")) + //{ + //保存至数据库 + //using (var dev = new CodeFirstDbContext()) + // { + // dev.UploadFinishs.Add(new UploadFinish() + // { + // UploaderID = lineName, + // Tag = sn, + // CreateAt = dt_fomat, + // RetryAt = DateTime.ParseExact(now, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture), + // RetryCount = 1, + // RequestData = S3Path + // }); + // int dbFlag = dev.SaveChanges();//1: 保存至数据库成功,否则保存失败 + // //TODO: 请求接口 + // //ReqData req = new ReqData(); + // //RespData resp = new RespData(); + // //req.s3Flag = s3Flag; + // //req.dbFlag = dbFlag; + // //resp = JsonConvert.DeserializeObject(HttpModel.PostUrl(url, JsonConvert.SerializeObject(req))); + // //Console.WriteLine("resp: " + resp.code + " " + resp.message); + // } + + //Console.WriteLine("S3对象存储路径: " + bucketName + "/" + S3Path); + //} + //else + //{ + // Console.WriteLine("上传未完成"); + //} + //Uploader uploader = new Uploader("字符串测试", new UploaderOperator()); - //for (var i = 0; i < 20; i++) + //for (var i = 0; i < 5; i++) //{ - // uploader.AddUploadRequest($"hello{i}"); + // uploader.AddUploadRequest($"hello{i}", $"tag{i}"); //} + #endregion #region S3相关测试 /* @@ -124,12 +159,11 @@ namespace S3Demo } catch (Exception ex) { - Console.WriteLine(ex.ToString()); + Console.WriteLine($"Main: {ex.Message}"); } Console.ReadKey(); } - #region 上传测试 public class UploaderOperator : IUploaderOperator { @@ -188,63 +222,55 @@ namespace S3Demo } } #endregion - /// - /// 创建事件及主要逻辑 + /// 创建文件事件 /// /// /// /// /// 主要逻辑 /// - private static void OnCreated(object sender, FileSystemEventArgs e) + private static async void OnCreated(object sender, FileSystemEventArgs e) + { + string path = e.FullPath; + Waiting(path);//等待文件创建完成 + await DisposeFile(path);//处理文件 + } + + private static async Task DisposeFile(string path) + { + // D:\ClientDir\line01\AOI\20211129\085023891P107CN14T00001.png + string lineName = Path.GetDirectoryName(path).Split('\\')[2]; + string equipmentName = Path.GetDirectoryName(path).Split('\\')[3]; + string date = Path.GetDirectoryName(path).Split('\\')[4]; + string fileName = Path.GetFileName(path); + string datetime = date + fileName.Substring(0, 9); + string dt_fomat = DateTime.ParseExact(datetime, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture).ToString("yyyy-MM-dd HH:mm:ss:fff"); + string sn = fileName.Substring(9, 14); + string now = DateTime.Now.ToString("yyyyMMddHHmmssfff"); + String[] tags = new String[] { sn, dt_fomat }; + string S3Path = lineName + "/" + equipmentName + "/" + date + "/" + fileName; + string recv = await ObjectModel.PutObjectsWithTagsAsync(client, bucketName, S3Path, path, tags); + Console.WriteLine(recv); + } + + private static void Waiting(string path) { try { - string lineName = Path.GetDirectoryName(e.FullPath).Split('\\')[2]; - string equipmentName = Path.GetDirectoryName(e.FullPath).Split('\\')[3]; - string date = Path.GetDirectoryName(e.FullPath).Split('\\')[4]; - string fileName = Path.GetFileName(e.FullPath); - string datetime = date + fileName.Substring(0, 9); - string dt_fomat = DateTime.ParseExact(datetime, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture).ToString("yyyy-MM-dd HH:mm:ss:fff"); - string code = fileName.Substring(fileName.Length - 6, 2); - string sn = fileName.Substring(9, 14); - string now = DateTime.Now.ToString("yyyyMMddHHmmssfff"); - String[] tags = new String[] { sn, dt_fomat, code }; - string S3Path = lineName + "/" + equipmentName + "/" + date + "/" + fileName; - - //发送太快会出问题 - //没有使用异步线程,可能没有足够的时间等待返回结果 [说明](https://www.cnblogs.com/lnwuyaowei/p/12672866.html) - string s3Flag = ObjectModel.Putobjectswithtags(bucketName, S3Path, e.FullPath, tags, credentials, conf).Result; - if (s3Flag.Equals("OK")) + FileInfo fi; + fi = new FileInfo(path); + long len1, len2; + len2 = fi.Length; + do { - //保存至数据库 - using (var dev = new CodeFirstDbContext()) - { - dev.UploadFinishs.Add(new UploadFinish() - { - UploaderID = lineName, - Tag = sn, - CreateAt = DateTime.ParseExact(datetime, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture), - RetryAt = DateTime.ParseExact(now, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture), - RetryCount = 1, - RequestData = S3Path - }); - int dbFlag = dev.SaveChanges();//1: 保存至数据库成功,否则保存失败 - //ReqData req = new ReqData(); - //RespData resp = new RespData(); - //req.s3Flag = s3Flag; - //req.dbFlag = dbFlag; - //resp = JsonConvert.DeserializeObject(HttpModel.PostUrl(url, JsonConvert.SerializeObject(req))); - //Console.WriteLine("resp: " + resp.code + " " + resp.message); - } - } - Console.WriteLine("S3对象存储路径: " + bucketName + "/" + S3Path); - } - catch (Exception ex) - { - Console.WriteLine("Run: " + ex.Message); + len1 = len2; + Thread.Sleep(1000);//等待1秒钟 + fi.Refresh();//这个语句不能漏了 + len2 = fi.Length; + } while (len1 < len2); } + catch { } } /// @@ -258,7 +284,7 @@ namespace S3Demo /// 打印错误 /// /// - private static void PrintException(Exception? ex) + private static void PrintException(Exception ex) { if (ex != null) {