diff --git a/Cargo.toml b/Cargo.toml index fcc13a0aea1e2f03184259812444688feb69938c..273c6fe886084080f8548554cd4418640d251c5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,13 +11,16 @@ tokio={version="1.37.0",features=["macros","rt-multi-thread","time"]} askama = {version="0.12.1"} askama_axum="0.4.0" - +tower-layer = "0.3.3" +tower-service="0.3.3" tower-http = { version = "0.5.2", features = ["fs", "trace","cors"] } sqlx = { version = "0.7.4", features = ["mysql", "runtime-tokio", "macros", "chrono"] } chrono = "0.4.38" cron = "0.12.1" tklog = "0.0.10" +job_scheduler = "1.2.1" +regex = "1.11.1" serde = {version="1.0.214", features = ["derive"]} -serde_json = "1.0.132" \ No newline at end of file +serde_json = "1.0.132" diff --git a/README.md b/README.md index ed4f3e0cf57fb6344cdd2763771b1ce783c6fc3f..4438a0d70a0ff6a931d18dc52aae55d744a009d5 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,6 @@ ### 介绍 - 使用axum 试着创建一个web app, 通过项目学习Rust, tokio, 这是个练习的项目。 ### 设计思想 @@ -13,7 +12,7 @@ 运行时:tokio 数据库:sqlx 日志:tklog -定时任务:cron +定时任务:job_scheduler 模板渲染:aksama, askama-axum 页面:html,css,js, JQuery, Bootstrap @@ -39,10 +38,6 @@ cargo build --release 时并不会帮我们把我们需要的静态文件打包 - 我们还需要把asserts文件(包含此文件夹)一同上传到执行文件的同级目录下, asserts文件夹是程序中ServeDir::new指定了该名称。 - - ### 注意 - - - 文件名建议中文,假如是英文的话,如果路径中使用反斜杠那么名字可能被转移,例如\test.md, \t 就被当作换行符。 - 路径名为全路径名称, 要改 diff --git a/resource/scripts/DDL.sql b/resource/scripts/DDL.sql index 3db8d96e04fa45f0e61c98e82df0c1fb88339436..cc22e7c3503cf4d6a822782e8dae6c7fed201898 100644 --- a/resource/scripts/DDL.sql +++ b/resource/scripts/DDL.sql @@ -28,3 +28,15 @@ CREATE TABLE category( PRIMARY KEY(`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +-- article read count table +CREATE TABLE access_count ( + `url` VARCHAR(64) NULL, + `article_id` INT(11) NULL, + `total_times` INT(11) NOT NULL DEFAULT 0, + `create_on` DATETIME NOT NULL, + `create_by` VARCHAR(32) NOT NULL, + `modify_on` DATETIME NOT NULL, + `modify_by` VARCHAR(32) NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + diff --git a/src/entity/access_count.rs b/src/entity/access_count.rs new file mode 100644 index 0000000000000000000000000000000000000000..84c81c7526ba19304b178a325c3ea92935a414a2 --- /dev/null +++ b/src/entity/access_count.rs @@ -0,0 +1,50 @@ +use std::io::{ Error, ErrorKind}; +use sqlx::MySqlPool; +use tklog::error; + +#[derive(Clone)] +#[derive(Debug)] +pub struct AccessCount<'a> { + pub url :&'a str, + pub article_id: i32, + pub total_times: i32, +} + +impl<'a> AccessCount<'a> { + pub fn new(url:&'a str, article_id:i32, total_times: i32)->Self { + AccessCount{ + url, + article_id, + total_times, + } + } + + + + pub async fn update(&self, pool :&MySqlPool) ->Result<(), Error> { + println!("execute update ---------------------{}", &self.total_times); + let result_update = sqlx::query( + r#" + INSERT INTO access_count (url, article_id, total_times, create_by, create_on, modify_by, modify_on) + VALUES (?, ?, ?, 'Scheduelr', NOW(), 'Scheduelr', NOW()) + ON DUPLICATE KEY UPDATE + total_times = total_times + VALUES(total_times), + modify_by = 'Scheduelr', + modify_on = now(), + create_by = 'Scheduelr', + create_on = now()"# + ) + .bind(self.url) + .bind(self.article_id) + .bind(self.total_times) + .execute(pool) + .await; + match result_update { + Ok(_) => {Ok(())}, + Err(e) => { + error!(e.to_string()); + Err(Error::new(ErrorKind::Other, e.to_string())) + }, + } + } +} diff --git a/src/entity/article_summary.rs b/src/entity/article_summary.rs index 0a01fb1b110d5c494355a66ed3eb1c323360c370..846a8b6d67cb5826befc22bc05b26a303bcf683a 100644 --- a/src/entity/article_summary.rs +++ b/src/entity/article_summary.rs @@ -1,6 +1,4 @@ -use chrono::{NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; -use serde_json; #[derive(Clone, Debug,Deserialize, Serialize, sqlx::FromRow)] pub struct ArticleSummary { diff --git a/src/entity/mod.rs b/src/entity/mod.rs index d12d61fc4a422493fd3f5e79048ea3f84d797f15..66e4c9dcd522222378cef02a07d967f2394da870 100644 --- a/src/entity/mod.rs +++ b/src/entity/mod.rs @@ -3,3 +3,4 @@ pub mod article_detail; pub mod article_summary; pub mod article_summary_vec; pub mod article_list_view; +pub mod access_count; diff --git a/src/layer/log_layer.rs b/src/layer/log_layer.rs new file mode 100644 index 0000000000000000000000000000000000000000..23678e5638c80331c28c1f9945388e5fc2b7e1de --- /dev/null +++ b/src/layer/log_layer.rs @@ -0,0 +1,50 @@ +use std::{fmt, task::{Context, Poll}}; + +/// a demo https://tower-rs.github.io/tower/tower_layer/trait.Layer.html +use tower_layer::Layer; +use tower_service::Service; + +#[derive(Debug, Clone)] +pub struct LogLayer { + pub target: &'static str, +} + +impl Layer for LogLayer { + type Service = LogService; + + fn layer(&self, service: S) -> Self::Service { + LogService { + target: self.target, + service + } + } +} + +// This service implements the Log behavior +#[derive(Debug, Clone)] +pub struct LogService { + target: &'static str, + service: S, +} + +impl Service for LogService +where + S: Service, + Request: fmt::Debug, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + // Insert log statement here or other functionality + //是单线程还是多线程? + println!("thread id: {:?}",std::thread::current().id()); + println!("request = {:?}, target = {:?}", request, self.target); + self.service.call(request) + } +} \ No newline at end of file diff --git a/src/layer/mod.rs b/src/layer/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..7bd7fa2513153849908a5eac346c77ef347f8088 --- /dev/null +++ b/src/layer/mod.rs @@ -0,0 +1,2 @@ +pub mod log_layer; +pub mod url_layer; \ No newline at end of file diff --git a/src/layer/url_layer.rs b/src/layer/url_layer.rs new file mode 100644 index 0000000000000000000000000000000000000000..aa76c92d70cd1fa14b72500f63a1292687d8fe8d --- /dev/null +++ b/src/layer/url_layer.rs @@ -0,0 +1,107 @@ +use axum::body::Body; +use axum::http::Request; +use regex::Regex; +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; +use std::sync::{Arc, RwLock}; +use std::{ + fmt, + task::{Context, Poll}, +}; +use tklog::warn; + +/// a demo https://tower-rs.github.io/tower/tower_layer/trait.Layer.html +use tower_layer::Layer; +use tower_service::Service; + +#[derive(Debug, Clone)] +pub struct UrlLayer { + articls_click_vue: Arc>>, +} + +impl UrlLayer { + pub fn new(articls_click_vue: Arc>>) -> Self { + UrlLayer { articls_click_vue } + } +} + +impl Layer for UrlLayer { + type Service = UrlService; + + fn layer(&self, service: S) -> Self::Service { + UrlService { + service, + articls_click_vue: Arc::clone(&self.articls_click_vue), + } + } +} + +// This service implements the Log behavior +#[derive(Debug, Clone)] +pub struct UrlService { + service: S, + articls_click_vue: Arc>>, +} + +impl Service> for UrlService +where + S: Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let uri = request.uri(); + println!("the path is : {}", uri); + let re = Regex::new(r"/article/(?P\d+)").unwrap(); + + if let Some(captures) = re.captures(uri.path()) { + if let Some(id_match) = captures.name("id") { + println!("Extracted ID: {}", id_match.as_str()); + let index: usize = match id_match.as_str().parse() { + Ok(num) => num, + Err(_) => { + warn!("Failed to parse article ID."); + return self.service.call(request); + } + }; + let mut vec = self.articls_click_vue.write().unwrap(); + // 确保 vec 的长度至少为 index + 1 + while vec.len() <= index { + vec.push(AtomicU16::new(0)); + } + // 更新点击次数 + vec[index].fetch_add(1, Ordering::SeqCst); + println!( + "Article {} has been clicked {} times.", + index, + vec[index].load(Ordering::SeqCst) + ); + } + } else { + println!("No match found."); + } + self.service.call(request) + } +} + +#[cfg(test)] +mod test { + use super::*; + use regex::Regex; + + #[test] + pub fn test_regex() { + let re = Regex::new(r"Hello (?\w+)!").unwrap(); + let Some(caps) = re.captures("Hello Murphy!") else { + println!("no match!"); + return; + }; + println!("The name is: {}", &caps["name"]); + } +} diff --git a/src/lib.rs b/src/lib.rs index 0612770d48618ca7d3c17f50085cb45d797a2c80..4172a6ab38f4862090341fbd04386f090fa7fd25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,4 +6,6 @@ pub mod entity; pub mod job; pub mod configuration; +pub mod scheduler_job; +pub mod layer; diff --git a/src/main.rs b/src/main.rs index df4d366d32c9022761d864d79e16ec2fc346cfbc..29d32a17b892ab8470e784b9eb67dfe7ab31f3d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,15 @@ +use std::sync::atomic::AtomicU16; +use std::sync::{Arc, RwLock}; + use axum::http::HeaderValue; -use axum::{extract::Path, extract::State, routing::get, Router, Json}; +use axum::{extract::Path, extract::State, routing::get, Json, Router}; use axum_hello::entity::article_list_view::ArticlesListView; -use tower_http::cors::{CorsLayer}; +use axum_hello::layer::log_layer::LogLayer; +use axum_hello::layer::url_layer::UrlLayer; +use axum_hello::scheduler_job::article_read_time_scheduler_job::ArticleReadTimeSchedulerJob; use sqlx::mysql::MySqlPoolOptions; use sqlx::MySqlPool; +use tower_http::cors::CorsLayer; use tklog::info; @@ -38,27 +44,41 @@ async fn main() { let basic_path = Arc::new(args[1].clone()); */ let pool = MySqlPoolOptions::new() - .connect("mysql://wukong:wk(2024)@localhost/Blog") + .connect("mysql://wukong:wk(2024)@47.92.236.188/Blog") .await .expect("failed to connect database."); + // default page_size=9 + let default_vec_size = 9 * 2; + let count_vec = (0..default_vec_size) + .map(|_| AtomicU16::new(0)) + .collect::>(); + let mut articles_click_vec: Arc>> = Arc::new(RwLock::new(count_vec)); + let UrlLayer = UrlLayer::new(Arc::clone(&articles_click_vec)); + let log_layer = LogLayer { + target: "hello layer", + }; let router = Router::new() - .route("/", get(articles_view)) //只返回试图 - .route("/articles",get(articles_view)) //只返回试图 + .route("/", get(articles_view)) //只返回试图 + .route("/articles", get(articles_view)) //只返回试图 .route("/articles/:page_num", get(article_list)) //返回具体信息 .route("/article/:id", get(article_detail)) .route("/me", get(me)) .route("/article/new", get(extract_article)) .layer( //设置同源策略 - CorsLayer::new() - .allow_origin("*".parse::().unwrap()) + CorsLayer::new().allow_origin("*".parse::().unwrap()), ) - .with_state(pool) + .layer(log_layer) + .layer(UrlLayer) + .with_state(pool.clone()) .nest_service("/assets", tower_http::services::ServeDir::new("assets")); + //开启scheudler + let _ = ArticleReadTimeSchedulerJob::new(Arc::clone(&articles_click_vec), pool.clone()).start(); + // run our app with hyper, listening globally on port 3000 - let listener = tokio::net::TcpListener::bind("0.0.0.0:80").await.unwrap(); + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); axum::serve(listener, router).await.unwrap(); } @@ -72,7 +92,7 @@ async fn say_hello(State(pool): State) -> String { row.0 } -async fn articles_view() -> ArticlesListView{ +async fn articles_view() -> ArticlesListView { ArticlesListView::new() } async fn article_list( diff --git a/src/scheduler_job/article_read_time_scheduler_job.rs b/src/scheduler_job/article_read_time_scheduler_job.rs index fde0cbd161ea2c5c87b1e9728b9e0af965ae710f..ce73a47829dca40bfbba6ec83ad0c031f33d1c02 100644 --- a/src/scheduler_job/article_read_time_scheduler_job.rs +++ b/src/scheduler_job/article_read_time_scheduler_job.rs @@ -1,19 +1,87 @@ +use std::sync::atomic::AtomicU16; +use std::sync::atomic::Ordering; +use std::sync::{Arc, RwLock}; /// 计算文章阅读此书的定时任务 /// AtomicI8 原子类型 /// create: job_scheduler /// 每小时一次 -use std::sync::atomic::AtomicI8; - -use job_scheduler::Job; +use std::thread; use std::time::Duration; +use std::{ + fmt, + task::{Context, Poll}, +}; +use tklog::error; + +use tokio::runtime::Runtime; + +use sqlx::MySqlPool; + +use job_scheduler::{Job, JobScheduler}; +use crate::entity::access_count::AccessCount; -struct article_read_time_secheduler_job { - map: Vec, //我们使用index+1 来代表对应的article_id - job: Job, + +pub struct ArticleReadTimeSchedulerJob { + articls_click_vue: Arc>>, + db_pool: MySqlPool, } -impl article_read_time_secheduler { - pub fn new() -> JobScheduler { - JobScheduler::new() +impl ArticleReadTimeSchedulerJob { + /// 初始化一个新的 ArticleReadTimeSchedulerJob 实例。 + /// + /// 参数: + /// - default_count_num: 文章数量,默认是 page_size * 2。 + pub fn new(articls_click_vue: Arc>>, db_pool: MySqlPool) -> Self { + ArticleReadTimeSchedulerJob { + articls_click_vue, + db_pool, + } } + + pub fn start(&self) { + let articls_click_vue = Arc::clone(&self.articls_click_vue); + let db_pool = self.db_pool.clone(); // 克隆数据库连接池 + // 创建一个全局的 Tokio 运行时实例以供重用。 + let runtime = Runtime::new().expect("Unable to create Tokio runtime"); + + thread::spawn(move || { + let mut sched = JobScheduler::new(); + sched.add(Job::new("1/10 * * * * *".parse().unwrap(), move || { + println!("hello job scheduler."); + + // 使用之前创建的运行时实例。 + runtime.block_on(async { + if let Ok(mut vec) = articls_click_vue.write() { + for (idx, value) in vec.iter_mut().enumerate() { + println!("{}", idx); + let click_times :i32= value.load(Ordering::SeqCst).into(); + if click_times >1 { + let account_count = AccessCount::new("", idx.try_into().unwrap(), click_times); + if let Err(e) = account_count.update(&db_pool).await { + eprintln!("Error updating access count: {}", e); + } + value.store(0, Ordering::SeqCst); + } + + } + } else { + error!("Failed to acquire write lock on articls_click_vue."); + } + }); + })); + + loop { + sched.tick(); + thread::sleep(Duration::from_millis(500)); + } + }); + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + pub fn test_article_read_time_secheduler() {} } diff --git a/src/scheduler_job/mod.rs b/src/scheduler_job/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..acacb9ba50f3e830c72099c6a44100bff529b11b --- /dev/null +++ b/src/scheduler_job/mod.rs @@ -0,0 +1 @@ +pub mod article_read_time_scheduler_job; diff --git a/tklogsize.txt b/tklogsize.txt new file mode 100644 index 0000000000000000000000000000000000000000..22603a16f358b01283ce4afc13d691b36fb0d9c0 --- /dev/null +++ b/tklogsize.txt @@ -0,0 +1,39 @@ +[INFO] 22:01:25 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 22:03:10 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 22:03:44 main.rs 102:page_num: 1 +[INFO] 22:03:44 main.rs 105:the offset is {}0 +[INFO] 22:03:45 main.rs 116:total: 3 +[INFO] 22:03:45 main.rs 122:position: 0 +[INFO] 22:03:45 main.rs 122:position: 1 +[INFO] 22:03:45 main.rs 122:position: 2 +[INFO] 22:03:48 main.rs 134:the article id is 2 +[INFO] 22:03:59 main.rs 134:the article id is 4 +[INFO] 22:04:12 main.rs 134:the article id is 1 +[INFO] 22:04:24 main.rs 134:the article id is 1 +[INFO] 22:04:28 main.rs 134:the article id is 1 +[INFO] 22:30:08 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 22:37:49 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 22:57:14 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 22:58:56 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:00:24 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:01:56 main.rs 134:the article id is 1 +[INFO] 23:01:57 main.rs 134:the article id is 1 +[INFO] 23:01:57 main.rs 134:the article id is 1 +[INFO] 23:01:58 main.rs 134:the article id is 1 +[INFO] 23:01:58 main.rs 134:the article id is 1 +[INFO] 23:02:53 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:03:04 main.rs 134:the article id is 1 +[INFO] 23:03:05 main.rs 134:the article id is 1 +[INFO] 23:03:44 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:03:46 main.rs 134:the article id is 1 +[INFO] 23:03:47 main.rs 134:the article id is 1 +[INFO] 23:04:27 main.rs 134:the article id is 2 +[INFO] 23:04:30 main.rs 134:the article id is 4 +[INFO] 23:05:58 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:06:00 main.rs 134:the article id is 4 +[INFO] 23:06:58 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:08:47 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:09:15 main.rs 134:the article id is 4 +[INFO] 23:09:50 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:13:57 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello +[INFO] 23:15:00 main.rs 36:Current workspace:: {:?}/home/owen/code/rust_project/axum-hello