how can i update this to the newest tokio (I am using tokio 0.1.9, but the latest is 1.47.1)
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::prelude::*;
use std::sync::{Arc, Mutex};
use std::io;
use tokio_io::codec::{Decoder, Encoder};
pub struct IScsiMsg {
tbuf: BytesMut,
}
impl IScsiMsg {
pub fn new() -> IScsiMsg {
IScsiMsg {
tbuf: BytesMut::with_capacity(262144),
}
}
}
impl Decoder for IScsiMsg {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
self.tbuf.extend_from_slice(buf);
buf.clear();
if self.tbuf.len() >= 48
{
let nt = get_pdu_len(&self.tbuf) as usize;
if self.tbuf.len() >= nt
{
let mut tk = BytesMut::with_capacity(nt);
tk.extend_from_slice(&self.tbuf[..nt]);
if self.tbuf.len() > nt
{
let remaining = self.tbuf.split_off(nt);
self.tbuf.clear();
self.tbuf.extend_from_slice(&remaining);
}
else
{
self.tbuf.clear();
}
return Ok(Some(tk));
}
return Ok(None);
}
Ok(None)
}
}
impl Encoder for IScsiMsg {
type Item = Bytes;
type Error = io::Error;
fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
buf.extend_from_slice(&msg);
let len = add_padding(48 + msg.len() as u32);
let nt = len - (48 + msg.len() as u32);
if nt > 0
{
let m = [0,0,0,0];
buf.extend_from_slice(&m[0..nt as usize]);
}
Ok(())
}
}
fn main() {
let (addr, conf, wb) = parse_command_line();
println!("");
println!("IP Address: {}", addr);
println!("Config file: {}", conf);
println!("Writeback: {}", wb);
let statemap: Arc<Mutex<FnvHashMap<u32, State>>> = Arc::new(Mutex::new(FnvHashMap::default()));
let targets: Arc<Mutex<ISCSITargets>>;
{
let mut tgts = ISCSITargets::new();
tgts.open_cfg(&conf, &wb);
targets = Arc::new(Mutex::new(tgts.clone()));
}
let addr = addr.parse().expect("Invalid socket address");
let socket = TcpListener::bind(&addr).expect("Cannot bind to socket address");
println!("Listening on: {}", addr);
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let cl = statemap.clone();
let tgs = targets.clone();
let mut idx = get_socket_index(&socket);
{
let mut clst = cl.lock().unwrap();
while clst.contains_key(&idx) {
idx += 100;
}
clst.insert(idx, State::new());
let st = clst.get_mut(&idx).unwrap();
st.set_stidx(idx);
println!("new conn {} {}", idx, clst.len());
}
socket.set_nodelay(true).unwrap();
socket.set_keepalive(Some(Duration::new(5, 0))).unwrap();
socket.set_linger(Some(Duration::new(5, 0))).unwrap();
let _ = socket.set_recv_buffer_size(262144);
let _ = socket.set_send_buffer_size(262144);
let framed = socket.framed(IScsiMsg::new());
let (writer, reader) = framed.split();
let processor = reader
.map(move |bytes| {
let mut clst = cl.lock().unwrap();
let mut st = clst.get_mut(&idx).unwrap();
let tg = tgs.lock().unwrap();
let mut fin = BytesMut::new();
let mut ns: usize = 0;
let bl = bytes.len();
while ns < bl {
let nt = get_pdu_len(&bytes[ns..]) as usize;
//print_pdu_info(&bytes[ns..ns+nt]);
let out = process_bytes(&bytes[ns..ns + nt], &mut st, &tg);
fin.extend_from_slice(&out);
ns += nt;
}
fin.freeze()
})
.forward(writer)
.map(|_| ())
.and_then(move |()| {
//let mut clst = cl1.lock().unwrap();
//clst.remove(&idx);
//println!("removed conn {} {}", idx, clst.len());
println!("Normal socket close");
Ok(())
})
.or_else(move |err| {
//let mut clst = cl2.lock().unwrap();
//clst.remove(&idx);
//println!("removed conn {} {}", idx, clst.len());
println!("Socket closed with error: {:?}", err);
Err(err)
})
.then(move |result| {
//let mut clst = cl2.lock().unwrap();
//clst.remove(&idx);
//println!("removed conn {} {}", idx, clst.len());
println!("Socket closed with result: {:?}", result);
Ok(())
});
tokio::spawn(processor)
});
tokio::run(done);
}
1 post - 1 participant
Read full topic
🏷️ Rust_feed