Can you help me with this code, I am trying to resolve the warning "warning: use of deprecated method `tokio_io::AsyncRead::framed`: Use tokio_codec::Decoder::framed instead"

⚓ Rust    📅 2025-08-12    👤 surdeus    👁️ 4      

surdeus

how can i update this to the newest tokio (I am using tokio 0.1.9, but the latest is 1.47.1)

use std::time::Duration;
use tokio::net::TcpListener;
use tokio::prelude::*;

use std::sync::{Arc, Mutex};
use std::io;
use tokio_io::codec::{Decoder, Encoder};

pub struct IScsiMsg {
    tbuf: BytesMut,
}

impl IScsiMsg {
    pub fn new() -> IScsiMsg {
        IScsiMsg {
            tbuf: BytesMut::with_capacity(262144),
        }
    }
}

impl Decoder for IScsiMsg {
    type Item = BytesMut;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
        self.tbuf.extend_from_slice(buf);
        buf.clear();

        if self.tbuf.len() >= 48
        {
            let nt = get_pdu_len(&self.tbuf) as usize;

	    if self.tbuf.len() >= nt
            {
                let mut tk = BytesMut::with_capacity(nt);
                tk.extend_from_slice(&self.tbuf[..nt]);

                if self.tbuf.len() > nt
                {
                    let remaining = self.tbuf.split_off(nt);
                    self.tbuf.clear();
                    self.tbuf.extend_from_slice(&remaining);
                }
                else
                {
                    self.tbuf.clear();
                }
                
                return Ok(Some(tk));
            }
        
            return Ok(None);
        }

        Ok(None)
    }
}

impl Encoder for IScsiMsg {
    type Item = Bytes;
    type Error = io::Error;

    fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
        buf.extend_from_slice(&msg);
        let len = add_padding(48 + msg.len() as u32);
        let nt = len - (48 + msg.len() as u32);

        if nt > 0
        {
            let m = [0,0,0,0];
            buf.extend_from_slice(&m[0..nt as usize]);
        }

        Ok(())
    }
}

fn main() {
    let (addr, conf, wb) = parse_command_line();
    println!("");
    println!("IP Address: {}", addr);
    println!("Config file: {}", conf);
    println!("Writeback: {}", wb);

    let statemap: Arc<Mutex<FnvHashMap<u32, State>>> = Arc::new(Mutex::new(FnvHashMap::default()));
    let targets: Arc<Mutex<ISCSITargets>>;

    {
        let mut tgts = ISCSITargets::new();
        tgts.open_cfg(&conf, &wb);
        targets = Arc::new(Mutex::new(tgts.clone()));
    }

    let addr = addr.parse().expect("Invalid socket address");
    let socket = TcpListener::bind(&addr).expect("Cannot bind to socket address");

    println!("Listening on: {}", addr);
    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            let cl = statemap.clone();
            let tgs = targets.clone();
            let mut idx = get_socket_index(&socket);

            {
                let mut clst = cl.lock().unwrap();

                while clst.contains_key(&idx) {
                    idx += 100;
                }

                clst.insert(idx, State::new());
                let st = clst.get_mut(&idx).unwrap();
                st.set_stidx(idx);
                println!("new conn {} {}", idx, clst.len());
            }

            socket.set_nodelay(true).unwrap();
            socket.set_keepalive(Some(Duration::new(5, 0))).unwrap();
            socket.set_linger(Some(Duration::new(5, 0))).unwrap();
            let _ = socket.set_recv_buffer_size(262144);
            let _ = socket.set_send_buffer_size(262144);
            let framed = socket.framed(IScsiMsg::new());
            let (writer, reader) = framed.split();

            let processor = reader
                .map(move |bytes| {
                    let mut clst = cl.lock().unwrap();
                    let mut st = clst.get_mut(&idx).unwrap();
                    let tg = tgs.lock().unwrap();

                    let mut fin = BytesMut::new();

                    let mut ns: usize = 0;
                    let bl = bytes.len();

                    while ns < bl {
                        let nt = get_pdu_len(&bytes[ns..]) as usize;
                        //print_pdu_info(&bytes[ns..ns+nt]);
                        let out = process_bytes(&bytes[ns..ns + nt], &mut st, &tg);
                        fin.extend_from_slice(&out);
                        ns += nt;
                    }

                    fin.freeze()
                })
                .forward(writer)
                .map(|_| ())
                .and_then(move |()| {
                    //let mut clst = cl1.lock().unwrap();
                    //clst.remove(&idx);
                    //println!("removed conn {} {}", idx, clst.len());
                    println!("Normal socket close");
                    Ok(())
                })
                .or_else(move |err| {
                    //let mut clst = cl2.lock().unwrap();
                    //clst.remove(&idx);
                    //println!("removed conn {} {}", idx, clst.len());
                    println!("Socket closed with error: {:?}", err);
                    Err(err)
                })
                .then(move |result| {
                    //let mut clst = cl2.lock().unwrap();
                    //clst.remove(&idx);
                    //println!("removed conn {} {}", idx, clst.len());
                    println!("Socket closed with result: {:?}", result);
                    Ok(())
                });


            tokio::spawn(processor)
        });

    tokio::run(done);
}

1 post - 1 participant

Read full topic

🏷️ Rust_feed