Python ile Ağ Tarama ve Port İzleme: socket ve scapy Kullanımı

Ağ yönetiminin en can sıkıcı taraflarından biri, hangi portların açık olduğunu, hangi servislerin çalıştığını ve ağda ne döndüğünü sürekli takip etmek zorunda kalmaktır. Bunu elle yapmak hem zaman alır hem de hata yapma olasılığını artırır. İşte tam da bu noktada Python devreye giriyor. socket ve scapy kütüphaneleriyle kendi ağ tarama ve port monitoring araçlarını yazmak hem öğretici hem de gerçekten işe yarayan bir yöntem. Bu yazıda sıfırdan başlayarak production ortamında kullanabileceğin scriptler yazacağız.

Neden socket ve scapy?

Python’ın standart kütüphanesindeki socket modülü, TCP/UDP bağlantıları üzerinde doğrudan kontrol sağlar. Basit port tarama işlemleri için fazlasıyla yeterlidir ve ek kurulum gerektirmez. scapy ise işi bir üst seviyeye taşır; ham paket oluşturma, ICMP ping, ARP keşfi ve SYN tarama gibi gelişmiş işlemleri mümkün kılar.

Nmap var, zaten her şeyi yapıyor diyebilirsin. Evet, nmap mükemmel bir araç. Ama kendi scriptini yazmak şu avantajları sağlar: çıktıyı istediğin formatta alırsın, başka sistemlerle entegre edersin, özel alarm mekanizmaları kurarsın ve en önemlisi, ne yaptığını tam olarak bilirsin.

Kurulum ve Hazırlık

socket modülü Python ile birlikte gelir, ayrıca kurulum gerekmez. Scapy için:

pip install scapy
pip install netifaces  # Ağ arayüzü bilgileri için
pip install python-nmap  # İleride işe yarar

Linux’ta scapy’yi root yetkisiyle çalıştırman gerekecek, çünkü ham soketlere erişim root izni ister. Windows’ta ise Npcap kurulu olması gerekiyor.

# Linux'ta scapy scriptlerini çalıştırmak için
sudo python3 port_scanner.py

# Veya scapy'ye özel capabilities ver
sudo setcap cap_net_raw+eip /usr/bin/python3

socket ile Temel Port Tarayıcı

İlk olarak en temel yapıyı kuralım. TCP bağlantı bazlı port tarama, bir porta bağlanmaya çalışır; bağlantı başarılıysa port açıktır, timeout gelirse kapalıdır.

#!/usr/bin/env python3
import socket
import concurrent.futures
import sys
from datetime import datetime

def scan_port(host, port, timeout=1):
    """Tek bir portu tara"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        if result == 0:
            return port, True
        return port, False
    except socket.error:
        return port, False

def get_service_name(port):
    """Port numarasından servis adını al"""
    try:
        return socket.getservbyport(port)
    except:
        return "unknown"

def tcp_port_scan(host, start_port=1, end_port=1024, workers=100):
    """Çok iş parçacıklı TCP port tarayıcı"""
    print(f"n[*] {host} hedefi taranıyor...")
    print(f"[*] Port aralığı: {start_port}-{end_port}")
    print(f"[*] Başlangıç: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}n")
    
    open_ports = []
    ports = range(start_port, end_port + 1)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        futures = {executor.submit(scan_port, host, port): port for port in ports}
        
        for future in concurrent.futures.as_completed(futures):
            port, is_open = future.result()
            if is_open:
                service = get_service_name(port)
                open_ports.append((port, service))
                print(f"[+] Port {port:5d}/tcp   OPEN   {service}")
    
    open_ports.sort(key=lambda x: x[0])
    print(f"n[*] Tarama tamamlandı. {len(open_ports)} açık port bulundu.")
    return open_ports

if __name__ == "__main__":
    target = sys.argv[1] if len(sys.argv) > 1 else "192.168.1.1"
    
    # Host adını IP'ye çevir
    try:
        target_ip = socket.gethostbyname(target)
        print(f"[*] {target} -> {target_ip}")
    except socket.gaierror:
        print(f"[-] {target} çözümlenemedi!")
        sys.exit(1)
    
    tcp_port_scan(target_ip, 1, 1024, workers=200)

Bu script 200 thread kullanarak 1-1024 portlarını saniyeler içinde tarar. workers değerini düşürürsen daha az agresif bir tarama yaparsın, bu da IDS sistemlerini tetikleme ihtimalini azaltır.

Banner Grabbing ile Servis Tespiti

Açık portları bulduktan sonra orada çalışan servisi daha iyi anlamak için banner grabbing yapabiliriz. Servisler genellikle bağlandığında kendilerini tanıtan bir başlık mesajı gönderir.

#!/usr/bin/env python3
import socket

def grab_banner(host, port, timeout=3):
    """Servis banner'ını yakala"""
    banner = ""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        sock.connect((host, port))
        
        # Bazı servisler HTTP isteği bekler
        if port in [80, 443, 8080, 8443]:
            sock.send(b"HEAD / HTTP/1.0rnHost: " + host.encode() + b"rnrn")
        elif port == 21:
            pass  # FTP otomatik banner gönderir
        else:
            sock.send(b"rn")
        
        banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
        sock.close()
    except Exception as e:
        banner = f"Banner alınamadı: {str(e)}"
    
    return banner

def scan_with_banner(host, ports):
    """Port tarama ve banner grabbing birlikte"""
    results = []
    for port in ports:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        sock.close()
        
        if result == 0:
            banner = grab_banner(host, port)
            results.append({
                'port': port,
                'status': 'open',
                'banner': banner[:100]  # İlk 100 karakter yeterli
            })
            print(f"[+] Port {port}: OPEN")
            if banner:
                print(f"    Banner: {banner[:80]}")
    
    return results

# Kullanım
if __name__ == "__main__":
    common_ports = [21, 22, 23, 25, 80, 110, 143, 443, 445, 3306, 3389, 5432, 6379, 8080]
    scan_with_banner("192.168.1.100", common_ports)

scapy ile ARP Keşfi

Yerel ağda hangi cihazların aktif olduğunu bulmak için ARP taraması en güvenilir yöntemdir. scapy bunu çok temiz bir şekilde yapmamıza olanak tanır.

#!/usr/bin/env python3
from scapy.all import ARP, Ether, srp, conf
import socket
import ipaddress

conf.verb = 0  # Scapy verbose çıktısını kapat

def arp_scan(network):
    """
    Yerel ağda aktif hostları ARP ile keşfet
    Örnek: arp_scan("192.168.1.0/24")
    """
    print(f"[*] ARP taraması başlatılıyor: {network}")
    
    # ARP paketi oluştur
    arp_request = ARP(pdst=network)
    broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
    arp_request_broadcast = broadcast / arp_request
    
    # Paketleri gönder ve yanıtları bekle
    answered, unanswered = srp(arp_request_broadcast, timeout=3, retry=2)
    
    hosts = []
    print(f"n{'IP Adresi':<20} {'MAC Adresi':<20} {'Hostname'}")
    print("-" * 60)
    
    for sent, received in answered:
        ip = received.psrc
        mac = received.hwsrc
        
        # Hostname çözümlemeyi dene
        try:
            hostname = socket.gethostbyaddr(ip)[0]
        except:
            hostname = "bilinmiyor"
        
        hosts.append({'ip': ip, 'mac': mac, 'hostname': hostname})
        print(f"{ip:<20} {mac:<20} {hostname}")
    
    print(f"n[*] {len(hosts)} aktif host bulundu.")
    return hosts

if __name__ == "__main__":
    # Kendi ağ aralığını gir
    active_hosts = arp_scan("192.168.1.0/24")

scapy ile SYN Tarama

SYN tarama (half-open scan) tam TCP bağlantısı kurmadan port durumunu öğrenmenin yoludur. Daha az iz bırakır ve bazı güvenlik duvarlarını atlatabilir. Ancak bu yöntemi sadece kendi ağında ve yetkili olduğun sistemlerde kullan.

#!/usr/bin/env python3
from scapy.all import IP, TCP, sr1, conf, RandShort
import time

conf.verb = 0

def syn_scan(target, ports, timeout=2):
    """
    SYN tarama - tam bağlantı kurmadan port durumunu öğren
    Root yetkisi gerektirir!
    """
    print(f"n[*] SYN Tarama: {target}")
    print(f"[*] Portlar: {min(ports)}-{max(ports)}n")
    
    open_ports = []
    closed_ports = []
    filtered_ports = []
    
    for port in ports:
        # SYN paketi oluştur
        syn_packet = IP(dst=target) / TCP(
            sport=RandShort(),  # Rastgele kaynak port
            dport=port,
            flags="S"           # SYN flag
        )
        
        # Paketi gönder ve yanıt bekle
        response = sr1(syn_packet, timeout=timeout)
        
        if response is None:
            # Yanıt yok = filtered
            filtered_ports.append(port)
        elif response.haslayer(TCP):
            tcp_flags = response.getlayer(TCP).flags
            
            if tcp_flags == 0x12:  # SYN-ACK: port açık
                open_ports.append(port)
                print(f"[+] Port {port}: OPEN")
                
                # RST gönder, bağlantıyı temizle
                rst = IP(dst=target) / TCP(
                    sport=syn_packet[TCP].sport,
                    dport=port,
                    flags="R"
                )
                sr1(rst, timeout=1)
                
            elif tcp_flags == 0x14:  # RST-ACK: port kapalı
                closed_ports.append(port)
    
    print(f"n[*] Özet:")
    print(f"    Açık: {len(open_ports)} port")
    print(f"    Kapalı: {len(closed_ports)} port")
    print(f"    Filtered: {len(filtered_ports)} port")
    
    return open_ports, closed_ports, filtered_ports

if __name__ == "__main__":
    target = "192.168.1.1"
    # Yaygın portları tara
    ports_to_scan = list(range(20, 25)) + [80, 443, 8080, 3306, 22, 3389]
    
    open_p, closed_p, filtered_p = syn_scan(target, ports_to_scan)
    print(f"nAçık portlar: {open_p}")

Port Monitoring: Sürekli İzleme Scripti

Şimdiye kadar tek seferlik tarama yaptık. Asıl güçlü senaryo ise bir portun durumunu sürekli izlemek ve değiştiğinde alarm vermektir. Bir servis aniden durduğunda veya beklenmedik bir port açıldığında haberdar olmak isteyebilirsin.

#!/usr/bin/env python3
import socket
import time
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from collections import defaultdict

class PortMonitor:
    def __init__(self, config_file="monitor_config.json"):
        self.config = self.load_config(config_file)
        self.port_states = defaultdict(dict)
        self.alert_history = []
        
    def load_config(self, config_file):
        """Yapılandırma dosyasını yükle"""
        default_config = {
            "targets": [
                {
                    "host": "192.168.1.100",
                    "name": "Web Sunucusu",
                    "ports": [80, 443, 22],
                    "critical": True
                },
                {
                    "host": "192.168.1.101", 
                    "name": "DB Sunucusu",
                    "ports": [3306, 5432, 22],
                    "critical": True
                }
            ],
            "check_interval": 60,
            "timeout": 3,
            "alert_email": "[email protected]",
            "smtp_server": "localhost"
        }
        
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            # Default config ile başla
            with open(config_file, 'w') as f:
                json.dump(default_config, f, indent=4)
            return default_config
    
    def check_port(self, host, port):
        """Port durumunu kontrol et"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.config.get('timeout', 3))
            result = sock.connect_ex((host, port))
            sock.close()
            return result == 0
        except:
            return False
    
    def send_alert(self, subject, message):
        """Email ile alarm gönder"""
        try:
            msg = MIMEText(message)
            msg['Subject'] = f"[PORT MONITOR] {subject}"
            msg['From'] = "monitor@localhost"
            msg['To'] = self.config.get('alert_email', 'admin@localhost')
            
            with smtplib.SMTP(self.config.get('smtp_server', 'localhost')) as server:
                server.send_message(msg)
            print(f"[!] Alarm gönderildi: {subject}")
        except Exception as e:
            print(f"[-] Email gönderilemedi: {e}")
    
    def log_event(self, host, port, old_state, new_state):
        """Durum değişikliğini logla"""
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        old_str = "OPEN" if old_state else "CLOSED"
        new_str = "OPEN" if new_state else "CLOSED"
        
        log_entry = f"[{timestamp}] {host}:{port} {old_str} -> {new_str}"
        
        with open("port_monitor.log", 'a') as f:
            f.write(log_entry + "n")
        
        print(log_entry)
        self.alert_history.append(log_entry)
    
    def run(self):
        """Ana izleme döngüsü"""
        print(f"[*] Port Monitor başlatıldı")
        print(f"[*] Kontrol aralığı: {self.config['check_interval']} saniye")
        print(f"[*] İzlenen hedefler:")
        
        for target in self.config['targets']:
            print(f"    - {target['name']} ({target['host']}): {target['ports']}")
        
        # İlk durumu kaydet
        for target in self.config['targets']:
            host = target['host']
            for port in target['ports']:
                is_open = self.check_port(host, port)
                self.port_states[host][port] = is_open
                status = "OPEN" if is_open else "CLOSED"
                print(f"[*] İlk durum - {host}:{port} = {status}")
        
        print(f"n[*] İzleme başladı...n")
        
        while True:
            time.sleep(self.config.get('check_interval', 60))
            
            for target in self.config['targets']:
                host = target['host']
                name = target['name']
                
                for port in target['ports']:
                    current_state = self.check_port(host, port)
                    previous_state = self.port_states[host].get(port)
                    
                    if previous_state != current_state:
                        self.log_event(host, port, previous_state, current_state)
                        
                        # Alarm gönder
                        action = "AÇILDI" if current_state else "KAPANDI"
                        subject = f"{name} - Port {port} {action}"
                        message = (
                            f"Host: {host} ({name})n"
                            f"Port: {port}n"
                            f"Durum: {action}n"
                            f"Zaman: {datetime.now()}n"
                        )
                        
                        if target.get('critical', False):
                            self.send_alert(subject, message)
                        
                        # Durumu güncelle
                        self.port_states[host][port] = current_state

if __name__ == "__main__":
    monitor = PortMonitor()
    monitor.run()

Gercek Dunya Senaryosu: Yeni Host Tespiti

Ağda izinsiz bir cihaz belirdiğinde haberdar olmak için ARP tabanlı bir izleme sistemi kurabilirsin. Bu özellikle güvenlik açısından kritik ağlarda işe yarar.

#!/usr/bin/env python3
from scapy.all import ARP, Ether, srp, conf
import time
import json
from datetime import datetime

conf.verb = 0

WHITELIST_FILE = "known_hosts.json"
SCAN_INTERVAL = 300  # 5 dakikada bir tara
NETWORK = "192.168.1.0/24"

def load_known_hosts():
    try:
        with open(WHITELIST_FILE, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return {}

def save_known_hosts(hosts):
    with open(WHITELIST_FILE, 'w') as f:
        json.dump(hosts, f, indent=4)

def arp_discover(network):
    """Ağdaki aktif hostları bul"""
    arp = ARP(pdst=network)
    ether = Ether(dst="ff:ff:ff:ff:ff:ff")
    packet = ether / arp
    result = srp(packet, timeout=3, retry=1)[0]
    
    hosts = {}
    for sent, received in result:
        hosts[received.psrc] = received.hwsrc
    return hosts

def monitor_new_hosts():
    """Yeni cihaz tespiti için sürekli izleme"""
    print(f"[*] Ağ izleme başlatıldı: {NETWORK}")
    
    known_hosts = load_known_hosts()
    
    if not known_hosts:
        print("[*] Bilinen host listesi boş, ilk tarama yapılıyor...")
        initial_hosts = arp_discover(NETWORK)
        known_hosts = initial_hosts
        save_known_hosts(known_hosts)
        print(f"[*] {len(known_hosts)} host whitelist'e eklendi.")
    
    print(f"[*] {len(known_hosts)} bilinen host izleniyor.n")
    
    while True:
        time.sleep(SCAN_INTERVAL)
        current_hosts = arp_discover(NETWORK)
        
        # Yeni hostları kontrol et
        for ip, mac in current_hosts.items():
            if ip not in known_hosts:
                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                alert_msg = f"[UYARI] [{timestamp}] YENİ HOST TESPİT EDİLDİ!"
                alert_msg += f" IP: {ip}, MAC: {mac}"
                
                print(alert_msg)
                
                with open("new_host_alerts.log", 'a') as f:
                    f.write(alert_msg + "n")
            
            # MAC değişimini kontrol et (ARP spoofing tespiti)
            elif known_hosts[ip] != mac:
                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                spoof_alert = f"[KRİTİK] [{timestamp}] OLASI ARP SPOOFING!"
                spoof_alert += f" IP: {ip}, Bilinen MAC: {known_hosts[ip]}, Yeni MAC: {mac}"
                print(spoof_alert)
        
        # Kaybolan hostları kontrol et
        for ip in known_hosts:
            if ip not in current_hosts:
                print(f"[*] Host erişilemez durumda: {ip}")

if __name__ == "__main__":
    monitor_new_hosts()

Sonuçları JSON Olarak Kaydetme ve Raporlama

Tarama sonuçlarını yapılandırılmış formatta saklamak, ileride analiz etmek için kritik öneme sahiptir.

#!/usr/bin/env python3
import socket
import json
import concurrent.futures
from datetime import datetime

def generate_scan_report(targets, port_range=(1, 1024)):
    """Birden fazla hedef için kapsamlı tarama raporu"""
    report = {
        "scan_date": datetime.now().isoformat(),
        "scanner": "Python socket scanner v1.0",
        "targets": []
    }
    
    for target in targets:
        host_result = {
            "host": target,
            "scan_start": datetime.now().isoformat(),
            "open_ports": [],
            "total_scanned": port_range[1] - port_range[0] + 1
        }
        
        try:
            ip = socket.gethostbyname(target)
            host_result["ip"] = ip
        except:
            host_result["ip"] = "çözümlenemedi"
            host_result["error"] = "DNS çözümleme başarısız"
            report["targets"].append(host_result)
            continue
        
        # Port tarama
        with concurrent.futures.ThreadPoolExecutor(max_workers=150) as executor:
            futures = {}
            for port in range(port_range[0], port_range[1] + 1):
                future = executor.submit(check_port_with_service, ip, port)
                futures[future] = port
            
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result:
                    host_result["open_ports"].append(result)
        
        host_result["open_ports"].sort(key=lambda x: x["port"])
        host_result["scan_end"] = datetime.now().isoformat()
        report["targets"].append(host_result)
    
    # Raporu kaydet
    filename = f"scan_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(report, f, indent=4, ensure_ascii=False)
    
    print(f"[*] Rapor kaydedildi: {filename}")
    return report

def check_port_with_service(host, port, timeout=1):
    """Port kontrolü ve servis bilgisi"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        
        if result == 0:
            try:
                service = socket.getservbyport(port)
            except:
                service = "unknown"
            
            return {
                "port": port,
                "protocol": "tcp",
                "service": service,
                "state": "open"
            }
        return None
    except:
        return None

if __name__ == "__main__":
    targets = ["192.168.1.1", "192.168.1.100", "google.com"]
    report = generate_scan_report(targets, port_range=(1, 1024))
    
    # Özet çıktı
    for target_result in report["targets"]:
        host = target_result["host"]
        open_count = len(target_result.get("open_ports", []))
        print(f"n{host}: {open_count} açık port")
        for port_info in target_result.get("open_ports", []):
            print(f"  - {port_info['port']}/tcp ({port_info['service']})")

Önemli Notlar ve Etik Kullanım

Bu araçları kullanırken dikkat etmen gereken birkaç kritik nokta var:

  • İzin almadan tarama yapma: Sadece kendi ağlarını ve izin verilen sistemleri tara. İzinsiz port tarama pek çok ülkede yasadışıdır.
  • IDS/IPS tetikleme: Yüksek hızlı taramalar güvenlik sistemlerini tetikler ve IP bloğuna yol açar. workers değerini makul tut.
  • Rate limiting: Production ağlarında tarama yaparken time.sleep() ekleyerek taramayı yavaşlat.
  • Log tutma: Tüm tarama aktivitelerini logla. Hem güvenlik hem de troubleshooting için gerekli.
  • Root yetkisi: scapy ham soket işlemleri için root yetkisi ister. Bu yetkiyi dikkatli kullan ve scripti minimum yetkiyle çalıştır.
  • Firewall kuralları: SYN tarama bazı firewall’ları atlatabilir gibi görünse de modern IDS sistemleri bunu kolayca tespit eder.

Sonuç

socket ve scapy kombinasyonu, ağ yönetiminde gerçekten güçlü bir araç seti sunuyor. Basit port taramadan ARP spoofing tespitine, banner grabbing’den sürekli port monitoring’e kadar pek çok senaryo için bu iki kütüphane yeterli.

Yazdığımız scriptleri production ortamında kullanmadan önce test ağında dene. Port monitor scriptini systemd servisi olarak çalıştırabilir, cron job ile periyodik tarama yapabilir ve sonuçları bir veritabanına kaydederek zaman içindeki değişimleri takip edebilirsin. Bir sonraki adım olarak bu scripti bir dashboard ile entegre etmek ve Grafana/Prometheus ile görselleştirmek işleri daha da profesyonel hale getirecektir.

Ağ güvenliği reaktif değil proaktif bir iş; bu scriptler sana o proaktif yaklaşımı kazandırır.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir