solve async upload

master
ZGGSONG 3 years ago
parent 93dcc82a20
commit f4de4981ba

@ -25,7 +25,7 @@ namespace Ksat.Supplyment.Library.Model.Uploader
/// </summary> /// </summary>
public string ErrorInfo; public string ErrorInfo;
public string Tag { get; set; } public string Tag;
public T Request; public T Request;
} }

@ -59,6 +59,7 @@ namespace Ksat.Supplyment.Library.Uploader
this.mTaskIdentify = ID; this.mTaskIdentify = ID;
this.mOperator = op; this.mOperator = op;
//sLog = LoggerFactory.ForContext("Uploader:" + this.mTaskIdentify); //sLog = LoggerFactory.ForContext("Uploader:" + this.mTaskIdentify);
// 入口
mDatasWorkerThread = new System.Threading.Thread(uploadWorker) mDatasWorkerThread = new System.Threading.Thread(uploadWorker)
{ {
Name = "upload worker", Name = "upload worker",
@ -66,6 +67,7 @@ namespace Ksat.Supplyment.Library.Uploader
}; };
mDatasWorkerThread.Start(); mDatasWorkerThread.Start();
// Swap 处理: 超过4s保存至数据库 Caches 表
mSwapClearTimer = new Timer((x) => mSwapClearTimer = new Timer((x) =>
{ {
var toDBs = new List<UploadModel<T>>(); var toDBs = new List<UploadModel<T>>();
@ -305,7 +307,7 @@ namespace Ksat.Supplyment.Library.Uploader
db.SaveChanges(); db.SaveChanges();
} }
} }
catch (Exception ex) catch (Exception)
{ {
//sLog.FormErrorf(ex.Message); //sLog.FormErrorf(ex.Message);
} }

@ -27,6 +27,6 @@ secretKey: 'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG'
- [SqliteCode First模式](https://blog.csdn.net/wucdsg/article/details/78895366) - [SqliteCode First模式](https://blog.csdn.net/wucdsg/article/details/78895366)
- [https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html](https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html) - [https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html](https://docs.aws.amazon.com/sdkfornet/v3/apidocs/items/S3/TS3Config.html)
- [https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPI.cs](https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPIExample/UploadFileMPUHighLevelAPI.cs) - [https://github1s.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadObjectExample/UploadObject.cs](https://github1s.com/awsdocs/aws-doc-sdk-examples/blob/main/dotnetv3/S3/UploadObjectExample/UploadObject.cs)
- [https://blog.csdn.net/tw_tangliang/article/details/118669099](https://blog.csdn.net/tw_tangliang/article/details/118669099) - [https://blog.csdn.net/tw_tangliang/article/details/118669099](https://blog.csdn.net/tw_tangliang/article/details/118669099)
- [https://ecloud.10086.cn/op-help-center/develop/202007021593677916792035060.pdf](https://ecloud.10086.cn/op-help-center/develop/202007021593677916792035060.pdf) - [https://ecloud.10086.cn/op-help-center/develop/202007021593677916792035060.pdf](https://ecloud.10086.cn/op-help-center/develop/202007021593677916792035060.pdf)

@ -15,30 +15,6 @@ namespace S3Demo.Model
class ObjectModel class ObjectModel
{ {
#region 上传对象(带标签) #region 上传对象(带标签)
/// <summary>
/// 带标签发送对象至存储桶
/// </summary>
/// <param name="bucketName"></param>
/// <param name="keyName"></param>
/// <param name="filePath"></param>
/// <param name="tags"></param>
/// <param name="credentials"></param>
/// <param name="conf"></param>
/// <returns></returns>
public static async Task<string> Putobjectswithtags(string bucketName, string keyName, string filePath, string[] tags, BasicAWSCredentials credentials, AmazonS3Config conf)
{
//string bucketName = "doc-example-bucket";
//string keyName = "newobject.txt";
//string filePath = @"*** file path ***";
// Specify your bucket region (an example region is shown).
//RegionEndpoint bucketRegion = RegionEndpoint.USWest2;
var client = new AmazonS3Client(credentials, conf);
string resv = await PutObjectsWithTagsAsync(client, bucketName, keyName, filePath, tags);
return resv;
}
/// <summary> /// <summary>
/// This method uploads an object with tags. It then shows the tag /// This method uploads an object with tags. It then shows the tag
/// values, changes the tags, and shows the new tags. /// values, changes the tags, and shows the new tags.
@ -64,8 +40,9 @@ namespace S3Demo.Model
TagSet = new List<Tag> TagSet = new List<Tag>
{ {
new Tag { Key = "sn", Value = tags[0] }, new Tag { Key = "sn", Value = tags[0] },
new Tag { Key = "date", Value = tags[1] }, new Tag { Key = "date", Value = tags[1] }
new Tag { Key = "code", Value = tags[2] } //TODO: 后续添加不良代码,接口获取
//new Tag { Key = "code", Value = tags[2] }
}, },
}; };
@ -117,10 +94,9 @@ namespace S3Demo.Model
// .ForEach(t => Console.WriteLine($"Key: {t.Key}, Value: {t.Value}")); // .ForEach(t => Console.WriteLine($"Key: {t.Key}, Value: {t.Value}"));
#endregion #endregion
} }
catch (AmazonS3Exception ex) catch (Exception ex)
{ {
Console.WriteLine( Console.WriteLine($"Error: {ex.Message}");
$"Error: '{ex.Message}'");
return ex.Message.ToString(); return ex.Message.ToString();
} }
} }

@ -8,13 +8,14 @@ using S3Demo.Model;
using Ksat.Supplyment.Library.Model; using Ksat.Supplyment.Library.Model;
using Ksat.Supplyment.Library.Uploader; using Ksat.Supplyment.Library.Uploader;
using Ksat.Supplyment.Library.Model.Uploader; using Ksat.Supplyment.Library.Model.Uploader;
using System.Threading.Tasks;
using System.Threading;
namespace S3Demo namespace S3Demo
{ {
public class Run public class Run
{ {
//请求接口 //private static string url = "http://172.17.204.180:9000/api/tri/trackin";
private static string url = "http://172.17.204.180:9000/api/tri/trackin";
private static BasicAWSCredentials credentials = new BasicAWSCredentials("admin", "admin123."); private static BasicAWSCredentials credentials = new BasicAWSCredentials("admin", "admin123.");
//private static BasicAWSCredentials credentials = new BasicAWSCredentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"); //private static BasicAWSCredentials credentials = new BasicAWSCredentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG");
private static string bucketName = "minio/test"; private static string bucketName = "minio/test";
@ -36,13 +37,13 @@ namespace S3Demo
ProxyPort = 9000, ProxyPort = 9000,
RegionEndpoint = RegionEndpoint.USEast1 RegionEndpoint = RegionEndpoint.USEast1
}; };
// 通过固定域名的方式 // 通过固定域名的方式
//private static AmazonS3Config conf = new AmazonS3Config() //private static AmazonS3Config conf = new AmazonS3Config()
//{ //{
// ServiceURL = "https://play.min.io", // ServiceURL = "https://play.min.io",
// ForcePathStyle = true // ForcePathStyle = true
//}; //};
public static AmazonS3Client client = new AmazonS3Client(credentials, conf);
public static void Main() public static void Main()
{ {
@ -61,18 +62,52 @@ namespace S3Demo
| NotifyFilters.FileName | NotifyFilters.FileName
| NotifyFilters.DirectoryName, | NotifyFilters.DirectoryName,
}; };
watch.Created += new FileSystemEventHandler(OnCreated); watch.Created += OnCreated;
watch.Error += OnError; watch.Error += OnError;
// 开始监控 // 开始监控
watch.EnableRaisingEvents = true; watch.EnableRaisingEvents = true;
} }
#endregion #endregion
#region 保存至数据库
//string s3Flag = ObjectModel.Putobjectswithtags(bucketName, S3Path, e.FullPath, tags, credentials, conf).Result;
//if (s3Flag.Equals("OK"))
//{
//保存至数据库
//using (var dev = new CodeFirstDbContext())
// {
// dev.UploadFinishs.Add(new UploadFinish()
// {
// UploaderID = lineName,
// Tag = sn,
// CreateAt = dt_fomat,
// RetryAt = DateTime.ParseExact(now, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture),
// RetryCount = 1,
// RequestData = S3Path
// });
// int dbFlag = dev.SaveChanges();//1: 保存至数据库成功,否则保存失败
// //TODO: 请求接口
// //ReqData req = new ReqData();
// //RespData resp = new RespData();
// //req.s3Flag = s3Flag;
// //req.dbFlag = dbFlag;
// //resp = JsonConvert.DeserializeObject<RespData>(HttpModel.PostUrl(url, JsonConvert.SerializeObject(req)));
// //Console.WriteLine("resp: " + resp.code + " " + resp.message);
// }
//Console.WriteLine("S3对象存储路径: " + bucketName + "/" + S3Path);
//}
//else
//{
// Console.WriteLine("上传未完成");
//}
//Uploader<string> uploader = new Uploader<string>("字符串测试", new UploaderOperator()); //Uploader<string> uploader = new Uploader<string>("字符串测试", new UploaderOperator());
//for (var i = 0; i < 20; i++) //for (var i = 0; i < 5; i++)
//{ //{
// uploader.AddUploadRequest($"hello{i}"); // uploader.AddUploadRequest($"hello{i}", $"tag{i}");
//} //}
#endregion
#region S3相关测试 #region S3相关测试
/* /*
@ -124,12 +159,11 @@ namespace S3Demo
} }
catch (Exception ex) catch (Exception ex)
{ {
Console.WriteLine(ex.ToString()); Console.WriteLine($"Main: {ex.Message}");
} }
Console.ReadKey(); Console.ReadKey();
} }
#region 上传测试 #region 上传测试
public class UploaderOperator : IUploaderOperator<string> public class UploaderOperator : IUploaderOperator<string>
{ {
@ -188,63 +222,55 @@ namespace S3Demo
} }
} }
#endregion #endregion
/// <summary> /// <summary>
/// 创建事件及主要逻辑 /// 创建文件事件
/// </summary> /// </summary>
/// <param name="sender"></param> /// <param name="sender"></param>
/// <param name="e"></param> /// <param name="e"></param>
/// <remarks> /// <remarks>
/// 主要逻辑 /// 主要逻辑
/// </remarks> /// </remarks>
private static void OnCreated(object sender, FileSystemEventArgs e) private static async void OnCreated(object sender, FileSystemEventArgs e)
{ {
try string path = e.FullPath;
Waiting(path);//等待文件创建完成
await DisposeFile(path);//处理文件
}
private static async Task DisposeFile(string path)
{ {
string lineName = Path.GetDirectoryName(e.FullPath).Split('\\')[2]; // D:\ClientDir\line01\AOI\20211129\085023891P107CN14T00001.png
string equipmentName = Path.GetDirectoryName(e.FullPath).Split('\\')[3]; string lineName = Path.GetDirectoryName(path).Split('\\')[2];
string date = Path.GetDirectoryName(e.FullPath).Split('\\')[4]; string equipmentName = Path.GetDirectoryName(path).Split('\\')[3];
string fileName = Path.GetFileName(e.FullPath); string date = Path.GetDirectoryName(path).Split('\\')[4];
string fileName = Path.GetFileName(path);
string datetime = date + fileName.Substring(0, 9); string datetime = date + fileName.Substring(0, 9);
string dt_fomat = DateTime.ParseExact(datetime, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture).ToString("yyyy-MM-dd HH:mm:ss:fff"); string dt_fomat = DateTime.ParseExact(datetime, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture).ToString("yyyy-MM-dd HH:mm:ss:fff");
string code = fileName.Substring(fileName.Length - 6, 2);
string sn = fileName.Substring(9, 14); string sn = fileName.Substring(9, 14);
string now = DateTime.Now.ToString("yyyyMMddHHmmssfff"); string now = DateTime.Now.ToString("yyyyMMddHHmmssfff");
String[] tags = new String[] { sn, dt_fomat, code }; String[] tags = new String[] { sn, dt_fomat };
string S3Path = lineName + "/" + equipmentName + "/" + date + "/" + fileName; string S3Path = lineName + "/" + equipmentName + "/" + date + "/" + fileName;
string recv = await ObjectModel.PutObjectsWithTagsAsync(client, bucketName, S3Path, path, tags);
Console.WriteLine(recv);
}
//发送太快会出问题 private static void Waiting(string path)
//没有使用异步线程,可能没有足够的时间等待返回结果 [说明](https://www.cnblogs.com/lnwuyaowei/p/12672866.html)
string s3Flag = ObjectModel.Putobjectswithtags(bucketName, S3Path, e.FullPath, tags, credentials, conf).Result;
if (s3Flag.Equals("OK"))
{ {
//保存至数据库 try
using (var dev = new CodeFirstDbContext())
{
dev.UploadFinishs.Add(new UploadFinish()
{ {
UploaderID = lineName, FileInfo fi;
Tag = sn, fi = new FileInfo(path);
CreateAt = DateTime.ParseExact(datetime, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture), long len1, len2;
RetryAt = DateTime.ParseExact(now, "yyyyMMddHHmmssfff", System.Globalization.CultureInfo.CurrentCulture), len2 = fi.Length;
RetryCount = 1, do
RequestData = S3Path
});
int dbFlag = dev.SaveChanges();//1: 保存至数据库成功,否则保存失败
//ReqData req = new ReqData();
//RespData resp = new RespData();
//req.s3Flag = s3Flag;
//req.dbFlag = dbFlag;
//resp = JsonConvert.DeserializeObject<RespData>(HttpModel.PostUrl(url, JsonConvert.SerializeObject(req)));
//Console.WriteLine("resp: " + resp.code + " " + resp.message);
}
}
Console.WriteLine("S3对象存储路径: " + bucketName + "/" + S3Path);
}
catch (Exception ex)
{ {
Console.WriteLine("Run: " + ex.Message); len1 = len2;
Thread.Sleep(1000);//等待1秒钟
fi.Refresh();//这个语句不能漏了
len2 = fi.Length;
} while (len1 < len2);
} }
catch { }
} }
/// <summary> /// <summary>
@ -258,7 +284,7 @@ namespace S3Demo
/// 打印错误 /// 打印错误
/// </summary> /// </summary>
/// <param name="ex"></param> /// <param name="ex"></param>
private static void PrintException(Exception? ex) private static void PrintException(Exception ex)
{ {
if (ex != null) if (ex != null)
{ {

Loading…
Cancel
Save