diff yolov8.py @ 4:f6990d85161c draft default tip

planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools commit c6c9d43a4ecdc88ebdeaf3451453a550f159c506
author bgruening
date Mon, 21 Jul 2025 15:51:13 +0000
parents 97bc82ee2a61
children
line wrap: on
line diff
--- a/yolov8.py	Mon Jul 14 18:28:46 2025 +0000
+++ b/yolov8.py	Mon Jul 21 15:51:13 2025 +0000
@@ -1,4 +1,5 @@
 import argparse
+import csv
 import os
 import pathlib
 import time
@@ -11,7 +12,6 @@
 from tifffile import imwrite
 from ultralytics import YOLO
 
-
 #
 # Input arguments
 #
@@ -79,7 +79,6 @@
 parser.add_argument("--class_names_file",
                     help="Path to the text file containing class names.",
                     type=str)
-
 # For training the model and prediction
 parser.add_argument("--mode",
                     help=(
@@ -130,6 +129,7 @@
 parser.add_argument('--headless', action='store_true')
 parser.add_argument('--nextflow', action='store_true')
 
+
 # For data augmentation
 parser.add_argument("--hsv_h",
                     help="(float) image HSV-Hue augmentation (fraction)",
@@ -171,9 +171,6 @@
                     default=1.0, type=float)
 
 
-#
-# Functions
-#
 # Train a new model on the dataset mentioned in yaml file
 def trainModel(model_path, model_name, yaml_filepath, **kwargs):
     if "imgsz" in kwargs:
@@ -272,13 +269,12 @@
                 translate=aug_translate, shear=aug_shear, scale=aug_scale,
                 perspective=aug_perspective, fliplr=aug_fliplr,
                 flipud=aug_flipud, mosaic=aug_mosaic, crop_fraction=aug_crop_fraction,
-                weight_decay=weight_decay, lr0=init_lr, seed=42)
+                weight_decay=weight_decay, lr0=init_lr)
     return model
 
 
 # Validate the trained model
 def validateModel(model):
-    # Validate the model
     metrics = model.val()  # no args needed, dataset & settings remembered
     metrics.box.map    # map50-95
     metrics.box.map50  # map50
@@ -316,6 +312,7 @@
     run_save_dir = kwargs['run_dir']  # For Galaxy, run_save_dir is always provided via xml wrapper
     if "foldername" in kwargs:
         save_folder_name = kwargs['foldername']
+
     # infer on a local image or directory containing images/videos
     prediction = model.predict(source=source_datapath, save=True, stream=True,
                                conf=confidence, imgsz=image_size,
@@ -329,6 +326,7 @@
 def save_yolo_bounding_boxes_to_txt(predictions, save_dir):
     """
     Function to save YOLO bounding boxes to text files.
+
     Parameters:
     - predictions: List of results from YOLO model inference.
     - save_dir: Directory where the text files will be saved.
@@ -339,12 +337,15 @@
         bounding_boxes = result.boxes.xyxy  # Bounding boxes in xyxy format
         confidence_scores = result.boxes.conf  # Confidence scores
         class_nums = result.boxes.cls  # Class numbers
+
         # Create save directory if it doesn't exist
         save_path = pathlib.Path(save_dir).absolute()
         save_path.mkdir(parents=True, exist_ok=True)
+
         # Construct filename for the text file
         image_filename = pathlib.Path(result.path).stem
         text_filename = save_path / f"{image_filename}.txt"
+
         # Write bounding boxes info into the text file
         with open(text_filename, 'w') as f:
             for i in range(bounding_boxes.shape[0]):
@@ -352,12 +353,14 @@
                 confidence = confidence_scores[i]
                 class_num = int(class_nums[i])
                 f.write(f'{class_num:01} {x1:06.2f} {y1:06.2f} {x2:06.2f} {y2:06.2f} {confidence:0.02} \n')
-        print(colored(f"Bounding boxes saved in: {text_filename}", 'green'))
+            print(colored(f"Bounding boxes saved in: {text_filename}", 'green'))
 
 
+# Main code
 if __name__ == '__main__':
     args = parser.parse_args()
     os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
+
     # Train/load model
     if (args.train):
         model = trainModel(args.model_path, args.model_name, args.yaml_path,
@@ -377,7 +380,7 @@
                                       "train", "weights", "best.pt"))
         else:
             model = YOLO(os.path.join(args.model_path,
-                                      args.model_name + ".pt"))
+                         args.model_name + ".pt"))
         model.info(verbose=True)
         elapsed = time.time() - t
         print(colored(f"\nYOLO model loaded in : '{elapsed}' sec \n", 'white', 'on_yellow'))
@@ -422,76 +425,107 @@
                                   tracker=args.tracker_file,
                                   conf=args.confidence,
                                   iou=args.iou,
-                                  persist=False,
-                                  show=True,
+                                  persist=True,
+                                  show=False,
                                   save=True,
                                   project=args.run_dir,
                                   name=args.foldername)
             # Store the track history
             track_history = defaultdict(lambda: [])
 
-            for result in results:
-                # Get the boxes and track IDs
-                if result.boxes and result.boxes.is_track:
-                    boxes = result.boxes.xywh.cpu()
-                    track_ids = result.boxes.id.int().cpu().tolist()
-                    # Visualize the result on the frame
-                    frame = result.plot()
-                    # Plot the tracks
-                    for box, track_id in zip(boxes, track_ids):
-                        x, y, w, h = box
-                        track = track_history[track_id]
-                        track.append((float(x), float(y)))  # x, y center point
-                        if len(track) > 30:  # retain 30 tracks for 30 frames
-                            track.pop(0)
-
-                        # Draw the tracking lines
-                        points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
-                        cv2.polylines(frame, [points], isClosed=False, color=(230, 230, 230), thickness=2)
-
-                    # Display the annotated frame
-                    cv2.imshow("YOLO11 Tracking", frame)
-                    print(colored(f"Tracking results saved in : '{args.save_dir}' \n", 'green'))
+            tsv_path = os.path.join(args.save_dir, "tracks.tsv")
+            with open(tsv_path, "w", newline="") as tsvfile:
+                writer = csv.writer(tsvfile, delimiter='\t')
+                writer.writerow(['track_id', 'frame', 'class', 'centroid_x', 'centroid_y'])
+                frame_idx = 0
+                for result in results:
+                    # Get the boxes and track IDs
+                    if result.boxes and result.boxes.is_track:
+                        track_ids = result.boxes.id.int().cpu().tolist()
+                        labels = result.boxes.cls.int().cpu().tolist() if hasattr(result.boxes, "cls") else [0] * len(track_ids)
+                        # Prepare mask image
+                        img_shape = result.orig_shape if hasattr(result, "orig_shape") else result.orig_img.shape
+                        mask = np.zeros(img_shape[:2], dtype=np.uint16)
+                        # Check if polygons (masks) are available
+                        if hasattr(result, "masks") and result.masks is not None and hasattr(result.masks, "xy"):
+                            polygons = result.masks.xy
+                            for i, (track_id, label) in enumerate(zip(track_ids, labels)):
+                                if i < len(polygons):
+                                    contour = polygons[i].astype(np.int32)
+                                    contour = contour.reshape(-1, 1, 2)
+                                    cv2.drawContours(mask, [contour], -1, int(track_id), cv2.FILLED)
+                                    # Calculate centroid of the polygon
+                                    M = cv2.moments(contour)
+                                    if M["m00"] != 0:
+                                        cx = float(M["m10"] / M["m00"])
+                                        cy = float(M["m01"] / M["m00"])
+                                    else:
+                                        cx, cy = 0.0, 0.0
+                                    writer.writerow([track_id, frame_idx, label, cx, cy])
+                        else:
+                            # Fallback to bounding boxes if polygons are not available
+                            boxes = result.boxes.xywh.cpu()
+                            xyxy_boxes = result.boxes.xyxy.cpu().numpy()
+                            for i, (box, xyxy, track_id, label) in enumerate(zip(boxes, xyxy_boxes, track_ids, labels)):
+                                x, y, w, h = box
+                                writer.writerow([track_id, frame_idx, label, float(x), float(y)])
+                                x1, y1, x2, y2 = map(int, xyxy)
+                                cv2.rectangle(mask, (x1, y1), (x2, y2), int(track_id), thickness=-1)
+                        # Collect masks for TYX stack
+                        if frame_idx == 0:
+                            mask_stack = []
+                        mask_stack.append(mask)
+                    frame_idx += 1
+            # Save TYX stack (T=frames, Y, X)
+            if 'mask_stack' in locals() and len(mask_stack) > 0:
+                tyx_array = np.stack(mask_stack, axis=0)
+                # Remove string from last underscore in filename
+                stem = pathlib.Path(result.path).stem
+                stem = stem.rsplit('_', 1)[0] if '_' in stem else stem
+                mask_save_as = str(pathlib.Path(os.path.join(args.save_dir, stem + "_mask.tiff")).absolute())
+                imwrite(mask_save_as, tyx_array)
+                print(colored(f"TYX mask stack saved as : '{mask_save_as}'", 'magenta'))
+            print(colored(f"Tracking results saved in : '{args.save_dir}' \n", 'green'))
         elif (args.mode == "segment"):
             # Read class names from the file
             with open(args.class_names_file, 'r') as f:
                 class_names = [line.strip() for line in f.readlines()]
+            # Create a mapping from class names to indices
             class_to_index = {class_name: i for i, class_name in enumerate(class_names)}
 
             # Save polygon coordinates
             for result in predictions:
+                # Create binary mask
                 img = np.copy(result.orig_img)
                 filename = pathlib.Path(result.path).stem
                 b_mask = np.zeros(img.shape[:2], np.uint8)
                 mask_save_as = str(pathlib.Path(os.path.join(args.save_dir, filename + "_mask.tiff")).absolute())
+                # Define output file path for text file
+                output_filename = os.path.splitext(filename)[0] + ".txt"
                 txt_save_as = str(pathlib.Path(os.path.join(args.save_dir, filename + ".txt")).absolute())
-
+                instance_id = 1  # Start instance IDs from 1
                 for c, ci in enumerate(result):
-                    if ci.masks is not None and ci.masks.xy:
-                        #  Extract contour
-                        contour = ci.masks.xy.pop()
-                        contour = contour.astype(np.int32).reshape(-1, 1, 2)
-                        _ = cv2.drawContours(b_mask, [contour], -1, (255, 255, 255), cv2.FILLED)
-
-                        # Normalized polygon points
-                        points = ci.masks.xyn.pop()
-                        obj_class = int(ci.boxes.cls.to("cpu").numpy().item())
-                        confidence = result.boxes.conf.to("cpu").numpy()[c]
+                    # Extract contour result
+                    contour = ci.masks.xy.pop()
+                    contour = contour.astype(np.int32)
+                    contour = contour.reshape(-1, 1, 2)
+                    # Draw contour onto mask with unique instance id
+                    _ = cv2.drawContours(b_mask, [contour], -1, instance_id, cv2.FILLED)
 
-                        with open(txt_save_as, 'a') as f:
-                            segmentation_points = ['{} {}'.format(points[i][0], points[i][1]) for i in range(len(points))]
-                            segmentation_points_string = ' '.join(segmentation_points)
-                            line = '{} {} {}\n'.format(obj_class, segmentation_points_string, confidence)
-                            f.write(line)
-                    else:
-                        print(colored(f"⚠️ No mask found for object {c} in '{filename}'. Skipping.", "yellow"))
+                    # Normalized polygon points
+                    points = ci.masks.xyn.pop()
+                    confidence = result.boxes.conf.to("cpu").numpy()[c]
 
-                # Overlay mask onto original image
-                colored_mask = cv2.merge([b_mask, np.zeros_like(b_mask), np.zeros_like(b_mask)])
-                blended = cv2.addWeighted(img, 1.0, colored_mask, 0.5, 0)
-                overlay_path = os.path.join(args.save_dir, filename + "_overlay.jpg")
-                cv2.imwrite(overlay_path, blended)
+                    with open(txt_save_as, 'a') as f:
+                        segmentation_points = ['{} {}'.format(points[i][0], points[i][1]) for i in range(len(points))]
+                        segmentation_points_string = ' '.join(segmentation_points)
+                        line = '{} {} {}\n'.format(instance_id, segmentation_points_string, confidence)
+                        f.write(line)
 
-                imwrite(mask_save_as, b_mask, imagej=True)
-                print(colored(f"Saved binary mask as : \n '{mask_save_as}' \n", 'magenta'))
+                    instance_id += 1  # Increment for next object
+
+                imwrite(mask_save_as, b_mask, imagej=True)  # save label mask image
+                print(colored(f"Saved label mask as : \n '{mask_save_as}' \n", 'magenta'))
                 print(colored(f"Polygon coordinates saved as : \n '{txt_save_as}' \n", 'cyan'))
+        else:
+            raise Exception(("Currently only 'detect', 'segment' and 'track' modes are available"))